Atomicity and Concurrency

Event sourcing makes strong consistency simple -- events append to a stream in order. But when multiple clients write concurrently, you need a strategy.

The problem

Two users update the same order simultaneously:

User A: GET order/123 (sees length=5)
User B: GET order/123 (sees length=5)

User A: POST was_shipped (expects length=5, creates event 6)
User B: POST was_delivered (expects length=5, creates event 6)

Without protection, both succeed. The order has two events at position 6. Data loss.

Optimistic Concurrency Control (OCC)

j17 uses OCC to prevent this. There are three modes: internal (default), external, and disabled.

Internal OCC (default)

When you don't provide previous_length, j17 uses internal OCC:

  1. Read current stream length
  2. Validate and prepare your event
  3. Atomically write if length hasn't changed
{
  "data": { "status": "shipped" },
  "metadata": {
    "actor": { "type": "user", "id": "user-456" }
  }
}

Internal OCC catches races during request processing (another write arriving in the same millisecond window). On conflict, j17 automatically retries once with fresh length. If retry also fails (sustained contention), you get 409.

This is the right default for most workloads. You don't need to think about concurrency unless you have read-modify-write patterns.

External OCC

When you provide previous_length, j17 uses external OCC:

{
  "data": { "status": "shipped" },
  "metadata": {
    "actor": { "type": "user", "id": "user-456" },
    "previous_length": 5
  }
}

j17 checks: does this aggregate currently have 5 events?

  • Yes: Write succeeds, returns new length (6)
  • No: Write fails with 409 Conflict

External OCC catches everything internal OCC catches, plus modifications since your read. No automatic retry -- the conflict is meaningful because your client's view was stale.

Use external OCC for: - Read-modify-write patterns (read aggregate, decide, write) - Long-running user sessions where data might change - Critical operations where you need guaranteed consistency

Disabling OCC (skip_occ)

For append-only patterns where conflicts don't matter:

{
  "data": { "entry": "user logged in" },
  "metadata": {
    "actor": { "type": "system", "id": "auth-service" },
    "skip_occ": true
  }
}

Requires spec opt-in per event type:

{
  "aggregate_types": {
    "audit": {
      "events": {
        "entry_was_added": {
          "allow_skip_occ": true,
          "schema": { "..." : "..." }
        }
      }
    }
  }
}

Safe for append-only logs, analytics, activity feeds. Dangerous for state machines, balances, or anything with check-then-act logic.

OCC summary

Mode previous_length Catches Auto-retry Use case
Internal Not provided Request-time races Yes (once) Default safety net
External Provided All modifications since read No Read-modify-write
Disabled skip_occ: true Nothing N/A Append-only logs

Handling conflicts

When you get a 409:

{
  "ok": false,
  "error": {
    "code": "conflict",
    "message": "Optimistic concurrency check failed",
    "details": {
      "expected": 5,
      "actual": 6
    }
  }
}

Options:

  1. Refetch and retry: Get current state, reapply your change, try again
  2. Reject: Tell the user someone else modified the data
  3. Merge: Combine your change with the new state

Refetch and retry

async function writeWithRetry(aggregateType, id, eventType, data, maxRetries = 3) {
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    // Get current state
    const current = await fetchAggregate(aggregateType, id);

    try {
      return await writeEvent(aggregateType, id, eventType, {
        data,
        metadata: {
          actor: { type: 'user', id: 'user-456' },
          previous_length: current.length
        }
      });
    } catch (err) {
      if (err.code === 'conflict' && attempt < maxRetries - 1) {
        continue; // Retry
      }
      throw err;
    }
  }
}

Reject

Show the user a message: "Someone else updated this. Please refresh and try again."

Safer than silent retries that might overwrite important changes.

Merge (advanced)

For non-conflicting changes, merge automatically:

// User A changes shipping_address
// User B changes status
// These don't conflict -- apply both

const current = await fetchAggregate('order', id);
const newData = mergeChanges(current.state, userInput);

await writeEvent('order', id, 'was_updated', {
  data: newData,
  metadata: { previous_length: current.length }
});

