k8s源码分析-创建Pod流程
本文从源码层面解释kubernetes从接收创建Pod的指令到实际创建Pod的整个过程。
1.1 监听用户请求
监听的任务是由kube-apiserver
这个组件来完成的,它实际上是一个WEB服务,一般是以双向TLS认证方式启动的,所以在启动时需要提供证书、私钥、客户端的CA证书和CA私钥,当然也支持HTTP的方式,启动后就开始监听用户请求了。
1.2 对请求分类
kube-apiserver
中的WEB服务在启动时注册了很多的Handler,golang中的Handler相当于Java中的servlet或者是Spring中的Controller,是对某一业务逻辑的封装,通俗点说,一个Handler负责对一个URI请求的处理,而在kube-apiserver
中,Handler被封装成了一个叫Store的对象,怎么封装的呢?比如/api/v1/namespaces/{namespace}/pods
这个URI对应了一个叫PodStorage
的Store,这个Store中包含了对/api/v1/namespaces/{namespace}/pods
的多个Handler,这些Handler有的是处理创建请求,有的是处理删除请求等等,代表了对一种资源的操作集。
我们来看看这个Store的定义:
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {
CreateStrategy rest.RESTCreateStrategy
AfterCreate ObjectFunc
UpdateStrategy rest.RESTUpdateStrategy
AfterUpdate ObjectFunc
DeleteStrategy rest.RESTDeleteStrategy
AfterDelete ObjectFunc
...
}
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (runtime.Object, error) {
func (e *Store) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
...
因为每种资源所需要的操作不一样,所以Store中只包含了基本的通用的操作,作为一个基础类。
1.3 处理请求
在Store的定义中有一个Storage storage.Interface
字段,Store中的创建、更新、删除等操作,比如上面的Create()
函数中会调用这个对象中的Create()
方法,也就是说这个Storage对象包含了一组更低级的操作,可以看作是数据的持久化层,这些操作都是通用的,而Store可以用Storage中的功能组合出具有不同功能的控制层对象,也就是Store对象啊,,好吧我们距离真相又进了一步,那这个Storage storage.Interface
对象又是怎样实现的呢?
1.4 数据存储到ETCD
Storage
字段是一个叫Interface
的类型,里面定义了一些数据持久层的操作,这里就不贴出来了,我们更关心它的实现,我们先来看看Interface实例的创建吧,它的创
建工作是由一个工厂类负责的:
staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
好吧,看到这里就彻底明白了,根据配置中指定的存储服务创建不同的Storage对象,并且目前只支持ETCD2和ETCD3两种存储服务,持久层所做的增删改查就是对ETCD中
数据的增删改查,总结一下Storage对象也是就Interface被创建的过程:
- 根据配置中提供的存储服务名称选择进入相应的创建函数,也就是上面的逻辑。
- 从配置对象中取出我们启动
kube-apiserver
时指定的ETCD的证书证书、私钥和CA证书(如果开启了双向证),用于apiserver和ETCD通信,用这些信息生成一个配置对象。
- 用配置对象创建一个ETCD的客户端。
- 创建一个
k8s.io/apiserver/pkg/storage/etcd3.store
对象并让其持有这个ETCD客户端
- 将这个对象作为Interface返回。
1.5 调用ETCD客户端
那么这个k8s.io/apiserver/pkg/storage/etcd3.store
对象就成了关键啦,因为它现了Interface的所有接口函数,我们看看它的Create()
函数据是怎样实现的:
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
其中txnResp, err := s.client.KV.Txn(ctx).If(
这一行就是调用ETCD客户端了。
2.1 调度Pod
调度工作是由kube-scheduler
负责的,而且不受kube-apiserver
控制,kube-scheduler
通过kube-apiserver
的REST API不断地检查是否有新的且还没有被调度的Pod,如果如有则根据配置中的调度算法为其绑定到某个节点,绑定的过程也是通过REST API将信息写入到kube-apiserver
中的,对应的REST API定义如下:
pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
restStorageMap := map[string]rest.Storage{
...
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
...
"pods/binding"
和"bindings"
两个API就是了。
下面是REST API/api/v1/namespaces/{my-ns}/pods
对应的Store对象的定义:
pkg/registry/core/pod/storage/storage.go
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
...
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
其中的Binding
字段就是关于绑定逻辑的Store了。
3.1 创建Pod
最后的创建工作由各个节点上的kubelet
组件负责,工作原理同kube-scheduler
一样,通过REST API从kube-apiserver
循环监听是否有新创建的并且已经被绑定到自己节点的Pod,如果有则在自己的节点上创建相应的Docker容器并设置容器的网络、端口转发、内存和CPU限制等,然后把结果用REST API通知给kube-apiserver
。
kubelet创建Pod的过程,主要逻辑在k8s源码的pkg/kubelet/kuberuntime/kuberuntime_manager.go文件的SyncPod()
函数中:
检查Pod信息是否有更改,如果是则杀死现有的Pod,包括Pod中的所有容器。
创建SandBox(沙箱),其实就是启动pause容器,这个过程中包含了一些重要操作:
创建SandBox的函数:pkg/kubelet/dockershim/docker_sandbox.go:79
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
拉取pause容器的镜像。
启动pause容器。
写入resolv.conf文件,覆盖docker默认创建的resolv.conf文件:pkg/kubelet/dockershim/docker_sandbox.go:140
containerInfo, err := ds.client.InspectContainer(createResp.ID)
设置Pod网络,也就是调用CNI来为容器添加网卡:pkg/kubelet/dockershim/docker_sandbox.go:163
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
启动init容器。
最后启动所有业务容器:pkg/kubelet/kuberuntime/kuberuntime_manager.go:712
for _, idx := range podContainerChanges.ContainersToStart { if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil { ... }
至此Pod就算创建完成了,如果创建了相应的service,那么proxy组件会为service在物理机上通过iptables
工具创建数据转发规则,将集群内的请求Pod的数据转发到相应的Pod中,如果Pod有多个副本,则以负载均衡的方式分别转发给这个Pod。