Skip to content
On this page

Directus Cloud

Everything you need to start building. Provisioned in 90 seconds. Starting at $15/month.

Get Started

Post-Commit Queue для action хуков

Внутренний паттерн проекта Crosses. Помогает корректно обрабатывать action хуки, которые срабатывают внутри длинной ручной транзакции — типичный случай при синхронизации между внешним и внутренним порталами.

TL;DR

Если ты пишешь action хук на коллекцию, которая может создаваться/обновляться внутри синхронизации (extensions/.shared/src/endpoints/synchronization.js) — оборачивай работу в enqueuePostCommit(...) вместо того чтобы выполнять её прямо в обработчике. Иначе ты не увидишь свою же запись через ItemsService.readOne(), потому что она ещё не закоммичена, а action эмитится с фрешем из пула.

ts
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';

action('approval_requests.items.create', ({ key }, { database, schema }) => {
    enqueuePostCommit(async () => {
        const svc = new ItemsService('approval_requests', {
            knex: database, schema, accountability: { admin: true } as any,
        });
        const record = await svc.readOne(key, { fields: [...] });
        // ... вся работа с record
    });
});
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';

action('approval_requests.items.create', ({ key }, { database, schema }) => {
    enqueuePostCommit(async () => {
        const svc = new ItemsService('approval_requests', {
            knex: database, schema, accountability: { admin: true } as any,
        });
        const record = await svc.readOne(key, { fields: [...] });
        // ... вся работа с record
    });
});

Всё. Если ты НЕ внутри sync — задача выполнится сразу (fallback). Если внутри — выполнится после trx.commit() в synchronization.js.

Зачем это вообще нужно

Корень проблемы — три факта из исходников Directus

Факт 1. ItemsService.createOne эмитит action событие СРАЗУ после внутренней обёртки transaction(), до коммита внешней транзакции. См. api/src/services/items.ts (createOne):

ts
const primaryKey = await transaction(this.knex, async (trx) => {
    // ... insert
});

if (opts.emitEvents !== false) {
    const actionEvent = {
        event: ['items.create', `${this.collection}.items.create`],
        meta: { payload, key: primaryKey, collection: this.collection },
        context: {
            database: getDatabase(),    // ← СВЕЖИЙ knex из пула, НЕ trx!
            schema: this.schema,
            accountability: this.accountability,
        },
    };
    emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}
const primaryKey = await transaction(this.knex, async (trx) => {
    // ... insert
});

if (opts.emitEvents !== false) {
    const actionEvent = {
        event: ['items.create', `${this.collection}.items.create`],
        meta: { payload, key: primaryKey, collection: this.collection },
        context: {
            database: getDatabase(),    // ← СВЕЖИЙ knex из пула, НЕ trx!
            schema: this.schema,
            accountability: this.accountability,
        },
    };
    emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context);
}

Факт 2. Хелпер transaction() НЕ оборачивает в новую транзакцию, если переданный knex УЖЕ транзакция. См. api/src/utils/transaction.ts:

ts
export const transaction = async (knex, handler) => {
    if (knex.isTransaction) {
        return handler(knex);                          // ← inline, без savepoint
    } else {
        return await knex.transaction(trx => handler(trx));
    }
};
export const transaction = async (knex, handler) => {
    if (knex.isTransaction) {
        return handler(knex);                          // ← inline, без savepoint
    } else {
        return await knex.transaction(trx => handler(trx));
    }
};

Факт 3. emitAction — fire-and-forget. См. api/src/emitter.ts:

ts
public emitAction(event, meta, context): void {
    for (const event of events) {
        this.actionEmitter.emitAsync(event, ...).catch(err => logger.warn(...));
        //                            ^^^^^^^^^ не await!
    }
}
public emitAction(event, meta, context): void {
    for (const event of events) {
        this.actionEmitter.emitAsync(event, ...).catch(err => logger.warn(...));
        //                            ^^^^^^^^^ не await!
    }
}

Что из этого следует для синхронизации

extensions/.shared/src/endpoints/synchronization.js открывает одну ручную транзакцию на весь sync-пакет:

js
trx = await database.transaction()
for (const item of data) {
    await crudActivity(item, trx, ...)   // все ItemsService с knex: trx
}
await recordResults(null, trx, ItemsService, data, ...)
// ...
await trx.commit();   // ← много позже, после ВСЕГО цикла + feedback
trx = await database.transaction()
for (const item of data) {
    await crudActivity(item, trx, ...)   // все ItemsService с knex: trx
}
await recordResults(null, trx, ItemsService, data, ...)
// ...
await trx.commit();   // ← много позже, после ВСЕГО цикла + feedback

