
零信任安全架构
零信任原则
- 永不信任,始终验证 - 对每个请求进行身份验证和授权
- 最小权限访问 - 每个身份仅授予最小权限
- 假设已被攻破 - 设计时假设攻击者已在内网
- 显式验证 - 使用所有可用数据点(身份、位置、设备、服务)

身份中心化访问控制
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import geoip2.database
@dataclass
class AccessContext:
user_id: str
device_id: str
ip_address: str
user_agent: str
timestamp: datetime
requested_resource: str
requested_action: str
@dataclass
class TrustScore:
score: float # 0.0 - 1.0
factors: dict
decision: str # "allow", "deny", "step-up-auth"
class ZeroTrustEngine:
def __init__(self):
self.geo_reader = geoip2.database.Reader('/data/GeoLite2-City.mmdb')
def evaluate_trust(self, ctx: AccessContext) -> TrustScore:
factors = {}
score = 0.0
# Factor 1: Identity verification strength
auth_strength = self._get_auth_strength(ctx.user_id)
factors['auth'] = auth_strength
score += auth_strength * 0.3
# Factor 2: Device trust
device_trust = self._get_device_trust(ctx.device_id)
factors['device'] = device_trust
score += device_trust * 0.25
# Factor 3: Network location
location_trust = self._assess_location(ctx.ip_address, ctx.user_id)
factors['location'] = location_trust
score += location_trust * 0.2
# Factor 4: Behavioral baseline
behavior_score = self._check_behavior(ctx.user_id, ctx.timestamp, ctx.ip_address)
factors['behavior'] = behavior_score
score += behavior_score * 0.15
# Factor 5: Resource sensitivity
resource_sensitivity = self._get_resource_sensitivity(ctx.requested_resource)
required_trust = resource_sensitivity
# Decision
if score >= required_trust:
decision = "allow"
elif score >= required_trust * 0.7:
decision = "step-up-auth" # Require MFA
else:
decision = "deny"
return TrustScore(score=score, factors=factors, decision=decision)
def _get_auth_strength(self, user_id: str) -> float:
auth_methods = self._get_user_auth_methods(user_id)
if 'hardware_key' in auth_methods:
return 1.0
if 'totp_mfa' in auth_methods:
return 0.8
if 'sms_mfa' in auth_methods:
return 0.6
return 0.3 # Password only
def _assess_location(self, ip: str, user_id: str) -> float:
try:
response = self.geo_reader.city(ip)
current_country = response.country.iso_code
usual_country = self._get_usual_country(user_id)
return 1.0 if current_country == usual_country else 0.2
except Exception:
return 0.1

使用 Kubernetes 网络策略实现微隔离
# Default deny all ingress/egress
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: production
spec:
podSelector: {} # All pods
policyTypes:
- Ingress
- Egress
---
# Allow frontend -> backend API only
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-frontend-to-api
namespace: production
spec:
podSelector:
matchLabels:
app: backend-api
ingress:
- from:
- podSelector:
matchLabels:
app: frontend
ports:
- protocol: TCP
port: 8080
---
# Allow backend -> database only
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-api-to-db
namespace: production
spec:
podSelector:
matchLabels:
app: postgres
ingress:
- from:
- podSelector:
matchLabels:
app: backend-api
ports:
- protocol: TCP
port: 5432

持续授权(策略执行)
from abc import ABC, abstractmethod
import casbin
class PolicyEnforcer:
def __init__(self, model_path: str, policy_path: str):
self.enforcer = casbin.Enforcer(model_path, policy_path)
def can_access(self, user: str, resource: str, action: str) -> bool:
return self.enforcer.enforce(user, resource, action)
def get_permissions(self, user: str) -> list:
return self.enforcer.get_permissions_for_user(user)
# Policy file (casbin policy.csv)
# p, alice, /data/project1/*, read
# p, bob, /data/project2/*, write
# g, carol, admin
# Request interceptor
class ZeroTrustMiddleware:
def __init__(self, policy_enforcer: PolicyEnforcer, trust_engine: ZeroTrustEngine):
self.policy = policy_enforcer
self.trust = trust_engine
async def __call__(self, request, call_next):
ctx = AccessContext(
user_id=request.state.user.id,
device_id=request.headers.get('X-Device-ID', 'unknown'),
ip_address=request.client.host,
user_agent=request.headers.get('User-Agent', ''),
timestamp=datetime.utcnow(),
requested_resource=str(request.url.path),
requested_action=request.method,
)
trust = self.trust.evaluate_trust(ctx)
if trust.decision == "deny":
return Response(status_code=403, content="Access denied by Zero Trust policy")
if trust.decision == "step-up-auth":
return Response(status_code=401, headers={"X-Require-MFA": "true"})
# Check policy
if not self.policy.can_access(ctx.user_id, ctx.requested_resource, ctx.requested_action):
return Response(status_code=403, content="Insufficient permissions")
return await call_next(request)
设备信任与 MDM 集成
class DeviceTrustService:
TRUST_LEVELS = {
'fully_managed': 1.0, # Corporate MDM enrolled
'registered': 0.7, # Registered, not fully managed
'compliant': 0.5, # Meets policy but unmanaged
'unknown': 0.1, # Unknown device
}
def get_trust_level(self, device_id: str) -> float:
device = self._lookup_device(device_id)
if not device:
return self.TRUST_LEVELS['unknown']
trust_level = self.TRUST_LEVELS.get(device['management_state'], 0.1)
# Additional checks
if not device.get('disk_encrypted'):
trust_level *= 0.5
if device.get('os_outdated'):
trust_level *= 0.7
if not device.get('av_updated'):
trust_level *= 0.8
return trust_level
零信任实施路线图
| 阶段 |
重点 |
关键行动 |
| 1 |
身份 |
SSO、MFA、特权访问 |
| 2 |
设备 |
MDM、设备合规 |
| 3 |
网络 |
微隔离、SD-WAN |
| 4 |
应用 |
RBAC、API 网关 |
| 5 |
数据 |
分类、DLP、加密 |