
生产环境中的密钥管理
HashiCorp Vault 设置
# 安装并初始化 Vault
vault server -config=vault-config.hcl
# 初始化
vault operator init -key-shares=5 -key-threshold=3
# 解封(需要 5 个密钥中的 3 个)
vault operator unseal KEY1
vault operator unseal KEY2
vault operator unseal KEY3
# 登录
vault login ROOT_TOKEN

Vault KV 密钥
# 启用 KV 密钥引擎 v2
vault secrets enable -path=secret kv-v2
# 存储密钥
vault kv put secret/myapp/prod \
DATABASE_URL="postgresql://user:pass@host/db" \
JWT_SECRET="$(openssl rand -base64 32)"
# 读取密钥
vault kv get -format=json secret/myapp/prod
# 列出版本
vault kv metadata get secret/myapp/prod
动态数据库凭证
# 启用数据库密钥引擎
vault secrets enable database
# 配置 PostgreSQL
vault write database/config/mydb \
plugin_name=postgresql-database-plugin \
connection_url="postgresql://{{username}}:{{password}}@localhost/mydb" \
allowed_roles="readonly,readwrite" \
username="vault" \
password="vaultpass"
# 创建角色
vault write database/roles/readonly \
db_name=mydb \
creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
default_ttl="1h" \
max_ttl="24h"
# 获取动态凭证(每次新用户!)
vault read database/creds/readonly

Python Vault 客户端
import hvac
import os
from functools import lru_cache
from datetime import datetime, timedelta
class VaultClient:
def __init__(self, url: str, token: str = None):
self.client = hvac.Client(url=url, token=token)
if not token:
# Kubernetes 认证
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
k8s_token = f.read()
self.client.auth.kubernetes.login(
role='myapp',
jwt=k8s_token,
)
def get_secret(self, path: str) -> dict:
resp = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point='secret',
)
return resp['data']['data']
def get_db_credentials(self, role: str) -> dict:
resp = self.client.read(f'database/creds/{role}')
return {
'username': resp['data']['username'],
'password': resp['data']['password'],
'lease_duration': resp['lease_duration'],
}
def renew_lease(self, lease_id: str, increment: int = 3600):
self.client.sys.renew_lease(lease_id=lease_id, increment=increment)
vault = VaultClient(url=os.getenv('VAULT_ADDR'))
secrets = vault.get_secret('myapp/prod')
Kubernetes Secrets 加密
# 启用静态加密(encryption-config.yaml)
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
- secrets
providers:
- aescbc:
keys:
- name: key1
secret: BASE64_ENCODED_32_BYTE_KEY
- identity: {} # 回退
# External Secrets Operator - 将 Vault 同步到 K8s secrets
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: myapp-secrets
spec:
refreshInterval: 1h
secretStoreRef:
kind: ClusterSecretStore
name: vault-backend
target:
name: myapp-secrets
creationPolicy: Owner
data:
- secretKey: DATABASE_URL
remoteRef:
key: secret/myapp/prod
property: DATABASE_URL
- secretKey: JWT_SECRET
remoteRef:
key: secret/myapp/prod
property: JWT_SECRET

AWS Secrets Manager
import boto3
import json
from botocore.exceptions import ClientError
from functools import lru_cache
import time
class AWSSecretsManager:
def __init__(self, region: str = 'us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region)
self._cache = {}
self._cache_ttl = 300 # 5 分钟
def get_secret(self, secret_name: str) -> dict:
# 检查缓存
cached = self._cache.get(secret_name)
if cached and time.time() - cached['time'] < self._cache_ttl:
return cached['value']
try:
resp = self.client.get_secret_value(SecretId=secret_name)
if 'SecretString' in resp:
value = json.loads(resp['SecretString'])
else:
value = json.loads(resp['SecretBinary'].decode('utf-8'))
self._cache[secret_name] = {'value': value, 'time': time.time()}
return value
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
raise ValueError(f"Secret {secret_name} not found")
raise
def rotate_secret(self, secret_name: str, rotation_lambda_arn: str):
self.client.rotate_secret(
SecretId=secret_name,
RotationLambdaARN=rotation_lambda_arn,
RotationRules={'AutomaticallyAfterDays': 30},
)
secrets = AWSSecretsManager()
db_creds = secrets.get_secret('prod/myapp/database')
DATABASE_URL = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/mydb"
密钥轮换模式
import threading
import time
class RotatingSecret:
def __init__(self, secret_name: str, vault_client: VaultClient, refresh_interval: int = 3600):
self.secret_name = secret_name
self.vault = vault_client
self.refresh_interval = refresh_interval
self._secret = None
self._lock = threading.RLock()
self._refresh()
self._start_background_refresh()
def _refresh(self):
with self._lock:
self._secret = self.vault.get_secret(self.secret_name)
self._refreshed_at = time.time()
def _start_background_refresh(self):
def refresh_loop():
while True:
time.sleep(self.refresh_interval * 0.9)
try:
self._refresh()
except Exception as e:
print(f"Secret refresh error: {e}")
t = threading.Thread(target=refresh_loop, daemon=True)
t.start()
def get(self, key: str):
with self._lock:
return self._secret.get(key)
# 用法:始终获取当前密钥值
db_secret = RotatingSecret('myapp/prod', vault)
DATABASE_URL = db_secret.get('DATABASE_URL')
绝不将密钥写在代码中
# 错误:.env 提交到 git
echo "DATABASE_URL=postgresql://..." > .env
git add .env # 永远不要这样做
# 正确:.env 在 .gitignore 中,密钥来自 Vault/AWS
cat .gitignore
# .env
# *.pem
# *.key
# secrets/
# 使用环境变量注入
vault agent -config=agent.hcl # Vault agent 注入环境变量