Раскручиваем по слоям:

  1. crudActivityItemsService.createOne с knex: trx
  2. Внутри createOne хелпер transaction() видит knex.isTransaction === true → inline, никакого нового trx/savepoint
  3. После await transaction(...) (которая просто отработала inline) → emitter.emitAction(...) с database: getDatabase() (свежий пул)
  4. Action хендлеры (наши action(...) в хуках) запускаются в микротаск-очереди — параллельно с продолжением for-цикла в synchronization.js
  5. Внешняя trx ещё открыта, commit() будет позже
  6. Action хендлер делает readOne(key) через getDatabase() → свежий коннект из пула → READ COMMITTED isolation → запись в trx не виднаnull

Race condition. Хук проигрывает гонку в коммит транзакции, если только не подождёт.

Почему filter тоже не помогает

Filter эмитится до INSERT, см. items.ts createOne (lines 159-175). В момент filter'а записи ещё не существует физически. Так что переключение action → filter решает проблему «нельзя прочитать после insert», но создаёт новую — «нечего читать до insert».

Почему Flow operation работает (и почему это не наш путь)

Flow с триггером action (см. api/src/flows.ts:194-208) подписывается через тот же emitter.onAction, с тем же database: getDatabase() — то есть структурно ничем не отличается от extension hook. Но Flow проходит через executeFlow → executeOperation → keyedData → handler, что добавляет несколько микротасков и await'ов. Обычно за это время synchronization.js успевает дойти до trx.commit(), и Flow operation побеждает в гонке.

Это случайность, а не гарантия. Под нагрузкой Flow тоже может проиграть. Плюс минусы Flow: нужно создавать operation в коде + регистрировать в админке + поддерживать в БД. Гораздо проще — обычный hook с правильным паттерном.

Решение — post-commit-queue

Идея: action хук не делает работу сразу, а складывает её в очередь, привязанную к sync-контексту через AsyncLocalStorage. После успешного trx.commit() synchronization.js сливает очередь — задачи выполняются и видят закоммиченные данные.

Архитектура

┌─────────────────────────────────────────────────────────────┐
│ POST /synchronization (synchronization.js)                  │
│                                                              │
│   syncContextStorage.run({ tasks: [] }, async () => {       │
│                                                              │
│     trx = await database.transaction()                      │
│                                                              │
│     for (item of data) {                                    │
│       await crudActivity(item, trx, ...)                    │
│       │                                                      │
│       └→ ItemsService.createOne                             │
│            │                                                 │
│            └→ emitter.emitAction('items.create')            │
│                 │                                            │
│                 └→ ┌──────────────────────────┐             │
│                    │ action hook              │             │
│                    │   enqueuePostCommit(fn)  │ ──┐         │
│                    │ // сохраняет fn в       │   │         │
│                    │ // ctx.tasks            │   │         │
│                    └──────────────────────────┘   │         │
│     }                                              │         │
│                                                    │         │
│     await trx.commit()       ✅ только теперь    │         │
│                                                    │         │
│     await drainPostCommit()  ───────────────────┐ │         │
│       ↓                                          │ │         │
│       выполняет ctx.tasks ───────────────────────┘ │         │
│                                                    │         │
│   })                                               │         │
└──────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ POST /synchronization (synchronization.js)                  │
│                                                              │
│   syncContextStorage.run({ tasks: [] }, async () => {       │
│                                                              │
│     trx = await database.transaction()                      │
│                                                              │
│     for (item of data) {                                    │
│       await crudActivity(item, trx, ...)                    │
│       │                                                      │
│       └→ ItemsService.createOne                             │
│            │                                                 │
│            └→ emitter.emitAction('items.create')            │
│                 │                                            │
│                 └→ ┌──────────────────────────┐             │
│                    │ action hook              │             │
│                    │   enqueuePostCommit(fn)  │ ──┐         │
│                    │ // сохраняет fn в       │   │         │
│                    │ // ctx.tasks            │   │         │
│                    └──────────────────────────┘   │         │
│     }                                              │         │
│                                                    │         │
│     await trx.commit()       ✅ только теперь    │         │
│                                                    │         │
│     await drainPostCommit()  ───────────────────┐ │         │
│       ↓                                          │ │         │
│       выполняет ctx.tasks ───────────────────────┘ │         │
│                                                    │         │
│   })                                               │         │
└──────────────────────────────────────────────────────────────┘

AsyncLocalStorage пробрасывает ctx через всю асинхронную цепочку вызовов, включая микротаски от emitter.emitAction — Node поддерживает это нативно.

API хелпера

Файл: extensions/.shared/src/utils/post-commit-queue.ts

