正在加载,请稍候…

密钥管理:HashiCorp Vault、Kubernetes Secrets 与 AWS Secrets Manager

为生产系统提供安全的密钥管理。学习 HashiCorp Vault、Kubernetes 密钥加密、AWS Secrets Manager 以及密钥轮换策略。

密钥管理:HashiCorp Vault、Kubernetes Secrets 与 AWS Secrets Manager

生产环境中的密钥管理

HashiCorp Vault 设置

# 安装并初始化 Vault
vault server -config=vault-config.hcl

# 初始化
vault operator init -key-shares=5 -key-threshold=3

# 解封(需要 5 个密钥中的 3 个)
vault operator unseal KEY1
vault operator unseal KEY2
vault operator unseal KEY3

# 登录
vault login ROOT_TOKEN

密钥管理:HashiCorp Vault、Kubernetes Secrets 与 AWS Secrets Manager 示意图

Vault KV 密钥

# 启用 KV 密钥引擎 v2
vault secrets enable -path=secret kv-v2

# 存储密钥
vault kv put secret/myapp/prod \
  DATABASE_URL="postgresql://user:pass@host/db" \
  JWT_SECRET="$(openssl rand -base64 32)"

# 读取密钥
vault kv get -format=json secret/myapp/prod

# 列出版本
vault kv metadata get secret/myapp/prod

动态数据库凭证

# 启用数据库密钥引擎
vault secrets enable database

# 配置 PostgreSQL
vault write database/config/mydb \
  plugin_name=postgresql-database-plugin \
  connection_url="postgresql://{{username}}:{{password}}@localhost/mydb" \
  allowed_roles="readonly,readwrite" \
  username="vault" \
  password="vaultpass"

# 创建角色
vault write database/roles/readonly \
  db_name=mydb \
  creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
  default_ttl="1h" \
  max_ttl="24h"

# 获取动态凭证(每次新用户!)
vault read database/creds/readonly

密钥管理:HashiCorp Vault、Kubernetes Secrets 与 AWS Secrets Manager 示意图

Python Vault 客户端

import hvac
import os
from functools import lru_cache
from datetime import datetime, timedelta

class VaultClient:
    def __init__(self, url: str, token: str = None):
        self.client = hvac.Client(url=url, token=token)

        if not token:
            # Kubernetes 认证
            with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
                k8s_token = f.read()

            self.client.auth.kubernetes.login(
                role='myapp',
                jwt=k8s_token,
            )

    def get_secret(self, path: str) -> dict:
        resp = self.client.secrets.kv.v2.read_secret_version(
            path=path,
            mount_point='secret',
        )
        return resp['data']['data']

    def get_db_credentials(self, role: str) -> dict:
        resp = self.client.read(f'database/creds/{role}')
        return {
            'username': resp['data']['username'],
            'password': resp['data']['password'],
            'lease_duration': resp['lease_duration'],
        }

    def renew_lease(self, lease_id: str, increment: int = 3600):
        self.client.sys.renew_lease(lease_id=lease_id, increment=increment)

vault = VaultClient(url=os.getenv('VAULT_ADDR'))
secrets = vault.get_secret('myapp/prod')

Kubernetes Secrets 加密

# 启用静态加密(encryption-config.yaml)
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
  - resources:
      - secrets
    providers:
      - aescbc:
          keys:
            - name: key1
              secret: BASE64_ENCODED_32_BYTE_KEY
      - identity: {}  # 回退
# External Secrets Operator - 将 Vault 同步到 K8s secrets
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: myapp-secrets
spec:
  refreshInterval: 1h
  secretStoreRef:
    kind: ClusterSecretStore
    name: vault-backend
  target:
    name: myapp-secrets
    creationPolicy: Owner
  data:
    - secretKey: DATABASE_URL
      remoteRef:
        key: secret/myapp/prod
        property: DATABASE_URL
    - secretKey: JWT_SECRET
      remoteRef:
        key: secret/myapp/prod
        property: JWT_SECRET

密钥管理:HashiCorp Vault、Kubernetes Secrets 与 AWS Secrets Manager 示意图

AWS Secrets Manager

import boto3
import json
from botocore.exceptions import ClientError
from functools import lru_cache
import time

class AWSSecretsManager:
    def __init__(self, region: str = 'us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region)
        self._cache = {}
        self._cache_ttl = 300  # 5 分钟

    def get_secret(self, secret_name: str) -> dict:
        # 检查缓存
        cached = self._cache.get(secret_name)
        if cached and time.time() - cached['time'] < self._cache_ttl:
            return cached['value']

        try:
            resp = self.client.get_secret_value(SecretId=secret_name)
            if 'SecretString' in resp:
                value = json.loads(resp['SecretString'])
            else:
                value = json.loads(resp['SecretBinary'].decode('utf-8'))

            self._cache[secret_name] = {'value': value, 'time': time.time()}
            return value

        except ClientError as e:
            if e.response['Error']['Code'] == 'ResourceNotFoundException':
                raise ValueError(f"Secret {secret_name} not found")
            raise

    def rotate_secret(self, secret_name: str, rotation_lambda_arn: str):
        self.client.rotate_secret(
            SecretId=secret_name,
            RotationLambdaARN=rotation_lambda_arn,
            RotationRules={'AutomaticallyAfterDays': 30},
        )

secrets = AWSSecretsManager()
db_creds = secrets.get_secret('prod/myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/mydb"

密钥轮换模式

import threading
import time

class RotatingSecret:
    def __init__(self, secret_name: str, vault_client: VaultClient, refresh_interval: int = 3600):
        self.secret_name = secret_name
        self.vault = vault_client
        self.refresh_interval = refresh_interval
        self._secret = None
        self._lock = threading.RLock()
        self._refresh()
        self._start_background_refresh()

    def _refresh(self):
        with self._lock:
            self._secret = self.vault.get_secret(self.secret_name)
            self._refreshed_at = time.time()

    def _start_background_refresh(self):
        def refresh_loop():
            while True:
                time.sleep(self.refresh_interval * 0.9)
                try:
                    self._refresh()
                except Exception as e:
                    print(f"Secret refresh error: {e}")
        t = threading.Thread(target=refresh_loop, daemon=True)
        t.start()

    def get(self, key: str):
        with self._lock:
            return self._secret.get(key)

# 用法:始终获取当前密钥值
db_secret = RotatingSecret('myapp/prod', vault)
DATABASE_URL = db_secret.get('DATABASE_URL')

绝不将密钥写在代码中

# 错误:.env 提交到 git
echo "DATABASE_URL=postgresql://..." > .env
git add .env  # 永远不要这样做

# 正确:.env 在 .gitignore 中,密钥来自 Vault/AWS
cat .gitignore
# .env
# *.pem
# *.key
# secrets/

# 使用环境变量注入
vault agent -config=agent.hcl  # Vault agent 注入环境变量