
使用 Go 构建 Kubernetes Operator
Kubernetes Operator 扩展了 Kubernetes API,用于自动化管理复杂的有状态应用程序。本指南介绍如何使用 controller-runtime 构建生产级 Operator。
什么是 Operator?
Operator 将人类运维知识编码为软件:
- 自定义资源定义(CRD):定义你的 API
- 控制器(Controller):监视资源并协调期望状态与实际状态
- 协调循环(Reconciliation loop):实现状态变更的核心逻辑

前提条件
# 安装 Operator SDK
curl -LO https://github.com/operator-framework/operator-sdk/releases/latest/download/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk
# 创建新的 Operator 项目
operator-sdk init --domain=example.com --repo=github.com/myorg/myoperator
operator-sdk create api --group=apps --version=v1alpha1 --kind=Database --resource --controller
定义 CRD
// api/v1alpha1/database_types.go
package v1alpha1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// DatabaseSpec 定义期望状态
type DatabaseSpec struct {
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=10
Replicas int32 `json:"replicas"`
// +kubebuilder:validation:Enum=postgres;mysql;mariadb
Engine string `json:"engine"`
Version string `json:"version"`
Storage StorageSpec `json:"storage"`
}
type StorageSpec struct {
// +kubebuilder:validation:Pattern=`^[0-9]+[KMGT]i
使用 Go 构建 Kubernetes Operator:自动化复杂工作负载 | MyUtl
MyUtl
正在加载,请稍候…
Size string `json:"size"`
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`
}
// DatabaseStatus 定义观察到的状态
type DatabaseStatus struct {
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=map
// +listMapKey=type
Conditions []metav1.Condition `json:"conditions,omitempty"`
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
Phase string `json:"phase,omitempty"` // Pending, Running, Failed
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Engine",type="string",JSONPath=".spec.engine"
// +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
// +kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type Database struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseSpec `json:"spec,omitempty"`
Status DatabaseStatus `json:"status,omitempty"`
}
控制器:协调循环
// controllers/database_controller.go
package controllers
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
appsv1alpha1 "github.com/myorg/myoperator/api/v1alpha1"
)
type DatabaseReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=apps.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 获取 Database 实例
db := &appsv1alpha1.Database{}
if err := r.Get(ctx, req.NamespacedName, db); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // 资源已删除
}
return ctrl.Result{}, err
}
// 初始设置 phase 为 Pending
if db.Status.Phase == "" {
db.Status.Phase = "Pending"
if err := r.Status().Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
// 协调 StatefulSet
sts, err := r.reconcileStatefulSet(ctx, db)
if err != nil {
logger.Error(err, "Failed to reconcile StatefulSet")
return ctrl.Result{}, err
}
// 协调 Service
if err := r.reconcileService(ctx, db); err != nil {
logger.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
// 更新状态
db.Status.ReadyReplicas = sts.Status.ReadyReplicas
if sts.Status.ReadyReplicas == db.Spec.Replicas {
db.Status.Phase = "Running"
} else {
db.Status.Phase = "Pending"
}
if err := r.Status().Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
logger.Info("Reconciled Database", "phase", db.Status.Phase)
return ctrl.Result{}, nil
}

创建子资源
func (r *DatabaseReconciler) reconcileStatefulSet(
ctx context.Context,
db *appsv1alpha1.Database,
) (*appsv1.StatefulSet, error) {
desired := r.desiredStatefulSet(db)
// 设置所有者引用以实现垃圾回收
if err := ctrl.SetControllerReference(db, desired, r.Scheme); err != nil {
return nil, err
}
// 获取现有的 StatefulSet
existing := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
if errors.IsNotFound(err) {
// 创建
if err := r.Create(ctx, desired); err != nil {
return nil, fmt.Errorf("creating StatefulSet: %w", err)
}
return desired, nil
}
if err != nil {
return nil, err
}
// 如果 spec 发生变化则更新
existing.Spec.Replicas = desired.Spec.Replicas
existing.Spec.Template = desired.Spec.Template
if err := r.Update(ctx, existing); err != nil {
return nil, fmt.Errorf("updating StatefulSet: %w", err)
}
return existing, nil
}
func (r *DatabaseReconciler) desiredStatefulSet(db *appsv1alpha1.Database) *appsv1.StatefulSet {
labels := map[string]string{
"app": db.Name,
"managed-by": "database-operator",
}
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name,
Namespace: db.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &db.Spec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: labels},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "database",
Image: fmt.Sprintf("%s:%s", db.Spec.Engine, db.Spec.Version),
Ports: []corev1.ContainerPort{
{ContainerPort: 5432, Name: "db"},
},
},
},
},
},
},
}
}
状态条件
// 按照 Kubernetes API 约定更新条件
func (r *DatabaseReconciler) setCondition(
db *appsv1alpha1.Database,
condType string,
status metav1.ConditionStatus,
reason, message string,
) {
condition := metav1.Condition{
Type: condType,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
ObservedGeneration: db.Generation,
}
// 检查是否已存在相同状态的条件
for i, c := range db.Status.Conditions {
if c.Type == condType {
if c.Status == status {
condition.LastTransitionTime = c.LastTransitionTime
}
db.Status.Conditions[i] = condition
return
}
}
db.Status.Conditions = append(db.Status.Conditions, condition)
}
// 使用示例
r.setCondition(db, "Available", metav1.ConditionTrue, "ReplicasReady", "All replicas are running")
r.setCondition(db, "Available", metav1.ConditionFalse, "ReplicasPending", "Waiting for replicas to start")
监视相关资源
func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.Database{}).
// 监视拥有的 StatefulSet 并触发协调
Owns(&appsv1.StatefulSet{}).
// 监视拥有的 Service
Owns(&corev1.Service{}).
// 自定义谓词:仅在 spec 变化时协调
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}

部署 Operator
# 从 Go 类型生成 CRD 清单
make manifests
# 安装 CRD
make install
# 本地运行测试
make run
# 构建并推送镜像
make docker-build IMG=my-registry/myoperator:v0.1.0
make docker-push IMG=my-registry/myoperator:v0.1.0
# 部署到集群
make deploy IMG=my-registry/myoperator:v0.1.0
# 创建 Database 资源
cat <<EOF | kubectl apply -f -
apiVersion: apps.example.com/v1alpha1
kind: Database
metadata:
name: my-postgres
namespace: default
spec:
engine: postgres
version: "15.4"
replicas: 3
storage:
size: 10Gi
EOF
最佳实践
- 幂等协调:始终向期望状态协调,不假设状态
- 所有者引用:为子资源设置
ctrl.SetControllerReference - 状态子资源:将
.status与.spec分开更新 - 出错时重新入队:对于临时错误返回
ctrl.Result{RequeueAfter: time.Second * 10} - 终结器:在删除前使用终结器进行清理
- RBAC:使用最小必要权限
总结
Kubernetes Operator 自动化复杂的运维任务:
- 使用 CRD 定义期望状态
- 实现协调循环以收敛到期望状态
- 使用所有者引用实现自动垃圾回收
- 按照 Kubernetes 约定更新状态条件
- 使用 envtest 进行控制器单元测试