Grading
Purpose
grading-svc is the production OMR grading worker. It grades one scanned answer-sheet image per message, using OpenCV and NumPy to normalize the sheet, detect selected bubbles, calculate scores from backend-supplied answer keys, and publish a completion callback.
This page is a reference document. It is written for developers changing the worker, backend exam integration, queue contracts, or OMR operations. For the section-wide worker model, see Workers Overview.
Source Paths
| Path | Role |
|---|---|
lumie-worker/services/grading/main.py | FastAPI app, lifespan wiring, /health, /metrics, and consumer task startup |
lumie-worker/services/grading/src/schema.py | Pydantic message and callback payload contracts |
lumie-worker/services/grading/src/usecase.py | GradeOMRUseCase, idempotency check, image download, exam metadata fetch, and grading orchestration |
lumie-worker/services/grading/src/domain/omr.py | Pure OpenCV/NumPy OMR processing and scoring logic |
lumie-worker/services/grading/src/mq/consumer.py | RabbitMQ consumer adapter and grading-specific callback payload builders |
lumie-worker/libs/common/mq.py | Shared ack, nack, reject, retry, and callback lifecycle |
lumie-worker/contracts/mq-schemas-v1.yaml | Hand-maintained async MQ contract reference |
lumie-backend/modules/exam/src/main/java/com/lumie/exam/adapter/out/messaging/JobRequestForwarder.java | Backend publisher for OMR grading jobs |
lumie-backend/modules/exam/src/main/java/com/lumie/exam/adapter/in/messaging/OmrGradingCallbackListener.java | Backend RabbitMQ callback consumer |
Public Surface
grading-svc itself exposes only operational HTTP endpoints:
| Endpoint | Purpose |
|---|---|
GET /health | Liveness/readiness probe; returns {"status":"healthy"} |
GET /metrics | Prometheus scrape endpoint mounted from the shared metrics app |
The production grading workflow is RabbitMQ-driven:
| Direction | Queue or routing key | Owner |
|---|---|---|
| Backend to worker | queue grading.omr-request, routing key grading.omr.request | JobRequestForwarder publishes the command |
| Worker to backend | routing key grading.omr.callback | grading-svc publishes the callback |
| Backend callback consume | queue grading.omr-callback | OmrGradingCallbackListener consumes completion payloads |
The callback queue name and routing key are intentionally different. Treat the backend listener queue, worker routing key, and broker binding as one contract when changing this flow.
Runtime Flow
Startup happens inside FastAPI lifespan, not at import time. main.py builds the adapter container, connects the callback publisher, and starts run_consumer(...) as a background task. Shutdown cancels the consumer, closes the callback connection, and closes the shared httpx.AsyncClient.
Message Contract
Incoming messages are validated as OMRCommand from services/grading/src/schema.py.
class OMRCommand(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="ignore")
job_id: int = Field(alias="jobId")
exam_id: int = Field(alias="examId")
tenant_slug: str = Field(alias="tenantSlug", min_length=1)
image_key: str = Field(alias="imageKey", min_length=1)
image_index: int = Field(alias="imageIndex", ge=0)
total_images: int = Field(alias="totalImages", ge=1)
schema_version: int = Field(alias="schemaVersion", default=1)
Successful callbacks include jobId, examId, tenantSlug, imageKey, imageIndex, totalImages, success=true, phoneNumber, totalScore, grade, and per-question results. Failure callbacks preserve the same job fields when available, set success=false, carry error, and set grading result fields to null or zero defaults.
Example inbound command:
{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"schemaVersion": 1
}
Example success callback:
{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"success": true,
"error": null,
"phoneNumber": "01012345678",
"totalScore": 88,
"grade": 2,
"results": [
{
"questionNumber": 1,
"studentAnswer": "A",
"correctAnswer": "A",
"score": 5,
"earnedScore": 5,
"questionType": "blank"
}
]
}
Example failure callback:
{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"success": false,
"error": "download failed",
"phoneNumber": null,
"totalScore": 0,
"grade": 0,
"results": null
}
The backend publisher constructs the command from OmrGradingRequestedEvent and sends it to RabbitMqConstants.LUMIE_COMMANDS_EXCHANGE with RabbitMqConstants.GRADING_OMR_REQUEST_ROUTING_KEY.
rabbitTemplate.convertAndSend(
RabbitMqConstants.LUMIE_COMMANDS_EXCHANGE,
RabbitMqConstants.GRADING_OMR_REQUEST_ROUTING_KEY,
message);
Processing Model
GradeOMRUseCase.execute(...) is the orchestration boundary. It does not parse MQ messages, ack/nack broker messages, read environment variables, or build callback payloads. Those responsibilities live in mq/consumer.py, config.py, and shared MQ utilities.
The use case flow is:
- Check Redis dedupe cache by
(jobId, imageIndex)when Redis is enabled. - Download the image bytes from MinIO using
imageKey. - Fetch exam metadata from the backend internal API at
/internal/omr/exams/{examId}/full. - Run
grade_omr_from_bytes(...)inasyncio.to_thread(...)because OpenCV and NumPy are CPU-bound. - Store the result in Redis for future redelivery.
- Return the raw grading result to the MQ adapter, which builds the callback payload.
The domain function normalizes an image to an A4-sized canvas of 2480 x 3508, deskews from top marker rectangles, preprocesses with blur, gamma correction, CLAHE, and sharpening, estimates marked bubble density, extracts the phone number, calculates per-question results, and derives the absolute grade from totalScore.
Configuration
services/grading/src/config.py is the configuration source for this worker.
| Setting | Default | Notes |
|---|---|---|
RABBITMQ_QUEUE | grading.omr-request | Queue consumed by run_consumer(...) |
RABBITMQ_PREFETCH_COUNT | 3 | Concurrent in-flight messages per consumer pod |
MINIO_ENDPOINT | localhost:9000 | Image object store endpoint |
MINIO_BUCKET | lumie | Bucket for OMR image objects |
LUMIE_BACKEND_URL | required | Backend base URL for internal exam metadata |
LUMIE_INTERNAL_HMAC_SECRET | required | Shared secret for signed /internal/** calls |
REDIS_ENABLED | true | Disable only for local runs without Redis |
OTEL_ENABLED | true | Enables OTLP export |
MinIO credentials, backend URL, and HMAC secret intentionally fail fast when missing. Redis is fail-open at the use-case boundary: when no deduper is wired, grading still proceeds.
Failure, Retry, And Idempotency
libs/common/mq.py::universal_process owns broker lifecycle for this and other MQ workers.
| Failure point | Behavior |
|---|---|
| Malformed message body | Reject without requeue, allowing DLQ routing |
| Handler failure after retries | Build failure callback, publish it, then ack the original message |
| Callback publish failure after retries | nack(requeue=True) so broker delivery-limit policy can move it to DLQ |
| Unexpected lifecycle error | nack(requeue=True) when the message has not already been processed |
The retry primitive uses three attempts by default with linear backoff. Worker-side idempotency is per (jobId, imageIndex), so RabbitMQ redelivery and Spring Modulith outbox republish can safely replay the same image command.
Observability
The worker emits structured background-job logs through the shared MQ lifecycle and service metrics through services/grading/src/observability/metrics.py.
| Metric | Labels | Meaning |
|---|---|---|
omr_grading_total | result, tenant | Count of success, cache hit, and handler failure outcomes |
omr_grading_duration_seconds | result | End-to-end grading use-case duration |
omr_grading_inflight | none | Number of messages currently inside the use case |
Tracing adds grade_omr spans with omr.job_id, omr.exam_id, omr.image_index, omr.total_images, and tenant.slug. The shared observability helper does not auto-instrument aio-pika; keep MQ lifecycle visibility in structured logs and explicit spans.
Synchronous HTTP Contract Drift
There is a separate temporary HTTP service, temp-omr-grading, for the AcademyWebsite migration path. It is not the production grading-svc worker.
Current inspected sources disagree:
| Source | Claim |
|---|---|
lumie-worker/services/temp-omr-grading/main.py | Exposes POST /api/temp/grading and rejects non-health requests on or after 2026-07-01 |
lumie-worker/contracts/omr-grading-v1.yaml | Documents POST /api/temp/grading |
lumie-backend/modules/exam/.../OmrServiceClient.java | Calls GRADING_SVC_URL + /api/omr/grade |
lumie-infra/applications/lumie/backend/common-values.yaml | Sets GRADING_SVC_URL to http://grading-svc.lumie-worker.svc:8000 |
Do not describe the synchronous path as a stable grading-svc endpoint until this drift is resolved. Changes to the sync path must update the backend client, worker route, contract YAML, and infra values together.
Verification
Use the narrowest check that covers the changed layer:
cd lumie-worker
pytest services/grading/tests
Expected success signals:
pytestexits0services/grading/tests/test_grade_omr_usecase.pypasses the cache-hit and fresh-grading pathsservices/grading/tests/test_worker_event_logging.pystill sees the queue namegrading.omr-request
For contract changes, also inspect these integration points:
rg -n "grading\\.omr|OmrServiceClient|/api/(temp/grading|omr/grade)" \
lumie-worker lumie-backend lumie-infra
Exact verification targets for this contract:
grading.omr-requestappears in worker config and backendJobRequestForwardergrading.omr.callbackappears inservices/grading/src/adapters/callback_mq.py- backend consumes the callback on queue
grading.omr-callbackinOmrGradingCallbackListener - backend
OmrGradingJobProcessortreatssuccess=truewithoutphoneNumberas a failure, so callback examples must preserve that field
For documentation-only edits in lumie-document, run the Docusaurus build from lumie-document/docusaurus and review warnings for broken links or stale localized pages.