Saltar al contenido principal

Merchant Activity Log

Module path: apps/backend/src/activity-log/
Feature branch: 038-merchant-activity-log


Overview

The Activity Log module is FlowPOS's immutable audit trail for all business-level operations. It records who did what, to which entity, and when — and exposes that history to authorized users via three query lenses and an async CSV export capability.

Key functional boundaries:

  • Retention: 13 months of live data in PostgreSQL; older records are archived monthly to GCS as compressed JSONL.
  • Redaction: PIN and salary fields are stripped server-side before any response leaves the API. Roles without HR access (StoreManager, etc.) also have compensation fields redacted.
  • Scope isolation: All queries are scoped to businessId. StoreManagers are additionally restricted to their own locationId.

Architecture

The module follows Hexagonal Architecture (ports & adapters):

activity-log/
├── activity-log.constants.ts ← Shared queue/job name constants
├── activity-log.module.ts ← NestJS module, registers cron archive job

├── domain/
│ ├── activity-log.domain.ts ← Actors, entry types, query params
│ ├── activity-log-repository.domain.ts ← IActivityLogRepository (port)
│ └── activity-log-export.domain.ts ← IActivityLogExportRepository (port) + export types

├── application/
│ ├── activity-log.service.ts ← record(), queryByEntity/User/Event()
│ ├── activity-log-export.service.ts ← initiate(), getStatus(), cancel()
│ ├── employee-activity.handler.ts ← EventEmitter2 listeners for employee lifecycle
│ └── redaction.service.ts ← Field-level redaction based on caller role

├── infrastructure/
│ ├── activity-log.repository.ts ← Kysely implementation of IActivityLogRepository
│ ├── activity-log-export.repository.ts ← Kysely implementation of IActivityLogExportRepository
│ ├── activity-log-export.processor.ts ← BullMQ: generates CSV, uploads to GCS
│ ├── activity-log-export-cleanup.processor.ts ← BullMQ: deletes GCS file after 24h
│ ├── activity-log-archive.processor.ts ← BullMQ: monthly JSONL.gz archive to GCS
│ └── csv-export.utils.ts ← CSV building utilities

└── interfaces/
├── activity-log.controller.ts
├── activity-log-export.controller.ts
└── dtos/
├── activity-log-entry.dto.ts ← Response DTO classes with Swagger decorators
├── query-by-entity.dto.ts
├── query-by-user.dto.ts
├── query-by-event.dto.ts
├── create-export.dto.ts
└── export-status.dto.ts

Dependency rules

interfaces → application → domain ← infrastructure

The domain layer has no framework dependencies. Infrastructure implements the domain ports. Controllers are thin — they extract context from the Firebase token and delegate to the service.


Domain Concepts

Actor

Every log entry is attributed to one of two actor types:

TypeFields
employeeemployeeId, displayName, role
systemjobName (e.g. "employee.created")

The actor is captured as an immutable snapshot at record time so the audit entry remains accurate even if the employee is later renamed or deleted.

Event Classification

ClassificationMeaning
standardRoutine CRUD (create, update, delete)
sensitiveHigh-stakes operations requiring closer scrutiny

Sensitive events have an additional sensitiveEventType discriminator:

  • discount_applied
  • void_cancellation
  • register_open / register_close
  • permission_role_change
  • data_export

Entity Types

user · employee · customer · supplier · product · price_list · business_settings · role · permission_assignment


Queues

Three BullMQ queues are registered:

QueuePurposeSchedule
activity-log-exportGenerates CSV file, uploads to GCSOn demand
activity-log-export-cleanupDeletes GCS file, marks export as expired24h after export completes
activity-log-archiveArchives rows >13 months old to GCS JSONL.gzMonthly cron — 1st day of month at 02:00

The archive job is registered idempotently in ActivityLogModule.onModuleInit().


Instrumentation — How to Record an Event

Inject ActivityLogService and call record(). Always pass the Kysely transaction (trx) to ensure atomicity:

import { ActivityLogService } from '@/activity-log/application/activity-log.service';

