
DynamoDB 思维
DynamoDB 要求预先围绕访问模式对数据进行建模。与 SQL 不同,你不能随意查询任何内容。

核心概念
- 分区键 (PK):决定项目存储在哪个分区
- 排序键 (SK):允许在分区内进行范围查询
- GSI:全局二级索引——不同的 PK/SK 组合
- LSI:本地二级索引——相同的 PK,不同的 SK
单表设计
// 所有实体在同一个表中
// PK 和 SK 编码实体类型和关系
// 用户实体
{ PK: 'USER#alice', SK: 'PROFILE', name: 'Alice', email: 'alice@example.com' }
// 用户的订单(查询 PK = 'USER#alice',SK begins_with 'ORDER#')
{ PK: 'USER#alice', SK: 'ORDER#2026-05-15#abc123', status: 'shipped', total: 89.99 }
// 按 ID 查询订单(查询 PK = 'ORDER#abc123')
{ PK: 'ORDER#abc123', SK: 'DETAILS', userId: 'alice', total: 89.99 }
// 产品
{ PK: 'PRODUCT#prod456', SK: 'DETAILS', name: 'Headphones', price: 79.99 }
// 用于邮箱查找的 GSI1
{ PK: 'USER#alice', SK: 'PROFILE', GSI1PK: 'EMAIL#alice@example.com', GSI1SK: 'USER#alice' }

表设计
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb')
const { DynamoDBDocumentClient, PutCommand, QueryCommand, GetCommand } = require('@aws-sdk/lib-dynamodb')
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}))
// 按 ID 获取用户
async function getUser(userId) {
const { Item } = await ddb.send(new GetCommand({
TableName: 'MainTable',
Key: { PK: `USER#${userId}`, SK: 'PROFILE' },
}))
return Item
}
// 获取用户的订单(按日期降序排列)
async function getUserOrders(userId, lastKey) {
const { Items, LastEvaluatedKey } = await ddb.send(new QueryCommand({
TableName: 'MainTable',
KeyConditionExpression: 'PK = :pk AND begins_with(SK, :prefix)',
ExpressionAttributeValues: {
':pk': `USER#${userId}`,
':prefix': 'ORDER#',
},
ScanIndexForward: false, // 降序
Limit: 20,
ExclusiveStartKey: lastKey,
}))
return { orders: Items, nextKey: LastEvaluatedKey }
}
// 通过邮箱获取用户(通过 GSI)
async function getUserByEmail(email) {
const { Items } = await ddb.send(new QueryCommand({
TableName: 'MainTable',
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :email',
ExpressionAttributeValues: { ':email': `EMAIL#${email}` },
}))
return Items?.[0]
}
写入操作
// 创建用户(带唯一性检查)
async function createUser(userId, email, name) {
// 条件写入——如果 PK/SK 已存在则失败
await ddb.send(new PutCommand({
TableName: 'MainTable',
Item: {
PK: `USER#${userId}`,
SK: 'PROFILE',
GSI1PK: `EMAIL#${email}`,
GSI1SK: `USER#${userId}`,
name,
email,
createdAt: new Date().toISOString(),
entityType: 'USER',
},
ConditionExpression: 'attribute_not_exists(PK)',
}))
}
// 事务写入(原子性多项目操作)
const { TransactWriteCommand } = require('@aws-sdk/lib-dynamodb')
await ddb.send(new TransactWriteCommand({
TransactItems: [
{
Put: {
TableName: 'MainTable',
Item: { PK: `ORDER#${orderId}`, SK: 'DETAILS', ...orderData },
ConditionExpression: 'attribute_not_exists(PK)',
},
},
{
Update: {
TableName: 'MainTable',
Key: { PK: `USER#${userId}`, SK: 'PROFILE' },
UpdateExpression: 'SET orderCount = orderCount + :one',
ExpressionAttributeValues: { ':one': 1 },
},
},
],
}))

DynamoDB Streams + Lambda
// 实时处理变更
export const handler = async (event) => {
for (const record of event.Records) {
if (record.eventName === 'INSERT') {
const newItem = unmarshall(record.dynamodb.NewImage)
if (newItem.entityType === 'ORDER') {
await processNewOrder(newItem)
}
}
}
}
容量规划
// 按需模式:按请求付费(适合流量波动)
// 预置模式:为可预测流量预先分配
// 监控消耗的容量
const { ConsumedCapacity } = await ddb.send(new QueryCommand({
TableName: 'MainTable',
ReturnConsumedCapacity: 'TOTAL',
// ...
}))
console.log('RCUs consumed:', ConsumedCapacity.CapacityUnits)