正在加载,请稍候…

零信任安全架构:永不信任,始终验证

通过身份中心化控制、微隔离、持续验证和设备信任实现零信任安全架构,为现代企业提供实用指南。

零信任安全架构:永不信任,始终验证

零信任安全架构

零信任原则

  1. 永不信任,始终验证 - 对每个请求进行身份验证和授权
  2. 最小权限访问 - 每个身份仅授予最小权限
  3. 假设已被攻破 - 设计时假设攻击者已在内网
  4. 显式验证 - 使用所有可用数据点(身份、位置、设备、服务)

零信任安全架构:永不信任,始终验证 示意图

身份中心化访问控制

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import geoip2.database

@dataclass
class AccessContext:
    user_id: str
    device_id: str
    ip_address: str
    user_agent: str
    timestamp: datetime
    requested_resource: str
    requested_action: str

@dataclass
class TrustScore:
    score: float  # 0.0 - 1.0
    factors: dict
    decision: str  # "allow", "deny", "step-up-auth"

class ZeroTrustEngine:
    def __init__(self):
        self.geo_reader = geoip2.database.Reader('/data/GeoLite2-City.mmdb')

    def evaluate_trust(self, ctx: AccessContext) -> TrustScore:
        factors = {}
        score = 0.0

        # Factor 1: Identity verification strength
        auth_strength = self._get_auth_strength(ctx.user_id)
        factors['auth'] = auth_strength
        score += auth_strength * 0.3

        # Factor 2: Device trust
        device_trust = self._get_device_trust(ctx.device_id)
        factors['device'] = device_trust
        score += device_trust * 0.25

        # Factor 3: Network location
        location_trust = self._assess_location(ctx.ip_address, ctx.user_id)
        factors['location'] = location_trust
        score += location_trust * 0.2

        # Factor 4: Behavioral baseline
        behavior_score = self._check_behavior(ctx.user_id, ctx.timestamp, ctx.ip_address)
        factors['behavior'] = behavior_score
        score += behavior_score * 0.15

        # Factor 5: Resource sensitivity
        resource_sensitivity = self._get_resource_sensitivity(ctx.requested_resource)
        required_trust = resource_sensitivity

        # Decision
        if score >= required_trust:
            decision = "allow"
        elif score >= required_trust * 0.7:
            decision = "step-up-auth"  # Require MFA
        else:
            decision = "deny"

        return TrustScore(score=score, factors=factors, decision=decision)

    def _get_auth_strength(self, user_id: str) -> float:
        auth_methods = self._get_user_auth_methods(user_id)
        if 'hardware_key' in auth_methods:
            return 1.0
        if 'totp_mfa' in auth_methods:
            return 0.8
        if 'sms_mfa' in auth_methods:
            return 0.6
        return 0.3  # Password only

    def _assess_location(self, ip: str, user_id: str) -> float:
        try:
            response = self.geo_reader.city(ip)
            current_country = response.country.iso_code
            usual_country = self._get_usual_country(user_id)
            return 1.0 if current_country == usual_country else 0.2
        except Exception:
            return 0.1

零信任安全架构:永不信任,始终验证 示意图

使用 Kubernetes 网络策略实现微隔离

# Default deny all ingress/egress
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: production
spec:
  podSelector: {}  # All pods
  policyTypes:
    - Ingress
    - Egress

---
# Allow frontend -> backend API only
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-frontend-to-api
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: backend-api
  ingress:
    - from:
        - podSelector:
            matchLabels:
              app: frontend
      ports:
        - protocol: TCP
          port: 8080

---
# Allow backend -> database only
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-api-to-db
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: postgres
  ingress:
    - from:
        - podSelector:
            matchLabels:
              app: backend-api
      ports:
        - protocol: TCP
          port: 5432

零信任安全架构:永不信任,始终验证 示意图

持续授权(策略执行)

from abc import ABC, abstractmethod
import casbin

class PolicyEnforcer:
    def __init__(self, model_path: str, policy_path: str):
        self.enforcer = casbin.Enforcer(model_path, policy_path)

    def can_access(self, user: str, resource: str, action: str) -> bool:
        return self.enforcer.enforce(user, resource, action)

    def get_permissions(self, user: str) -> list:
        return self.enforcer.get_permissions_for_user(user)

# Policy file (casbin policy.csv)
# p, alice, /data/project1/*, read
# p, bob, /data/project2/*, write
# g, carol, admin

# Request interceptor
class ZeroTrustMiddleware:
    def __init__(self, policy_enforcer: PolicyEnforcer, trust_engine: ZeroTrustEngine):
        self.policy = policy_enforcer
        self.trust = trust_engine

    async def __call__(self, request, call_next):
        ctx = AccessContext(
            user_id=request.state.user.id,
            device_id=request.headers.get('X-Device-ID', 'unknown'),
            ip_address=request.client.host,
            user_agent=request.headers.get('User-Agent', ''),
            timestamp=datetime.utcnow(),
            requested_resource=str(request.url.path),
            requested_action=request.method,
        )

        trust = self.trust.evaluate_trust(ctx)

        if trust.decision == "deny":
            return Response(status_code=403, content="Access denied by Zero Trust policy")

        if trust.decision == "step-up-auth":
            return Response(status_code=401, headers={"X-Require-MFA": "true"})

        # Check policy
        if not self.policy.can_access(ctx.user_id, ctx.requested_resource, ctx.requested_action):
            return Response(status_code=403, content="Insufficient permissions")

        return await call_next(request)

设备信任与 MDM 集成

class DeviceTrustService:
    TRUST_LEVELS = {
        'fully_managed': 1.0,    # Corporate MDM enrolled
        'registered': 0.7,       # Registered, not fully managed
        'compliant': 0.5,        # Meets policy but unmanaged
        'unknown': 0.1,          # Unknown device
    }

    def get_trust_level(self, device_id: str) -> float:
        device = self._lookup_device(device_id)
        if not device:
            return self.TRUST_LEVELS['unknown']

        trust_level = self.TRUST_LEVELS.get(device['management_state'], 0.1)

        # Additional checks
        if not device.get('disk_encrypted'):
            trust_level *= 0.5
        if device.get('os_outdated'):
            trust_level *= 0.7
        if not device.get('av_updated'):
            trust_level *= 0.8

        return trust_level

零信任实施路线图

阶段 重点 关键行动
1 身份 SSO、MFA、特权访问
2 设备 MDM、设备合规
3 网络 微隔离、SD-WAN
4 应用 RBAC、API 网关
5 数据 分类、DLP、加密