Adapter Transactions
Implement transactional consistency in your adapter
The Contract
When Ventyd calls commitEvents(), it hands your adapter a batch of events that must be persisted atomically. Either every event is saved, or none of them are. This is the single most important guarantee your adapter must uphold — without it, an entity's event stream can become inconsistent, and no amount of application-level retry logic can fix a half-written history.
But atomicity alone is often not enough. In production, you typically need to do more than just append events — you need to update a read model, maybe save a snapshot, and ensure no one else has written to the same entity in the meantime. All of this should happen inside one transaction.
Why One Transaction Matters
Consider what happens without a transaction:
1. Adapter inserts events ✅
2. Adapter updates view table ❌ (crashes here)
3. Adapter saves snapshot (never reached)Now your event store says the user's name is "Alice", but your view table still says "Bob". The read model is silently wrong, and you might not notice until a user reports it. Worse, the next commit will succeed normally, burying the inconsistency deeper.
A database transaction turns this into an all-or-nothing operation. If step 2 fails, step 1 is rolled back. The world stays consistent.
Implementing Transactions
The Minimal Case
At the simplest level, wrap your event inserts in a database transaction:
const adapter: Adapter = {
async commitEvents({ events }) {
if (events.length === 0) return;
await db.transaction(async (tx) => {
for (const event of events) {
await tx.insert('events', event);
}
});
},
// ...
};This is sufficient for getting started. But as your system grows, you'll want to do more inside that same transaction boundary.
The Full Pattern: Events + View + Snapshot
The Prisma adapter demonstrates what a production transaction typically looks like — three operations bundled into a single atomic commit:
async commitEvents({ events, entityId, entityName, state }) {
const lastEvent = events[events.length - 1];
const lastVersion = lastEvent?.version ?? 0;
// 1. Build the view row from current state
const viewRow = entityToViewRow({ entityId, entityName, state, version: lastVersion });
// 2. Start building the transaction
const commands = [
// Append events (write model)
tables.event.createMany({ data: events.map(eventToRow) }),
// Update the view (read model)
tables.view.upsert({
where: { entityId },
update: viewRow,
create: viewRow,
}),
];
// 3. Conditionally add a snapshot
if (snapshotEvery && lastVersion > 0 && lastVersion % snapshotEvery === 0) {
commands.push(
tables.snapshot.upsert({
where: { entityId },
update: { entityId, entityName, state, version: lastVersion },
create: { entityId, entityName, state, version: lastVersion },
}),
);
}
// 4. Execute everything atomically
await prisma.$transaction(commands);
}The key insight: the view update and the event append happen in the same transaction. This means your read model is always exactly consistent with your event log — not eventually consistent, not "probably consistent", but transactionally consistent.
Concurrency Control
When two processes load the same entity and try to commit changes simultaneously, you have a conflict. Ventyd provides an expectedVersion parameter to help you detect this:
Process A: loads user at version 3, dispatches event → version 4
Process B: loads user at version 3, dispatches event → version 4
Process A: commits (expectedVersion: 3) → succeeds, version is now 4
Process B: commits (expectedVersion: 3) → should fail, version is no longer 3To implement this, add a unique constraint on (entityId, version) in your event table. When Process B tries to insert an event with version: 4, the database will reject it because Process A already wrote that version.
async commitEvents({ events, expectedVersion }) {
await db.transaction(async (tx) => {
// The unique constraint on (entityId, version) will cause
// this to fail if another process already wrote these versions
for (const event of events) {
await tx.insert('events', event);
}
});
}The beauty of this approach is that the database does the conflict detection for you. No distributed locks, no compare-and-swap — just a unique constraint that turns concurrent writes into a database error you can catch and retry.
What Belongs Inside the Transaction
A useful rule of thumb: include everything that must be consistent with the event log.
| Inside the transaction | Outside the transaction |
|---|---|
| Event inserts | Sending notifications |
| View/projection updates | Triggering webhooks |
| Snapshot saves | Updating search indexes |
| Version conflict checks | Analytics tracking |
Plugins run after the transaction commits, which is the right place for side effects that can tolerate eventual consistency. The transaction boundary is reserved for things that must be immediately consistent.
Error Handling
When a transaction fails, the events are not persisted and the entity's in-memory state will be ahead of the database. This is by design — the caller should catch the error and either retry with a fresh load or propagate the failure.
try {
await repository.commit(user);
} catch (error) {
// Transaction failed — the entity's events were not saved
// Reload from the database to get the actual state
const freshUser = await repository.findOne({ entityId: user.entityId });
// Retry the operation on the fresh entity
}Don't try to "fix" a failed transaction by partially applying changes. The whole point of the transaction is that partial application doesn't happen.
