Communication Queue Module
Overview
The communication queue module provides deferred and scheduled delivery of outbound communications (email, SMS, WhatsApp). It acts as a buffer between the communication creation layer and the actual send execution, providing retry logic, backoff, and operational visibility.
Architecture
interfaces/
communication-queue.controller.ts ← HTTP endpoints (admin/ops)
dtos/
queue-stats-response.dto.ts ← Swagger response schema
queue-job-action-response.dto.ts ← Swagger action response schema
application/
communication-queue.service.ts ← Business logic + cron processor
domain/
communication-queue-repository.domain.ts ← Port interface
infrastructure/
communication-queue.repository.ts ← Kysely adapter
Dependency Flow
Controller → Service → Repository (interface in domain, impl in infrastructure)
→ CommunicationsService.executeSend() (via forwardRef)
The circular dependency between CommunicationQueueService and CommunicationsService is resolved with forwardRef — the queue service delegates actual sending to CommunicationsService.executeSend().
Domain Concepts
Queue Job Lifecycle
PENDING → PROCESSING → COMPLETED
↘ PENDING (retry) → ... → FAILED (max retries)
PENDING → CANCELLED (manual)
FAILED → PENDING (manual retry)
Statuses
| Status | Description |
|---|---|
pending | Waiting to be processed (scheduledAt <= now) |
processing | Currently being sent |
completed | Successfully sent |
failed | Exhausted all retry attempts |
cancelled | Manually cancelled by operator |
Retry Strategy
Exponential backoff with fixed delay schedule:
| Attempt | Delay |
|---|---|
| 1st retry | 1 minute |
| 2nd retry | 5 minutes |
| 3rd retry | 15 minutes |
| 4th+ retry | 1 hour |
Default maxAttempts is 3.
Priority
Jobs are processed in priority order (high > normal > low), then by scheduledAt ascending. The cron processor picks up 10 jobs per cycle.
API Endpoints
All endpoints are scoped under GET/POST/DELETE /businesses/:businessId/communication-queue/.
| Method | Path | Description |
|---|---|---|
| GET | /stats | Aggregate status counts for the business |
| GET | /pending?limit=N | List pending jobs ready for processing |
| GET | /failed?limit=N | List failed jobs |
| GET | /by-communication/:communicationId | All jobs for a communication |
| GET | /:id | Single job details |
| POST | /:id/retry | Reset failed/cancelled job to pending |
| POST | /:id/cancel | Cancel pending/processing job |
| POST | /process-now | Trigger immediate queue processing (admin) |
| DELETE | /:id | Permanently delete a job (admin) |
Multi-Tenancy
The communication_queue table does not have a businessId column. Business scoping is achieved by JOIN to the communication table (which has businessId). This applies to the /stats endpoint. The listing endpoints (/pending, /failed) currently return global results — business-scoped filtering for these can be added if needed.
Design Decisions
-
Cron-based processing (not BullMQ) — The queue uses
@nestjs/schedulecron (every 30 seconds) rather than BullMQ for simplicity. BullMQ is available in the project for other use cases but was not needed here given the low throughput of communications. -
Sequential processing — Jobs within a batch are processed sequentially to maintain order and avoid overwhelming external email/SMS providers.
-
Concurrency guard — A simple
isProcessingboolean prevents overlapping cron cycles. This works because the backend runs as a single instance; if horizontal scaling is needed, this should be replaced with a distributed lock. -
Stats via SQL aggregation — The
/statsendpoint usesCOUNT ... FILTER WHEREat the SQL level (via Kysely) rather than loading all rows into memory.