Skip to main content

Communication Queue Module

Overview

The communication queue module provides deferred and scheduled delivery of outbound communications (email, SMS, WhatsApp). It acts as a buffer between the communication creation layer and the actual send execution, providing retry logic, backoff, and operational visibility.

Architecture

interfaces/
communication-queue.controller.ts ← HTTP endpoints (admin/ops)
dtos/
queue-stats-response.dto.ts ← Swagger response schema
queue-job-action-response.dto.ts ← Swagger action response schema
application/
communication-queue.service.ts ← Business logic + cron processor
domain/
communication-queue-repository.domain.ts ← Port interface
infrastructure/
communication-queue.repository.ts ← Kysely adapter

Dependency Flow

Controller → Service → Repository (interface in domain, impl in infrastructure)
→ CommunicationsService.executeSend() (via forwardRef)

The circular dependency between CommunicationQueueService and CommunicationsService is resolved with forwardRef — the queue service delegates actual sending to CommunicationsService.executeSend().

Domain Concepts

Queue Job Lifecycle

PENDING → PROCESSING → COMPLETED
↘ PENDING (retry) → ... → FAILED (max retries)
PENDING → CANCELLED (manual)
FAILED → PENDING (manual retry)

Statuses

StatusDescription
pendingWaiting to be processed (scheduledAt <= now)
processingCurrently being sent
completedSuccessfully sent
failedExhausted all retry attempts
cancelledManually cancelled by operator

Retry Strategy

Exponential backoff with fixed delay schedule:

AttemptDelay
1st retry1 minute
2nd retry5 minutes
3rd retry15 minutes
4th+ retry1 hour

Default maxAttempts is 3.

Priority

Jobs are processed in priority order (high > normal > low), then by scheduledAt ascending. The cron processor picks up 10 jobs per cycle.

API Endpoints

All endpoints are scoped under GET/POST/DELETE /businesses/:businessId/communication-queue/.

MethodPathDescription
GET/statsAggregate status counts for the business
GET/pending?limit=NList pending jobs ready for processing
GET/failed?limit=NList failed jobs
GET/by-communication/:communicationIdAll jobs for a communication
GET/:idSingle job details
POST/:id/retryReset failed/cancelled job to pending
POST/:id/cancelCancel pending/processing job
POST/process-nowTrigger immediate queue processing (admin)
DELETE/:idPermanently delete a job (admin)

Multi-Tenancy

The communication_queue table does not have a businessId column. Business scoping is achieved by JOIN to the communication table (which has businessId). This applies to the /stats endpoint. The listing endpoints (/pending, /failed) currently return global results — business-scoped filtering for these can be added if needed.

Design Decisions

  1. Cron-based processing (not BullMQ) — The queue uses @nestjs/schedule cron (every 30 seconds) rather than BullMQ for simplicity. BullMQ is available in the project for other use cases but was not needed here given the low throughput of communications.

  2. Sequential processing — Jobs within a batch are processed sequentially to maintain order and avoid overwhelming external email/SMS providers.

  3. Concurrency guard — A simple isProcessing boolean prevents overlapping cron cycles. This works because the backend runs as a single instance; if horizontal scaling is needed, this should be replaced with a distributed lock.

  4. Stats via SQL aggregation — The /stats endpoint uses COUNT ... FILTER WHERE at the SQL level (via Kysely) rather than loading all rows into memory.