Appearance
Post-Commit Queue для action хуков
Внутренний паттерн проекта Crosses. Помогает корректно обрабатывать
actionхуки, которые срабатывают внутри длинной ручной транзакции — типичный случай при синхронизации между внешним и внутренним порталами.
TL;DR
Если ты пишешь action хук на коллекцию, которая может создаваться/обновляться внутри синхронизации (extensions/.shared/src/endpoints/synchronization.js) — оборачивай работу в enqueuePostCommit(...) вместо того чтобы выполнять её прямо в обработчике. Иначе ты не увидишь свою же запись через ItemsService.readOne(), потому что она ещё не закоммичена, а action эмитится с фрешем из пула.
ts
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';
action('approval_requests.items.create', ({ key }, { database, schema }) => {
enqueuePostCommit(async () => {
const svc = new ItemsService('approval_requests', {
knex: database, schema, accountability: { admin: true } as any,
});
const record = await svc.readOne(key, { fields: [...] });
// ... вся работа с record
});
});import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';
action('approval_requests.items.create', ({ key }, { database, schema }) => {
enqueuePostCommit(async () => {
const svc = new ItemsService('approval_requests', {
knex: database, schema, accountability: { admin: true } as any,
});
const record = await svc.readOne(key, { fields: [...] });
// ... вся работа с record
});
});Всё. Если ты НЕ внутри sync — задача выполнится сразу (fallback). Если внутри — выполнится после trx.commit() в synchronization.js.
Зачем это вообще нужно
Корень проблемы — три факта из исходников Directus
Факт 1. ItemsService.createOne эмитит action событие СРАЗУ после внутренней обёртки transaction(), до коммита внешней транзакции. См. api/src/services/items.ts (createOne):
ts
const primaryKey = await transaction(this.knex, async (trx) => {
// ... insert
});
if (opts.emitEvents !== false) {
const actionEvent = {
event: ['items.create', `${this.collection}.items.create`],
meta: { payload, key: primaryKey, collection: this.collection },
context: {
database: getDatabase(), // ← СВЕЖИЙ knex из пула, НЕ trx!
schema: this.schema,
accountability: this.accountability,
},
};
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}const primaryKey = await transaction(this.knex, async (trx) => {
// ... insert
});
if (opts.emitEvents !== false) {
const actionEvent = {
event: ['items.create', `${this.collection}.items.create`],
meta: { payload, key: primaryKey, collection: this.collection },
context: {
database: getDatabase(), // ← СВЕЖИЙ knex из пула, НЕ trx!
schema: this.schema,
accountability: this.accountability,
},
};
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}Факт 2. Хелпер transaction() НЕ оборачивает в новую транзакцию, если переданный knex УЖЕ транзакция. См. api/src/utils/transaction.ts:
ts
export const transaction = async (knex, handler) => {
if (knex.isTransaction) {
return handler(knex); // ← inline, без savepoint
} else {
return await knex.transaction(trx => handler(trx));
}
};export const transaction = async (knex, handler) => {
if (knex.isTransaction) {
return handler(knex); // ← inline, без savepoint
} else {
return await knex.transaction(trx => handler(trx));
}
};Факт 3. emitAction — fire-and-forget. См. api/src/emitter.ts:
ts
public emitAction(event, meta, context): void {
for (const event of events) {
this.actionEmitter.emitAsync(event, ...).catch(err => logger.warn(...));
// ^^^^^^^^^ не await!
}
}public emitAction(event, meta, context): void {
for (const event of events) {
this.actionEmitter.emitAsync(event, ...).catch(err => logger.warn(...));
// ^^^^^^^^^ не await!
}
}Что из этого следует для синхронизации
extensions/.shared/src/endpoints/synchronization.js открывает одну ручную транзакцию на весь sync-пакет:
js
trx = await database.transaction()
for (const item of data) {
await crudActivity(item, trx, ...) // все ItemsService с knex: trx
}
await recordResults(null, trx, ItemsService, data, ...)
// ...
await trx.commit(); // ← много позже, после ВСЕГО цикла + feedbacktrx = await database.transaction()
for (const item of data) {
await crudActivity(item, trx, ...) // все ItemsService с knex: trx
}
await recordResults(null, trx, ItemsService, data, ...)
// ...
await trx.commit(); // ← много позже, после ВСЕГО цикла + feedbackРаскручиваем по слоям:
crudActivity→ItemsService.createOneсknex: trx- Внутри
createOneхелперtransaction()видитknex.isTransaction === true→ inline, никакого нового trx/savepoint - После
await transaction(...)(которая просто отработала inline) →emitter.emitAction(...)сdatabase: getDatabase()(свежий пул) - Action хендлеры (наши
action(...)в хуках) запускаются в микротаск-очереди — параллельно с продолжениемfor-цикла вsynchronization.js - Внешняя
trxещё открыта,commit()будет позже - Action хендлер делает
readOne(key)черезgetDatabase()→ свежий коннект из пула → READ COMMITTED isolation → запись в trx не видна →null
Race condition. Хук проигрывает гонку в коммит транзакции, если только не подождёт.
Почему filter тоже не помогает
Filter эмитится до INSERT, см. items.ts createOne (lines 159-175). В момент filter'а записи ещё не существует физически. Так что переключение action → filter решает проблему «нельзя прочитать после insert», но создаёт новую — «нечего читать до insert».
Почему Flow operation работает (и почему это не наш путь)
Flow с триггером action (см. api/src/flows.ts:194-208) подписывается через тот же emitter.onAction, с тем же database: getDatabase() — то есть структурно ничем не отличается от extension hook. Но Flow проходит через executeFlow → executeOperation → keyedData → handler, что добавляет несколько микротасков и await'ов. Обычно за это время synchronization.js успевает дойти до trx.commit(), и Flow operation побеждает в гонке.
Это случайность, а не гарантия. Под нагрузкой Flow тоже может проиграть. Плюс минусы Flow: нужно создавать operation в коде + регистрировать в админке + поддерживать в БД. Гораздо проще — обычный hook с правильным паттерном.
Решение — post-commit-queue
Идея: action хук не делает работу сразу, а складывает её в очередь, привязанную к sync-контексту через AsyncLocalStorage. После успешного trx.commit() synchronization.js сливает очередь — задачи выполняются и видят закоммиченные данные.
Архитектура
┌─────────────────────────────────────────────────────────────┐
│ POST /synchronization (synchronization.js) │
│ │
│ syncContextStorage.run({ tasks: [] }, async () => { │
│ │
│ trx = await database.transaction() │
│ │
│ for (item of data) { │
│ await crudActivity(item, trx, ...) │
│ │ │
│ └→ ItemsService.createOne │
│ │ │
│ └→ emitter.emitAction('items.create') │
│ │ │
│ └→ ┌──────────────────────────┐ │
│ │ action hook │ │
│ │ enqueuePostCommit(fn) │ ──┐ │
│ │ // сохраняет fn в │ │ │
│ │ // ctx.tasks │ │ │
│ └──────────────────────────┘ │ │
│ } │ │
│ │ │
│ await trx.commit() ✅ только теперь │ │
│ │ │
│ await drainPostCommit() ───────────────────┐ │ │
│ ↓ │ │ │
│ выполняет ctx.tasks ───────────────────────┘ │ │
│ │ │
│ }) │ │
└──────────────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────────────┐
│ POST /synchronization (synchronization.js) │
│ │
│ syncContextStorage.run({ tasks: [] }, async () => { │
│ │
│ trx = await database.transaction() │
│ │
│ for (item of data) { │
│ await crudActivity(item, trx, ...) │
│ │ │
│ └→ ItemsService.createOne │
│ │ │
│ └→ emitter.emitAction('items.create') │
│ │ │
│ └→ ┌──────────────────────────┐ │
│ │ action hook │ │
│ │ enqueuePostCommit(fn) │ ──┐ │
│ │ // сохраняет fn в │ │ │
│ │ // ctx.tasks │ │ │
│ └──────────────────────────┘ │ │
│ } │ │
│ │ │
│ await trx.commit() ✅ только теперь │ │
│ │ │
│ await drainPostCommit() ───────────────────┐ │ │
│ ↓ │ │ │
│ выполняет ctx.tasks ───────────────────────┘ │ │
│ │ │
│ }) │ │
└──────────────────────────────────────────────────────────────┘AsyncLocalStorage пробрасывает ctx через всю асинхронную цепочку вызовов, включая микротаски от emitter.emitAction — Node поддерживает это нативно.
API хелпера
Файл: extensions/.shared/src/utils/post-commit-queue.ts
enqueuePostCommit(task, options?)
ts
function enqueuePostCommit(
task: () => Promise<void>,
options?: { runImmediatelyIfNoContext?: boolean }
): voidfunction enqueuePostCommit(
task: () => Promise<void>,
options?: { runImmediatelyIfNoContext?: boolean }
): void- Внутри sync-контекста → задача попадает в очередь, выполнится после
drainPostCommit() - Вне sync-контекста → задача запускается немедленно через fire-and-forget Promise (CRUD из админки и т.п.)
runImmediatelyIfNoContext: falseотключает fallback — задача просто игнорируется если контекста нет (полезно если ты ТОЛЬКО хочешь обработать sync-events и не хочешь дублировать на CRUD)
drainPostCommit()
ts
async function drainPostCommit(): Promise<void>async function drainPostCommit(): Promise<void>Слив очереди. Вызывается только из synchronization.js после успешного trx.commit(). Свойства:
- задачи выполняются последовательно (одна за другой)
- ошибка одной задачи не прерывает остальные
- исключения наружу не пробрасывает
- очередь после drain пустая (повторный вызов — no-op)
syncContextStorage
ts
const syncContextStorage: AsyncLocalStorage<{ tasks: Task[]; label?: string }>const syncContextStorage: AsyncLocalStorage<{ tasks: Task[]; label?: string }>AsyncLocalStorage инстанс. Используется напрямую только в synchronization.js:
js
await syncContextStorage.run({ tasks: [], label: `sync-${feedbackId}` }, async () => {
// вся работа с trx
});await syncContextStorage.run({ tasks: [], label: `sync-${feedbackId}` }, async () => {
// вся работа с trx
});В хуках работать с ним напрямую не нужно — enqueuePostCommit сам читает контекст.
Когда использовать
✅ Используй enqueuePostCommit, если
- Твой
actionхук подписан на коллекцию, в которую может писатьsynchronization.js - Внутри хука нужно прочитать ТОЛЬКО ЧТО созданную/обновлённую запись (для отправки уведомления, генерации файла, вызова внешнего API и т.п.)
- Логика хука не должна откатываться при rollback внешней trx (например, уведомление надо посылать только если sync реально завершился)
❌ НЕ используй enqueuePostCommit, если
- Твой хук —
filter, неaction(filter имеет свою семантику и работает с payload до insert'а) - Тебе нужно остановить операцию (filter может вернуть payload или throw — action не может, action асинхронный)
- Тебе нужно прочитать запись в той же транзакции (например, ты делаешь
filterи тебе нужно проверить связанные записи) — для этого юзайfilterиcontext.database(он будет = trx) - Логика должна выполняться до commit (валидация, отмена insert) — это уровень
filter, а неaction
Примеры
Action hook — стандартный паттерн
ts
// extensions/outer-extensions/src/hooks/notifications.hook.ts
import { defineHook } from '@directus/extensions-sdk';
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';
export default defineHook(({ action }, { logger, services }) => {
const { ItemsService, NotificationsService } = services;
action('approval_requests.items.create', ({ key, keys }, { database, schema }) => {
const approvalRequestId = key ?? keys?.[0];
if (!approvalRequestId) return;
enqueuePostCommit(async () => {
try {
const svc = new ItemsService('approval_requests', {
knex: database,
schema,
accountability: { admin: true } as any,
});
const record = await svc.readOne(approvalRequestId, {
fields: ['id', 'request_to.id', 'process.type', /* ... */],
});
if (!record || record.process?.type !== 'document_approval') return;
const notifications = new NotificationsService({
knex: database, schema, accountability: null,
});
await notifications.createOne({
subject: 'Требуется согласование',
message: '...',
recipient: record.request_to.id,
});
logger.info(`Notification sent for ${approvalRequestId}`);
} catch (err) {
logger.error(`Notification failed: ${(err as Error).message}`);
}
});
});
});// extensions/outer-extensions/src/hooks/notifications.hook.ts
import { defineHook } from '@directus/extensions-sdk';
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';
export default defineHook(({ action }, { logger, services }) => {
const { ItemsService, NotificationsService } = services;
action('approval_requests.items.create', ({ key, keys }, { database, schema }) => {
const approvalRequestId = key ?? keys?.[0];
if (!approvalRequestId) return;
enqueuePostCommit(async () => {
try {
const svc = new ItemsService('approval_requests', {
knex: database,
schema,
accountability: { admin: true } as any,
});
const record = await svc.readOne(approvalRequestId, {
fields: ['id', 'request_to.id', 'process.type', /* ... */],
});
if (!record || record.process?.type !== 'document_approval') return;
const notifications = new NotificationsService({
knex: database, schema, accountability: null,
});
await notifications.createOne({
subject: 'Требуется согласование',
message: '...',
recipient: record.request_to.id,
});
logger.info(`Notification sent for ${approvalRequestId}`);
} catch (err) {
logger.error(`Notification failed: ${(err as Error).message}`);
}
});
});
});Важные моменты:
- Сам хук остаётся синхронным (без
asyncв callback).enqueuePostCommitпринимаетasync () => {}функцию — она будет вызвана позже. - Try/catch внутри задачи обязателен.
drainPostCommitловит ошибки сам, но ты не получишь информацию о том, что именно сломалось — лучше залогируй. accountability: { admin: true }— обязательно. По умолчаниюaccountabilityхука может быть от sync-сервиса с ограниченными правами.databaseиз контекста хука =getDatabase()(свежий пул, см.items.ts:380). Используем его — после commit он точно увидит данные.- Не используй
setImmediate/setTimeout/process.nextTickвнутриenqueuePostCommit-задачи — они оторвут задачу от sync-контекста и от транзакционных гарантий, и Mox AsyncLocalStorage потеряется.
Action hook — отключить fallback вне sync
Если хук должен срабатывать только в рамках синхронизации (например, для зеркалирования изменений на другой портал, и при ручном CRUD это не нужно):
ts
action('some_collection.items.update', ({ key }, { database, schema }) => {
enqueuePostCommit(
async () => { /* ... */ },
{ runImmediatelyIfNoContext: false } // ← вне sync — игнорируем
);
});action('some_collection.items.update', ({ key }, { database, schema }) => {
enqueuePostCommit(
async () => { /* ... */ },
{ runImmediatelyIfNoContext: false } // ← вне sync — игнорируем
);
});Long-transaction endpoint — добавить новый sync-эндпоинт
Если ты пишешь свой endpoint, который держит длинную ручную транзакцию (по аналогии с synchronization.js), и хочешь чтобы action хуки внутри корректно работали:
js
import { syncContextStorage, drainPostCommit } from '../utils/post-commit-queue.ts'
export default {
id: "my-bulk-endpoint",
handler: (router, { database, services, getSchema }) => {
router.post("/", async (req, res) => {
await syncContextStorage.run(
{ tasks: [], label: `bulk-${Date.now()}` },
async () => {
let trx;
try {
trx = await database.transaction();
// ... твоя работа с ItemsService через knex: trx
await trx.commit();
// Сразу после commit — слив очереди
try {
await drainPostCommit();
} catch (err) {
req.log.warn(`Post-commit drain error: ${err.message}`);
}
res.status(200).json({ ok: true });
} catch (err) {
if (trx) await trx.rollback();
// tasks тихо отбрасываются вместе с контекстом
res.status(500).json({ error: err.message });
}
}
);
});
}
}import { syncContextStorage, drainPostCommit } from '../utils/post-commit-queue.ts'
export default {
id: "my-bulk-endpoint",
handler: (router, { database, services, getSchema }) => {
router.post("/", async (req, res) => {
await syncContextStorage.run(
{ tasks: [], label: `bulk-${Date.now()}` },
async () => {
let trx;
try {
trx = await database.transaction();
// ... твоя работа с ItemsService через knex: trx
await trx.commit();
// Сразу после commit — слив очереди
try {
await drainPostCommit();
} catch (err) {
req.log.warn(`Post-commit drain error: ${err.message}`);
}
res.status(200).json({ ok: true });
} catch (err) {
if (trx) await trx.rollback();
// tasks тихо отбрасываются вместе с контекстом
res.status(500).json({ error: err.message });
}
}
);
});
}
}Ключевые правила для эндпоинтов:
syncContextStorage.run— самый внешний слой обработчика. Всё, включая открытие trx, должно быть внутри callback.drainPostCommit()вызывай ТОЛЬКО после успешногоtrx.commit(). При rollback не вызывай — задачи отбросятся вместе с контекстом по выходу изrun.- Один endpoint = один контекст. Не вкладывай
syncContextStorage.runдруг в друга — внутреннийenqueuePostCommitпопадёт во внутренний контекст, а не во внешний. drainPostCommitлови в try/catch на всякий случай. Helper ловит ошибки задач, но баги в самом хелпере или в коде раннера лучше обработать.
Поведение при rollback
При вызове trx.rollback() (например, sync-пакет провалился, fallback ошибки и т.п.):
drainPostCommit()НЕ вызывается- Задачи в очереди контекста тихо отбрасываются
- Контекст уничтожается по выходу из
syncContextStorage.run - Уведомления, файлы, внешние вызовы — не происходят
Это правильное поведение — задачи семантически зависят от факта коммита. Если данных в БД нет, отправлять уведомление о них некорректно.
Если тебе нужны задачи, которые должны выполниться независимо от исхода транзакции (например, метрики или audit log) — не используй post-commit queue, делай fire-and-forget напрямую через void doIt().
Логи
В synchronization.js после успешного коммита и слива:
[Main stream] Transaction commit ✅✅✅
[Main stream] Post-commit queue drained ✅[Main stream] Transaction commit ✅✅✅
[Main stream] Post-commit queue drained ✅Если drainPostCommit сам кинул исключение (не должно происходить, helper ловит ошибки задач):
[Main stream] Post-commit drain unexpected error: <message>[Main stream] Post-commit drain unexpected error: <message>В хуках — твоя задача залогировать сама. Рекомендуется префикс [NotificationsHook] или аналогичный, чтобы было удобно grep-ать.
Известные ограничения и подводные камни
1. AsyncLocalStorage и сторонние библиотеки
Node.js пробрасывает ALS-контекст через async/await нативно. Но если задача попадает в сторонний event loop (например, очередь BullMQ, worker thread, native addon callback) — контекст может потеряться. В нашем случае всё работает в основном Node-процессе, проблем нет, но если будешь интегрировать что-то экзотическое — проверяй.
2. Несколько action хуков на одно событие
Если на approval_requests.items.create подписано несколько хуков, каждый должен использовать enqueuePostCommit отдельно. Очередь общая на контекст — все задачи попадут в неё и выполнятся в порядке регистрации.
3. Отложенные задачи и очень длинный sync
drainPostCommit сливает задачи последовательно. Если задач много и каждая делает сетевой вызов или PDF-генерацию — общее время drain может вырасти. На текущей нагрузке это не проблема, но если будешь видеть таймауты — рассмотри параллельный drain (Promise.allSettled) или вынос задач в реальную очередь (BullMQ + Redis).
4. Не зови enqueuePostCommit из самой задачи
ts
enqueuePostCommit(async () => {
enqueuePostCommit(async () => { /* ... */ }); // ← НЕТ!
});enqueuePostCommit(async () => {
enqueuePostCommit(async () => { /* ... */ }); // ← НЕТ!
});К моменту выполнения первой задачи drainPostCommit уже сделал splice(0) и контекст пуст. Вторая задача попадёт в новый пустой массив, который никто не сольёт. Если нужна цепочка — вызывай вторую задачу прямо изнутри первой через await.
5. Rollback после частичного drain — невозможен
Как только drainPostCommit начал выполнять задачи, обратной дороги нет. Если задача отправила HTTP-запрос в сторонний сервис, а потом ты по какой-то причине решил откатить trx — поздно, запрос уже ушёл. Поэтому drainPostCommit строго после commit() и никак иначе.
История
- 2026-04-07 — обнаружен race condition между
actionхуком наapproval_requestsи длинной trx вsynchronization.js. Подтверждён через лог[setImmediate-test] approval_request <id> visible after 7 attempt(s)— setImmediate+полинг с backoff увидел запись только через ~1.3 секунды - 2026-04-07 — реализован post-commit-queue (этот документ), setImmediate+полинг убран, работает в проде на тесте
- weeek-596 — связанная задача в трекере
Связанные файлы
extensions/.shared/src/utils/post-commit-queue.ts— хелперextensions/.shared/src/endpoints/synchronization.js— sync эндпоинт, пример использованияsyncContextStorage.run+drainPostCommitextensions/outer-extensions/src/hooks/notifications.hook.ts—approval_requests.items.createпример использованияenqueuePostCommitв hookapi/src/services/items.ts— исходник Directus, объясняющий семантику emitActionapi/src/utils/transaction.ts— исходникtransaction()хелпераapi/src/emitter.ts— исходник fire-and-forgetemitActionapi/src/flows.ts— для сравнения: как Flow подписывается на action события