-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtetris_env.py
97 lines (86 loc) · 4.46 KB
/
tetris_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import queue as Queue
import time
from random import random
import numpy as np
import parameters as pm
from scheduler_base import Scheduler
class Tetris_Env(Scheduler):
def _schedule(self):
tic = time.time()
if len(self.uncompleted_jobs) > 0:
node_used_resr_queue = Queue.PriorityQueue()
for i in range(pm.CLUSTER_NUM_NODES):
node_used_resr_queue.put((i, np.zeros(
pm.NUM_RESR_TYPES))) # this queue is sorted based on node id instead of available resources
while not node_used_resr_queue.empty():
node, used_resrs = node_used_resr_queue.get()
# calculate score
mean_resr_score = dict()
mean_align_score = dict()
for job in self.uncompleted_jobs:
if pm.PS_WORKER:
resr = job.resr_ps + job.resr_worker
else:
resr = job.resr_worker
mean_resr_score[job] = np.sum(resr) * (1 - job.progress / job.num_epochs)
mean_align_score[job] = np.sum((pm.NUM_RESR_SLOTS - used_resrs) * resr)
weight = (float(sum(mean_align_score.values())) / len(mean_align_score)) / (
float(sum(mean_resr_score.values())) / len(mean_resr_score))
if weight == 0:
continue
score_q = Queue.PriorityQueue()
for job in self.uncompleted_jobs:
score = mean_align_score[job] + weight * mean_resr_score[job]
score_q.put((-score, random(), job))
while not score_q.empty():
_, _, job = score_q.get()
if job.num_workers >= pm.MAX_NUM_WORKERS:
continue
else:
# alloc resr
if pm.PS_WORKER:
resr_reqs = job.resr_worker + job.resr_ps
else:
resr_reqs = job.resr_worker
succ, node_used_resrs = self.cluster.alloc(resr_reqs, node)
if succ:
if pm.PS_WORKER and pm.BUNDLE_ACTION:
self._state(job.id, "bundle")
job.num_workers += 1
job.curr_worker_placement.append(node)
job.num_ps += 1
job.curr_ps_placement.append(node)
job.dom_share = np.max(1.0 * (
job.num_workers * job.resr_worker + job.num_ps * job.resr_ps) / self.cluster.CLUSTER_RESR_CAPS)
else:
self._state(job.id, "worker")
job.num_workers += 1
job.curr_worker_placement.append(node)
job.dom_share = np.max(1.0 * (
job.num_workers * job.resr_worker + job.num_ps * job.resr_ps) / self.cluster.CLUSTER_RESR_CAPS)
if pm.PS_WORKER:
self._state(job.id, "ps")
job.num_ps += 1
job.curr_ps_placement.append(node)
job.dom_share = np.max(1.0 * (
job.num_workers * job.resr_worker + job.num_ps * job.resr_ps) / self.cluster.CLUSTER_RESR_CAPS)
self.running_jobs.add(job)
node_used_resr_queue.put((node, node_used_resrs)) # this code must be here instead of above
break
else: # fail to alloc resources
# continue
break
toc = time.time()
self.logger.debug(self.name + ":: " + "scheduling time: " + "%.3f" % (toc - tic) + " seconds.")
for job in self.uncompleted_jobs:
self.logger.debug(self.name + ":: scheduling results" + " num_worker: " + str(job.num_workers))
def test():
import log, trace
logger = log.getLogger(name="test.log", level="DEBUG")
job_trace = trace.Trace(logger).get_trace()
env = Tetris_Env("Tetris", job_trace, logger)
while not env.end:
env.step()
print(env.get_results())
if __name__ == '__main__':
test()