Merchant Activity Log
Module path: apps/backend/src/activity-log/
Feature branch: 038-merchant-activity-log
Overview
The Activity Log module is FlowPOS's immutable audit trail for all business-level operations. It records who did what, to which entity, and when — and exposes that history to authorized users via three query lenses and an async CSV export capability.
Key functional boundaries:
- Retention: 13 months of live data in PostgreSQL; older records are archived monthly to GCS as compressed JSONL.
- Redaction: PIN and salary fields are stripped server-side before any response leaves the API. Roles without HR access (
StoreManager, etc.) also have compensation fields redacted. - Scope isolation: All queries are scoped to
businessId. StoreManagers are additionally restricted to their ownlocationId.
Architecture
The module follows Hexagonal Architecture (ports & adapters):
activity-log/
├── activity-log.constants.ts ← Shared queue/job name constants
├── activity-log.module.ts ← NestJS module, registers cron archive job
│
├── domain/
│ ├── activity-log.domain.ts ← Actors, entry types, query params
│ ├── activity-log-repository.domain.ts ← IActivityLogRepository (port)
│ └── activity-log-export.domain.ts ← IActivityLogExportRepository (port) + export types
│
├── application/
│ ├── activity-log.service.ts ← record(), queryByEntity/User/Event()
│ ├── activity-log-export.service.ts ← initiate(), getStatus(), cancel()
│ ├── employee-activity.handler.ts ← EventEmitter2 listeners for employee lifecycle
│ └── redaction.service.ts ← Field-level redaction based on caller role
│
├── infrastructure/
│ ├── activity-log.repository.ts ← Kysely implementation of IActivityLogRepository
│ ├── activity-log-export.repository.ts ← Kysely implementation of IActivityLogExportRepository
│ ├── activity-log-export.processor.ts ← BullMQ: generates CSV, uploads to GCS
│ ├── activity-log-export-cleanup.processor.ts ← BullMQ: deletes GCS file after 24h
│ ├── activity-log-archive.processor.ts ← BullMQ: monthly JSONL.gz archive to GCS
│ └── csv-export.utils.ts ← CSV building utilities
│
└── interfaces/
├── activity-log.controller.ts
├── activity-log-export.controller.ts
└── dtos/
├── activity-log-entry.dto.ts ← Response DTO classes with Swagger decorators
├── query-by-entity.dto.ts
├── query-by-user.dto.ts
├── query-by-event.dto.ts
├── create-export.dto.ts
└── export-status.dto.ts
Dependency rules
interfaces → application → domain ← infrastructure
The domain layer has no framework dependencies. Infrastructure implements the domain ports. Controllers are thin — they extract context from the Firebase token and delegate to the service.
Domain Concepts
Actor
Every log entry is attributed to one of two actor types:
| Type | Fields |
|---|---|
employee | employeeId, displayName, role |
system | jobName (e.g. "employee.created") |
The actor is captured as an immutable snapshot at record time so the audit entry remains accurate even if the employee is later renamed or deleted.
Event Classification
| Classification | Meaning |
|---|---|
standard | Routine CRUD (create, update, delete) |
sensitive | High-stakes operations requiring closer scrutiny |
Sensitive events have an additional sensitiveEventType discriminator:
discount_appliedvoid_cancellationregister_open/register_closepermission_role_changedata_export
Entity Types
user · employee · customer · supplier · product · price_list · business_settings · role · permission_assignment
Queues
Three BullMQ queues are registered:
| Queue | Purpose | Schedule |
|---|---|---|
activity-log-export | Generates CSV file, uploads to GCS | On demand |
activity-log-export-cleanup | Deletes GCS file, marks export as expired | 24h after export completes |
activity-log-archive | Archives rows >13 months old to GCS JSONL.gz | Monthly cron — 1st day of month at 02:00 |
The archive job is registered idempotently in ActivityLogModule.onModuleInit().
Instrumentation — How to Record an Event
Inject ActivityLogService and call record(). Always pass the Kysely transaction (trx) to ensure atomicity:
import { ActivityLogService } from '@/activity-log/application/activity-log.service';
// Inside a transactional service method:
await this.database.transaction().execute(async (trx) => {
const updated = await this.myRepository.update(id, data, trx);
await this.activityLogService.record({
businessId: updated.businessId,
locationId: updated.locationId ?? null,
actor: {
type: 'employee',
employeeId: actorId,
displayName: actorName,
role: actorRole,
},
actionType: 'update', // 'create' | 'update' | 'delete' | 'event'
entityType: 'product', // see EntityType enum
entityId: updated.id,
entityDisplayName: updated.name,
eventClassification: 'standard',
changePayload: { before: oldValues, after: newValues },
}, trx);
});
For sensitive events, add:
eventClassification: 'sensitive',
sensitiveEventType: 'discount_applied',
Note:
record()throws on failure. Let the error propagate — this is intentional. The requirement is zero-silent-loss (FR-031): if activity capture fails, the parent transaction must roll back.
API Endpoints
All endpoints require a valid Firebase ID token via cookie flowpos-id-token or Authorization: Bearer. Business context is derived from the token's current_business_id claim.
Query endpoints
| Method | Path | Permission | Description |
|---|---|---|---|
| GET | /activity-log/by-entity | ActivityLog.Read | Change history for an entity type |
| GET | /activity-log/by-user | ActivityLog.Read | Entries by actor (employee or system) |
| GET | /activity-log/by-event | ActivityLog.Read | Sensitive events by type |
All query endpoints support:
from/to— ISO 8601 date rangepage/pageSize(25 | 50 | 100)
See Swagger UI (/api) for full parameter documentation.
Export endpoints
| Method | Path | Permission | Description |
|---|---|---|---|
| POST | /activity-log/export | ActivityLogExport.Create | Initiate async CSV export |
| GET | /activity-log/export/:id | ActivityLogExport.Read | Poll export status / get download URL |
| DELETE | /activity-log/export/:id | ActivityLogExport.Delete | Cancel a pending export |
Export flow:
POST /activity-log/export→ returns{ exportId, status: "pending" }- Poll
GET /activity-log/export/:iduntilstatus === "completed" - Use
downloadUrl(24h pre-signed GCS URL) to download the CSV - After 24h the URL expires, status transitions to
expired, and the GCS file is deleted
CSV columns: ID · Business ID · Location ID · Actor Type · Actor Employee ID · Actor Name · Actor Role · Action · Entity Type · Entity ID · Entity Name · Classification · Sensitive Event Type · Timestamp
Redaction Rules
The RedactionService processes every entry before it leaves the application layer:
| Field pattern | Condition |
|---|---|
pin, pinHash, pinCode | Always redacted |
salary, compensation, wage, hourlyRate, baseSalary | Redacted unless caller role is owner, administrator, or admin |
Redaction applies recursively to nested objects in changePayload.
Data Model
activity_log
| Column | Type | Notes |
|---|---|---|
id | uuid PK | Generated |
business_id | uuid FK | Multi-tenancy scope |
location_id | uuid FK nullable | Location scope |
actor_type | enum employee|system | |
actor_employee_id | uuid nullable | FK to employee |
actor_snapshot | jsonb | Immutable snapshot at record time |
action_type | enum create|update|delete|event | |
entity_type | enum | |
entity_id | uuid | |
entity_display_name | text | |
event_classification | enum standard|sensitive | |
sensitive_event_type | enum nullable | |
change_payload | jsonb nullable | Before/after diff |
is_complete | boolean | Reserved for multi-step transactions |
created_at | timestamptz |
activity_log_export
| Column | Type | Notes |
|---|---|---|
id | uuid PK | |
business_id | uuid FK | |
requested_by_employee_id | uuid FK | |
status | enum | pending|processing|completed|failed|cancelled|expired |
filter_snapshot | jsonb | { lens, filters, requestedByRole } |
file_path | text nullable | GCS object path |
download_url | text nullable | Pre-signed URL |
expires_at | timestamptz nullable | |
row_count | integer nullable | |
was_truncated | boolean nullable | True if result exceeded 100k rows |
completed_at | timestamptz nullable | |
created_at | timestamptz |
Environment Variables
| Variable | Required | Description |
|---|---|---|
GCS_ACTIVITY_LOG_EXPORT_BUCKET | Yes (export feature) | GCS bucket for CSV export files |
GCS_ACTIVITY_LOG_ARCHIVE_BUCKET | Yes (archive job) | GCS bucket for monthly JSONL.gz archives |
Known Architecture Notes
- Archive processor queries the database directly (bypasses repository layer) because it operates on raw rows for serialization to JSONL. This is intentional for performance on large batches — the repository abstraction is not suited for bulk streaming.
- Storage client (
@google-cloud/storage) is instantiated withnew Storage()inside each processor. A future refactor should extract this as an injectable provider to improve testability. - BullMQ
@InjectQueueappears inActivityLogExportService(application layer). Strictly this is an infrastructure concern; the trade-off was accepted to avoid introducing a queue port abstraction for a single use case.