所有文章

k8s源码分析-scheduler

本文以kubenetes v1.7为例,说明kube-scheduler组件的启动流程与工作原理。

入口

scheduler的main函数定义在plugin/cmd/kube-scheduler/scheduler.go中,main函数代码也比较清晰:

  1. 创建默认配置对象
  2. 将配置对象的指针传给命令行解析器,然后命令行解析器把解析到的各选项的值写入到配置对象中
  3. 如果用户指定了version选项则打印版本信息并退出
  4. 将配置对象传给Run()函数,然后就开始启动Scheduler了

创建客户端

kubeClient, leaderElectionClient, err := createClients(s)

Run()函数一开始先创建了一个kubernetes的客户端,用来连接kube-apiserver组件以获取集群信息,这个客户端对象会被包含在Scheduler对象中。

创建缓存更新器

informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
podInformer := factory.NewPodInformer(kubeClient, 0)

然后又根据客户端创建出来两个Informer对象,它跟客户端在一个包中,其作用是允许用户提供一些事件监听器(watcher),然后它有一个Run方法,启动以后会一直循环从kube-apiserver中查询我们想要的信息,比如节点状态、新增Pod等等,如果有变化就会触发我们注册的相应的监听器对应的动作,然后本地有一个缓存对象,用来存放这些查询到的信息,这时只是创建,它们的Run方法还没有被调用。

创建Scheduler

sched, err := CreateScheduler(

Scheduler对象的创建与另外两个对象密切相关,一个是Config,它与Scheduler定义在一个文件中:plugin/pkg/scheduler/scheduler.go,另一个是ConfigFactory,定义在plugin/pkg/scheduler/factory/factory.go,它们的关系大概为:

  1. ConfigFactory的主要工作是维护本地已缓存调度资源,比如等待调度的Pod、已调度的Pod、集群节点列表、PV/PVC列表等,并由Informer循环地从apiserver中把资源更新到本地,当然还包括向队列增删改查的函数,这些函数由Informer提供。

  2. Scheduler是对ConfigFactory的高级抽象,相对包含的函数少一些,因为它封装出了更高级的功能,使用起来更简单。

  3. 而Config是Scheduler中的一个字段,Config没有函数只有一些字段,主要的作用是包含了Scheduler运行时需要的资源,这个Config对象包含的元素如下:

    type Config struct {
    	SchedulerCache schedulercache.Cache
    	Ecache     *core.EquivalenceCache
    	NodeLister algorithm.NodeLister
    	Algorithm  algorithm.ScheduleAlgorithm
    	Binder     Binder
    	PodConditionUpdater PodConditionUpdater
    	PodPreemptor PodPreemptor
    	NextPod func() *v1.Pod
    	WaitForCacheSync func() bool
    	Error func(*v1.Pod, error)
    	Recorder record.EventRecorder
    	StopEverything chan struct{}
    }
    

    其中Algorithm字段自然就是调度算法了,NodeLister字段表示集群中所有节点的列表,Binder用来将指定Pod绑定到某一主机上。

默认调度算法

默认的调度算法对象在pkg/scheduler/factory/factory.go:CreateFromKeys()函数中被创建:

algo := core.NewGenericScheduler(
	c.schedulerCache,
	c.equivalencePodCache,
	c.podQueue,
	predicateFuncs,
	predicateMetaProducer,
	priorityConfigs,
	priorityMetaProducer,
	extenders,
	c.volumeBinder,
	c.pVCLister,
	c.alwaysCheckAllPredicates,
	c.disablePreemption,
)

所以调度一个Pod的具体实现就在这个genericScheduler结构体中定义了,关键函数是它的Schedule()函数了:

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
...

Schedule()函数完成的工作如下:

  1. 先过滤掉不合法的Node,如果过滤后只剩一个Node,就把Pod调度到该Node并结束本次调度工作:

    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    
  2. 通过该算法对象中指定的打分函数,并行地为每个节点打分。不管是指定的哪个打分函数,这些打分函数预期设置为0-10,0是最低优先级得分(最不喜欢的节点),10是最高,每个优先级函数也可以有 将自身权重,优先级函数返回的节点得分乘以权重得到加权得分,最后将所有得分合并(加)得到所有节点的总加权得分:

    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    
  3. 打分过程比较复杂,上面函数中的g.prioritizers字段是一个函数集,这些函数提供了不同的规则为Node打分,它们是在集群启动时被注删到factory对象中,而genericScheduler对象在创建时引用了这些函数,以下是注册所有默认算法的代码:

    pkg/scheduler/algorithmprovider/defaults/defaults.go

    registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
    
  4. 为所有Node打分后得到一个以分数排序的节点列表,如果有多个第一名,就从这些第一名中以循环方式选择一个节点:

    return g.selectHost(priorityList)
    

调度器中几个重要的对象的创建大概就这么多,后面就开始启动了。

启动Metrics服务

go startHTTP(s)

Metrics是一个用于查询调试信息和运行状态信息的REST API Server,它定义如下:

func startHTTP(s *options.SchedulerServer) {
	mux := http.NewServeMux()
	healthz.InstallHandler(mux)
	if s.EnableProfiling {
		mux.HandleFunc("/debug/pprof/", pprof.Index)
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
...
mux.Handle("/metrics", prometheus.Handler())
...

服务的默认端口是10251,所以我们可以这样查看Scheduler的运行时信息:

curl -i localhost:10251/metrics

启动缓存更新器

go podInformer.Informer().Run(stop)
informerFactory.Start(stop)

启动Scheduler

sched.Run()

终于看到启动Schduler的代码了,看看它是怎么启动的: plugin/pkg/scheduler/scheduler.go

func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

调度Pod

Scheduler启动后,Scheduler对象的scheduleOne()函数会被循环调用,每调用一次就完成一次调度Pod的操作:

  1. 执行sched.config.NextPod()获取下一个需要被调度的Pod,sched.config就是我们前面说到的Config对象,这时候就它发挥作用的时候了。

  2. 通过启动时指定的调度算法(如果不指定,则默认使用我们上面讲过的算法)得出该Pod应该被调度到哪节点上,关键代码:

    host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    
  3. 绑定Pod和主机

    err := sched.bind(assumedPod, &v1.Binding{
    	ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
    	Target: v1.ObjectReference{
    		Kind: "Node",
    		Name: suggestedHost,
    	},
    })
    
  4. 将绑定信息写回到kube-apiserver,这一步是在上面的sched.bind()这个函数上完成的,关键代码是err := sched.config.Binder.Bind(b)Bind()是一个接口函数,它的实现在ConfigFactory对象中,定义如下:

    // Bind just does a POST binding RPC.
    func (b *binder) Bind(binding *v1.Binding) error {
    	glog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
    	return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
    }
    

    最后将本次的操作信息写入到Metrics服务中。

总结

从调度器的实现上来看,它与apiserver之间的解耦合做的非常彻底,完全没有依赖,我们甚至可以通过REST API手动来完成Pod与主机的绑定。文中提到客户端是一个单独的项目在Github上,你可以把它引用在自己的项目中来完成与k8s集群的交互,当然可以模仿调度器用Informer的方式实现更高级的功能。

以上就是Scheduler组件启动的全总流程,希望你读完本文后对Scheduler有一个比较清晰的认识,以更好的使用你的k8s集群。


编写日期:2018-01-14