Skip to content

Example: Event Sourcing

An event-sourced cricket match tracker using effect-dynamodb’s EventStore module. Events are appended to a DynamoDB stream, state is reconstructed by folding over the event history, and commands are validated against current state before producing new events.

What you’ll learn:

  • Defining event streams with EventStore.makeStream
  • The decider pattern: commands, events, and state evolution
  • Command handlers with optimistic concurrency
  • Reading events (read, readFrom, currentVersion)
  • Reconstructing state with EventStore.fold
  • Query combinators on event streams (reverse, limit)

Events are pure domain Schema.Class definitions. Each event captures something that happened in the domain — no DynamoDB concepts:

events.ts
class MatchStarted extends Schema.Class<MatchStarted>("MatchStarted")({
venue: Schema.String,
homeTeam: Schema.String,
awayTeam: Schema.String,
}) {}
class InningsCompleted extends Schema.Class<InningsCompleted>("InningsCompleted")({
innings: Schema.Number,
runs: Schema.Number,
wickets: Schema.Number,
}) {}
class MatchEnded extends Schema.Class<MatchEnded>("MatchEnded")({
result: Schema.String,
}) {}
type MatchEvent = MatchStarted | InningsCompleted | MatchEnded

Each class name doubles as the event type discriminator stored in DynamoDB.


Define the application schema and a table for the event store. Events live in their own physical table — no entity registration is needed because the stream itself manages the items:

infrastructure.ts
const AppSchema = DynamoSchema.make({ name: "cricket", version: 1 })
const EventsTable = Table.make({ schema: AppSchema })

An event stream binds events to a table, names the stream, and declares which attributes compose the stream ID (analogous to an aggregate ID):

stream.ts
const MatchEvents = EventStore.makeStream({
table: EventsTable,
streamName: "Match",
events: [MatchStarted, InningsCompleted, MatchEnded],
streamId: { composite: ["matchId"] },
})

Under the hood, each event is stored as a DynamoDB item with:

  • PK composed from the stream ID (matchId)
  • SK containing a zero-padded version number for ordered retrieval
  • eventType discriminator for decoding back to the correct Schema.Class

The decider is the core pattern for event sourcing. It defines three things:

  1. initialState — the starting state before any events
  2. decide — validates a command against current state, returning events or an error
  3. evolve — applies an event to state, producing the next state
decider.ts
interface MatchState {
readonly status: "pending" | "in-progress" | "completed"
readonly venue?: string
readonly innings: ReadonlyArray<{ runs: number; wickets: number }>
readonly result?: string
}
type MatchCommand =
| {
readonly _tag: "StartMatch"
readonly venue: string
readonly homeTeam: string
readonly awayTeam: string
}
| {
readonly _tag: "CompleteInnings"
readonly innings: number
readonly runs: number
readonly wickets: number
}
| { readonly _tag: "EndMatch"; readonly result: string }
class AlreadyStarted extends Data.TaggedError("AlreadyStarted") {}
class NotStarted extends Data.TaggedError("NotStarted") {}
class AlreadyEnded extends Data.TaggedError("AlreadyEnded") {}
const matchDecider: EventStore.Decider<
MatchState,
MatchCommand,
MatchEvent,
AlreadyStarted | NotStarted | AlreadyEnded
> = {
initialState: { status: "pending", innings: [] },
decide: (command, state) =>
Effect.gen(function* () {
if (command._tag === "StartMatch") {
if (state.status !== "pending") return yield* new AlreadyStarted()
return [
new MatchStarted({
venue: command.venue,
homeTeam: command.homeTeam,
awayTeam: command.awayTeam,
}),
]
}
if (command._tag === "CompleteInnings") {
if (state.status !== "in-progress") return yield* new NotStarted()
return [
new InningsCompleted({
innings: command.innings,
runs: command.runs,
wickets: command.wickets,
}),
]
}
if (command._tag === "EndMatch") {
if (state.status === "completed") return yield* new AlreadyEnded()
if (state.status !== "in-progress") return yield* new NotStarted()
return [new MatchEnded({ result: command.result })]
}
return []
}),
evolve: (state, event) => {
if (event instanceof MatchStarted) {
return { ...state, status: "in-progress" as const, venue: event.venue }
}
if (event instanceof InningsCompleted) {
return {
...state,
innings: [...state.innings, { runs: event.runs, wickets: event.wickets }],
}
}
if (event instanceof MatchEnded) {
return { ...state, status: "completed" as const, result: event.result }
}
return state
},
}

The decide function is effectful — it can fail with domain errors. The evolve function is pure — it simply transforms state. This separation keeps business rules in decide and state transitions in evolve.


