
分布式事务是微服务中最棘手的问题。Saga模式用一个由事件连接的本地事务链替代了单一的ACID事务。
编排式Saga
from kafka import KafkaProducer, KafkaConsumer
import json, uuid
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode()
)
def create_order(customer_id: int, items: list, total: float) -> dict:
order_id = str(uuid.uuid4())
db.save_order({'order_id': order_id, 'status': 'pending'})
producer.send('order-events', {
'type': 'OrderCreated', 'order_id': order_id,
'customer_id': customer_id, 'items': items, 'total': total
})
return {'order_id': order_id}
def process_events():
consumer = KafkaConsumer(
'inventory-events', 'payment-events',
bootstrap_servers=['kafka:9092'],
group_id='order-service',
value_deserializer=lambda x: json.loads(x.decode())
)
for msg in consumer:
event = msg.value
if event['type'] == 'PaymentCharged':
db.update_order(event['order_id'], status='confirmed')
elif event['type'] == 'PaymentFailed':
db.update_order(event['order_id'], status='failed')
# 补偿:释放库存
producer.send('inventory-events', {
'type': 'ReleaseReservation', 'order_id': event['order_id']
})

幂等性
def handle_event(event: dict) -> None:
event_id = event['event_id']
if redis.sismember('processed_events', event_id):
return # 已处理
process_order_event(event)
redis.sadd('processed_events', event_id)
redis.expire('processed_events', 86400)

发件箱模式(保证投递)
def create_order_safely(order_data: dict) -> None:
with db.transaction():
order = db.save_order(order_data)
# 同一事务:要么都成功,要么都失败
db.save_outbox_event({
'event_type': 'OrderCreated',
'payload': json.dumps(order_data),
'published': False
})
# 单独的relay读取发件箱并发布到Kafka

事件溯源
class OrderEventStore:
def append(self, order_id: str, event: dict) -> None:
db.insert('order_events', {
'order_id': order_id, 'event_type': event['type'],
'event_data': json.dumps(event), 'created_at': datetime.utcnow()
})
def replay(self, order_id: str) -> dict:
events = db.query(
'SELECT * FROM order_events WHERE order_id = ? ORDER BY id',
order_id
)
state = {}
for e in events:
ev = json.loads(e.event_data)
if ev['type'] == 'OrderCreated':
state = {**ev, 'status': 'pending'}
elif ev['type'] == 'OrderConfirmed':
state = {**state, 'status': 'confirmed'}
return state
Saga思维:设计工作流而非事务。每一步都必须幂等并具备补偿操作。
→ 使用 JSON Viewer 工具验证事件模式。