通过Operator自动暴露集群内部服务
本文最后更新于 493 天前,其中的信息可能已经有所发展或是发生改变。

需求背景

运维团队希望管控部署在k8s集群里对外暴露的服务,开发团队无需关心服务如何暴露给用户。

image-20230507203329600

红色部分

开发团队创建应用的manifests,开发团队可以为Service:资源添加annotationingress/http:true来决定终端用户是否可以访问到该服务,默认不能访问到。至于具体如何让用户可以访问到服务,开发团队不需要关心。

绿色部分

custom controller需要监听Service资源,当Service.发生变化时

  • 如果新增Service时
    • 包含指定annotation创建Ingress资源对象
    • 不包含指定annttation,忽略
  • 如果删除Servicel时删除Ingress资源对象
  • 如果更新Servicel时
    • 包含指定annotation,检查Ingress资源对象是否存在,不存在则创建,存在则忽略
    • 不包含指定annotation,检查 Ingress资源对象是否存在,存在则删除,不存在则忽略

蓝色部分

Ingress Controller使用nginx ingress Controller,类似集群网关(ingress controller 也有 apisix实现),根据 Ingress为我们更新nginx的配置,最后,终端用户便可以通过Ingress Controlleri的地址访问到开发团队指定的服务。

实现

主要步骤

  1. 创建clientset,用于操作资源
  2. 创建serviceInformer、ingressInformer
    1. 通过informer里的indexer获取资源
    2. 添加事件处理
      1. 将事件对应的资源对象的key放进workqueue
  3. 创建work gorounite,从workqueue消费事件
  4. 创建或删除对应的ingress

拿创建service举例

  1. 通过serviceInformer注册的新增事件处理,得到创建的service
  2. 将service的key放进workqueue
  3. work gorounite从workqueue里获取到key
  4. 通过serviceInformer的indexer获取到service
  5. 判断是否具有ingress/http:trueannotation,有的话,查询对应的ingress是否存在,不存在就创建ingress

代码

main.go

func main() {
    // config
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        clusterConfig, err := rest.InClusterConfig()
        if err != nil {
            panic(err)
        }
        config = clusterConfig
    }

    // client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // factory
    factory := informers.NewSharedInformerFactory(clientset, 0)

    // informer
    serviceInformer := factory.Core().V1().Services()
    ingressInformer := factory.Networking().V1().Ingresses()

    // addEvent
    controller := NewController(clientset, serviceInformer, ingressInformer)

    // start
    stopCh := make(chan struct{})
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)
    controller.Run(stopCh)
}

controller.go

const (
    workNum  = 5
    maxRetry = 10
)

type Controller struct {
    // 从indexer里获取数据
    serviceLister v13.ServiceLister
    ingressLister v14.IngressLister
    client        *kubernetes.Clientset
    queue         workqueue.RateLimitingInterface
}

func NewController(client *kubernetes.Clientset, serviceInformer v1.ServiceInformer, ingressInformer v12.IngressInformer) *Controller {
    c := &Controller{
        client:        client,
        serviceLister: serviceInformer.Lister(),
        ingressLister: ingressInformer.Lister(),
        queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
            Name: "ingress",
        }),
    }
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: c.addService, UpdateFunc: c.updateService})
    // ingress被删除时,只要service还存在,就会重新创建ingress
    ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: c.deleteIngress})
    return c
}

func (c Controller) addService(obj interface{}) {
    fmt.Println("add service")
    c.enqueue(obj)
}

func (c Controller) updateService(oldObj interface{}, newObj interface{}) {
    fmt.Println("update service")
    if reflect.DeepEqual(oldObj, newObj) {
        return
    }
    c.enqueue(newObj)
}

func (c Controller) deleteIngress(obj interface{}) {
    fmt.Println("delete ingress")
    ig := obj.(*v15.Ingress)
    _, ok := ig.Annotations["ingress/http"]
    if !ok {
        return
    }

    // 重新创建ingress
    c.enqueue(obj)
}

func (c Controller) enqueue(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        runtime.HandleError(err)
        return
    }
    c.queue.Add(key)
}

func (c Controller) Run(ch chan struct{}) {
    fmt.Println("controller run")
    for i := 0; i < workNum; i++ {
        go c.work()
    }
    <-ch
}

func (c Controller) work() {
    for c.processNextItem() {
    }
}

func (c Controller) processNextItem() bool {
    item, shutdown := c.queue.Get()
    if shutdown {
        return false
    }

    defer c.queue.Done(item)
    err := c.syncService(item.(string))
    if err != nil {
        // 异常情况进行重试
        c.handleErr(item.(string), err)
    }
    return true
}

func (c Controller) handleErr(key string, err error) {
    if c.queue.NumRequeues(key) <= maxRetry {
        c.queue.Add(key)
        return
    }

    c.queue.Forget(key)
    runtime.HandleError(err)
}

func (c Controller) syncService(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    ingressExist := false
    _, err = c.ingressLister.Ingresses(namespace).Get(name)
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }
    } else {
        ingressExist = true
    }

    serviceExist := false
    service, err := c.serviceLister.Services(namespace).Get(name)
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }
    } else {
        serviceExist = true
    }

    annotationExist := false
    if serviceExist {
        if _, ok := service.Annotations["ingress/http"]; ok {
            annotationExist = true
        }
    }

    if annotationExist && !ingressExist {
        if err := c.createIngress(service); err != nil {
            return err
        }
    } else if !annotationExist && ingressExist {
        if err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, v16.DeleteOptions{}); err != nil {
            return err
        }
    }

    return nil
}

func (c Controller) createIngress(service *v17.Service) error {
    ingress := &v15.Ingress{}
    ingress.Name = service.Name
    ingress.Namespace = service.Namespace
    ingress.Annotations = map[string]string{
        "ingress/http": "true",
    }

    // 删除service,ingress也会被删除
    ingress.ObjectMeta.OwnerReferences = []v16.OwnerReference{
        *v16.NewControllerRef(service, v16.SchemeGroupVersion.WithKind("Service")),
    }

    pathType := v15.PathTypePrefix
    ingressClassName := "nginx"
    ingress.Spec = v15.IngressSpec{
        Rules: []v15.IngressRule{
            {
                Host: "test.com",
                IngressRuleValue: v15.IngressRuleValue{
                    HTTP: &v15.HTTPIngressRuleValue{
                        Paths: []v15.HTTPIngressPath{
                            {
                                Path:     "/",
                                PathType: &pathType,
                                Backend: v15.IngressBackend{
                                    Service: &v15.IngressServiceBackend{
                                        Name: service.Name,
                                        Port: v15.ServiceBackendPort{Number: 80},
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        IngressClassName: &ingressClassName,
    }

    _, err := c.client.NetworkingV1().Ingresses(service.Namespace).Create(context.TODO(), ingress, v16.CreateOptions{})
    return err
}
作者:Yuyy
博客:https://yuyy.info

评论

  1. mark
    Windows Edge
    1年前
    2023-6-10 7:48:34

    博主,您这个博客主题好漂亮,我也使用了这个主题,但是不会修改,而且还会有一些报错,有点难搞,博主可否分享一下修改后的主题呀,感谢博客;我的邮箱:awmking163@163.com

    • Yuyy
      博主
      mark
      Macintosh Chrome
      1年前
      2023-6-10 10:18:52

      谢谢,不过这个主题我没有修改代码之类的,只是改了些主题自带的设置。你想知道具体哪块的修改啊?我看看我是咋设置的

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