enqueuePostCommit(task, options?)

ts
function enqueuePostCommit(
    task: () => Promise<void>,
    options?: { runImmediatelyIfNoContext?: boolean }
): void
function enqueuePostCommit(
    task: () => Promise<void>,
    options?: { runImmediatelyIfNoContext?: boolean }
): void
  • Внутри sync-контекста → задача попадает в очередь, выполнится после drainPostCommit()
  • Вне sync-контекста → задача запускается немедленно через fire-and-forget Promise (CRUD из админки и т.п.)
  • runImmediatelyIfNoContext: false отключает fallback — задача просто игнорируется если контекста нет (полезно если ты ТОЛЬКО хочешь обработать sync-events и не хочешь дублировать на CRUD)

drainPostCommit()

ts
async function drainPostCommit(): Promise<void>
async function drainPostCommit(): Promise<void>

Слив очереди. Вызывается только из synchronization.js после успешного trx.commit(). Свойства:

  • задачи выполняются последовательно (одна за другой)
  • ошибка одной задачи не прерывает остальные
  • исключения наружу не пробрасывает
  • очередь после drain пустая (повторный вызов — no-op)

syncContextStorage

ts
const syncContextStorage: AsyncLocalStorage<{ tasks: Task[]; label?: string }>
const syncContextStorage: AsyncLocalStorage<{ tasks: Task[]; label?: string }>

AsyncLocalStorage инстанс. Используется напрямую только в synchronization.js:

js
await syncContextStorage.run({ tasks: [], label: `sync-${feedbackId}` }, async () => {
    // вся работа с trx
});
await syncContextStorage.run({ tasks: [], label: `sync-${feedbackId}` }, async () => {
    // вся работа с trx
});

В хуках работать с ним напрямую не нужно — enqueuePostCommit сам читает контекст.

Когда использовать

✅ Используй enqueuePostCommit, если

  • Твой action хук подписан на коллекцию, в которую может писать synchronization.js
  • Внутри хука нужно прочитать ТОЛЬКО ЧТО созданную/обновлённую запись (для отправки уведомления, генерации файла, вызова внешнего API и т.п.)
  • Логика хука не должна откатываться при rollback внешней trx (например, уведомление надо посылать только если sync реально завершился)

