Skip to main content

Grading

Purpose

grading-svc is the production OMR grading worker. It grades one scanned answer-sheet image per message, using OpenCV and NumPy to normalize the sheet, detect selected bubbles, calculate scores from backend-supplied answer keys, and publish a completion callback.

This page is a reference document. It is written for developers changing the worker, backend exam integration, queue contracts, or OMR operations. For the section-wide worker model, see Workers Overview.

Source Paths

PathRole
lumie-worker/services/grading/main.pyFastAPI app, lifespan wiring, /health, /metrics, and consumer task startup
lumie-worker/services/grading/src/schema.pyPydantic message and callback payload contracts
lumie-worker/services/grading/src/usecase.pyGradeOMRUseCase, idempotency check, image download, exam metadata fetch, and grading orchestration
lumie-worker/services/grading/src/domain/omr.pyPure OpenCV/NumPy OMR processing and scoring logic
lumie-worker/services/grading/src/mq/consumer.pyRabbitMQ consumer adapter and grading-specific callback payload builders
lumie-worker/libs/common/mq.pyShared ack, nack, reject, retry, and callback lifecycle
lumie-worker/contracts/mq-schemas-v1.yamlHand-maintained async MQ contract reference
lumie-backend/modules/exam/src/main/java/com/lumie/exam/adapter/out/messaging/JobRequestForwarder.javaBackend publisher for OMR grading jobs
lumie-backend/modules/exam/src/main/java/com/lumie/exam/adapter/in/messaging/OmrGradingCallbackListener.javaBackend RabbitMQ callback consumer

Public Surface

grading-svc itself exposes only operational HTTP endpoints:

EndpointPurpose
GET /healthLiveness/readiness probe; returns {"status":"healthy"}
GET /metricsPrometheus scrape endpoint mounted from the shared metrics app

The production grading workflow is RabbitMQ-driven:

DirectionQueue or routing keyOwner
Backend to workerqueue grading.omr-request, routing key grading.omr.requestJobRequestForwarder publishes the command
Worker to backendrouting key grading.omr.callbackgrading-svc publishes the callback
Backend callback consumequeue grading.omr-callbackOmrGradingCallbackListener consumes completion payloads

The callback queue name and routing key are intentionally different. Treat the backend listener queue, worker routing key, and broker binding as one contract when changing this flow.

Runtime Flow

Startup happens inside FastAPI lifespan, not at import time. main.py builds the adapter container, connects the callback publisher, and starts run_consumer(...) as a background task. Shutdown cancels the consumer, closes the callback connection, and closes the shared httpx.AsyncClient.

Message Contract

Incoming messages are validated as OMRCommand from services/grading/src/schema.py.

class OMRCommand(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="ignore")

job_id: int = Field(alias="jobId")
exam_id: int = Field(alias="examId")
tenant_slug: str = Field(alias="tenantSlug", min_length=1)
image_key: str = Field(alias="imageKey", min_length=1)
image_index: int = Field(alias="imageIndex", ge=0)
total_images: int = Field(alias="totalImages", ge=1)
schema_version: int = Field(alias="schemaVersion", default=1)

Successful callbacks include jobId, examId, tenantSlug, imageKey, imageIndex, totalImages, success=true, phoneNumber, totalScore, grade, and per-question results. Failure callbacks preserve the same job fields when available, set success=false, carry error, and set grading result fields to null or zero defaults.

Example inbound command:

{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"schemaVersion": 1
}

Example success callback:

{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"success": true,
"error": null,
"phoneNumber": "01012345678",
"totalScore": 88,
"grade": 2,
"results": [
{
"questionNumber": 1,
"studentAnswer": "A",
"correctAnswer": "A",
"score": 5,
"earnedScore": 5,
"questionType": "blank"
}
]
}

Example failure callback:

{
"jobId": 7,
"examId": 42,
"tenantSlug": "acme",
"imageKey": "tmp/acme/omr/abc/sheet1.png",
"imageIndex": 0,
"totalImages": 3,
"success": false,
"error": "download failed",
"phoneNumber": null,
"totalScore": 0,
"grade": 0,
"results": null
}

The backend publisher constructs the command from OmrGradingRequestedEvent and sends it to RabbitMqConstants.LUMIE_COMMANDS_EXCHANGE with RabbitMqConstants.GRADING_OMR_REQUEST_ROUTING_KEY.

rabbitTemplate.convertAndSend(
RabbitMqConstants.LUMIE_COMMANDS_EXCHANGE,
RabbitMqConstants.GRADING_OMR_REQUEST_ROUTING_KEY,
message);

Processing Model

GradeOMRUseCase.execute(...) is the orchestration boundary. It does not parse MQ messages, ack/nack broker messages, read environment variables, or build callback payloads. Those responsibilities live in mq/consumer.py, config.py, and shared MQ utilities.

