
生产环境下的 GraphQL 最佳实践
GraphQL 提供了灵活性,但也带来了独特的挑战。本指南涵盖了生产就绪的模式。
Schema 设计原则
# 好:使用 connections 表示列表(支持分页)
type Query {
users(first: Int, after: String, filter: UserFilter): UserConnection!
user(id: ID!): User
}
type UserConnection {
edges: [UserEdge!]!
pageInfo: PageInfo!
totalCount: Int!
}
type UserEdge {
node: User!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
hasPreviousPage: Boolean!
startCursor: String
endCursor: String
}
# 好:使用 input types 表示变更
type Mutation {
createUser(input: CreateUserInput!): CreateUserPayload!
updateUser(id: ID!, input: UpdateUserInput!): UpdateUserPayload!
}
input CreateUserInput {
name: String!
email: String!
role: UserRole = USER
}
type CreateUserPayload {
user: User
errors: [UserError!]!
}
type UserError {
field: String
message: String!
code: UserErrorCode!
}

使用 DataLoader 解决 N+1 问题
import DataLoader from 'dataloader';
// 没有 DataLoader - N+1 问题
// 查询 100 篇文章 = 100 次独立的用户查询!
// 使用 DataLoader - 批量查询
const userLoader = new DataLoader<string, User>(async (userIds) => {
// 一次查询所有 ID
const users = await db.users.findMany({
where: { id: { in: userIds as string[] } },
});
// 按输入 ID 顺序返回
const userMap = new Map(users.map(u => [u.id, u]));
return userIds.map(id => userMap.get(id) ?? new Error(`User ${id} not found`));
});
// 在 resolver 中
const resolvers = {
Post: {
author: (post, _, { loaders }) => {
return loaders.user.load(post.authorId);
},
tags: (post, _, { loaders }) => {
return loaders.tagsByPost.load(post.id);
},
},
};
// Context 设置 - 每个请求新建 DataLoader!
function createContext({ req }) {
return {
db,
loaders: {
user: new DataLoader(batchUsers),
tagsByPost: new DataLoader(batchTagsByPost),
},
};
}
基于游标的分页
async function resolveUsersConnection(
parent: unknown,
args: { first?: number; after?: string; filter?: UserFilter },
ctx: Context,
) {
const limit = Math.min(args.first ?? 20, 100);
const cursor = args.after ? decodeCursor(args.after) : null;
const users = await ctx.db.users.findMany({
take: limit + 1, // 多取一条以检查 hasNextPage
where: {
...(cursor && { id: { gt: cursor } }),
...buildFilterWhere(args.filter),
},
orderBy: { id: 'asc' },
});
const hasNextPage = users.length > limit;
const edges = users.slice(0, limit).map(user => ({
node: user,
cursor: encodeCursor(user.id),
}));
return {
edges,
pageInfo: {
hasNextPage,
hasPreviousPage: !!cursor,
startCursor: edges[0]?.cursor,
endCursor: edges[edges.length - 1]?.cursor,
},
totalCount: () => ctx.db.users.count({ where: buildFilterWhere(args.filter) }),
};
}
function encodeCursor(id: string): string {
return Buffer.from(`cursor:${id}`).toString('base64');
}
function decodeCursor(cursor: string): string {
const decoded = Buffer.from(cursor, 'base64').toString();
return decoded.replace('cursor:', '');
}
认证与授权
// Schema 指令用于授权
const typeDefs = gql`
directive @auth(requires: Role = USER) on FIELD_DEFINITION
enum Role { USER ADMIN }
type Query {
me: User @auth
adminStats: Stats @auth(requires: ADMIN)
}
`;
// 使用 graphql-shield 进行字段级授权
import { shield, rule, and, not } from 'graphql-shield';
const isAuthenticated = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return !!ctx.user;
}
);
const isAdmin = rule({ cache: 'contextual' })(
async (parent, args, ctx) => {
return ctx.user?.role === 'ADMIN';
}
);
const isOwner = rule({ cache: 'strict' })(
async (parent, args, ctx) => {
const post = await ctx.loaders.post.load(args.id);
return post.authorId === ctx.user?.id;
}
);
export const permissions = shield({
Query: {
me: isAuthenticated,
adminStats: isAdmin,
},
Mutation: {
createPost: isAuthenticated,
updatePost: and(isAuthenticated, isOwner),
deletePost: and(isAuthenticated, or(isOwner, isAdmin)),
},
});
查询复杂度与深度限制
import { createComplexityLimitRule } from 'graphql-validation-complexity';
import depthLimit from 'graphql-depth-limit';
const server = new ApolloServer({
typeDefs,
resolvers,
validationRules: [
depthLimit(10), // 最大查询深度
createComplexityLimitRule(1000, {
scalarCost: 1,
objectCost: 2,
listFactor: 10,
introspectionListFactor: 2,
}),
],
});
// 自定义复杂度计算器
function calculateComplexity(args: Record<string, any>): number {
const limit = args.first || args.limit || 10;
return limit * 2; // 每个条目成本为 2
}
持久化查询(安全 + 性能)
import { createPersistedQueryLink } from '@apollo/client/link/persisted-queries';
import { sha256 } from 'crypto-hash';
// 客户端:自动哈希并发送查询 ID
const link = createPersistedQueryLink({ sha256 });
const client = new ApolloClient({ link, cache: new InMemoryCache() });
// 服务器:验证并查找查询
const server = new ApolloServer({
cache: new InMemoryLRUCache(),
plugins: [
ApolloServerPluginCacheControl({ defaultMaxAge: 5 }),
{
async requestDidStart({ request }) {
if (request.extensions?.persistedQuery) {
const { sha256Hash } = request.extensions.persistedQuery;
// 在生产环境中对照白名单验证
}
},
},
],
});
总结
GraphQL 生产环境检查清单:
- 对所有列表字段使用 connections(支持分页)
- 对所有变更使用 input types
- 使用 DataLoader 进行批处理(解决 N+1)
- 使用基于游标的分页(而非偏移量)
- 设置查询深度和复杂度限制
- 使用持久化查询防止恶意查询
- 使用 graphql-shield 进行字段级授权