正在加载,请稍候…

事件驱动微服务:Kafka、Saga模式与最终一致性

使用Kafka设计弹性事件驱动微服务。涵盖Saga模式处理分布式事务、事件溯源、CQRS、发件箱模式及最终一致性。

事件驱动微服务:Kafka、Saga模式与最终一致性

分布式事务是微服务中最棘手的问题。Saga模式用一个由事件连接的本地事务链替代了单一的ACID事务。

编排式Saga

from kafka import KafkaProducer, KafkaConsumer
import json, uuid

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode()
)

def create_order(customer_id: int, items: list, total: float) -> dict:
    order_id = str(uuid.uuid4())
    db.save_order({'order_id': order_id, 'status': 'pending'})
    producer.send('order-events', {
        'type': 'OrderCreated', 'order_id': order_id,
        'customer_id': customer_id, 'items': items, 'total': total
    })
    return {'order_id': order_id}

def process_events():
    consumer = KafkaConsumer(
        'inventory-events', 'payment-events',
        bootstrap_servers=['kafka:9092'],
        group_id='order-service',
        value_deserializer=lambda x: json.loads(x.decode())
    )
    for msg in consumer:
        event = msg.value
        if event['type'] == 'PaymentCharged':
            db.update_order(event['order_id'], status='confirmed')
        elif event['type'] == 'PaymentFailed':
            db.update_order(event['order_id'], status='failed')
            # 补偿:释放库存
            producer.send('inventory-events', {
                'type': 'ReleaseReservation', 'order_id': event['order_id']
            })

事件驱动微服务:Kafka、Saga模式与最终一致性示意图

幂等性

def handle_event(event: dict) -> None:
    event_id = event['event_id']
    if redis.sismember('processed_events', event_id):
        return  # 已处理
    process_order_event(event)
    redis.sadd('processed_events', event_id)
    redis.expire('processed_events', 86400)

事件驱动微服务:Kafka、Saga模式与最终一致性示意图

发件箱模式(保证投递)

def create_order_safely(order_data: dict) -> None:
    with db.transaction():
        order = db.save_order(order_data)
        # 同一事务:要么都成功,要么都失败
        db.save_outbox_event({
            'event_type': 'OrderCreated',
            'payload': json.dumps(order_data),
            'published': False
        })
    # 单独的relay读取发件箱并发布到Kafka

事件驱动微服务:Kafka、Saga模式与最终一致性示意图

事件溯源

class OrderEventStore:
    def append(self, order_id: str, event: dict) -> None:
        db.insert('order_events', {
            'order_id': order_id, 'event_type': event['type'],
            'event_data': json.dumps(event), 'created_at': datetime.utcnow()
        })

    def replay(self, order_id: str) -> dict:
        events = db.query(
            'SELECT * FROM order_events WHERE order_id = ? ORDER BY id',
            order_id
        )
        state = {}
        for e in events:
            ev = json.loads(e.event_data)
            if ev['type'] == 'OrderCreated':
                state = {**ev, 'status': 'pending'}
            elif ev['type'] == 'OrderConfirmed':
                state = {**state, 'status': 'confirmed'}
        return state

Saga思维:设计工作流而非事务。每一步都必须幂等并具备补偿操作。

→ 使用 JSON Viewer 工具验证事件模式。