Only safe for orthogonal changes. Don't merge conflicting status updates.

Batch writes (same aggregate)

j17 provides atomic batch writes for multiple events targeting the same aggregate. All events succeed or none are written:

POST /order/550e8400-e29b-41d4-a716-446655440000
Content-Type: application/json
Authorization: Bearer $J17_API_KEY

{
  "events": [
    {"type": "was_created", "data": {"customer_id": "abc"}},
    {"type": "had_item_added", "data": {"sku": "WIDGET-1", "qty": 2}},
    {"type": "had_item_added", "data": {"sku": "GADGET-3", "qty": 1}}
  ],
  "metadata": {
    "actor": {"type": "admin", "id": "system"},
    "previous_length": 0
  }
}

Response:

{
  "ok": true,
  "stream_ids": ["1706789012345-0", "1706789012345-1", "1706789012345-2"],
  "count": 3,
  "implied_count": 5
}

The previous_length applies to the batch as a whole. Each event in the batch can trigger implications; all implications see the pre-batch state.

Good use cases for batch writes: - Initial entity creation with multiple setup events - Complex state transitions that are logically atomic - Bulk imports or data migrations

Cross-aggregate consistency

OCC protects single aggregates. What about operations across multiple?

Example: Place an order AND reserve inventory.

order:123  -> was_placed
inventory:456 -> was_reserved

If the first succeeds but the second fails, you have an order without reserved inventory. With N independent writes, you have 2N possible success/failure combinations.

The solution: implications

Instead of submitting multiple events from the client, submit ONE trigger event. j17 automatically emits derived events atomically:

{
  "aggregate_types": {
    "order": {
      "events": {
        "was_placed": {
          "schema": { "..." : "..." },
          "handler": [
            { "set": { "target": "", "value": "$.data" } }
          ],
          "implications": [
            {
              "emit": {
                "aggregate_type": "inventory",
                "id": "$.data.product_id",
                "event_type": "was_reserved",
                "data": {
                  "quantity": "$.data.qty",
                  "order_id": "$.key"
                }
              }
            }
          ]
        }
      }
    }
  }
}

Now your client code is simple:

const result = await writeEvent('order', orderId, 'was_placed', {
  data: orderData,
  metadata: { actor: { type: 'user', id: userId } }
});
// Either everything succeeded, or nothing was written
// result.implied_count tells you how many derived events were created

The trigger event contains all the context needed for derived events. Placing an order is the business fact; inventory reservation follows from that fact.

When implications don't fit

Implications work when derived events follow deterministically from the trigger event data. They don't fit when:

  • You need external data at decision time (inventory check from external system)
  • Steps take time or can fail externally (payment processing, shipping)
  • Different authorization contexts are required

For these cases, use the saga pattern: trigger event starts the workflow, external processes respond asynchronously, confirmation/failure events drive next steps, and compensation events handle rollback.

Read-modify-write cycles

Common pattern:

  1. GET aggregate
  2. Modify state in memory
  3. POST event with previous_length

This is safe with external OCC. If someone else writes between 1 and 3, your POST fails and you retry.

Alternative: Calculated events

Instead of sending the new state, send the operation:

{
  "data": { "amount": 50 },
  "metadata": {
    "actor": { "type": "user", "id": "user-456" },
    "previous_length": 5
  }
}

Handler computes new balance: state.balance + event.data.amount

This is naturally mergeable. If someone else also adds $50, both succeed and balance becomes $200.

Performance trade-offs

Mode Latency Consistency Use case
Internal Baseline Request-time safety Default for most writes
External Baseline Strong Read-modify-write, critical ops
Disabled Baseline None Append-only logs

The overhead is minimal. OCC adds a single integer comparison.

Debugging conflicts

High conflict rates indicate: - Hotspots (everyone updating the same aggregate) - Long gaps between GET and POST - Missing calculated events (sending absolute values instead of deltas)

Fix hotspots by sharding (add sub-aggregates) or using calculated events.

See also

Can't find what you need? support@j17.app