-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler_base.py
212 lines (194 loc) · 8.91 KB
/
scheduler_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import random
import numpy as np
import trace
import log
import parameters as pm
from cluster import Cluster
import queue as Queue
class Scheduler(object):
def __init__(self, name, trace, logger):
self.name = name # e.g., 'DRF'
self.trace = trace
if logger is None:
assert name
self.logger = log.getLogger(name=name, fh=False)
else:
self.logger = logger
self.cluster = Cluster(self.logger)
self.running_jobs = set()
self.uncompleted_jobs = set()
self.completed_jobs = set()
self.curr_ts = 0
self.end = False
self.rewards = []
def step(self) -> list:
# step by one timeslot
assert not self.end
self._prepare()
self._schedule()
self._progress()
if len(self.completed_jobs) == pm.TOT_NUM_JOBS:
self.end = True
self.curr_ts += 1
return self.data
def get_job_jcts(self) -> dict:
jcts = dict()
for job in self.completed_jobs:
jcts[job.id] = job.end_time - job.arrv_time + 1.0
return jcts
def _prepare(self):
self.cluster.clear()
self.data = []
self.running_jobs.clear()
# push job in list
if self.curr_ts in self.trace:
for job in self.trace[self.curr_ts]:
job.reset() # must reset since it is trained for multiple epochs
self.uncompleted_jobs.add(job)
self.logger.debug(job.info())
for job in self.uncompleted_jobs:
job.num_workers = 0
job.curr_worker_placement = []
if pm.PS_WORKER:
job.num_ps = 0
job.curr_ps_placement = []
# sort based on used resources from smallest to largest for load balancing
self.node_used_resr_queue = Queue.PriorityQueue()
for i in range(pm.CLUSTER_NUM_NODES):
self.node_used_resr_queue.put((0, i))
def _schedule(self):
self.logger.info("This method is to be implemented on child class!")
def _progress(self) -> None:
"""
each running job run job_step(), increase epoch and reward. if epoch is enough, determine end_time
It presents in one time slot, all job has one step.
"""
reward = 0
for job in self.running_jobs.copy():
epoch = job.step()
reward += epoch / job.num_epochs
if job.progress >= job.real_num_epochs:
if pm.FINE_GRAIN_JCT:
job.end_time = self.curr_ts - 1 + job.get_run_time_in_ts()
else:
job.end_time = self.curr_ts
# self.running_jobs.remove(job)
self.uncompleted_jobs.remove(job)
self.completed_jobs.add(job)
if pm.NUM_UNCOMPLETED_JOB_REWARD:
reward = len(self.uncompleted_jobs)
self.rewards.append(reward)
def observe(self) -> np.ndarray:
"""
existing resource share of each job: 0-1
job type 0-8
job normalized progress 0-1
num of backlogs: percentage of total number of jobs in the trace
"""
# cluster_state = self.cluster.get_cluster_state()
# for test, first use dominant resource share of each job as input state
q = Queue.PriorityQueue()
for job in self.uncompleted_jobs:
if pm.PS_WORKER:
if job.num_workers >= pm.MAX_NUM_WORKERS and job.num_ps >= pm.MAX_NUM_WORKERS: # and, not or
continue
else:
if job.num_workers >= pm.MAX_NUM_WORKERS: # not schedule it any more
continue
if pm.JOB_SORT_PRIORITY == "Resource":
q.put((job.dom_share, job.arrv_time, job))
elif pm.JOB_SORT_PRIORITY == "Arrival":
# q.put((job.arrv_time, job.arrv_time, job))
q.put((job.arrv_time, random.random(), job))
elif pm.JOB_SORT_PRIORITY == "Progress":
q.put((1 - job.progress / job.num_epochs, job.arrv_time, job))
if pm.ZERO_PADDING:
state = np.zeros(shape=pm.STATE_DIM) # zero padding instead of -1
else:
state = -1 * np.ones(shape=pm.STATE_DIM)
self.window_jobs = [None for _ in range(pm.SCHED_WINDOW_SIZE)]
shuffle = np.array([i for i in range(pm.SCHED_WINDOW_SIZE)]) # default keep order
if pm.JOB_ORDER_SHUFFLE:
shuffle = np.random.choice(pm.SCHED_WINDOW_SIZE, pm.SCHED_WINDOW_SIZE, replace=False)
# resource share / job arrival / progress
# the following is mentioned in the paper 4.1 policy neural network
for order in shuffle:
if not q.empty():
_, _, job = q.get()
j = 0
for (input, enable) in pm.INPUTS_GATE:
# INPUTS_GATE=[("TYPE",True),("STAY",False),("PROGRESS",False),("DOM_RESR",False), ("WORKERS",True)]
if enable:
if input == "TYPE":
if not pm.INPUT_RESCALE:
if not pm.TYPE_BINARY:
state[j][order] = job.type
else:
bin_str = "{0:b}".format(job.type).zfill(4)
for bin_ch in bin_str:
state[j][order] = int(bin_ch)
j += 1
j -= 1
else:
state[j][order] = float(job.type) / 8
elif input == "STAY":
if not pm.INPUT_RESCALE:
state[j][order] = self.curr_ts - job.arrv_time
else:
state[j][order] = float(self.curr_ts - job.arrv_time) / 100
elif input == "PROGRESS":
state[j][order] = 1 - job.progress / job.num_epochs
elif input == "DOM_RESR":
state[j][order] = job.dom_share
elif input == "WORKERS":
if not pm.INPUT_RESCALE:
state[j][order] = job.num_workers
else:
state[j][order] = float(job.num_workers) / pm.MAX_NUM_WORKERS
elif input == "PS":
if not pm.INPUT_RESCALE:
state[j][order] = job.num_ps
else:
state[j][order] = float(job.num_ps) / pm.MAX_NUM_WORKERS
else:
raise RuntimeError
j += 1
self.window_jobs[order] = job
# backlog = float(max(len(self.uncompleted_jobs) - pm.SCHED_WINDOW_SIZE, 0))/len(pm.TOT_NUM_JOBS)
self.logger.debug("ts: " + str(self.curr_ts)
+ " backlog: " + str(max(len(self.uncompleted_jobs) - pm.SCHED_WINDOW_SIZE, 0))
+ " completed jobs: " + str(len(self.completed_jobs))
+ " uncompleted jobs: " + str(len(self.uncompleted_jobs)))
return state
def _state(self, label_job_id,
role="worker"): # whether this action selection leads to worker increment or ps increment
# cluster_state = self.cluster.get_cluster_state()
input = self.observe() # NN input
label = np.zeros(pm.ACTION_DIM) # why this action named label?
for i in range(pm.SCHED_WINDOW_SIZE):
job = self.window_jobs[i]
if job and job.id == label_job_id: # increase worker/ps for specific job
if pm.PS_WORKER:
if pm.BUNDLE_ACTION:
if role == "worker":
label[i * 3] = 1
elif role == "ps":
label[i * 3 + 1] = 1
elif role == "bundle":
label[i * 3 + 2] = 1
else:
if role == "worker":
label[i * 2] = 1
elif role == "ps":
label[i * 2 + 1] = 1
else:
label[i] = 1
self.data.append((input, label)) # add trajectory, for gradient calculation.
def get_results(self) -> (int, float, float, float):
# get final results, including avg jct, makespan and avg reward
jct_list = [(job.end_time - job.arrv_time + 1.0) for job in self.completed_jobs]
makespan = max([job.end_time + 1.0 for job in self.completed_jobs])
assert jct_list
return (
len(self.completed_jobs), 1.0 * sum(jct_list) / len(jct_list), makespan,
sum(self.rewards) / len(self.rewards))