
基于SAML 2.0的企业级SSO
SAML 2.0架构
用户浏览器 <-> 服务提供商 (SP) <-> 身份提供商 (IdP)
(您的应用) (Okta, Azure AD, G Suite)
SP发起流程:
1. 用户访问SP(您的应用)
2. SP生成AuthnRequest,重定向到IdP
3. 用户在IdP进行身份验证
4. IdP通过浏览器向SP发送SAML响应(断言)
5. SP验证断言,创建会话

使用passport-saml的Node.js SAML
import passport from 'passport'
import { Strategy as SamlStrategy } from '@node-saml/passport-saml'
import express from 'express'
const samlConfig = {
callbackUrl: 'https://app.example.com/auth/saml/callback',
entryPoint: 'https://company.okta.com/app/example/sso/saml',
issuer: 'https://app.example.com',
// IdP证书(来自IdP元数据)
cert: process.env.IDP_CERT,
// 可选:对请求进行签名
privateKey: process.env.SP_PRIVATE_KEY,
signatureAlgorithm: 'sha256',
// 属性映射
wantAssertionsSigned: true,
wantAuthnResponseSigned: true,
}
passport.use(new SamlStrategy(samlConfig, async (profile, done) => {
try {
// 从SAML属性中提取用户信息
const user = {
id: profile.nameID,
email: profile['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress'],
name: profile['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name'],
groups: profile['http://schemas.microsoft.com/ws/2008/06/identity/claims/groups'] || [],
}
// 在数据库中更新或创建用户
const dbUser = await upsertUser(user)
return done(null, dbUser)
} catch (err) {
return done(err)
}
}))
const app = express()
// SP发起的登录
app.get('/auth/saml', passport.authenticate('saml', { failureRedirect: '/login' }))
// ACS(断言消费者服务)
app.post('/auth/saml/callback',
passport.authenticate('saml', { failureRedirect: '/login' }),
(req, res) => {
req.session.user = req.user
res.redirect('/dashboard')
}
)
// SP元数据端点(用于配置IdP)
app.get('/auth/saml/metadata', (req, res) => {
const strategy = passport._strategy('saml')
const metadata = strategy.generateServiceProviderMetadata(null, process.env.SP_CERT)
res.set('Content-Type', 'text/xml')
res.send(metadata)
})

LDAP认证
import ldap3
from ldap3 import Server, Connection, NTLM, SASL, KERBEROS
class LDAPAuthenticator:
def __init__(self, ldap_server: str, base_dn: str):
self.server = Server(ldap_server, get_info='ALL', use_ssl=True)
self.base_dn = base_dn
def authenticate(self, username: str, password: str) -> dict:
user_dn = f"uid={username},{self.base_dn}"
try:
conn = Connection(
self.server,
user=user_dn,
password=password,
authentication='SIMPLE',
auto_bind=True,
)
# 获取用户属性
conn.search(
user_dn,
'(objectclass=*)',
attributes=['cn', 'mail', 'memberOf', 'department']
)
if conn.entries:
entry = conn.entries[0]
return {
'id': username,
'name': str(entry.cn),
'email': str(entry.mail),
'groups': [g.split(',')[0].replace('CN=', '') for g in entry.memberOf],
'department': str(entry.department),
}
return None
except ldap3.core.exceptions.LDAPBindError:
return None # 无效凭据
finally:
try:
conn.unbind()
except Exception:
pass
def get_group_members(self, group_dn: str) -> list:
with Connection(self.server, auto_bind=True) as conn:
conn.search(
group_dn,
'(objectclass=group)',
attributes=['member']
)
return [m for m in conn.entries[0].member]

JIT(即时)用户预置
async function upsertUser(samlProfile) {
const { email, name, groups } = samlProfile
const existingUser = await db.users.findOne({ email })
if (existingUser) {
// 根据IdP属性更新用户
return db.users.update(existingUser.id, {
name,
saml_groups: groups,
last_sso_login: new Date(),
})
}
// 即时预置
const newUser = await db.users.create({
email,
name,
saml_groups: groups,
source: 'saml',
created_at: new Date(),
})
// 根据SAML组自动分配角色
const roles = await mapGroupsToRoles(groups)
await db.userRoles.bulkCreate(roles.map(role => ({
user_id: newUser.id,
role,
})))
return newUser
}
function mapGroupsToRoles(groups: string[]): string[] {
const groupRoleMap = {
'Engineering': ['developer'],
'DevOps': ['developer', 'deployer'],
'Management': ['manager'],
'IT-Admins': ['admin'],
}
return [...new Set(
groups.flatMap(g => groupRoleMap[g] || [])
)]
}
安全考虑
| 风险 |
缓解措施 |
| XML注入 |
使用经过验证的SAML库 |
| 签名包装 |
验证断言位置 |
| 开放重定向 |
验证RelayState/重定向URL |
| 会话固定 |
SSO后创建新会话 |
| 令牌重放 |
检查NotOnOrAfter,使用断言缓存 |