Skip to main content

Chatbot

Purpose

chatbot-svc is the LangGraph worker behind Lumie's academy assistant. The backend proxies chat requests to this worker, the worker streams assistant output over Server-Sent Events (SSE), and all tenant data access is delegated back to backend /internal/chatbot/** endpoints.

This page is a reference document for developers changing the worker, backend chat proxy, tool contracts, LangGraph persistence, or chatbot operations. For section-wide worker conventions, see Workers Overview.

Source Paths

PathRole
lumie-worker/services/chatbot/main.pyFastAPI app, lifespan setup, /health, /metrics, /api/chatbot/stream, and /api/chatbot/confirm
lumie-worker/services/chatbot/src/schema.pyPydantic request and response envelopes for stream and confirm calls
lumie-worker/services/chatbot/src/usecase.pySSE streaming, detached graph runs, pending-action persistence, confirmation resume, and error mapping
lumie-worker/services/chatbot/src/graph/agent.pyLangGraph topology, scope guard, LLM node, tool node, retry routing, and cancellation routing
lumie-worker/services/chatbot/src/graph/tools.pyTool specifications passed to the model and human-readable action descriptions
lumie-worker/services/chatbot/src/adapters/backend_api.pyHMAC-signed HTTP adapter for backend /internal/chatbot/** endpoints
lumie-worker/services/chatbot/src/config.pyEnvironment-backed settings for LLM, backend, LangGraph DB, and observability
lumie-backend/modules/ai/src/main/java/com/lumie/ai/adapter/out/external/ChatbotClient.javaBackend client that calls the worker and proxies SSE frames
lumie-backend/modules/ai/src/main/java/com/lumie/ai/adapter/in/web/InternalChatbotController.javaBackend callback surface used by the worker for SQL, tools, schema, and persistence
lumie-infra/applications/lumie/worker/chatbot-svc/common-values.yamlProduction deployment, service, env, secrets, and ServiceMonitor wiring

Public Surface

chatbot-svc is an internal ClusterIP service. Browsers do not call it directly; frontend chat requests go through backend /v1/chat/**, then backend ChatbotClient calls the worker.

EndpointPurpose
GET /healthProbe endpoint; returns {"status":"ok"} without requiring the lifespan container
GET /metricsPrometheus endpoint mounted from metrics_app
POST /api/chatbot/streamStarts or continues a chat turn and returns an SSE stream
POST /api/chatbot/confirmResumes a paused write-tool action after user approval or rejection

Both worker POST routes require X-Tenant-Slug. main.py::_require_tenant_match(...) rejects the request with HTTP 400 when the header does not match the body tenantSlug. If the lifespan container has not finished initializing, both POST routes return HTTP 503.

Runtime Flow

ChatUseCase.stream_chat(...) starts the graph in a detached background task. The HTTP response only drains a bounded queue of SSE frames. If the client disconnects, the graph task keeps running and persists the final assistant message through the backend.

Stream Contract

POST /api/chatbot/stream accepts ChatStreamRequest.

class ChatStreamRequest(BaseModel):
tenant_slug: str = Field(..., alias="tenantSlug", min_length=1)
tenant_id: int = Field(..., alias="tenantId")
user_id: int = Field(..., alias="userId")
conversation_id: int | None = Field(None, alias="conversationId")
message: str = Field(..., min_length=1, max_length=4000)

When conversationId is null, the worker asks the backend to create a conversation and returns the new id in the final done event.

Example request body:

{
"tenantSlug": "demo",
"tenantId": 7,
"userId": 101,
"conversationId": null,
"message": "이번 주 공지사항을 만들어 줘"
}

The stream emits these SSE events:

EventData shapeMeaning
token{"content": "..."}Assistant text chunk. Tool messages and raw SQL results are filtered out.
pending{"messageId": 1, "toolName": "...", "description": "...", "arguments": {...}}A write tool needs user confirmation before execution.
done{"conversationId": 1, "messageId": 2}The turn finished, or a pending action was persisted.
error{"message": "..."}The detached graph run failed after startup.

Backend ChatbotClient preserves event names and data while proxying the worker stream to the browser-facing SseEmitter.

Short transcript from the current worker contract:

event: token
data: {"content":"안녕"}

event: token
data: {"content":"하세요"}

event: done
data: {"conversationId":10,"messageId":52}

That transcript matches the ChatUseCase._sse(...) framing and the test_stream_emits_tokens_and_persists_both_messages coverage in services/chatbot/tests/test_usecase.py.

Confirm Contract

POST /api/chatbot/confirm accepts ConfirmRequest and returns ChatReplyResponse.

class ConfirmRequest(BaseModel):
tenant_slug: str = Field(..., alias="tenantSlug", min_length=1)
tenant_id: int = Field(..., alias="tenantId")
user_id: int = Field(..., alias="userId")
conversation_id: int = Field(..., alias="conversationId")
message_id: int = Field(..., alias="messageId")
confirmed: bool

Example confirm request:

{
"tenantSlug": "demo",
"tenantId": 7,
"userId": 101,
"conversationId": 10,
"messageId": 52,
"confirmed": true
}

The backend public /v1/chat/confirm route receives only messageId and confirmed; it resolves conversationId from the pending message before calling the worker. Backend idempotency is handled at /v1/chat/confirm with the optional Idempotency-Key header, not inside the worker.

When confirmation resumes the graph and another write action is requested, the response contains a new pendingAction. Otherwise it returns the final assistant message and pendingAction=null.

Example response when the pending action finishes:

{
"conversationId": 10,
"message": "완료했습니다.",
"pendingAction": null
}

If LangGraph pauses again on another write tool, confirm_tool(...) returns message: "" and a non-null pendingAction instead of a final assistant reply.

Backend Callback Surface

The worker never opens a tenant data connection. It uses BackendApiClient to call backend /internal/chatbot/** endpoints with HMAC-signed JSON bodies and X-Tenant-Slug.

Backend endpointWorker methodResponsibility
POST /internal/chatbot/queryexecute_query(...)Execute read-only SQL through GeneralQueryTool, SqlValidator, RLS, and the read-only pool
POST /internal/chatbot/tools/{toolName}execute_tool(...)Execute an allowlisted write tool after user confirmation
GET /internal/chatbot/schemaget_schema(...)Return tenant schema description for the system prompt
GET /internal/chatbot/historyget_history(...)Return recent conversation messages; currently reserved for history hydration
POST /internal/chatbot/pending-actionsave_pending_action(...)Persist a LangGraph interrupt as a pending action message
POST /internal/chatbot/save-messagesave_message(...)Persist user, assistant, or tool messages
POST /internal/chatbot/conversationscreate_conversation(...)Create a new conversation and return its id

InternalChatbotController re-establishes tenant and user context before executing SQL or tools, so RLS and existing backend tool implementations remain the control point.

LangGraph Model

src/graph/agent.py compiles this graph:

The model is built with langchain_openai.ChatOpenAI, defaulting to Gemini through the OpenAI-compatible endpoint:

SettingDefault
LLM_BASE_URLhttps://generativelanguage.googleapis.com/v1beta/openai
LLM_MODELgemini-2.5-flash
LLM_API_KEYrequired

The process prompt is loaded from services/chatbot/prompts/chat-system.txt and cached for the process lifetime. Editing the prompt requires a pod restart.

Tool And HITL Model

The graph exposes one read tool and four write tools to the model.

ToolTypeExecution path
general_queryReadExecutes immediately through POST /internal/chatbot/query
create_announcementWritePauses with interrupt(...), then executes through backend tools after confirmation
send_smsWritePauses with interrupt(...), then executes after confirmation
send_telegramWritePauses with interrupt(...), then executes after confirmation
schedule_taskWritePauses with interrupt(...), then executes after confirmation

Read failures increment sql_retry_count, emit chatbot_sql_retries_total, and route back to the model until SQL_MAX_RETRIES is reached. Write tools never execute during the initial stream turn. They first persist a pending action, emit a pending event, and wait for /api/chatbot/confirm.

Persistence And Startup

The worker uses two persistence paths:

Persistence pathOwnerPurpose
Backend ai_conversations and messagesBackend ChatServiceConversation id, user messages, assistant messages, and pending actions
LangGraph checkpointer tablesAsyncPostgresSaverGraph state for thread_id = "{tenantId}:{conversationId}"

Lifespan startup in main.py:

  1. Builds synchronous adapters and the shared httpx.AsyncClient.
  2. Opens an AsyncConnectionPool from LANGGRAPH_DB_DSN.
  3. Creates AsyncPostgresSaver(pool) and runs setup().
  4. Builds the chat model.
  5. Compiles the LangGraph with the live checkpointer.
  6. Attaches the graph to ChatUseCase.

The Postgres pool is configured for PgBouncer transaction-mode compatibility: autocommit=True, prepare_threshold=0, row_factory=dict_row, and max_idle=180.0.

Configuration And Deployment

src/config.py and chatbot-svc/common-values.yaml define the runtime contract.

SettingRequiredNotes
LUMIE_BACKEND_URLYesBackend base URL for /internal/chatbot/** callbacks
LUMIE_INTERNAL_HMAC_SECRETYesShared secret for signed internal calls
LLM_API_KEYYesGemini API key from Vault
LLM_BASE_URLNoOpenAI-compatible Gemini endpoint by default
LLM_MODELNogemini-2.5-flash by default
LANGGRAPH_DB_DSNYesDSN for the dedicated lumie_langgraph role through the CNPG pooler
OTEL_ENABLEDNoDefaults to true; disable for local tests
OTEL_ENDPOINTNoOTLP gRPC endpoint for the cluster collector
RECURSION_LIMITNoMaximum graph recursion steps; defaults to 10
SQL_MAX_RETRIESNoMaximum read-query repair attempts; defaults to 3

Production deployment is ClusterIP-only with ingress disabled. Prometheus scrapes /metrics/ through the ServiceMonitor.

Failure And Backpressure Behavior

Failure pointBehavior
X-Tenant-Slug mismatchWorker returns HTTP 400 before entering the use case
Lifespan container not readyWorker returns HTTP 503 for stream and confirm routes
Client disconnect during streamDetached graph task keeps running and persists the final assistant message
Slow SSE consumer_emit(...) waits up to 5 seconds, then drops the live frame while keeping the graph run alive
Graph recursion limitEmits a fallback assistant message, persists it, and records chatbot_errors_total{error_type="recursion"}
Unexpected graph exceptionEmits an error SSE frame and records chatbot_errors_total{error_type="exception"}
Backend SQL failureTool node retries through the model until SQL_MAX_RETRIES, then routes to give_up
User rejects a write toolGraph routes to deterministic cancellation text; the backend tool is not executed
Unknown model-requested toolTool node records a tool message saying the tool is unknown; the backend is not called for that tool

Shutdown cancels in-flight detached graph tasks, closes the shared HTTP client, then closes the Postgres pool last so checkpoint writes can finish first.

Observability

Metrics are exported from src/observability/metrics.py.

MetricLabelsMeaning
chatbot_stream_duration_secondsoutcomeEnd-to-end duration of a stream run; outcome is completed, interrupted, or error
chatbot_sql_retries_totaltenantCount of read-query retry attempts
chatbot_hitl_totalconfirmedCount of human confirmation decisions
chatbot_errors_totalerror_typeGraph failures by recursion or exception

Tracing uses OpenTelemetry for FastAPI requests and OpenInference LangChain instrumentation for LLM, tool, and chain spans. httpx is intentionally not instrumented because global httpx instrumentation breaks ChatOpenAI construction in this service.

Verification

Use the chatbot test slice for worker changes:

cd lumie-worker
pytest services/chatbot/tests

Expected success signals:

  • pytest exits 0
  • services/chatbot/tests/test_health.py proves GET /health returns {"status":"ok"}
  • the same test file also proves POST /api/chatbot/stream returns 503 when lifespan startup has not attached the container yet
  • services/chatbot/tests/test_usecase.py covers token, pending, done, disconnect durability, and confirm-path persistence

For contract changes, inspect the worker, backend proxy, and backend internal callback surface together:

rg -n "api/chatbot|internal/chatbot|ChatbotClient|InternalChatbotController" \
lumie-worker/services/chatbot lumie-backend/modules/ai

For deployment changes, inspect the worker Helm values and backend service URL:

rg -n "CHATBOT_SVC_URL|chatbot-svc|LANGGRAPH_DB_DSN|LLM_MODEL" \
lumie-infra/applications/lumie lumie-backend/Tiltfile lumie-worker/Tiltfile