// Inside a transactional service method:
await this.database.transaction().execute(async (trx) => {
const updated = await this.myRepository.update(id, data, trx);

await this.activityLogService.record({
businessId: updated.businessId,
locationId: updated.locationId ?? null,
actor: {
type: 'employee',
employeeId: actorId,
displayName: actorName,
role: actorRole,
},
actionType: 'update', // 'create' | 'update' | 'delete' | 'event'
entityType: 'product', // see EntityType enum
entityId: updated.id,
entityDisplayName: updated.name,
eventClassification: 'standard',
changePayload: { before: oldValues, after: newValues },
}, trx);
});

For sensitive events, add:

eventClassification: 'sensitive',
sensitiveEventType: 'discount_applied',

Note: record() throws on failure. Let the error propagate — this is intentional. The requirement is zero-silent-loss (FR-031): if activity capture fails, the parent transaction must roll back.


API Endpoints

All endpoints require a valid Firebase ID token via cookie flowpos-id-token or Authorization: Bearer. Business context is derived from the token's current_business_id claim.

Query endpoints

MethodPathPermissionDescription
GET/activity-log/by-entityActivityLog.ReadChange history for an entity type
GET/activity-log/by-userActivityLog.ReadEntries by actor (employee or system)
GET/activity-log/by-eventActivityLog.ReadSensitive events by type

All query endpoints support:

  • from / to — ISO 8601 date range
  • page / pageSize (25 | 50 | 100)

See Swagger UI (/api) for full parameter documentation.

Export endpoints

MethodPathPermissionDescription
POST/activity-log/exportActivityLogExport.CreateInitiate async CSV export
GET/activity-log/export/:idActivityLogExport.ReadPoll export status / get download URL
DELETE/activity-log/export/:idActivityLogExport.DeleteCancel a pending export

Export flow:

  1. POST /activity-log/export → returns { exportId, status: "pending" }
  2. Poll GET /activity-log/export/:id until status === "completed"
  3. Use downloadUrl (24h pre-signed GCS URL) to download the CSV
  4. After 24h the URL expires, status transitions to expired, and the GCS file is deleted

CSV columns: ID · Business ID · Location ID · Actor Type · Actor Employee ID · Actor Name · Actor Role · Action · Entity Type · Entity ID · Entity Name · Classification · Sensitive Event Type · Timestamp


Redaction Rules

The RedactionService processes every entry before it leaves the application layer:

Field patternCondition
pin, pinHash, pinCodeAlways redacted
salary, compensation, wage, hourlyRate, baseSalaryRedacted unless caller role is owner, administrator, or admin

Redaction applies recursively to nested objects in changePayload.


Data Model

activity_log

ColumnTypeNotes
iduuid PKGenerated
business_iduuid FKMulti-tenancy scope
location_iduuid FK nullableLocation scope
actor_typeenum employee|system
actor_employee_iduuid nullableFK to employee
actor_snapshotjsonbImmutable snapshot at record time
action_typeenum create|update|delete|event
entity_typeenum
entity_iduuid
entity_display_nametext
event_classificationenum standard|sensitive
sensitive_event_typeenum nullable
change_payloadjsonb nullableBefore/after diff
is_completebooleanReserved for multi-step transactions
created_attimestamptz

activity_log_export

ColumnTypeNotes
iduuid PK
business_iduuid FK
requested_by_employee_iduuid FK
statusenumpending|processing|completed|failed|cancelled|expired
filter_snapshotjsonb{ lens, filters, requestedByRole }
file_pathtext nullableGCS object path
download_urltext nullablePre-signed URL
expires_attimestamptz nullable
row_countinteger nullable
was_truncatedboolean nullableTrue if result exceeded 100k rows
completed_attimestamptz nullable
created_attimestamptz

Environment Variables

VariableRequiredDescription
GCS_ACTIVITY_LOG_EXPORT_BUCKETYes (export feature)GCS bucket for CSV export files
GCS_ACTIVITY_LOG_ARCHIVE_BUCKETYes (archive job)GCS bucket for monthly JSONL.gz archives

Known Architecture Notes

  • Archive processor queries the database directly (bypasses repository layer) because it operates on raw rows for serialization to JSONL. This is intentional for performance on large batches — the repository abstraction is not suited for bulk streaming.
  • Storage client (@google-cloud/storage) is instantiated with new Storage() inside each processor. A future refactor should extract this as an injectable provider to improve testability.
  • BullMQ @InjectQueue appears in ActivityLogExportService (application layer). Strictly this is an infrastructure concern; the trade-off was accepted to avoid introducing a queue port abstraction for a single use case.