Three layers: a short note at the top, the key lines with our take in the middle, the full source at the bottom.
Scheduled job
retention-reaper.ts
The scheduled job that deletes demo extractions and expired audit rows after the retention window.
Repo path apps/api/src/scheduled/retention-reaper.tsLanguage TypeScript
What this is
A scheduled job that runs in the background and deletes invoice scans after their retention window. By default that window is 24 hours after the scan was read; a locked scan stays until you delete it yourself.
What it proves
Backs the promise that unlocked scans are kept for 24 hours and no longer. The reaper is the mechanism that enforces it on a clock, not just a written commitment. Read the promise →
What to look for in the source below
- A schedule line — the job runs on a fixed cadence, not on demand.
- A safety check that refuses to delete a scan whose lock half-completed. If something went wrong while locking, we keep the file rather than risk losing it.
- A log line for every deletion — the audit log records the event, but the file content is gone.
The lines that carry the weight
The safety check that never deletes a half-locked file
Lines 80–110
}
export async function runRetentionReaper(
env: Env,
asOf: Date = new Date(),
batch: number = DEFAULT_BATCH,
): Promise<ReaperResult> {
const store = defaultDocumentsStore(env);
return runRetentionReaperOnStore(
env,
store,
asOf,
batch,
env.RETENTION_REAPER_DRYRUN === "true",
);
}
/**
* The pure form of the reaper: takes the documents-store as a
* parameter so tests can seed a Stub and exercise the same body
* the cron handler runs. Production calls runRetentionReaper above,
* which resolves the store via the standard factory.
*/
export async function runRetentionReaperOnStore(
env: Env,
store: DocumentsStore,
asOf: Date,
batch: number = DEFAULT_BATCH,
dryRun: boolean = false,
): Promise<ReaperResult> {
const result: ReaperResult = {Plain English
If a scan was in the middle of being locked when the reaper ran, this guard catches it and skips the delete. A half-locked file is fragile; the reaper waits for the next pass rather than risk losing a document the visitor wanted to keep.
Show the full file (426 lines)
425 lines
import type { Env } from "../env";
import {
defaultDocumentsStore,
type DocumentsStore,
} from "../lib/documents-store";
import { defaultDocumentDeksStore } from "../lib/document-deks-store";
import { finishDocumentEncrypt } from "../lib/finish-encrypt";
import { recordAuditEvent } from "../lib/audit";
import { D1AuditStore } from "../lib/audit-d1";
import { log } from "../lib/logger";
import {
assertOrgOwnsKey,
r2DeleteForOrg,
r2GetForOrg,
r2KeyForOrg,
TenantBoundaryViolation,
} from "../lib/r2-tenant";
/**
* R2 retention reaper (B-priv-7).
*
* Runs on a Cloudflare Cron trigger (see wrangler.toml `[triggers]`).
* Picks up documents past their `retention_until`, deletes the R2
* object via the per-org tenant-scoped helper, stamps r2_purged_at
* on the row, and records a `document.r2_purged` audit event so the
* customer can see the deletion in their audit log.
*
* Safety invariants:
* - The reaper iterates documents BUT every R2 deletion is routed
* through r2DeleteForOrg, which asserts the key starts with
* `org_<row.org_id>/`. A tampered row (or a future migration that
* accidentally changed the key shape) raises rather than deleting
* the wrong tenant's object.
* - Marking is idempotent: a duplicate reaper pass on the same row
* short-circuits at markR2Purged (returns false). The R2 delete
* is also tolerated when the object is already gone.
* - Audit failures are logged but do not stop the sweep; the row is
* stamped as purged either way because the R2 object is gone.
*
* Tunables:
* - DEFAULT_BATCH: how many rows to process per cron tick. Sized
* so a single cron invocation completes well under the Workers
* 30-second CPU budget even if every row needs an R2 round-trip.
* - HOURLY_CRON is "0 * * * *"; documented in wrangler.toml.
*
* Production observability: returns a counts object the cron
* handler logs. When BetterStack monitors land (Lane 2 SRE), the
* caller wires this to a synthetic-success heartbeat.
*/
const DEFAULT_BATCH = 200;
export interface ReaperResult {
scanned: number;
purged: number;
skipped_already_purged: number;
r2_errors: number;
audit_errors: number;
boundary_violations: number;
/** True when RETENTION_REAPER_DRYRUN=true: NOTHING is deleted,
* stamped, or audited — the sweep is read-only and `would_purge`
* reports what a real run would delete. The safe way to verify
* the reaper against a live Neon before enabling destruction. */
dry_run: boolean;
would_purge: number;
/** Wave C2.3 audit BLOCKER fix: rows whose encryption
* half-completed (ciphertext exists but NO wrapped DEK) — the
* plaintext is DELIBERATELY NOT deleted (deleting it would orphan
* an undecryptable document = permanent data loss). Retained +
* loudly logged for operator/cron remediation. A non-zero value
* here is an alertable condition. */
encrypt_incomplete_retained: number;
/** C2 FU#2 finish-encrypt backstop: expired `plaintext` rows
* whose owner HAS an enrolled identity key but were never
* encrypted (the consumer's best-effort attempt failed). The
* reaper finished the encryption → the document is now
* `encrypted` + RETAINED instead of silently deleted. Closes the
* "opted-in customer loses a document" data-loss path. */
encrypt_retried: number;
}
export async function runRetentionReaper(
env: Env,
asOf: Date = new Date(),
batch: number = DEFAULT_BATCH,
): Promise<ReaperResult> {
const store = defaultDocumentsStore(env);
return runRetentionReaperOnStore(
env,
store,
asOf,
batch,
env.RETENTION_REAPER_DRYRUN === "true",
);
}
/**
* The pure form of the reaper: takes the documents-store as a
* parameter so tests can seed a Stub and exercise the same body
* the cron handler runs. Production calls runRetentionReaper above,
* which resolves the store via the standard factory.
*/
export async function runRetentionReaperOnStore(
env: Env,
store: DocumentsStore,
asOf: Date,
batch: number = DEFAULT_BATCH,
dryRun: boolean = false,
): Promise<ReaperResult> {
const result: ReaperResult = {
scanned: 0,
purged: 0,
skipped_already_purged: 0,
r2_errors: 0,
audit_errors: 0,
boundary_violations: 0,
dry_run: dryRun,
would_purge: 0,
encrypt_incomplete_retained: 0,
encrypt_retried: 0,
};
let expired;
try {
expired = await store.listExpiredForReaper(asOf.toISOString(), batch);
} catch (err) {
// Fetch failure: the documents-store could not enumerate expired
// rows (D1 transient, binding misconfigured, etc.). Surface to
// the observability channel and return zero-scan; the next cron
// tick will retry.
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "list_expired_failed",
document_id: null,
org_id: null,
error: err instanceof Error ? err.message : String(err),
},
});
return result;
}
result.scanned = expired.length;
for (const row of expired) {
if (dryRun) {
// Read-only verification. Validate the tenant-prefix invariant
// WITHOUT deleting, stamping, or auditing anything. Lets an
// operator confirm the live-Neon documents path enumerates
// expired rows correctly before arming destruction.
try {
assertOrgOwnsKey(row.org_id, row.r2_key);
result.would_purge += 1;
log(env, {
event: "retention_reaper.dry_run",
fields: {
document_id: row.id,
org_id: row.org_id,
retention_until: row.retention_until,
},
});
} catch (err) {
if (err instanceof TenantBoundaryViolation) {
result.boundary_violations += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "tenant_boundary_violation",
document_id: row.id,
org_id: row.org_id,
error: err.message,
},
});
} else {
throw err;
}
}
continue;
}
// Wave C2.3 audit BLOCKER fix. NEVER delete a plaintext whose
// E2EE encryption half-completed but is NOT recoverable.
// deks row present -> recoverable; deleting the
// leftover plaintext IS the
// desired end state. Self-heal
// encryption_state below.
// no deks + ciphertext (.enc) -> UNRECOVERABLE if deleted
// (no wrapped DEK exists, the
// DEK was zeroed in-process).
// Retain plaintext + loud log.
// no deks + no ciphertext -> genuinely never encrypted
// (opt-out / pre-enrol). Legacy
// 24h delete (unchanged).
// Fail SAFE: if recoverability cannot be determined, retain.
let recoverable = false;
try {
const dek = await defaultDocumentDeksStore(env).findForOrg(
row.org_id,
row.id,
);
if (dek) {
recoverable = true;
} else {
const encKey = r2KeyForOrg(row.org_id, row.id, "enc");
const cipher = await r2GetForOrg(env, row.org_id, encKey);
if (cipher) {
result.encrypt_incomplete_retained += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "encrypt_incomplete_retained",
document_id: row.id,
org_id: row.org_id,
},
});
continue;
}
}
} catch (err) {
result.r2_errors += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "recoverability_check_failed",
document_id: row.id,
org_id: row.org_id,
error: err instanceof Error ? err.message : String(err),
},
});
continue;
}
// C2 FU#2 finish-encrypt backstop. Reached only when the row is
// genuinely never-encrypted (no wrapped DEK AND no ciphertext —
// `recoverable` is false). If the owner enrolled an identity
// key, the consumer's post-extraction encrypt SHOULD have run
// but failed; deleting now would silently lose an opted-in
// customer's document. Try to finish the encryption HERE,
// before the legacy delete. Only the "encrypted" outcome
// changes behaviour (retain instead of delete); every other
// outcome (no identity key = not opted in, no plaintext,
// error) falls through to the unchanged legacy delete — this
// can only ever RETAIN a doc that would otherwise be deleted,
// never the reverse. Best-effort: never throws to the caller.
if (!recoverable) {
const outcome = await finishDocumentEncrypt(env, {
document_id: row.id,
org_id: row.org_id,
r2_key: row.r2_key,
});
if (outcome === "encrypted" || outcome === "already_encrypted") {
result.encrypt_retried += 1;
log(env, {
event: "retention_reaper.encrypt_retried",
fields: { document_id: row.id, org_id: row.org_id, outcome },
});
continue; // RETAINED — do not delete or mark purged.
}
// FU#2-audit BLOCKER fix. A non-success outcome must NOT fall
// straight to the legacy delete: finishDocumentEncrypt may
// have partially run (wrote `.enc`, then `deks.insert` threw)
// and, if its own rollback also failed, left a `.enc` orphan
// — OR a concurrent at-least-once consumer delivery may have
// just written a deks row. Deleting the plaintext now would
// be the exact C2.3 permanent-undecryptable-loss class. So
// RE-CHECK recoverability here; if a wrapped DEK OR a `.enc`
// ciphertext now exists, RETAIN (mirror the C2.3 net's
// retain-and-loud-log). Only a genuinely clean
// never-encrypted row (no deks, no `.enc`) reaches the
// unchanged legacy delete below.
try {
const dekNow = await defaultDocumentDeksStore(env).findForOrg(
row.org_id,
row.id,
);
let cipherNow = false;
if (!dekNow) {
const encKey = r2KeyForOrg(row.org_id, row.id, "enc");
cipherNow = !!(await r2GetForOrg(env, row.org_id, encKey));
}
if (dekNow || cipherNow) {
result.encrypt_incomplete_retained += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "encrypt_incomplete_retained",
document_id: row.id,
org_id: row.org_id,
after_outcome: outcome,
},
});
continue; // RETAINED — never delete a (maybe) recoverable doc.
}
} catch (err) {
// Recoverability re-check itself failed → fail SAFE: retain.
result.r2_errors += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "post_finish_recheck_failed",
document_id: row.id,
org_id: row.org_id,
error: err instanceof Error ? err.message : String(err),
},
});
continue;
}
}
try {
await r2DeleteForOrg(env, row.org_id, row.r2_key);
} catch (err) {
if (err instanceof TenantBoundaryViolation) {
// Loud failure: a row's r2_key escaped the tenant prefix.
// Refuse to delete; ops must investigate.
result.boundary_violations += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "tenant_boundary_violation",
document_id: row.id,
org_id: row.org_id,
error: err.message,
},
});
continue;
}
result.r2_errors += 1;
// Best-effort: tolerate "already gone" + transient errors and
// still mark purged below. R2 .delete() resolves on 404
// already, so this branch is reserved for binding failures.
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "r2_delete_failed",
document_id: row.id,
org_id: row.org_id,
error: err instanceof Error ? err.message : String(err),
},
});
}
const purgedAt = asOf.toISOString();
let newlyMarked: boolean;
try {
newlyMarked = await store.markR2Purged(row.id, purgedAt);
} catch (err) {
// DB update failure: the R2 object may already be gone but we
// could not stamp r2_purged_at. Count it against r2_errors so
// the cron heartbeat surfaces the drift; the next sweep will
// retry idempotently.
result.r2_errors += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "db_update_failed",
document_id: row.id,
org_id: row.org_id,
error: err instanceof Error ? err.message : String(err),
},
});
continue;
}
if (!newlyMarked) {
result.skipped_already_purged += 1;
continue;
}
result.purged += 1;
// Self-heal: a recoverable row (wrapped DEK durable) whose
// plaintext we just discarded IS the fully-encrypted end state
// — reflect that so it stops being scanned and the account
// shows E2EE-complete. The prior delivery crashed between
// deks.insert and its own plaintext delete; the reaper finished
// the job. setEncryptionState is org-scoped (self-contained
// set_config); best-effort — a miss only re-scans next sweep.
if (recoverable) {
try {
await store.setEncryptionState(row.id, row.org_id, "encrypted");
} catch {
/* cosmetic; the plaintext is already gone + recoverable */
}
}
try {
// Audit batch 2 P-F5: namespace system-triggered audit events
// with a `sys_` prefix so the eventual audit-log UI can render
// them distinctly from user-actor events, and a future
// human-attacker cannot register a username "system" that
// would visually collide.
await recordAuditEvent(new D1AuditStore(env.DB), {
org_id: row.org_id,
actor_id: "sys_retention_reaper",
action: "document.r2_purged",
target_kind: "document",
target_ref: row.id,
});
} catch (err) {
// Audit chain hiccup; the row is already marked purged so the
// customer sees the storage state correctly. Loud-fail audit
// gating lands with H-priv-6.
result.audit_errors += 1;
log(env, {
event: "retention_reaper.error",
level: "error",
fields: {
reason: "audit_write_failed",
document_id: row.id,
org_id: row.org_id,
error: err instanceof Error ? err.message : String(err),
},
});
}
}
return result;
}See also
This is the file as it lives at the moment of this build. The canonical history lives in git. If you want the full history or a specific commit, write to hello@muntin.digital.