正在加载,请稍候…

使用 Go 构建 Kubernetes Operator:自动化复杂工作负载

学习如何使用 Operator SDK 和 controller-runtime 构建自定义 Kubernetes Operator,涵盖 CRD、协调循环、状

使用 Go 构建 Kubernetes Operator:自动化复杂工作负载

使用 Go 构建 Kubernetes Operator

Kubernetes Operator 扩展了 Kubernetes API,用于自动化管理复杂的有状态应用程序。本指南介绍如何使用 controller-runtime 构建生产级 Operator。

什么是 Operator?

Operator 将人类运维知识编码为软件:

  • 自定义资源定义(CRD):定义你的 API
  • 控制器(Controller):监视资源并协调期望状态与实际状态
  • 协调循环(Reconciliation loop):实现状态变更的核心逻辑

使用 Go 构建 Kubernetes Operator:自动化复杂工作负载 插图

前提条件

# 安装 Operator SDK
curl -LO https://github.com/operator-framework/operator-sdk/releases/latest/download/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk

# 创建新的 Operator 项目
operator-sdk init --domain=example.com --repo=github.com/myorg/myoperator
operator-sdk create api --group=apps --version=v1alpha1 --kind=Database --resource --controller

定义 CRD

// api/v1alpha1/database_types.go
package v1alpha1

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

// DatabaseSpec 定义期望状态
type DatabaseSpec struct {
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=10
    Replicas int32 `json:"replicas"`
    
    // +kubebuilder:validation:Enum=postgres;mysql;mariadb
    Engine string `json:"engine"`
    
    Version string `json:"version"`
    
    Storage StorageSpec `json:"storage"`
}

type StorageSpec struct {
    // +kubebuilder:validation:Pattern=`^[0-9]+[KMGT]i

  
    
    
    
    使用 Go 构建 Kubernetes Operator:自动化复杂工作负载 | MyUtl
    
    
    
    
    

    
    
    
    
    
    

    
    
    
    
    

    
    
    
    
    

    

    
    
    

    
    
    
    
    
    
    
    
      
    
  
  
    
    

    
    
正在加载,请稍候…
Size string `json:"size"` // +optional StorageClassName *string `json:"storageClassName,omitempty"` } // DatabaseStatus 定义观察到的状态 type DatabaseStatus struct { // +patchMergeKey=type // +patchStrategy=merge // +listType=map // +listMapKey=type Conditions []metav1.Condition `json:"conditions,omitempty"` ReadyReplicas int32 `json:"readyReplicas,omitempty"` Phase string `json:"phase,omitempty"` // Pending, Running, Failed } // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Engine",type="string",JSONPath=".spec.engine" // +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas" // +kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas" // +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" type Database struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec DatabaseSpec `json:"spec,omitempty"` Status DatabaseStatus `json:"status,omitempty"` }

控制器:协调循环

// controllers/database_controller.go
package controllers

import (
    "context"
    "fmt"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    appsv1alpha1 "github.com/myorg/myoperator/api/v1alpha1"
)

type DatabaseReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=apps.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    
    // 获取 Database 实例
    db := &appsv1alpha1.Database{}
    if err := r.Get(ctx, req.NamespacedName, db); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil // 资源已删除
        }
        return ctrl.Result{}, err
    }
    
    // 初始设置 phase 为 Pending
    if db.Status.Phase == "" {
        db.Status.Phase = "Pending"
        if err := r.Status().Update(ctx, db); err != nil {
            return ctrl.Result{}, err
        }
    }
    
    // 协调 StatefulSet
    sts, err := r.reconcileStatefulSet(ctx, db)
    if err != nil {
        logger.Error(err, "Failed to reconcile StatefulSet")
        return ctrl.Result{}, err
    }
    
    // 协调 Service
    if err := r.reconcileService(ctx, db); err != nil {
        logger.Error(err, "Failed to reconcile Service")
        return ctrl.Result{}, err
    }
    
    // 更新状态
    db.Status.ReadyReplicas = sts.Status.ReadyReplicas
    if sts.Status.ReadyReplicas == db.Spec.Replicas {
        db.Status.Phase = "Running"
    } else {
        db.Status.Phase = "Pending"
    }
    
    if err := r.Status().Update(ctx, db); err != nil {
        return ctrl.Result{}, err
    }
    
    logger.Info("Reconciled Database", "phase", db.Status.Phase)
    return ctrl.Result{}, nil
}

使用 Go 构建 Kubernetes Operator:自动化复杂工作负载 插图

创建子资源