EventStore.commandHandler wires the decider to an event stream. It handles the read-decide-append cycle with optimistic concurrency:

handler.ts
const matchEvents = yield* EventStore.bind(MatchEvents)
const handleMatch = EventStore.commandHandler(matchDecider, matchEvents)

Each call to handleMatch:

  1. Reads all events for the stream ID
  2. Folds them through evolve to reconstruct current state
  3. Runs decide with the command and current state
  4. Appends the resulting events with an expected version check

const r1 = yield* handleMatch(
{ matchId: "m-1" },
{ _tag: "StartMatch", venue: "MCG", homeTeam: "AUS", awayTeam: "ENG" },
)
// State: in-progress, Version: 1, Events: 1

The return value includes the new state, the version after append, and the events that were produced.

const r2 = yield* handleMatch(
{ matchId: "m-1" },
{ _tag: "CompleteInnings", innings: 1, runs: 250, wickets: 10 },
)
const r3 = yield* handleMatch(
{ matchId: "m-1" },
{ _tag: "CompleteInnings", innings: 2, runs: 180, wickets: 10 },
)
// State: in-progress, Innings: 2, Version: 3
const r4 = yield* handleMatch(
{ matchId: "m-1" },
{ _tag: "EndMatch", result: "AUS won by 70 runs" },
)
// State: completed, Result: AUS won by 70 runs, Version: 4

const allEvents = yield* matchEvents.read({ matchId: "m-1" })
// v1: MatchStarted at 2026-03-15T...
// v2: InningsCompleted at 2026-03-15T...
// v3: InningsCompleted at 2026-03-15T...
// v4: MatchEnded at 2026-03-15T...

Each event envelope includes version, eventType, timestamp, and the decoded event payload.

Useful for catching up from a known checkpoint:

const laterEvents = yield* matchEvents.readFrom({ matchId: "m-1" }, 2)
// v2: InningsCompleted
// v3: InningsCompleted
// v4: MatchEnded
const version = yield* matchEvents.currentVersion({ matchId: "m-1" })
// Current version: 4

EventStore.fold applies the decider’s evolve function over a list of events, starting from initialState:

const state = EventStore.fold(matchDecider, allEvents)
// Reconstructed: status=completed, innings=2

This is a pure function — no DynamoDB calls. It takes the events you already have and replays them through evolve.

Event streams expose a query API with the same combinators as entity queries. For example, get the latest event using reverse and limit:

const latest = yield* matchEvents.provide(
matchEvents.query.events({ matchId: "m-1" }).pipe(Query.reverse, Query.limit(1), Query.collect),
)
const [latestEvent] = latest
// Latest: v4 MatchEnded

The decider enforces business rules. Sending an invalid command produces a tagged error:

const error = yield* handleMatch(
{ matchId: "m-1" },
{ _tag: "StartMatch", venue: "SCG", homeTeam: "AUS", awayTeam: "IND" },
).pipe(Effect.flip)
// Error: AlreadyStarted

Because errors use Data.TaggedError, you can handle specific cases with Effect.catchTag:

yield* handleMatch(streamId, command)
.pipe(
Effect.catchTag("AlreadyStarted", () => ...),
Effect.catchTag("NotStarted", () => ...),
Effect.catchTag("AlreadyEnded", () => ...),
)

The complete runnable example is at examples/event-sourcing.ts in the repository.

Terminal window
docker run -d -p 8000:8000 amazon/dynamodb-local
Terminal window
npx tsx examples/event-sourcing.ts
main.ts
const AppLayer = Layer.mergeAll(
DynamoClient.layer({
region: "us-east-1",
endpoint: "http://localhost:8000",
credentials: { accessKeyId: "local", secretAccessKey: "local" },
}),
EventsTable.layer({ name: "event-sourcing-example" }),
)
const main = program.pipe(Effect.provide(AppLayer))
Effect.runPromise(main).then(
() => console.log("\nDone."),
(err) => console.error("Failed:", err),
)

ConceptHow it’s used
Event streamEventStore.makeStream binds event schemas to a table with stream ID composites
Decider patterninitialState + decide (command validation) + evolve (state transition)
Command handlerEventStore.commandHandler wires decider to stream with optimistic concurrency
Read operationsread (all events), readFrom (from version), currentVersion (latest version number)
FoldEventStore.fold reconstructs state from events using the decider’s evolve function
Query combinatorsQuery.reverse, Query.limit, Query.collect work on event stream queries
Domain errorsTagged errors (AlreadyStarted, NotStarted, AlreadyEnded) for precise error handling