DynamoDB Streams
DynamoDB Streams deliver a time-ordered sequence of item-level changes. effect-dynamodb provides utilities for decoding stream records into typed domain objects.
Understanding the Problem
Section titled “Understanding the Problem”DynamoDB Stream records contain items in DynamoDB’s native AttributeValue format:
{ "pk": { "S": "$myapp#v1#employee#emp-alice" }, "sk": { "S": "$myapp#v1#employee" }, "__edd_e__": { "S": "Employee" }, "email": { "S": "alice@acme.com" }, "version": { "N": "1" }}Manually decoding this into your domain types is error-prone. effect-dynamodb provides Entity.decodeMarshalledItem for AttributeValue input and Entity.itemSchema for already-unmarshalled input.
The examples on this page use these models and entities:
class Employee extends Schema.Class<Employee>("Employee")({ employeeId: Schema.String, email: Schema.String, displayName: Schema.NonEmptyString, department: Schema.String,}) {}
const EmployeeModel = DynamoModel.configure(Employee, { email: { immutable: true },})
class Task extends Schema.Class<Task>("Task")({ taskId: Schema.String, employeeId: Schema.String, title: Schema.NonEmptyString, status: Schema.String,}) {}const AppSchema = DynamoSchema.make({ name: "myapp", version: 1 })
const EmployeeEntity = Entity.make({ model: EmployeeModel, entityType: "Employee", primaryKey: { pk: { field: "pk", composite: ["employeeId"] }, sk: { field: "sk", composite: [] }, }, indexes: { byDepartment: { name: "gsi1", pk: { field: "gsi1pk", composite: ["department"] }, sk: { field: "gsi1sk", composite: ["displayName"] }, }, }, timestamps: true, versioned: true,})
const TaskEntity = Entity.make({ model: Task, entityType: "Task", primaryKey: { pk: { field: "pk", composite: ["taskId"] }, sk: { field: "sk", composite: [] }, }, indexes: { byEmployee: { name: "gsi1", pk: { field: "gsi1pk", composite: ["employeeId"] }, sk: { field: "gsi1sk", composite: ["status"] }, }, }, timestamps: true,})
const _MainTable = Table.make({ schema: AppSchema, entities: { EmployeeEntity, TaskEntity },})Decoding from AttributeValue Format
Section titled “Decoding from AttributeValue Format”For Lambda functions consuming DynamoDB Stream events directly, use Entity.decodeMarshalledItem. It unmarshalls the AttributeValue map and decodes via the entity’s item schema in a single step, returning an Effect that fails with ValidationError on schema mismatch:
const handleEmployeeStream = (event: DynamoDBStreamEvent) => Effect.gen(function* () { for (const record of event.Records) { if (record.dynamodb?.NewImage) { const employee = yield* Entity.decodeMarshalledItem( EmployeeEntity, record.dynamodb.NewImage, ) yield* Console.log(`Updated: ${(employee as Employee).displayName}`) } } })Decoding from Unmarshalled Format
Section titled “Decoding from Unmarshalled Format”If you’ve already unmarshalled the item (e.g., using Marshaller.fromAttributeMap or @aws-sdk/util-dynamodb), use Entity.itemSchema to get a Schema codec for direct decoding:
const employeeFromItem = Entity.itemSchema(EmployeeEntity)
const decodeUnmarshalledEmployee = (marshalledImage: Record<string, AttributeValue>) => { const unmarshalled = Marshaller.fromAttributeMap(marshalledImage) return Schema.decodeUnknownSync(employeeFromItem)(unmarshalled)}Typing the Marshalled Shape
Section titled “Typing the Marshalled Shape”The Entity.Marshalled<E> type represents the shape of a DynamoDB item in AttributeValue format:
type EmployeeMarshalled = Entity.Marshalled<typeof EmployeeEntity>// {// readonly [x: string]: {// readonly S?: string// readonly N?: string// readonly BOOL?: boolean// readonly NULL?: boolean// readonly L?: ReadonlyArray<unknown>// readonly M?: Record<string, unknown>// }// }Processing Multiple Entity Types
Section titled “Processing Multiple Entity Types”When processing streams from a single-table design, use __edd_e__ to discriminate entity types and decode each with Entity.decodeMarshalledItem:
const handleStreamEvent = (event: DynamoDBStreamEvent) => Effect.gen(function* () { for (const record of event.Records) { const image = record.dynamodb?.NewImage if (!image) continue
const entityType = image["__edd_e__"]?.S switch (entityType) { case "Employee": { const emp = yield* Entity.decodeMarshalledItem(EmployeeEntity, image) yield* handleEmployeeChange(emp as Entity.Record<typeof EmployeeEntity>) break } case "Task": { const task = yield* Entity.decodeMarshalledItem(TaskEntity, image) yield* handleTaskChange(task as Entity.Record<typeof TaskEntity>) break } } } })
const handleEmployeeChange = (emp: Entity.Record<typeof EmployeeEntity>) => Console.log(`Employee changed: ${emp.displayName} (${emp.email})`)
const handleTaskChange = (task: Entity.Record<typeof TaskEntity>) => Console.log(`Task changed: ${task.title} (${task.status})`)