func (r *DatabaseReconciler) reconcileStatefulSet(
    ctx context.Context,
    db *appsv1alpha1.Database,
) (*appsv1.StatefulSet, error) {
    
    desired := r.desiredStatefulSet(db)
    
    // 设置所有者引用以实现垃圾回收
    if err := ctrl.SetControllerReference(db, desired, r.Scheme); err != nil {
        return nil, err
    }
    
    // 获取现有的 StatefulSet
    existing := &appsv1.StatefulSet{}
    err := r.Get(ctx, client.ObjectKeyFromObject(desired), existing)
    
    if errors.IsNotFound(err) {
        // 创建
        if err := r.Create(ctx, desired); err != nil {
            return nil, fmt.Errorf("creating StatefulSet: %w", err)
        }
        return desired, nil
    }
    if err != nil {
        return nil, err
    }
    
    // 如果 spec 发生变化则更新
    existing.Spec.Replicas = desired.Spec.Replicas
    existing.Spec.Template = desired.Spec.Template
    if err := r.Update(ctx, existing); err != nil {
        return nil, fmt.Errorf("updating StatefulSet: %w", err)
    }
    
    return existing, nil
}

func (r *DatabaseReconciler) desiredStatefulSet(db *appsv1alpha1.Database) *appsv1.StatefulSet {
    labels := map[string]string{
        "app":        db.Name,
        "managed-by": "database-operator",
    }
    
    return &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name,
            Namespace: db.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.StatefulSetSpec{
            Replicas: &db.Spec.Replicas,
            Selector: &metav1.LabelSelector{MatchLabels: labels},
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{Labels: labels},
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "database",
                            Image: fmt.Sprintf("%s:%s", db.Spec.Engine, db.Spec.Version),
                            Ports: []corev1.ContainerPort{
                                {ContainerPort: 5432, Name: "db"},
                            },
                        },
                    },
                },
            },
        },
    }
}

状态条件

// 按照 Kubernetes API 约定更新条件
func (r *DatabaseReconciler) setCondition(
    db *appsv1alpha1.Database,
    condType string,
    status metav1.ConditionStatus,
    reason, message string,
) {
    condition := metav1.Condition{
        Type:               condType,
        Status:             status,
        Reason:             reason,
        Message:            message,
        LastTransitionTime: metav1.Now(),
        ObservedGeneration: db.Generation,
    }
    
    // 检查是否已存在相同状态的条件
    for i, c := range db.Status.Conditions {
        if c.Type == condType {
            if c.Status == status {
                condition.LastTransitionTime = c.LastTransitionTime
            }
            db.Status.Conditions[i] = condition
            return
        }
    }
    db.Status.Conditions = append(db.Status.Conditions, condition)
}

// 使用示例
r.setCondition(db, "Available", metav1.ConditionTrue, "ReplicasReady", "All replicas are running")
r.setCondition(db, "Available", metav1.ConditionFalse, "ReplicasPending", "Waiting for replicas to start")

监视相关资源

func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1alpha1.Database{}).
        // 监视拥有的 StatefulSet 并触发协调
        Owns(&appsv1.StatefulSet{}).
        // 监视拥有的 Service
        Owns(&corev1.Service{}).
        // 自定义谓词:仅在 spec 变化时协调
        WithEventFilter(predicate.GenerationChangedPredicate{}).
        Complete(r)
}

使用 Go 构建 Kubernetes Operator:自动化复杂工作负载 插图

部署 Operator

# 从 Go 类型生成 CRD 清单
make manifests

# 安装 CRD
make install

# 本地运行测试
make run

# 构建并推送镜像
make docker-build IMG=my-registry/myoperator:v0.1.0
make docker-push IMG=my-registry/myoperator:v0.1.0

# 部署到集群
make deploy IMG=my-registry/myoperator:v0.1.0

# 创建 Database 资源
cat <<EOF | kubectl apply -f -
apiVersion: apps.example.com/v1alpha1
kind: Database
metadata:
  name: my-postgres
  namespace: default
spec:
  engine: postgres
  version: "15.4"
  replicas: 3
  storage:
    size: 10Gi
EOF

最佳实践

  1. 幂等协调:始终向期望状态协调,不假设状态
  2. 所有者引用:为子资源设置 ctrl.SetControllerReference
  3. 状态子资源:将 .status.spec 分开更新
  4. 出错时重新入队:对于临时错误返回 ctrl.Result{RequeueAfter: time.Second * 10}
  5. 终结器:在删除前使用终结器进行清理
  6. RBAC:使用最小必要权限

总结

Kubernetes Operator 自动化复杂的运维任务:

  • 使用 CRD 定义期望状态
  • 实现协调循环以收敛到期望状态
  • 使用所有者引用实现自动垃圾回收
  • 按照 Kubernetes 约定更新状态条件
  • 使用 envtest 进行控制器单元测试