从之前的分析中,已经知道了kube-batch的启动过程。kube-batch总共有4个过程。这里我们从Allocate开始。

目录:

一: 流程解释

二:代码说明

一: 流程解释

在allocate.go中:找到Execute函数。首先用文字解释一下整个的过程:

(1)将kube-batch的job放入对应的队列。这是一个具有优先级的队列。

(2)依次遍历这些队列,如果为空就跳过

(3)如果不为空,依次从队列中pop出一个job.即接下来要调度这个job

(4)取出这个job对应的所有Tasks(即要绑定的pod),对每个task进行假绑定,这里的假绑定意思是 只是更新task的状态,先记录pod绑定在哪个节点上。当达到JobReady时,进行真正的绑定。这样就实现了一次性绑定了好几个Pod.

(5)更新job的信息,将pod重新加入队列。跳出循环,再次进行调度。

对上面流程有两个地方需要再解释一下:

(1) jobReady 作用是什么,gang scheduler和这个有什么关系?

allocate每次都是对task进行假绑定。jobReady是一个信号。表示现在可以进行真正的绑定了。

在gang.go的75行,实现了这个接口:

1
2
3
4
5
6
7
func jobReady(obj interface{}) bool {
	job := obj.(*api.JobInfo)

	occupied := readyTaskNum(job)

	return occupied >= job.MinAvailable
}

可以看出来,gang scheduler中 就是通过数量上的判断来进行限制的。job.MinAvailable这个是podgroup的minNumber数量。这样就使得每次调度的时候,只有当MinAvailable个task准备好了之后。才会进行调度,从而达到gang scheduler的效果。

(2)为什么job还要重新加入队列。这个job不是已经调度了吗?

因为有可能job的Tasks数量会多于 job.MinAvailable。例如,一个job有8个task,但是它指定的podgroup的 minNumber=4。这样调度时会首先调度4个task.当真正绑定之后。剩余没绑定的4个task是一个新的job.所有需要重新加入队列。

二:代码说明

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
func (alloc *allocateAction) Execute(ssn *framework.Session) {
	glog.V(3).Infof("Enter Allocate ...")
	defer glog.V(3).Infof("Leaving Allocate ...")

	// 这是优先级队列,即队列里面的内容是有优先级的
	queues := util.NewPriorityQueue(ssn.QueueOrderFn)
	jobsMap := map[api.QueueID]*util.PriorityQueue{}

	//首先将所有的kube-batch job放入
	for _, job := range ssn.Jobs {
		if queue, found := ssn.Queues[job.Queue]; found {
			queues.Push(queue)
		} else {
			glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
				job.Namespace, job.Name, job.Queue)
			continue
		}

		if _, found := jobsMap[job.Queue]; !found {
			jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
		}

		glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
		jobsMap[job.Queue].Push(job)
	}

	glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))

	pendingTasks := map[api.JobID]*util.PriorityQueue{}

	for {
		if queues.Empty() {
			break
		}
		// 从第一个队列开始寻找是否有job需要调度
		queue := queues.Pop().(*api.QueueInfo)
		if ssn.Overused(queue) {
			glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
			continue
		}

		jobs, found := jobsMap[queue.UID]

		glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)

		if !found || jobs.Empty() {
			glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
			continue
		}
		
		// 从队列中依次弹出job进行调度
		job := jobs.Pop().(*api.JobInfo)
		if _, found := pendingTasks[job.UID]; !found {
			tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
			for _, task := range job.TaskStatusIndex[api.Pending] {
				// Skip BestEffort task in 'allocate' action.
				if task.Resreq.IsEmpty() {
					glog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
						task.Namespace, task.Name)
					continue
				}

				tasks.Push(task)
			}
			pendingTasks[job.UID] = tasks
		}
		tasks := pendingTasks[job.UID]

		glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
			tasks.Len(), job.Namespace, job.Name)
		
		// 具体调度Task的循环,每次都假绑定一个Task,表示这个task已经完成
		for !tasks.Empty() {
			predicateNodes := []*api.NodeInfo{}
			nodeScores := map[int][]*api.NodeInfo{}

			task := tasks.Pop().(*api.TaskInfo)
			assigned := false

			glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
				len(ssn.Nodes), job.Namespace, job.Name)

			//any task that doesn't fit will be the last processed
			//within this loop context so any existing contents of
			//NodesFitDelta are for tasks that eventually did fit on a
			//node
			
			// 后面的很长一般分,就是为task选择一个合适的node。
			//主要内容是先过滤,然后选择一个满足task的最优节点,然后更新job中该task的信息
			if len(job.NodesFitDelta) > 0 {
				job.NodesFitDelta = make(api.NodeResourceMap)
			}
			for _, node := range ssn.Nodes {
				glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
					task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)

				// TODO (k82cn): Enable eCache for performance improvement.
				if err := ssn.PredicateFn(task, node); err != nil {
					glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
						task.Namespace, task.Name, node.Name, err)
					continue
				} else {
					predicateNodes = append(predicateNodes, node)
				}
			}
			for _, node := range predicateNodes {
				score, err := ssn.NodeOrderFn(task, node)
				if err != nil {
					glog.V(3).Infof("Error in Calculating Priority for the node:%v", err)
				} else {
					nodeScores[score] = append(nodeScores[score], node)
				}
			}
			selectedNodes := util.SelectBestNode(nodeScores)
			for _, node := range selectedNodes {
				// Allocate idle resource to the task.
				if task.InitResreq.LessEqual(node.Idle) {
					glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
						task.Namespace, task.Name, node.Name)
					// !!!这里需要重点注意,这里调用了session.go中Allocate函数。 下面会将这个的作用
					if err := ssn.Allocate(task, node.Name); err != nil {         
						glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
							task.UID, node.Name, ssn.UID, err)
						continue
					}
					assigned = true
					break
				} else {
					//store information about missing resources
					job.NodesFitDelta[node.Name] = node.Idle.Clone()
					job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
					glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
						task.Namespace, task.Name, node.Name)
				}

				// Allocate releasing resource to the task if any.
				if task.InitResreq.LessEqual(node.Releasing) {
					glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
						task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
					if err := ssn.Pipeline(task, node.Name); err != nil {
						glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
							task.UID, node.Name, ssn.UID)
						continue
					}

					assigned = true
					break
				}
			}
			
			
			//如果绑定某个task过程中失败,比如资源不足。那么就会跳出这个循环。
			if !assigned {
				break
			}
			// 将job重新加入队列,然后进行下一个job的调度。
			if ssn.JobReady(job) {
				jobs.Push(job)
				break
			}
		}
		// Added Queue back until no job in Queue.  
		queues.Push(queue)
	}

