所有文章

k8s源码分析-创建Pod流程

本文从源码层面解释kubernetes从接收创建Pod的指令到实际创建Pod的整个过程。

1.1 监听用户请求

监听的任务是由kube-apiserver这个组件来完成的,它实际上是一个WEB服务,一般是以双向TLS认证方式启动的,所以在启动时需要提供证书、私钥、客户端的CA证书和CA私钥,当然也支持HTTP的方式,启动后就开始监听用户请求了。

1.2 对请求分类

kube-apiserver中的WEB服务在启动时注册了很多的Handler,golang中的Handler相当于Java中的servlet或者是Spring中的Controller,是对某一业务逻辑的封装,通俗点说,一个Handler负责对一个URI请求的处理,而在kube-apiserver中,Handler被封装成了一个叫Store的对象,怎么封装的呢?比如/api/v1/namespaces/{namespace}/pods这个URI对应了一个叫PodStorage的Store,这个Store中包含了对/api/v1/namespaces/{namespace}/pods的多个Handler,这些Handler有的是处理创建请求,有的是处理删除请求等等,代表了对一种资源的操作集。

我们来看看这个Store的定义: staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

type Store struct {
    CreateStrategy rest.RESTCreateStrategy
    AfterCreate ObjectFunc
    UpdateStrategy rest.RESTUpdateStrategy
    AfterUpdate ObjectFunc
    DeleteStrategy rest.RESTDeleteStrategy
    AfterDelete ObjectFunc
...
}

func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {

func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
...

因为每种资源所需要的操作不一样,所以Store中只包含了基本的通用的操作,作为一个基础类。

1.3 处理请求

在Store的定义中有一个Storage storage.Interface字段,Store中的创建、更新、删除等操作,比如上面的Create()函数中会调用这个对象中的Create()方法,也就是说这个Storage对象包含了一组更低级的操作,可以看作是数据的持久化层,这些操作都是通用的,而Store可以用Storage中的功能组合出具有不同功能的控制层对象,也就是Store对象啊,,好吧我们距离真相又进了一步,那这个Storage storage.Interface对象又是怎样实现的呢?

1.4 数据存储到ETCD

Storage字段是一个叫Interface的类型,里面定义了一些数据持久层的操作,这里就不贴出来了,我们更关心它的实现,我们先来看看Interface实例的创建吧,它的创 建工作是由一个工厂类负责的: staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        switch c.Type {
        case storagebackend.StorageTypeETCD2:
                return newETCD2Storage(c)
        case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
                return newETCD3Storage(c)
        default:
                return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
        }
}

好吧,看到这里就彻底明白了,根据配置中指定的存储服务创建不同的Storage对象,并且目前只支持ETCD2和ETCD3两种存储服务,持久层所做的增删改查就是对ETCD中 数据的增删改查,总结一下Storage对象也是就Interface被创建的过程:

  1. 根据配置中提供的存储服务名称选择进入相应的创建函数,也就是上面的逻辑。
  2. 从配置对象中取出我们启动kube-apiserver时指定的ETCD的证书证书、私钥和CA证书(如果开启了双向证),用于apiserver和ETCD通信,用这些信息生成一个配置对象。
  3. 用配置对象创建一个ETCD的客户端。
  4. 创建一个k8s.io/apiserver/pkg/storage/etcd3.store对象并让其持有这个ETCD客户端
  5. 将这个对象作为Interface返回。

1.5 调用ETCD客户端

那么这个k8s.io/apiserver/pkg/storage/etcd3.store对象就成了关键啦,因为它现了Interface的所有接口函数,我们看看它的Create()函数据是怎样实现的: vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go

// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
        if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
                return errors.New("resourceVersion should not be set on objects to be created")
        }
        data, err := runtime.Encode(s.codec, obj)
        if err != nil {
                return err
        }
        key = path.Join(s.pathPrefix, key)

        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
                return err
        }

        newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
        if err != nil {
                return storage.NewInternalError(err.Error())
        }

        txnResp, err := s.client.KV.Txn(ctx).If(
                notFound(key),
        ).Then(
                clientv3.OpPut(key, string(newData), opts...),
        ).Commit()
        if err != nil {
                return err
        }
        if !txnResp.Succeeded {
                return storage.NewKeyExistsError(key, 0)
        }

        if out != nil {
                putResp := txnResp.Responses[0].GetResponsePut()
                return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
        }
        return nil
}

其中txnResp, err := s.client.KV.Txn(ctx).If(这一行就是调用ETCD客户端了。

2.1 调度Pod

调度工作是由kube-scheduler负责的,而且不受kube-apiserver控制,kube-scheduler通过kube-apiserver的REST API不断地检查是否有新的且还没有被调度的Pod,如果如有则根据配置中的调度算法为其绑定到某个节点,绑定的过程也是通过REST API将信息写入到kube-apiserver中的,对应的REST API定义如下: pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
        restStorageMap := map[string]rest.Storage{
...
                "pods/binding":     podStorage.Binding,
                "bindings":         podStorage.Binding,
...

"pods/binding""bindings"两个API就是了。

下面是REST API/api/v1/namespaces/{my-ns}/pods对应的Store对象的定义: pkg/registry/core/pod/storage/storage.go

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
...
        return PodStorage{
                Pod:         &REST{store, proxyTransport},
                Binding:     &BindingREST{store: store},

其中的Binding字段就是关于绑定逻辑的Store了。

3.1 创建Pod

最后的创建工作由各个节点上的kubelet组件负责,工作原理同kube-scheduler一样,通过REST API从kube-apiserver循环监听是否有新创建的并且已经被绑定到自己节点的Pod,如果有则在自己的节点上创建相应的Docker容器并设置容器的网络、端口转发、内存和CPU限制等,然后把结果用REST API通知给kube-apiserver

kubelet创建Pod的过程,主要逻辑在k8s源码的pkg/kubelet/kuberuntime/kuberuntime_manager.go文件的SyncPod()函数中:

  1. 检查Pod信息是否有更改,如果是则杀死现有的Pod,包括Pod中的所有容器。

  2. 创建SandBox(沙箱),其实就是启动pause容器,这个过程中包含了一些重要操作:

    • 创建SandBox的函数:pkg/kubelet/dockershim/docker_sandbox.go:79

      func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
      
    • 拉取pause容器的镜像。

    • 启动pause容器。

    • 写入resolv.conf文件,覆盖docker默认创建的resolv.conf文件:pkg/kubelet/dockershim/docker_sandbox.go:140

      containerInfo, err := ds.client.InspectContainer(createResp.ID)
      
    • 设置Pod网络,也就是调用CNI来为容器添加网卡:pkg/kubelet/dockershim/docker_sandbox.go:163

      err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
      
  3. 启动init容器。

  4. 最后启动所有业务容器:pkg/kubelet/kuberuntime/kuberuntime_manager.go:712

    for _, idx := range podContainerChanges.ContainersToStart {
    		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
    ...
    }
    

至此Pod就算创建完成了,如果创建了相应的service,那么proxy组件会为service在物理机上通过iptables工具创建数据转发规则,将集群内的请求Pod的数据转发到相应的Pod中,如果Pod有多个副本,则以负载均衡的方式分别转发给这个Pod。

相关资料


编写日期:2017-12-30