正在加载,请稍候…

LangChain 生产模式:智能体、记忆与 RAG 最佳实践

构建生产级 LangChain 应用:结构化智能体与工具调用、对话记忆管理、带重排序的 RAG、流式响应、错误处理及 LangSmith 可观测性。

LangChain 生产模式:智能体、记忆与 RAG 最佳实践

LangChain 生产模式:超越快速入门

LangChain 的快速入门教程展示了令人印象深刻的演示。但在生产环境中构建可靠的应用需要深入理解其模式和陷阱。

LLM 配置与回退

from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI

primary = ChatAnthropic(
    model='claude-3-5-sonnet-20241022',
    temperature=0,
    max_tokens=2048,
    timeout=30,
    max_retries=2,
)
fallback = ChatOpenAI(model='gpt-4o', temperature=0)

# 主模型抛出异常时自动故障转移
llm_with_fallback = primary.with_fallbacks([fallback])

# 使用 Pydantic 进行结构化输出
from pydantic import BaseModel

class ExtractedData(BaseModel):
    company_name: str
    industry: str
    founded_year: int | None

structured_llm = primary.with_structured_output(ExtractedData)

LangChain 生产模式:智能体、记忆与 RAG 最佳实践 插图

生产级 RAG 链与重排序

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

def build_rag_chain(collection_name: str):
    vectorstore = QdrantVectorStore.from_existing_collection(
        embedding=OpenAIEmbeddings(model='text-embedding-3-small'),
        collection_name=collection_name
    )
    # 检索 20 个候选,重排序至前 5
    base_retriever = vectorstore.as_retriever(
        search_type='mmr',
        search_kwargs={'k': 20, 'fetch_k': 50}
    )
    cross_encoder = HuggingFaceCrossEncoder(
        model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'
    )
    retriever = ContextualCompressionRetriever(
        base_compressor=CrossEncoderReranker(model=cross_encoder, top_n=5),
        base_retriever=base_retriever
    )
    prompt = ChatPromptTemplate.from_messages([
        ('system', '仅根据上下文回答。如果信息不足,请说明。\n\n上下文:\n{context}'),
        MessagesPlaceholder(variable_name='chat_history'),
        ('human', '{question}')
    ])
    def format_docs(docs):
        return '\n\n---\n\n'.join([
            f'来源:{doc.metadata.get("source", "unknown")}\n{doc.page_content}'
            for doc in docs
        ])
    return (
        {'context': retriever | format_docs, 'question': RunnablePassthrough(),
         'chat_history': lambda x: x.get('chat_history', [])}
        | prompt | llm_with_fallback | StrOutputParser()
    )

LangChain 生产模式:智能体、记忆与 RAG 最佳实践 插图

对话记忆管理

记忆是最大的生产陷阱——上下文窗口会填满:

from langchain.memory import ConversationSummaryBufferMemory
from langchain_core.messages import HumanMessage, AIMessage
import json

class ProductionMemoryStore:
    def __init__(self, redis_client, llm, max_token_limit=2000):
        self.redis = redis_client
        self.llm = llm
        self.max_token_limit = max_token_limit

    def get_memory(self, session_id: str):
        memory = ConversationSummaryBufferMemory(
            llm=self.llm, max_token_limit=self.max_token_limit,
            return_messages=True, memory_key='chat_history'
        )
        stored = self.redis.get(f'memory:{session_id}')
        if stored:
            data = json.loads(stored)
            memory.moving_summary_buffer = data.get('summary', '')
            for msg in data.get('messages', []):
                cls = HumanMessage if msg['type'] == 'human' else AIMessage
                memory.chat_memory.add_message(cls(content=msg['content']))
        return memory

    def save_memory(self, session_id: str, memory):
        msgs = [{'type': 'human' if isinstance(m, HumanMessage) else 'ai',
                 'content': m.content} for m in memory.chat_memory.messages]
        self.redis.setex(f'memory:{session_id}', 86400 * 7,
            json.dumps({'summary': memory.moving_summary_buffer, 'messages': msgs}))

LangChain 生产模式:智能体、记忆与 RAG 最佳实践 插图

结构化智能体与工具调用

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.tools import tool

@tool
def get_order_status(order_id: str) -> str:
    '''通过订单 ID 获取客户订单的当前状态。'''
    order = fetch_order(order_id)
    if not order: return f'未找到 ID 为 {order_id} 的订单'
    return json.dumps({'status': order.status, 'tracking': order.tracking})

@tool
def create_support_ticket(customer_id: str, subject: str, priority: str = 'medium') -> str:
    '''创建支持工单。优先级:low, medium, high, urgent。'''
    if priority not in ['low', 'medium', 'high', 'urgent']:
        return '错误:无效优先级'
    ticket_id = create_ticket_in_system(customer_id, subject, priority)
    return f'已创建工单 #{ticket_id}'

agent_executor = AgentExecutor(
    agent=create_tool_calling_agent(llm=primary, tools=[get_order_status, create_support_ticket], prompt=prompt),
    tools=[get_order_status, create_support_ticket],
    max_iterations=5,
    handle_parsing_errors=True,
    return_intermediate_steps=True
)

LangSmith 可观测性

import os
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_PROJECT'] = 'production-rag'

from langchain_core.runnables.config import RunnableConfig
config = RunnableConfig(
    tags=['production', 'v2.3'],
    metadata={'session_id': session_id, 'user_tier': user.tier}
)
result = await chain.ainvoke({'question': query}, config=config)

错误处理

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def robust_invoke(chain, inputs: dict):
    try:
        return await chain.ainvoke(inputs)
    except Exception as e:
        logger.error(f'链错误:{e}')
        raise

生产级 LangChain 应用需要关注上下文窗口管理、检索质量(重排序至关重要)、智能体循环终止条件以及从第一天开始的可观测性。