Example: Event Sourcing
An event-sourced cricket match tracker using effect-dynamodb’s EventStore module. Events are appended to a DynamoDB stream, state is reconstructed by folding over the event history, and commands are validated against current state before producing new events.
What you’ll learn:
- Defining event streams with
EventStore.makeStream - The decider pattern: commands, events, and state evolution
- Command handlers with optimistic concurrency
- Reading events (
read,readFrom,currentVersion) - Reconstructing state with
EventStore.fold - Query combinators on event streams (reverse, limit)
Step 1: Events
Section titled “Step 1: Events”Events are pure domain Schema.Class definitions. Each event captures something that happened in the domain — no DynamoDB concepts:
class MatchStarted extends Schema.Class<MatchStarted>("MatchStarted")({ venue: Schema.String, homeTeam: Schema.String, awayTeam: Schema.String,}) {}
class InningsCompleted extends Schema.Class<InningsCompleted>("InningsCompleted")({ innings: Schema.Number, runs: Schema.Number, wickets: Schema.Number,}) {}
class MatchEnded extends Schema.Class<MatchEnded>("MatchEnded")({ result: Schema.String,}) {}
type MatchEvent = MatchStarted | InningsCompleted | MatchEndedEach class name doubles as the event type discriminator stored in DynamoDB.
Step 2: Schema and Table
Section titled “Step 2: Schema and Table”Define the application schema and a table for the event store. Events live in their own physical table — no entity registration is needed because the stream itself manages the items:
const AppSchema = DynamoSchema.make({ name: "cricket", version: 1 })const EventsTable = Table.make({ schema: AppSchema })Step 3: Event Stream
Section titled “Step 3: Event Stream”An event stream binds events to a table, names the stream, and declares which attributes compose the stream ID (analogous to an aggregate ID):
const MatchEvents = EventStore.makeStream({ table: EventsTable, streamName: "Match", events: [MatchStarted, InningsCompleted, MatchEnded], streamId: { composite: ["matchId"] },})Under the hood, each event is stored as a DynamoDB item with:
- PK composed from the stream ID (
matchId) - SK containing a zero-padded version number for ordered retrieval
- eventType discriminator for decoding back to the correct Schema.Class
Step 4: Decider
Section titled “Step 4: Decider”The decider is the core pattern for event sourcing. It defines three things:
- initialState — the starting state before any events
- decide — validates a command against current state, returning events or an error
- evolve — applies an event to state, producing the next state
interface MatchState { readonly status: "pending" | "in-progress" | "completed" readonly venue?: string readonly innings: ReadonlyArray<{ runs: number; wickets: number }> readonly result?: string}
type MatchCommand = | { readonly _tag: "StartMatch" readonly venue: string readonly homeTeam: string readonly awayTeam: string } | { readonly _tag: "CompleteInnings" readonly innings: number readonly runs: number readonly wickets: number } | { readonly _tag: "EndMatch"; readonly result: string }
class AlreadyStarted extends Data.TaggedError("AlreadyStarted") {}class NotStarted extends Data.TaggedError("NotStarted") {}class AlreadyEnded extends Data.TaggedError("AlreadyEnded") {}
const matchDecider: EventStore.Decider< MatchState, MatchCommand, MatchEvent, AlreadyStarted | NotStarted | AlreadyEnded> = { initialState: { status: "pending", innings: [] },
decide: (command, state) => Effect.gen(function* () { if (command._tag === "StartMatch") { if (state.status !== "pending") return yield* new AlreadyStarted() return [ new MatchStarted({ venue: command.venue, homeTeam: command.homeTeam, awayTeam: command.awayTeam, }), ] } if (command._tag === "CompleteInnings") { if (state.status !== "in-progress") return yield* new NotStarted() return [ new InningsCompleted({ innings: command.innings, runs: command.runs, wickets: command.wickets, }), ] } if (command._tag === "EndMatch") { if (state.status === "completed") return yield* new AlreadyEnded() if (state.status !== "in-progress") return yield* new NotStarted() return [new MatchEnded({ result: command.result })] } return [] }),
evolve: (state, event) => { if (event instanceof MatchStarted) { return { ...state, status: "in-progress" as const, venue: event.venue } } if (event instanceof InningsCompleted) { return { ...state, innings: [...state.innings, { runs: event.runs, wickets: event.wickets }], } } if (event instanceof MatchEnded) { return { ...state, status: "completed" as const, result: event.result } } return state },}The decide function is effectful — it can fail with domain errors. The evolve function is pure — it simply transforms state. This separation keeps business rules in decide and state transitions in evolve.
Step 5: Command Handler
Section titled “Step 5: Command Handler”EventStore.commandHandler wires the decider to an event stream. It handles the read-decide-append cycle with optimistic concurrency:
const matchEvents = yield* EventStore.bind(MatchEvents)const handleMatch = EventStore.commandHandler(matchDecider, matchEvents)Each call to handleMatch:
- Reads all events for the stream ID
- Folds them through
evolveto reconstruct current state - Runs
decidewith the command and current state - Appends the resulting events with an expected version check
Step 6: Running Commands
Section titled “Step 6: Running Commands”Start Match
Section titled “Start Match”const r1 = yield* handleMatch( { matchId: "m-1" }, { _tag: "StartMatch", venue: "MCG", homeTeam: "AUS", awayTeam: "ENG" },)// State: in-progress, Version: 1, Events: 1The return value includes the new state, the version after append, and the events that were produced.
Complete Innings
Section titled “Complete Innings”const r2 = yield* handleMatch( { matchId: "m-1" }, { _tag: "CompleteInnings", innings: 1, runs: 250, wickets: 10 },)
const r3 = yield* handleMatch( { matchId: "m-1" }, { _tag: "CompleteInnings", innings: 2, runs: 180, wickets: 10 },)// State: in-progress, Innings: 2, Version: 3End Match
Section titled “End Match”const r4 = yield* handleMatch( { matchId: "m-1" }, { _tag: "EndMatch", result: "AUS won by 70 runs" },)// State: completed, Result: AUS won by 70 runs, Version: 4Step 7: Reading Events
Section titled “Step 7: Reading Events”Read All Events
Section titled “Read All Events”const allEvents = yield* matchEvents.read({ matchId: "m-1" })// v1: MatchStarted at 2026-03-15T...// v2: InningsCompleted at 2026-03-15T...// v3: InningsCompleted at 2026-03-15T...// v4: MatchEnded at 2026-03-15T...Each event envelope includes version, eventType, timestamp, and the decoded event payload.
Read From a Specific Version
Section titled “Read From a Specific Version”Useful for catching up from a known checkpoint:
const laterEvents = yield* matchEvents.readFrom({ matchId: "m-1" }, 2)// v2: InningsCompleted// v3: InningsCompleted// v4: MatchEndedCurrent Version
Section titled “Current Version”const version = yield* matchEvents.currentVersion({ matchId: "m-1" })// Current version: 4Step 8: Fold and Query
Section titled “Step 8: Fold and Query”Reconstruct State from Events
Section titled “Reconstruct State from Events”EventStore.fold applies the decider’s evolve function over a list of events, starting from initialState:
const state = EventStore.fold(matchDecider, allEvents)// Reconstructed: status=completed, innings=2This is a pure function — no DynamoDB calls. It takes the events you already have and replays them through evolve.
Query Combinators
Section titled “Query Combinators”Event streams expose a query API with the same combinators as entity queries. For example, get the latest event using reverse and limit:
const latest = yield* matchEvents.provide( matchEvents.query.events({ matchId: "m-1" }).pipe(Query.reverse, Query.limit(1), Query.collect),)const [latestEvent] = latest// Latest: v4 MatchEndedStep 9: Domain Error Handling
Section titled “Step 9: Domain Error Handling”The decider enforces business rules. Sending an invalid command produces a tagged error:
const error = yield* handleMatch( { matchId: "m-1" }, { _tag: "StartMatch", venue: "SCG", homeTeam: "AUS", awayTeam: "IND" },).pipe(Effect.flip)// Error: AlreadyStartedBecause errors use Data.TaggedError, you can handle specific cases with Effect.catchTag:
yield* handleMatch(streamId, command) .pipe( Effect.catchTag("AlreadyStarted", () => ...), Effect.catchTag("NotStarted", () => ...), Effect.catchTag("AlreadyEnded", () => ...), )Running the Example
Section titled “Running the Example”The complete runnable example is at examples/event-sourcing.ts in the repository.
docker run -d -p 8000:8000 amazon/dynamodb-localnpx tsx examples/event-sourcing.tsLayer Setup
Section titled “Layer Setup”const AppLayer = Layer.mergeAll( DynamoClient.layer({ region: "us-east-1", endpoint: "http://localhost:8000", credentials: { accessKeyId: "local", secretAccessKey: "local" }, }), EventsTable.layer({ name: "event-sourcing-example" }),)
const main = program.pipe(Effect.provide(AppLayer))
Effect.runPromise(main).then( () => console.log("\nDone."), (err) => console.error("Failed:", err),)Key Takeaways
Section titled “Key Takeaways”| Concept | How it’s used |
|---|---|
| Event stream | EventStore.makeStream binds event schemas to a table with stream ID composites |
| Decider pattern | initialState + decide (command validation) + evolve (state transition) |
| Command handler | EventStore.commandHandler wires decider to stream with optimistic concurrency |
| Read operations | read (all events), readFrom (from version), currentVersion (latest version number) |
| Fold | EventStore.fold reconstructs state from events using the decider’s evolve function |
| Query combinators | Query.reverse, Query.limit, Query.collect work on event stream queries |
| Domain errors | Tagged errors (AlreadyStarted, NotStarted, AlreadyEnded) for precise error handling |
What’s Next?
Section titled “What’s Next?”- Modeling Guide — Deep dive into models, schemas, tables, and entities
- Queries Guide — Query combinators, pagination, and filtering
- Example: Unique Constraints — Globally unique fields and idempotency keys
- Example: Human Resources — Single-table design with collections and transactions