k8s源码分析-scheduler
本文以kubenetes v1.7为例,说明kube-scheduler
组件的启动流程与工作原理。
入口
scheduler的main函数定义在plugin/cmd/kube-scheduler/scheduler.go
中,main函数代码也比较清晰:
- 创建默认配置对象
- 将配置对象的指针传给命令行解析器,然后命令行解析器把解析到的各选项的值写入到配置对象中
- 如果用户指定了
version
选项则打印版本信息并退出
- 将配置对象传给
Run()
函数,然后就开始启动Scheduler了
创建客户端
kubeClient, leaderElectionClient, err := createClients(s)
Run()
函数一开始先创建了一个kubernetes的客户端,用来连接kube-apiserver
组件以获取集群信息,这个客户端对象会被包含在Scheduler对象中。
创建缓存更新器
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
podInformer := factory.NewPodInformer(kubeClient, 0)
然后又根据客户端创建出来两个Informer
对象,它跟客户端在一个包中,其作用是允许用户提供一些事件监听器(watcher),然后它有一个Run方法,启动以后会一直循环从kube-apiserver
中查询我们想要的信息,比如节点状态、新增Pod等等,如果有变化就会触发我们注册的相应的监听器对应的动作,然后本地有一个缓存对象,用来存放这些查询到的信息,这时只是创建,它们的Run方法还没有被调用。
创建Scheduler
sched, err := CreateScheduler(
Scheduler对象的创建与另外两个对象密切相关,一个是Config,它与Scheduler定义在一个文件中:plugin/pkg/scheduler/scheduler.go
,另一个是ConfigFactory,定义在plugin/pkg/scheduler/factory/factory.go
,它们的关系大概为:
ConfigFactory的主要工作是维护本地已缓存调度资源,比如等待调度的Pod、已调度的Pod、集群节点列表、PV/PVC列表等,并由
Informer
循环地从apiserver
中把资源更新到本地,当然还包括向队列增删改查的函数,这些函数由Informer
提供。
Scheduler是对ConfigFactory的高级抽象,相对包含的函数少一些,因为它封装出了更高级的功能,使用起来更简单。
而Config是Scheduler中的一个字段,Config没有函数只有一些字段,主要的作用是包含了Scheduler运行时需要的资源,这个Config对象包含的元素如下:
type Config struct { SchedulerCache schedulercache.Cache Ecache *core.EquivalenceCache NodeLister algorithm.NodeLister Algorithm algorithm.ScheduleAlgorithm Binder Binder PodConditionUpdater PodConditionUpdater PodPreemptor PodPreemptor NextPod func() *v1.Pod WaitForCacheSync func() bool Error func(*v1.Pod, error) Recorder record.EventRecorder StopEverything chan struct{} }
其中
Algorithm
字段自然就是调度算法了,NodeLister
字段表示集群中所有节点的列表,Binder
用来将指定Pod绑定到某一主机上。
默认调度算法
默认的调度算法对象在pkg/scheduler/factory/factory.go:CreateFromKeys()
函数中被创建:
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
)
所以调度一个Pod的具体实现就在这个genericScheduler
结构体中定义了,关键函数是它的Schedule()
函数了:
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
...
Schedule()
函数完成的工作如下:
先过滤掉不合法的Node,如果过滤后只剩一个Node,就把Pod调度到该Node并结束本次调度工作:
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
通过该算法对象中指定的打分函数,并行地为每个节点打分。不管是指定的哪个打分函数,这些打分函数预期设置为0-10,0是最低优先级得分(最不喜欢的节点),10是最高,每个优先级函数也可以有 将自身权重,优先级函数返回的节点得分乘以权重得到加权得分,最后将所有得分合并(加)得到所有节点的总加权得分:
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
打分过程比较复杂,上面函数中的
g.prioritizers
字段是一个函数集,这些函数提供了不同的规则为Node打分,它们是在集群启动时被注删到factory对象中,而genericScheduler
对象在创建时引用了这些函数,以下是注册所有默认算法的代码:pkg/scheduler/algorithmprovider/defaults/defaults.go
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
为所有Node打分后得到一个以分数排序的节点列表,如果有多个第一名,就从这些第一名中以循环方式选择一个节点:
return g.selectHost(priorityList)
调度器中几个重要的对象的创建大概就这么多,后面就开始启动了。
启动Metrics服务
go startHTTP(s)
Metrics是一个用于查询调试信息和运行状态信息的REST API Server,它定义如下:
func startHTTP(s *options.SchedulerServer) {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
...
mux.Handle("/metrics", prometheus.Handler())
...
服务的默认端口是10251,所以我们可以这样查看Scheduler的运行时信息:
curl -i localhost:10251/metrics
启动缓存更新器
go podInformer.Informer().Run(stop)
informerFactory.Start(stop)
启动Scheduler
sched.Run()
终于看到启动Schduler的代码了,看看它是怎么启动的:
plugin/pkg/scheduler/scheduler.go
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
调度Pod
Scheduler启动后,Scheduler对象的scheduleOne()
函数会被循环调用,每调用一次就完成一次调度Pod的操作:
执行
sched.config.NextPod()
获取下一个需要被调度的Pod,sched.config
就是我们前面说到的Config对象,这时候就它发挥作用的时候了。
通过启动时指定的调度算法(如果不指定,则默认使用我们上面讲过的算法)得出该Pod应该被调度到哪节点上,关键代码:
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
绑定Pod和主机
err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: suggestedHost, }, })
将绑定信息写回到
kube-apiserver
,这一步是在上面的sched.bind()
这个函数上完成的,关键代码是err := sched.config.Binder.Bind(b)
,Bind()
是一个接口函数,它的实现在ConfigFactory对象中,定义如下:// Bind just does a POST binding RPC. func (b *binder) Bind(binding *v1.Binding) error { glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) }
最后将本次的操作信息写入到Metrics服务中。
总结
从调度器的实现上来看,它与apiserver之间的解耦合做的非常彻底,完全没有依赖,我们甚至可以通过REST API手动来完成Pod与主机的绑定。文中提到客户端是一个单独的项目在Github上,你可以把它引用在自己的项目中来完成与k8s集群的交互,当然可以模仿调度器用Informer的方式实现更高级的功能。
以上就是Scheduler组件启动的全总流程,希望你读完本文后对Scheduler有一个比较清晰的认识,以更好的使用你的k8s集群。