The use case flow is:

  1. Check Redis dedupe cache by (jobId, imageIndex) when Redis is enabled.
  2. Download the image bytes from MinIO using imageKey.
  3. Fetch exam metadata from the backend internal API at /internal/omr/exams/{examId}/full.
  4. Run grade_omr_from_bytes(...) in asyncio.to_thread(...) because OpenCV and NumPy are CPU-bound.
  5. Store the result in Redis for future redelivery.
  6. Return the raw grading result to the MQ adapter, which builds the callback payload.

The domain function normalizes an image to an A4-sized canvas of 2480 x 3508, deskews from top marker rectangles, preprocesses with blur, gamma correction, CLAHE, and sharpening, estimates marked bubble density, extracts the phone number, calculates per-question results, and derives the absolute grade from totalScore.

Configuration

services/grading/src/config.py is the configuration source for this worker.

SettingDefaultNotes
RABBITMQ_QUEUEgrading.omr-requestQueue consumed by run_consumer(...)
RABBITMQ_PREFETCH_COUNT3Concurrent in-flight messages per consumer pod
MINIO_ENDPOINTlocalhost:9000Image object store endpoint
MINIO_BUCKETlumieBucket for OMR image objects
LUMIE_BACKEND_URLrequiredBackend base URL for internal exam metadata
LUMIE_INTERNAL_HMAC_SECRETrequiredShared secret for signed /internal/** calls
REDIS_ENABLEDtrueDisable only for local runs without Redis
OTEL_ENABLEDtrueEnables OTLP export

MinIO credentials, backend URL, and HMAC secret intentionally fail fast when missing. Redis is fail-open at the use-case boundary: when no deduper is wired, grading still proceeds.

Failure, Retry, And Idempotency

libs/common/mq.py::universal_process owns broker lifecycle for this and other MQ workers.

Failure pointBehavior
Malformed message bodyReject without requeue, allowing DLQ routing
Handler failure after retriesBuild failure callback, publish it, then ack the original message
Callback publish failure after retriesnack(requeue=True) so broker delivery-limit policy can move it to DLQ
Unexpected lifecycle errornack(requeue=True) when the message has not already been processed

The retry primitive uses three attempts by default with linear backoff. Worker-side idempotency is per (jobId, imageIndex), so RabbitMQ redelivery and Spring Modulith outbox republish can safely replay the same image command.

Observability

The worker emits structured background-job logs through the shared MQ lifecycle and service metrics through services/grading/src/observability/metrics.py.

MetricLabelsMeaning
omr_grading_totalresult, tenantCount of success, cache hit, and handler failure outcomes
omr_grading_duration_secondsresultEnd-to-end grading use-case duration
omr_grading_inflightnoneNumber of messages currently inside the use case

Tracing adds grade_omr spans with omr.job_id, omr.exam_id, omr.image_index, omr.total_images, and tenant.slug. The shared observability helper does not auto-instrument aio-pika; keep MQ lifecycle visibility in structured logs and explicit spans.

Synchronous HTTP Contract Drift

There is a separate temporary HTTP service, temp-omr-grading, for the AcademyWebsite migration path. It is not the production grading-svc worker.

Current inspected sources disagree:

SourceClaim
lumie-worker/services/temp-omr-grading/main.pyExposes POST /api/temp/grading and rejects non-health requests on or after 2026-07-01
lumie-worker/contracts/omr-grading-v1.yamlDocuments POST /api/temp/grading
lumie-backend/modules/exam/.../OmrServiceClient.javaCalls GRADING_SVC_URL + /api/omr/grade
lumie-infra/applications/lumie/backend/common-values.yamlSets GRADING_SVC_URL to http://grading-svc.lumie-worker.svc:8000

Do not describe the synchronous path as a stable grading-svc endpoint until this drift is resolved. Changes to the sync path must update the backend client, worker route, contract YAML, and infra values together.

Verification

Use the narrowest check that covers the changed layer:

cd lumie-worker
pytest services/grading/tests

Expected success signals:

  • pytest exits 0
  • services/grading/tests/test_grade_omr_usecase.py passes the cache-hit and fresh-grading paths
  • services/grading/tests/test_worker_event_logging.py still sees the queue name grading.omr-request

For contract changes, also inspect these integration points:

rg -n "grading\\.omr|OmrServiceClient|/api/(temp/grading|omr/grade)" \
lumie-worker lumie-backend lumie-infra

Exact verification targets for this contract:

  • grading.omr-request appears in worker config and backend JobRequestForwarder
  • grading.omr.callback appears in services/grading/src/adapters/callback_mq.py
  • backend consumes the callback on queue grading.omr-callback in OmrGradingCallbackListener
  • backend OmrGradingJobProcessor treats success=true without phoneNumber as a failure, so callback examples must preserve that field

For documentation-only edits in lumie-document, run the Docusaurus build from lumie-document/docusaurus and review warnings for broken links or stale localized pages.