session.go中Allocate函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
	if err := ssn.cache.AllocateVolumes(task, hostname); err != nil {
		return err
	}

	// 这里这是更新task的状态。
	// Only update status in session
	job, found := ssn.Jobs[task.Job]
	if found {
		if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
			glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
				task.Namespace, task.Name, api.Allocated, ssn.UID, err)
			return err
		}
	} else {
		glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
			task.Job, ssn.UID)
		return fmt.Errorf("failed to find job %s", task.Job)
	}

	task.NodeName = hostname

	if node, found := ssn.Nodes[hostname]; found {
		if err := node.AddTask(task); err != nil {
			glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
				task.Namespace, task.Name, hostname, ssn.UID, err)
			return err
		}
		glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
			task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
	} else {
		glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.",
			hostname, ssn.UID)
		return fmt.Errorf("failed to find node %s", hostname)
	}

    //gang.go中有,这里是真正的绑定了,当jobReady时,调用dispatch函数对所有的Allocated的task进行绑定。
	// dispatch就在该函数的下面。内容也很直观,就是调用k8s的接口,真正的绑定pod
	if ssn.JobReady(job) {
		for _, task := range job.TaskStatusIndex[api.Allocated] {
			if err := ssn.dispatch(task); err != nil {                    // 如果job准备好了,就直接真正绑定所有准备好的任务??
				glog.Errorf("Failed to dispatch task <%v/%v>: %v",
					task.Namespace, task.Name, err)
				return err
			}
		}
	}

	return nil
}

总结:

感觉自己的文字表达能力还是不行,还需要更多的锻炼。 结合代码的注释和上面的流程说明一起看会更容易理解。

在session.go中可以看到,每次为task分配资源时,首先都是更新状态,只有达到jobReady时,才真正的绑定到具体的某个结点上。

当然如果当前要调度的job1,它需要的资源不足,那么当前这个job1就会跳出循环,找下一个要进行调度的job。不用担心,job1中已经绑定的task所占的资源。backfill操作会将Job1清空。