k8s源码分析-apiserver
本系列文章是基于kubernetes1.7版本的。
main函数
apiserver的入口定义在cmd/kube-apiserver/apiserver.go
文件中:
func main() {
rand.Seed(time.Now().UTC().UnixNano())
// 创建一个默认配置对象
s := options.NewServerRunOptions()
// 构建命令行对象
s.AddFlags(pflag.CommandLine)
// 解析命令行参数
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
// 如果指定了-version选项,则打印版本号然后退出
verflag.PrintAndExitIfRequested()
// 启动服务,并从管道中监听停止信号,该通道可能永远不会写入数据
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
这个main函数看起来是比较直观的,中文的注释是我加上去的,便于阅读。
初始化配置
在main函数中的第一行是为随机数产生器用当前时间提供了一个基数,避免多次启动产生相同的值,说明在这个组件中可能有些功能依赖了随数机。
s := options.NewServerRunOptions()
显然是在创建一个配置对象,不过这个对象中只包含一些默认参数。
s.AddFlags(pflag.CommandLine)
是构建一个命令行对象,怎么构建的呢,其实就是s
在pflag.CommandLine
这个对象中加了很多选项,并把自己的在很多字段以指针的方式注入到了pflag.CommandLine
中,这些选项是经过分组的,如ETCD相关配置、apiserver相关的请求超时时间等。
紧接着flag.InitFlags()
解析所有命令行参数,并通过指针把值放到配置对象s
中。
其中flag
这个包是k8s对开源项目github.com/spf13/pflag
的封装,而这个项目又是基于golang标准库中的flag包开发的,并且封装了一些实用的函数,可以让你快速生成自己的命令行选项。
准备资源
然后调用到了Run()函数,定义在cmd/kube-apiserver/app/server.go文件中,这里代码较多就不全部贴出来了,为了保持文章的可读性,我会尽量减少函数展开的层数。
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
上面代码判断是否安装在云主机上,如果是,并且指定了密钥文件,也是就--ssh-keyfile
选项,则安装key到所有云主机的实例中,然后使用该密钥文件创建一个连接器nodeTunneler
,用来访问其它节点。
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
这句是利用最初的配置对象runOptions
创建用于启动apiserver的配置对象,与runOptions
不同的是,它还包括了启动apiserver所需的资源。
该函数中做了以下几件事:
- 根据用户指定的
--admission-control
列表加载相应的admission插件,这些插件的作用是用来过滤API请求的,可以用来作请求的合法性检测,比如我们对所有get csr
的请求进行用户验证,如果该用户权限太低则拒绝该请求;也可以修改某个请求,比如为所有create deployments
的请求加上scale=1
等等。默认不加载任何规则。 - 如果有必要的配置没有设置则为其设置默认值,如Service IP段、序列化缓存大小等。
- 验证必要配置项,如果在此阶段有错误配置项,则所有错误信息将被打包返回。
- 如果验证通过则开始连接ETCD,并启动同步信息的进程,该进程会一直监听apiserver的数据并同步到ETCD集群中。
- 最后创建
master.Config
对象,也就是kubeAPIServerConfig
变量,然后返回。
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)
这行代码是比较核心的,它的作用如下:
- 首先对
kubeAPIServerConfig
做一个配置信息的检查,如果有空缺的字段则为其补全。 - 安装以
/api/v1
开头的的REST API,安装API的函数是pkg/master/master.go
文件内的InstallLegacyAPI()
函数,而把RUI和Handler对应起来的是pkg/registry/core/rest/storage_core.go
文件内的NewLegacyRESTStorage()
函数,我们贴出来一小段:
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
...
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
...
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
...
可见在k8s中,把Handler封装成了Storage
,其实就是一个Storage
对象负责一个REST API的增删改查。
- 然后把最终的启动逻辑作为钩子,注册到
kubeAPIServer
中,但这时还没有启动,注册钩子的代码如下:
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(context.StopCh)
return nil
})
我们来说一下这个钩子函数,可以看到这里做了一个启动操作,这个sharedInformers
是个集合,它包含了多个SharedInformer
对象,也说是说这个启动操作其实是启动所有的SharedInformer
,这个SharedInformer
又是什么呢?它其实是一个缓存器,其它如kube-scheduler、kubelet等组件跟apiserver通信时都是通过缓存间接通信的,而这个SharedInformer
一方面负责收集客户端和其它组件发来的请求,一方面负责通知该事件的关注者,使关注者可以做出相应的动作,还有就是它会定其把缓存中的数据同步到ETCD中。
以下是SharedInformer
接口的定义,在shared_informer.go文件中:
type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
前两个函数都是向该事件添加一个监听器,也就是前面说的关注者,只不过使用第二个函数可以自定义同步数据的周期间隔时间。
GetStore()
用来获取该事件类型的处理器,有新事件时该Store对应的逻辑会被调起。
Run()
就是启动该SharedInformer
的各个进程了。
启动服务
此时我们还在app.Run()
函数中,在函数的最后才是真正启动Server的逻辑:
aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
我们展开句尾的.Run(stopCh)
看一下,这正是API服务启动的关键代码,它定义在:
kubernetes/vendor/k8s.io/apiserver/pkg/server/serve.go
func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
secureServer := &http.Server{
Addr: s.SecureServingInfo.BindAddress,
Handler: s.Handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: &tls.Config{
NameToCertificate: s.SecureServingInfo.SNICerts,
MinVersion: tls.VersionTLS12,
NextProtos: []string{"h2", "http/1.1"},
},
}
if s.SecureServingInfo.Cert != nil {
secureServer.TLSConfig.Certificates = []tls.Certificate{*s.SecureServingInfo.Cert}
}
// 中间省略...
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
}
- 函数的开头就是定义一个Server,这在golang中是一个标准的Server.
- 其中的
s.Handler
是所有REST API对应的handler,golang中的handler相当于java中的servlet,或者是Spring VMC中的controller. - 读取配置中提供的证书、CA、私钥等,注入到Server对象中。
- 最后一行是启动Server,里面是非阻塞式启动的,逻辑较简单,这里就不再展开了。
- 调用刚才注册的启动钩子函数,启动各个
SharedInformer
。 - 监听
stopCh
通道并阻塞主进程。
到这里就是apiserver启动的全部流程了,下次细讲一下apiserver的工作流程,以及它如何处理Client发来的创建、更新、删除等请求。