confluentinc/flink-sql-ai-meetingcoach-azure
This project demonstrates a real-time AI "Meeting Coach" showcasing the use of Confluent Cloud for Apache Flink AI Inference functions to build a real-time Retrieval-Augmented Generation (RAG) pipeline. The demo uses both a static knowledge base of sales documents and real-time simulated meeting data.
Flink AI Meeting Coach โ Real-time Sales Coaching with RAG
This project demonstrates a real-time AI "Meeting Coach" showcasing the use of Confluent Cloud for Apache Flink AI Inference functions to build a real-time Retrieval-Augmented Generation (RAG) pipeline. The demo uses both a static knowledge base of sales documents and real-time simulated meeting data.
Architecture
The Meeting Coach system consumes a chat stream representing a sales meeting. When specific triggers occur (like a customer raising a price objection), it uses a RAG pipeline to retrieve relevant context from a knowledge base of company documents (e.g., battlecards, sales materials, previous meeting notes, objection handling tips) stored in a vector database, and generates real-time coaching suggestions for the salesperson using an LLM.
๐ Quick Start Deployment
This guide helps you deploy the complete Flink ML Demo infrastructure from scratch with minimal credentials.
Prerequisites
You need accounts and API credentials for:
- Confluent Cloud (for Kafka + Flink)
- Microsoft Azure (for OpenAI services)
- MongoDB Atlas (for vector database)
- This deployment uses MongoDB Atlas M0 Free Tier
- Note: M0 free tier cluster cannot be created via Terraform API - must be created manually in Atlas UI
Step 1: Get Required Credentials
1.1 Confluent Cloud API Keys
1.1. Go to Confluent Cloud
1.2. Create a new Cloud API Key (not cluster-specific)
1.3. Note down: API Key and API Secret
1.2 Azure Service Principal
1.4. Go to Azure Portal > App registrations
1.5. Create a new app registration
1.6. Go to "Certificates & secrets" > New client secret
1.7. Go to "Subscriptions" > Your subscription > Access control (IAM)
1.8. Add role assignment: "Contributor" for your app
1.9. Note down: Subscription ID, Tenant ID, Client ID, Client Secret
1.3 MongoDB Atlas API Keys
1.10. Go to MongoDB Atlas > Access Manager > API Keys
1.11. Create new Organization API Key with "Project Creator" permissions
1.12. Note down: Public Key, Private Key, and your Organization ID
Step 2: Configure Environment
2.1. Copy the template:
cp .env.template .env2.2. Open the .env file and fill it out with your credentials:
nano .envStep 3: Deploy Infrastructure
3.1. Initialize Terraform:
cd terraform
terraform init3.2. Load environment variables:
Make sure to load this command every time before running terraform plan or terraform apply
source ./load_tf_vars.sh3.3. Deploy everything:
-- make sure to run `source ./load_tf_vars.sh` every time before `terraform apply`
terraform apply --auto-approveStep 4: Run personalized Flink SQL commands to generate connections, models, and tables in Confluent Cloud
4.1. Run terraform/generate_personalized_commands.sh which generates terraform/personalized_setup_commands.md, containing personalized Flink SQL commands with your specific deployment details.
cd terraform
./generate_personalized_commands.sh4.2. Run the personalized Flink SQL statements from personalized_setup_commands.md in Confluent Cloud to create Flink connections, models, and tables
Use either the Confluent Cloud Flink SQL Workspace or, if logged into Confluent CLI, use Confluent Flink shell by running:
confluent flink shellData Flow
There are two main tracks that define how data flows within this project:
Knowledge Base Data Prep Pipeline:
- Ingestion: Generate static, synthetic knowledge base documents (e.g., .txt, .md). Use a method like the Kafka file source connector or a custom producer to send these documents as messages to the
knowledgeKafka topic. - Processing & Embedding: A Flink SQL job consumes these documents, chunks them using
ML_CHARACTER_TEXT_SPLITTER, usesML_PREDICTwith theopenaiembedmodel to call Azure OpenAI for embeddings, and prepares the data for the vector store. - Storage: The embedded knowledge is stored in MongoDB Atlas via the
knowledge_mongodbexternal table definition and a suitable sink mechanism.
Real-time Coaching Pipeline:
- Input: Real or simulated chat messages are sent to the
messages_conversationKafka topic via the frontend web UI. - Processing: Flink SQL filters the conversation for prospect messages (
messages_prospect), then generates embeddings for these messages (messages_prospect_embeddings) by calling out to the Azure OpenAItext-embedding-ada-002model. - RAG: The system then uses Flink SQL AI functions to perform vector search against MongoDB Atlas vector database using
VECTOR_SEARCH, and saves the results (messages_prospect_rag_results). - Generation: The system calls the Azure OpenAI
gpt-4o-minimodel viaML_PREDICTusing thecoaching_response_generatormodel and a custom prompt, including the prospect's recent message, and the retrieved document chunk results from RAG retrieval. - Pipeline result storage: The original message, along with the relevant document chunks retrieved via RAG search, and the
coaching_response_generator's final meeting coaching output, are saved for later review and model retraining to Azure CosmosDB.
Running the AI MeetingCoach Application UI
-
Activate Virtual Environment:
Before running the application, activate the Python virtual environment. If you haven't created one, you can do so withpython -m venv .venvoruv venv.- On macOS/Linux:
source .venv/bin/activate - On Windows:
.\.venv\Scripts\activate
- On macOS/Linux:
-
Install Dependencies:
Install the required Python packages:pip install -r requirements.txt
Alternatively, if using
uv, run:uv sync
-
Set Environment Variables:
Ensure you have have fully filled out the.envfile in the project root directory with the necessary credentials (see.env.templatefor required variables like Kafka/Schema Registry credentials and Azure OpenAI keys). -
Load Knowledge Base (one-time setup):
python publish_knowledge_documents.py
-
Run the Flask App:
Start the Flask development server:python app.py
The application should now be accessible at
http://127.0.0.1:5000(or the host/port specified in the output).
What Gets Created
The Terraform + Flink SQL statements you entered earlier create everything from scratch:
Confluent Cloud
- โ New environment
- โ Kafka cluster with all topics
- โ Flink compute pool
- โ Service accounts and API keys
- โ MongoDB sink connector
Microsoft Azure
- โ Resource group
- โ OpenAI service
- โ Text embedding model deployment
- โ GPT-4 completion model deployment
MongoDB Atlas
- โ New project
- โ M0 Free Tier cluster with vector search support
- โ Database user with proper permissions
- โ Vector search index (1536 dimensions for OpenAI)
- โ IP allowlist for access
Cleanup
To destroy all resources:
cd terraform
terraform destroy --auto-approveTroubleshooting
"Resource already exists" errors:
- Change your
DEPLOYMENT_PREFIXin .env to something unique
MongoDB connection issues:
- Wait 2-3 minutes after cluster creation for it to be fully ready
- Ensure your cluster is deployed in
eastus2 (Virigina)region, same as rest of project infra. - Whitelist IP 0.0.0.0/0 to enable Confluent to connect
Azure permission errors:
- Ensure your service principal has "Contributor" role on the subscription
- Check that the Azure region is
eastus2for this project - do not change region.
Confluent connection errors:
- Verify your Cloud API key is a
Cloud Resource Managementkey, and that you've also attachedOrganizationAdminpermissions to the key (!!) - Ensure your cluster is deployed in
eastus2 (Virigina)region, same as rest of project infra. - Use the exact resource IDs from terraform outputs
Project Structure
โโโ README.md (this file)
โโโ .env.template (credential template)
โโโ requirements.txt (python dependencies)
โโโ publish_knowledge_documents.py (main utility)
โโโ app.py (main flask application)
โโโ terraform/ (infrastructure as code)
โโโ app/ (application code & assets)
โ โโโ routes/ (flask routes)
โ โโโ utils/ (utilities)
โ โโโ templates/ (html templates)
โ โโโ static/ (css, js, images)
โ โโโ scripts/ (utility scripts)
โโโ sample-data/ (demo knowledge base)
โโโ images/ (screenshots & diagrams)

