Skip to content

DynamoDB Streams

DynamoDB Streams deliver a time-ordered sequence of item-level changes. effect-dynamodb provides utilities for decoding stream records into typed domain objects.

DynamoDB Stream records contain items in DynamoDB’s native AttributeValue format:

{
"pk": { "S": "$myapp#v1#employee#emp-alice" },
"sk": { "S": "$myapp#v1#employee" },
"__edd_e__": { "S": "Employee" },
"email": { "S": "alice@acme.com" },
"version": { "N": "1" }
}

Manually decoding this into your domain types is error-prone. effect-dynamodb provides Entity.decodeMarshalledItem for AttributeValue input and Entity.itemSchema for already-unmarshalled input.

The examples on this page use these models and entities:

class Employee extends Schema.Class<Employee>("Employee")({
employeeId: Schema.String,
email: Schema.String,
displayName: Schema.NonEmptyString,
department: Schema.String,
}) {}
const EmployeeModel = DynamoModel.configure(Employee, {
email: { immutable: true },
})
class Task extends Schema.Class<Task>("Task")({
taskId: Schema.String,
employeeId: Schema.String,
title: Schema.NonEmptyString,
status: Schema.String,
}) {}
const AppSchema = DynamoSchema.make({ name: "myapp", version: 1 })
const EmployeeEntity = Entity.make({
model: EmployeeModel,
entityType: "Employee",
primaryKey: {
pk: { field: "pk", composite: ["employeeId"] },
sk: { field: "sk", composite: [] },
},
indexes: {
byDepartment: {
name: "gsi1",
pk: { field: "gsi1pk", composite: ["department"] },
sk: { field: "gsi1sk", composite: ["displayName"] },
},
},
timestamps: true,
versioned: true,
})
const TaskEntity = Entity.make({
model: Task,
entityType: "Task",
primaryKey: {
pk: { field: "pk", composite: ["taskId"] },
sk: { field: "sk", composite: [] },
},
indexes: {
byEmployee: {
name: "gsi1",
pk: { field: "gsi1pk", composite: ["employeeId"] },
sk: { field: "gsi1sk", composite: ["status"] },
},
},
timestamps: true,
})
const _MainTable = Table.make({
schema: AppSchema,
entities: { EmployeeEntity, TaskEntity },
})

For Lambda functions consuming DynamoDB Stream events directly, use Entity.decodeMarshalledItem. It unmarshalls the AttributeValue map and decodes via the entity’s item schema in a single step, returning an Effect that fails with ValidationError on schema mismatch:

const handleEmployeeStream = (event: DynamoDBStreamEvent) =>
Effect.gen(function* () {
for (const record of event.Records) {
if (record.dynamodb?.NewImage) {
const employee = yield* Entity.decodeMarshalledItem(
EmployeeEntity,
record.dynamodb.NewImage,
)
yield* Console.log(`Updated: ${(employee as Employee).displayName}`)
}
}
})

If you’ve already unmarshalled the item (e.g., using Marshaller.fromAttributeMap or @aws-sdk/util-dynamodb), use Entity.itemSchema to get a Schema codec for direct decoding:

const employeeFromItem = Entity.itemSchema(EmployeeEntity)
const decodeUnmarshalledEmployee = (marshalledImage: Record<string, AttributeValue>) => {
const unmarshalled = Marshaller.fromAttributeMap(marshalledImage)
return Schema.decodeUnknownSync(employeeFromItem)(unmarshalled)
}

The Entity.Marshalled<E> type represents the shape of a DynamoDB item in AttributeValue format:

type EmployeeMarshalled = Entity.Marshalled<typeof EmployeeEntity>
// {
// readonly [x: string]: {
// readonly S?: string
// readonly N?: string
// readonly BOOL?: boolean
// readonly NULL?: boolean
// readonly L?: ReadonlyArray<unknown>
// readonly M?: Record<string, unknown>
// }
// }

When processing streams from a single-table design, use __edd_e__ to discriminate entity types and decode each with Entity.decodeMarshalledItem:

const handleStreamEvent = (event: DynamoDBStreamEvent) =>
Effect.gen(function* () {
for (const record of event.Records) {
const image = record.dynamodb?.NewImage
if (!image) continue
const entityType = image["__edd_e__"]?.S
switch (entityType) {
case "Employee": {
const emp = yield* Entity.decodeMarshalledItem(EmployeeEntity, image)
yield* handleEmployeeChange(emp as Entity.Record<typeof EmployeeEntity>)
break
}
case "Task": {
const task = yield* Entity.decodeMarshalledItem(TaskEntity, image)
yield* handleTaskChange(task as Entity.Record<typeof TaskEntity>)
break
}
}
}
})
const handleEmployeeChange = (emp: Entity.Record<typeof EmployeeEntity>) =>
Console.log(`Employee changed: ${emp.displayName} (${emp.email})`)
const handleTaskChange = (task: Entity.Record<typeof TaskEntity>) =>
Console.log(`Task changed: ${task.title} (${task.status})`)
  • Testing — Testing patterns with mocked DynamoClient
  • Modeling — Model, schema, table, entity definitions