❌ НЕ используй enqueuePostCommit, если

  • Твой хук — filter, не action (filter имеет свою семантику и работает с payload до insert'а)
  • Тебе нужно остановить операцию (filter может вернуть payload или throw — action не может, action асинхронный)
  • Тебе нужно прочитать запись в той же транзакции (например, ты делаешь filter и тебе нужно проверить связанные записи) — для этого юзай filter и context.database (он будет = trx)
  • Логика должна выполняться до commit (валидация, отмена insert) — это уровень filter, а не action

Примеры

Action hook — стандартный паттерн

ts
// extensions/outer-extensions/src/hooks/notifications.hook.ts
import { defineHook } from '@directus/extensions-sdk';
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';

export default defineHook(({ action }, { logger, services }) => {
    const { ItemsService, NotificationsService } = services;

    action('approval_requests.items.create', ({ key, keys }, { database, schema }) => {
        const approvalRequestId = key ?? keys?.[0];
        if (!approvalRequestId) return;

        enqueuePostCommit(async () => {
            try {
                const svc = new ItemsService('approval_requests', {
                    knex: database,
                    schema,
                    accountability: { admin: true } as any,
                });

                const record = await svc.readOne(approvalRequestId, {
                    fields: ['id', 'request_to.id', 'process.type', /* ... */],
                });

                if (!record || record.process?.type !== 'document_approval') return;

                const notifications = new NotificationsService({
                    knex: database, schema, accountability: null,
                });

                await notifications.createOne({
                    subject: 'Требуется согласование',
                    message: '...',
                    recipient: record.request_to.id,
                });

                logger.info(`Notification sent for ${approvalRequestId}`);
            } catch (err) {
                logger.error(`Notification failed: ${(err as Error).message}`);
            }
        });
    });
});
// extensions/outer-extensions/src/hooks/notifications.hook.ts
import { defineHook } from '@directus/extensions-sdk';
import { enqueuePostCommit } from '@extensions/shared/utils/post-commit-queue';

export default defineHook(({ action }, { logger, services }) => {
    const { ItemsService, NotificationsService } = services;

    action('approval_requests.items.create', ({ key, keys }, { database, schema }) => {
        const approvalRequestId = key ?? keys?.[0];
        if (!approvalRequestId) return;

        enqueuePostCommit(async () => {
            try {
                const svc = new ItemsService('approval_requests', {
                    knex: database,
                    schema,
                    accountability: { admin: true } as any,
                });

                const record = await svc.readOne(approvalRequestId, {
                    fields: ['id', 'request_to.id', 'process.type', /* ... */],
                });

                if (!record || record.process?.type !== 'document_approval') return;

                const notifications = new NotificationsService({
                    knex: database, schema, accountability: null,
                });

                await notifications.createOne({
                    subject: 'Требуется согласование',
                    message: '...',
                    recipient: record.request_to.id,
                });

                logger.info(`Notification sent for ${approvalRequestId}`);
            } catch (err) {
                logger.error(`Notification failed: ${(err as Error).message}`);
            }
        });
    });
});

Важные моменты:

  1. Сам хук остаётся синхронным (без async в callback). enqueuePostCommit принимает async () => {} функцию — она будет вызвана позже.
  2. Try/catch внутри задачи обязателен. drainPostCommit ловит ошибки сам, но ты не получишь информацию о том, что именно сломалось — лучше залогируй.
  3. accountability: { admin: true } — обязательно. По умолчанию accountability хука может быть от sync-сервиса с ограниченными правами.
  4. database из контекста хука = getDatabase() (свежий пул, см. items.ts:380). Используем его — после commit он точно увидит данные.
  5. Не используй setImmediate/setTimeout/process.nextTick внутри enqueuePostCommit-задачи — они оторвут задачу от sync-контекста и от транзакционных гарантий, и Mox AsyncLocalStorage потеряется.

Action hook — отключить fallback вне sync

Если хук должен срабатывать только в рамках синхронизации (например, для зеркалирования изменений на другой портал, и при ручном CRUD это не нужно):

ts
action('some_collection.items.update', ({ key }, { database, schema }) => {
    enqueuePostCommit(
        async () => { /* ... */ },
        { runImmediatelyIfNoContext: false }   // ← вне sync — игнорируем
    );
});
action('some_collection.items.update', ({ key }, { database, schema }) => {
    enqueuePostCommit(
        async () => { /* ... */ },
        { runImmediatelyIfNoContext: false }   // ← вне sync — игнорируем
    );
});

Long-transaction endpoint — добавить новый sync-эндпоинт

Если ты пишешь свой endpoint, который держит длинную ручную транзакцию (по аналогии с synchronization.js), и хочешь чтобы action хуки внутри корректно работали:

js
import { syncContextStorage, drainPostCommit } from '../utils/post-commit-queue.ts'

export default {
    id: "my-bulk-endpoint",
    handler: (router, { database, services, getSchema }) => {
        router.post("/", async (req, res) => {
            await syncContextStorage.run(
                { tasks: [], label: `bulk-${Date.now()}` },
                async () => {
                    let trx;
                    try {
                        trx = await database.transaction();

                        // ... твоя работа с ItemsService через knex: trx

                        await trx.commit();

                        // Сразу после commit — слив очереди
                        try {
                            await drainPostCommit();
                        } catch (err) {
                            req.log.warn(`Post-commit drain error: ${err.message}`);
                        }

                        res.status(200).json({ ok: true });
                    } catch (err) {
                        if (trx) await trx.rollback();
                        // tasks тихо отбрасываются вместе с контекстом
                        res.status(500).json({ error: err.message });
                    }
                }
            );
        });
    }
}
import { syncContextStorage, drainPostCommit } from '../utils/post-commit-queue.ts'

export default {
    id: "my-bulk-endpoint",
    handler: (router, { database, services, getSchema }) => {
        router.post("/", async (req, res) => {
            await syncContextStorage.run(
                { tasks: [], label: `bulk-${Date.now()}` },
                async () => {
                    let trx;
                    try {
                        trx = await database.transaction();

                        // ... твоя работа с ItemsService через knex: trx

                        await trx.commit();

                        // Сразу после commit — слив очереди
                        try {
                            await drainPostCommit();
                        } catch (err) {
                            req.log.warn(`Post-commit drain error: ${err.message}`);
                        }

                        res.status(200).json({ ok: true });
                    } catch (err) {
                        if (trx) await trx.rollback();
                        // tasks тихо отбрасываются вместе с контекстом
                        res.status(500).json({ error: err.message });
                    }
                }
            );
        });
    }
}

Ключевые правила для эндпоинтов:

  1. syncContextStorage.run — самый внешний слой обработчика. Всё, включая открытие trx, должно быть внутри callback.
  2. drainPostCommit() вызывай ТОЛЬКО после успешного trx.commit(). При rollback не вызывай — задачи отбросятся вместе с контекстом по выходу из run.
  3. Один endpoint = один контекст. Не вкладывай syncContextStorage.run друг в друга — внутренний enqueuePostCommit попадёт во внутренний контекст, а не во внешний.
  4. drainPostCommit лови в try/catch на всякий случай. Helper ловит ошибки задач, но баги в самом хелпере или в коде раннера лучше обработать.

Поведение при rollback

При вызове trx.rollback() (например, sync-пакет провалился, fallback ошибки и т.п.):

  • drainPostCommit() НЕ вызывается
  • Задачи в очереди контекста тихо отбрасываются
  • Контекст уничтожается по выходу из syncContextStorage.run
  • Уведомления, файлы, внешние вызовы — не происходят

Это правильное поведение — задачи семантически зависят от факта коммита. Если данных в БД нет, отправлять уведомление о них некорректно.

Если тебе нужны задачи, которые должны выполниться независимо от исхода транзакции (например, метрики или audit log) — не используй post-commit queue, делай fire-and-forget напрямую через void doIt().

Логи

В synchronization.js после успешного коммита и слива:

[Main stream] Transaction commit ✅✅✅
[Main stream] Post-commit queue drained ✅
[Main stream] Transaction commit ✅✅✅
[Main stream] Post-commit queue drained ✅

Если drainPostCommit сам кинул исключение (не должно происходить, helper ловит ошибки задач):

[Main stream] Post-commit drain unexpected error: <message>
[Main stream] Post-commit drain unexpected error: <message>

В хуках — твоя задача залогировать сама. Рекомендуется префикс [NotificationsHook] или аналогичный, чтобы было удобно grep-ать.

Известные ограничения и подводные камни

1. AsyncLocalStorage и сторонние библиотеки

Node.js пробрасывает ALS-контекст через async/await нативно. Но если задача попадает в сторонний event loop (например, очередь BullMQ, worker thread, native addon callback) — контекст может потеряться. В нашем случае всё работает в основном Node-процессе, проблем нет, но если будешь интегрировать что-то экзотическое — проверяй.

2. Несколько action хуков на одно событие

Если на approval_requests.items.create подписано несколько хуков, каждый должен использовать enqueuePostCommit отдельно. Очередь общая на контекст — все задачи попадут в неё и выполнятся в порядке регистрации.

3. Отложенные задачи и очень длинный sync

drainPostCommit сливает задачи последовательно. Если задач много и каждая делает сетевой вызов или PDF-генерацию — общее время drain может вырасти. На текущей нагрузке это не проблема, но если будешь видеть таймауты — рассмотри параллельный drain (Promise.allSettled) или вынос задач в реальную очередь (BullMQ + Redis).

4. Не зови enqueuePostCommit из самой задачи

ts
enqueuePostCommit(async () => {
    enqueuePostCommit(async () => { /* ... */ });   // ← НЕТ!
});
enqueuePostCommit(async () => {
    enqueuePostCommit(async () => { /* ... */ });   // ← НЕТ!
});

К моменту выполнения первой задачи drainPostCommit уже сделал splice(0) и контекст пуст. Вторая задача попадёт в новый пустой массив, который никто не сольёт. Если нужна цепочка — вызывай вторую задачу прямо изнутри первой через await.

5. Rollback после частичного drain — невозможен

Как только drainPostCommit начал выполнять задачи, обратной дороги нет. Если задача отправила HTTP-запрос в сторонний сервис, а потом ты по какой-то причине решил откатить trx — поздно, запрос уже ушёл. Поэтому drainPostCommit строго после commit() и никак иначе.

История

  • 2026-04-07 — обнаружен race condition между action хуком на approval_requests и длинной trx в synchronization.js. Подтверждён через лог [setImmediate-test] approval_request <id> visible after 7 attempt(s) — setImmediate+полинг с backoff увидел запись только через ~1.3 секунды
  • 2026-04-07 — реализован post-commit-queue (этот документ), setImmediate+полинг убран, работает в проде на тесте
  • weeek-596 — связанная задача в трекере

Связанные файлы

  • extensions/.shared/src/utils/post-commit-queue.ts — хелпер
  • extensions/.shared/src/endpoints/synchronization.js — sync эндпоинт, пример использования syncContextStorage.run + drainPostCommit
  • extensions/outer-extensions/src/hooks/notifications.hook.tsapproval_requests.items.create пример использования enqueuePostCommit в hook
  • api/src/services/items.ts — исходник Directus, объясняющий семантику emitAction
  • api/src/utils/transaction.ts — исходник transaction() хелпера
  • api/src/emitter.ts — исходник fire-and-forget emitAction
  • api/src/flows.ts — для сравнения: как Flow подписывается на action события