Chatbot
Purpose
chatbot-svc is the LangGraph worker behind Lumie's academy assistant. The backend proxies chat requests to this worker, the worker streams assistant output over Server-Sent Events (SSE), and all tenant data access is delegated back to backend /internal/chatbot/** endpoints.
This page is a reference document for developers changing the worker, backend chat proxy, tool contracts, LangGraph persistence, or chatbot operations. For section-wide worker conventions, see Workers Overview.
Source Paths
| Path | Role |
|---|---|
lumie-worker/services/chatbot/main.py | FastAPI app, lifespan setup, /health, /metrics, /api/chatbot/stream, and /api/chatbot/confirm |
lumie-worker/services/chatbot/src/schema.py | Pydantic request and response envelopes for stream and confirm calls |
lumie-worker/services/chatbot/src/usecase.py | SSE streaming, detached graph runs, pending-action persistence, confirmation resume, and error mapping |
lumie-worker/services/chatbot/src/graph/agent.py | LangGraph topology, scope guard, LLM node, tool node, retry routing, and cancellation routing |
lumie-worker/services/chatbot/src/graph/tools.py | Tool specifications passed to the model and human-readable action descriptions |
lumie-worker/services/chatbot/src/adapters/backend_api.py | HMAC-signed HTTP adapter for backend /internal/chatbot/** endpoints |
lumie-worker/services/chatbot/src/config.py | Environment-backed settings for LLM, backend, LangGraph DB, and observability |
lumie-backend/modules/ai/src/main/java/com/lumie/ai/adapter/out/external/ChatbotClient.java | Backend client that calls the worker and proxies SSE frames |
lumie-backend/modules/ai/src/main/java/com/lumie/ai/adapter/in/web/InternalChatbotController.java | Backend callback surface used by the worker for SQL, tools, schema, and persistence |
lumie-infra/applications/lumie/worker/chatbot-svc/common-values.yaml | Production deployment, service, env, secrets, and ServiceMonitor wiring |
Public Surface
chatbot-svc is an internal ClusterIP service. Browsers do not call it directly; frontend chat requests go through backend /v1/chat/**, then backend ChatbotClient calls the worker.
| Endpoint | Purpose |
|---|---|
GET /health | Probe endpoint; returns {"status":"ok"} without requiring the lifespan container |
GET /metrics | Prometheus endpoint mounted from metrics_app |
POST /api/chatbot/stream | Starts or continues a chat turn and returns an SSE stream |
POST /api/chatbot/confirm | Resumes a paused write-tool action after user approval or rejection |
Both worker POST routes require X-Tenant-Slug. main.py::_require_tenant_match(...) rejects the request with HTTP 400 when the header does not match the body tenantSlug. If the lifespan container has not finished initializing, both POST routes return HTTP 503.
Runtime Flow
ChatUseCase.stream_chat(...) starts the graph in a detached background task. The HTTP response only drains a bounded queue of SSE frames. If the client disconnects, the graph task keeps running and persists the final assistant message through the backend.
Stream Contract
POST /api/chatbot/stream accepts ChatStreamRequest.
class ChatStreamRequest(BaseModel):
tenant_slug: str = Field(..., alias="tenantSlug", min_length=1)
tenant_id: int = Field(..., alias="tenantId")
user_id: int = Field(..., alias="userId")
conversation_id: int | None = Field(None, alias="conversationId")
message: str = Field(..., min_length=1, max_length=4000)
When conversationId is null, the worker asks the backend to create a conversation and returns the new id in the final done event.
Example request body:
{
"tenantSlug": "demo",
"tenantId": 7,
"userId": 101,
"conversationId": null,
"message": "이번 주 공지사항을 만들어 줘"
}
The stream emits these SSE events:
| Event | Data shape | Meaning |
|---|---|---|
token | {"content": "..."} | Assistant text chunk. Tool messages and raw SQL results are filtered out. |
pending | {"messageId": 1, "toolName": "...", "description": "...", "arguments": {...}} | A write tool needs user confirmation before execution. |
done | {"conversationId": 1, "messageId": 2} | The turn finished, or a pending action was persisted. |
error | {"message": "..."} | The detached graph run failed after startup. |
Backend ChatbotClient preserves event names and data while proxying the worker stream to the browser-facing SseEmitter.
Short transcript from the current worker contract:
event: token
data: {"content":"안녕"}
event: token
data: {"content":"하세요"}
event: done
data: {"conversationId":10,"messageId":52}
That transcript matches the ChatUseCase._sse(...) framing and the test_stream_emits_tokens_and_persists_both_messages coverage in services/chatbot/tests/test_usecase.py.
Confirm Contract
POST /api/chatbot/confirm accepts ConfirmRequest and returns ChatReplyResponse.
class ConfirmRequest(BaseModel):
tenant_slug: str = Field(..., alias="tenantSlug", min_length=1)
tenant_id: int = Field(..., alias="tenantId")
user_id: int = Field(..., alias="userId")
conversation_id: int = Field(..., alias="conversationId")
message_id: int = Field(..., alias="messageId")
confirmed: bool
Example confirm request:
{
"tenantSlug": "demo",
"tenantId": 7,
"userId": 101,
"conversationId": 10,
"messageId": 52,
"confirmed": true
}
The backend public /v1/chat/confirm route receives only messageId and confirmed; it resolves conversationId from the pending message before calling the worker. Backend idempotency is handled at /v1/chat/confirm with the optional Idempotency-Key header, not inside the worker.
When confirmation resumes the graph and another write action is requested, the response contains a new pendingAction. Otherwise it returns the final assistant message and pendingAction=null.
Example response when the pending action finishes:
{
"conversationId": 10,
"message": "완료했습니다.",
"pendingAction": null
}
If LangGraph pauses again on another write tool, confirm_tool(...) returns message: "" and a non-null pendingAction instead of a final assistant reply.
Backend Callback Surface
The worker never opens a tenant data connection. It uses BackendApiClient to call backend /internal/chatbot/** endpoints with HMAC-signed JSON bodies and X-Tenant-Slug.
| Backend endpoint | Worker method | Responsibility |
|---|---|---|
POST /internal/chatbot/query | execute_query(...) | Execute read-only SQL through GeneralQueryTool, SqlValidator, RLS, and the read-only pool |
POST /internal/chatbot/tools/{toolName} | execute_tool(...) | Execute an allowlisted write tool after user confirmation |
GET /internal/chatbot/schema | get_schema(...) | Return tenant schema description for the system prompt |
GET /internal/chatbot/history | get_history(...) | Return recent conversation messages; currently reserved for history hydration |
POST /internal/chatbot/pending-action | save_pending_action(...) | Persist a LangGraph interrupt as a pending action message |
POST /internal/chatbot/save-message | save_message(...) | Persist user, assistant, or tool messages |
POST /internal/chatbot/conversations | create_conversation(...) | Create a new conversation and return its id |
InternalChatbotController re-establishes tenant and user context before executing SQL or tools, so RLS and existing backend tool implementations remain the control point.
LangGraph Model
src/graph/agent.py compiles this graph:
The model is built with langchain_openai.ChatOpenAI, defaulting to Gemini through the OpenAI-compatible endpoint:
| Setting | Default |
|---|---|
LLM_BASE_URL | https://generativelanguage.googleapis.com/v1beta/openai |
LLM_MODEL | gemini-2.5-flash |
LLM_API_KEY | required |
The process prompt is loaded from services/chatbot/prompts/chat-system.txt and cached for the process lifetime. Editing the prompt requires a pod restart.
Tool And HITL Model
The graph exposes one read tool and four write tools to the model.
| Tool | Type | Execution path |
|---|---|---|
general_query | Read | Executes immediately through POST /internal/chatbot/query |
create_announcement | Write | Pauses with interrupt(...), then executes through backend tools after confirmation |
send_sms | Write | Pauses with interrupt(...), then executes after confirmation |
send_telegram | Write | Pauses with interrupt(...), then executes after confirmation |
schedule_task | Write | Pauses with interrupt(...), then executes after confirmation |
Read failures increment sql_retry_count, emit chatbot_sql_retries_total, and route back to the model until SQL_MAX_RETRIES is reached. Write tools never execute during the initial stream turn. They first persist a pending action, emit a pending event, and wait for /api/chatbot/confirm.
Persistence And Startup
The worker uses two persistence paths:
| Persistence path | Owner | Purpose |
|---|---|---|
Backend ai_conversations and messages | Backend ChatService | Conversation id, user messages, assistant messages, and pending actions |
| LangGraph checkpointer tables | AsyncPostgresSaver | Graph state for thread_id = "{tenantId}:{conversationId}" |
Lifespan startup in main.py:
- Builds synchronous adapters and the shared
httpx.AsyncClient. - Opens an
AsyncConnectionPoolfromLANGGRAPH_DB_DSN. - Creates
AsyncPostgresSaver(pool)and runssetup(). - Builds the chat model.
- Compiles the LangGraph with the live checkpointer.
- Attaches the graph to
ChatUseCase.
The Postgres pool is configured for PgBouncer transaction-mode compatibility: autocommit=True, prepare_threshold=0, row_factory=dict_row, and max_idle=180.0.
Configuration And Deployment
src/config.py and chatbot-svc/common-values.yaml define the runtime contract.
| Setting | Required | Notes |
|---|---|---|
LUMIE_BACKEND_URL | Yes | Backend base URL for /internal/chatbot/** callbacks |
LUMIE_INTERNAL_HMAC_SECRET | Yes | Shared secret for signed internal calls |
LLM_API_KEY | Yes | Gemini API key from Vault |
LLM_BASE_URL | No | OpenAI-compatible Gemini endpoint by default |
LLM_MODEL | No | gemini-2.5-flash by default |
LANGGRAPH_DB_DSN | Yes | DSN for the dedicated lumie_langgraph role through the CNPG pooler |
OTEL_ENABLED | No | Defaults to true; disable for local tests |
OTEL_ENDPOINT | No | OTLP gRPC endpoint for the cluster collector |
RECURSION_LIMIT | No | Maximum graph recursion steps; defaults to 10 |
SQL_MAX_RETRIES | No | Maximum read-query repair attempts; defaults to 3 |
Production deployment is ClusterIP-only with ingress disabled. Prometheus scrapes /metrics/ through the ServiceMonitor.
Failure And Backpressure Behavior
| Failure point | Behavior |
|---|---|
X-Tenant-Slug mismatch | Worker returns HTTP 400 before entering the use case |
| Lifespan container not ready | Worker returns HTTP 503 for stream and confirm routes |
| Client disconnect during stream | Detached graph task keeps running and persists the final assistant message |
| Slow SSE consumer | _emit(...) waits up to 5 seconds, then drops the live frame while keeping the graph run alive |
| Graph recursion limit | Emits a fallback assistant message, persists it, and records chatbot_errors_total{error_type="recursion"} |
| Unexpected graph exception | Emits an error SSE frame and records chatbot_errors_total{error_type="exception"} |
| Backend SQL failure | Tool node retries through the model until SQL_MAX_RETRIES, then routes to give_up |
| User rejects a write tool | Graph routes to deterministic cancellation text; the backend tool is not executed |
| Unknown model-requested tool | Tool node records a tool message saying the tool is unknown; the backend is not called for that tool |
Shutdown cancels in-flight detached graph tasks, closes the shared HTTP client, then closes the Postgres pool last so checkpoint writes can finish first.
Observability
Metrics are exported from src/observability/metrics.py.
| Metric | Labels | Meaning |
|---|---|---|
chatbot_stream_duration_seconds | outcome | End-to-end duration of a stream run; outcome is completed, interrupted, or error |
chatbot_sql_retries_total | tenant | Count of read-query retry attempts |
chatbot_hitl_total | confirmed | Count of human confirmation decisions |
chatbot_errors_total | error_type | Graph failures by recursion or exception |
Tracing uses OpenTelemetry for FastAPI requests and OpenInference LangChain instrumentation for LLM, tool, and chain spans. httpx is intentionally not instrumented because global httpx instrumentation breaks ChatOpenAI construction in this service.
Verification
Use the chatbot test slice for worker changes:
cd lumie-worker
pytest services/chatbot/tests
Expected success signals:
pytestexits0services/chatbot/tests/test_health.pyprovesGET /healthreturns{"status":"ok"}- the same test file also proves
POST /api/chatbot/streamreturns503when lifespan startup has not attached the container yet services/chatbot/tests/test_usecase.pycoverstoken,pending,done, disconnect durability, and confirm-path persistence
For contract changes, inspect the worker, backend proxy, and backend internal callback surface together:
rg -n "api/chatbot|internal/chatbot|ChatbotClient|InternalChatbotController" \
lumie-worker/services/chatbot lumie-backend/modules/ai
For deployment changes, inspect the worker Helm values and backend service URL:
rg -n "CHATBOT_SVC_URL|chatbot-svc|LANGGRAPH_DB_DSN|LLM_MODEL" \
lumie-infra/applications/lumie lumie-backend/Tiltfile lumie-worker/Tiltfile