1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
func (alloc *allocateAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Allocate ...")
defer glog.V(3).Infof("Leaving Allocate ...")
// 这是优先级队列,即队列里面的内容是有优先级的
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobsMap := map[api.QueueID]*util.PriorityQueue{}
//首先将所有的kube-batch job放入
for _, job := range ssn.Jobs {
if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
job.Namespace, job.Name, job.Queue)
continue
}
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
pendingTasks := map[api.JobID]*util.PriorityQueue{}
for {
if queues.Empty() {
break
}
// 从第一个队列开始寻找是否有job需要调度
queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
jobs, found := jobsMap[queue.UID]
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
if !found || jobs.Empty() {
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
// 从队列中依次弹出job进行调度
job := jobs.Pop().(*api.JobInfo)
if _, found := pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
// Skip BestEffort task in 'allocate' action.
if task.Resreq.IsEmpty() {
glog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
task.Namespace, task.Name)
continue
}
tasks.Push(task)
}
pendingTasks[job.UID] = tasks
}
tasks := pendingTasks[job.UID]
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)
// 具体调度Task的循环,每次都假绑定一个Task,表示这个task已经完成
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}
task := tasks.Pop().(*api.TaskInfo)
assigned := false
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
len(ssn.Nodes), job.Namespace, job.Name)
//any task that doesn't fit will be the last processed
//within this loop context so any existing contents of
//NodesFitDelta are for tasks that eventually did fit on a
//node
// 后面的很长一般分,就是为task选择一个合适的node。
//主要内容是先过滤,然后选择一个满足task的最优节点,然后更新job中该task的信息
if len(job.NodesFitDelta) > 0 {
job.NodesFitDelta = make(api.NodeResourceMap)
}
for _, node := range ssn.Nodes {
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// TODO (k82cn): Enable eCache for performance improvement.
if err := ssn.PredicateFn(task, node); err != nil {
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
continue
} else {
predicateNodes = append(predicateNodes, node)
}
}
for _, node := range predicateNodes {
score, err := ssn.NodeOrderFn(task, node)
if err != nil {
glog.V(3).Infof("Error in Calculating Priority for the node:%v", err)
} else {
nodeScores[score] = append(nodeScores[score], node)
}
}
selectedNodes := util.SelectBestNode(nodeScores)
for _, node := range selectedNodes {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle) {
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
// !!!这里需要重点注意,这里调用了session.go中Allocate函数。 下面会将这个的作用
if err := ssn.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
continue
}
assigned = true
break
} else {
//store information about missing resources
job.NodesFitDelta[node.Name] = node.Idle.Clone()
job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
}
// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.UID)
continue
}
assigned = true
break
}
}
//如果绑定某个task过程中失败,比如资源不足。那么就会跳出这个循环。
if !assigned {
break
}
// 将job重新加入队列,然后进行下一个job的调度。
if ssn.JobReady(job) {
jobs.Push(job)
break
}
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
|