-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhighway_layout_v19.py
1646 lines (1587 loc) · 88.5 KB
/
highway_layout_v19.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import math
import os.path
import time
from copy import deepcopy
from load_global_graph import load_G
import networkx as nx
import utilities as ut
from visualize import plot_unpartitioned_G
import itertools
from scipy.spatial.distance import cityblock
"""
v3: graph edge is unweighted
=> highly possible to find path (2 direction) with overlapping edges
v4: graph with weighted edge design
=> problem: edge cases where limiting edges does not resolve conflicts at intersection
=> simply removing edge/adding edge cost does not resolve node conflicts
v5: weighted & directed edge design
=> when creating the original graph, each undirected edge replaced by 2 directed opposite edge (conjugate)
=> when an edge is occupied by a hw, remove its conjugate => avoid actual conflict
=> when an edge is occupied by a hw, add cost c in [0, inf) => avoid too much traffic
=> in this way, each road can have at most 1 in/out in each direction, a max of 4 in/out
v6: add file output function
v7: add "lock" function
set lock as L
v8: what "lock"s each "lock" can reach => lock reachability
replace end of road with X
v9: try to fix previous "lock" two problems:
1. lock not connected any town nodes => cannot reach from town
2. lock disconnect town nodes => town become disconnected now
v10: refactor code & add documentation up to v9
v11: modifications on making the result T-connected
- this makes the result more possible to be T-conncected when graph is square
1. find outer highway endpoints
2. try to make them connected clockwise, before layout other part of highways
* this step can be done recursively, layer by layer, possible extension... *
Definition of T-connected:
for each town1-town2 pair, there is a direct highway from town1 to town2, where:
adj 2 town nodes (within same town) are bi-direction connected
adj town node & lock node are bi-direction connected
adj 2 lock nodes are NOT connected
adj lock node & highway node are bi-direction connected
adj 2 highway nodes are only connected based on highway direction
v12: build town classes, check town connectivity
v13: add more constraint on picking outer highway endpoints
1. the end point picked is as close to the ideal as possible (max is max_search_around)
2. the end point picked MUST be connected with 4 surrounding map nodes (except edges?)
=> result T-connected maps:
Boston_0_256 (0), ost003d (0), den312d (0),random-64-64-20 (0), den520d (1)
v14: improve inner highway layout algorithm:
1. try all adjacent (s, t), same as before
2. if not found path between (s, t), try: (s-, t) & (s, t+) until find both or run out of s- & t+
v15: output update for lock_reachability => no more json, now locks.txt
v16: highway connection output => rl_edges.txt
- previous: from previous versions, highways.txt does not show direction at interaction
- update: rl_edges.txt contains the direction of outputs for each highway node (4 dir binary)
v17: output for visualization => vis_gmap.json
v18: fix ideal_end_pts problem (top right corner)
- can't simply switch between floor & ceiling, example: (7.3 will have one 7 one 8)
- fixed by using a double to store the summation, then always cast (trim) to int
v19: fix some bugs in a1v1
"""
def largest_connected_component(original_G):
"""
Find & Return the largest connected component in original_G as the new graph to work on
:param original_G: original_G read from the mapf-baseline map
:return new_G: the new_G corresponds to the largest connected component
"""
if nx.is_connected(original_G):
return original_G
new_G = original_G.subgraph(max(nx.connected_components(original_G), key=len)).copy()
assert nx.is_connected(new_G)
return new_G
def ideal_strips(nrows, ncols, max_town_size):
"""
Calculate the ideal number of strips (both x & y directions) based on expected max town size
:param nrows: number of rows in the mapf-map (corresponds to horizontal highways)
:param ncols: number of cols in the mapf-map (corresponds to vertical highways)
:param max_town_size: maximum town size expected, should be smaller than the desired
:return ideal_num_strip: ideal number of strips in both directions
"""
max_size = nrows * ncols # max possible number of nodes in G
ideal_num_grid = max_size // max_town_size # idea number of grids (ideally each gird surrounded by 4 hw in 4 dir)
ideal_num_strip = math.ceil(math.sqrt(ideal_num_grid)) # idea number of highways in each direction
ideal_num_strip = ideal_num_strip + 1 if ideal_num_strip % 2 != 0 else ideal_num_strip # divisible by 2
return ideal_num_strip
def ideal_endpts_dict(nrows, ncols, n_highway_hor, n_highway_ver):
"""
Get ideal endpoints of highways, ignoring obstacles, as dict representing 2D matrix
:param nrows: number of rows in the mapf-map (corresponds to horizontal highways)
:param ncols: number of cols in the mapf-map (corresponds to vertical highways)
:param n_highway_hor: number of expected horizontal highways
:param n_highway_ver: number of expected vertical highways
:return endpts: 2D matrox, each index corresponds to the 'ideal' highway end point location
:return x_width: expected width of separation of horizontal highways
:return y_width: expected width of separation of vertical highways
"""
assert n_highway_hor > 1 and n_highway_ver > 1 # make sure there are more than 1 lanes in each direction
assert n_highway_hor % 2 == 0 and n_highway_ver % 2 == 0 # make sure each direction has multiple of 2 lanes
# access element by calling endpts[row][col]
endpts = dict()
# init the empty 2D dict with None everywhere
for i in range((n_highway_hor + 2)):
endpts[i] = dict()
for j in range((n_highway_ver + 2)):
endpts[i][j] = None # None means no end points there
# x's: horizontal index
x_width = ncols // (n_highway_ver + 1)
xs = []
prev_add = 0
for i in range(n_highway_ver + 2):
if i == 0:
xs.append(0)
elif i == n_highway_ver + 1:
xs.append(ncols - 1)
else:
# switch between floor & ceil in assigning the actual width so that in average evenly distributed
prev_add += ncols / (n_highway_ver + 1)
xs.append(int(prev_add))
# y's: vertical index
y_width = nrows // (n_highway_hor + 1)
ys = []
prev_add = 0
for i in range(n_highway_hor + 2):
if i == 0:
ys.append(0)
elif i == n_highway_hor + 1:
ys.append(nrows - 1)
else:
# switch between floor & ceil in assigning the actual width so that in average evenly distributed
prev_add += nrows / (n_highway_hor + 1)
ys.append(int(prev_add))
# plug xs, ys into the endpts dict in corresponding index
for i in range((n_highway_hor + 2)):
for j in range((n_highway_ver + 2)):
endpts[i][j] = ut.Coord(xs[j], ys[i], '')
# print("endpts i={},j={} with coord x={},y={}".format(i, j, xs[j], ys[i]))
assert x_width > 2 and y_width > 2 # make sure towns are not too tiny
return endpts, x_width, y_width
def actual_endpts_dict(G, ideal_endpts, max_search_around):
"""
Get actual endpoints of highways, considering obstacles and max_search_around, as dict for 2D matrix
:param G: the undirected graph corresponds to the input map (returned from load_G)
:param ideal_endpts: returned from ideal_endpts_dict storing the ideal endpoints (might be invalid)
:param max_search_around: max manhattan distance searching around when finding actual endpoints
:return actual: 2D matrox, each index corresponds to the 'actual' highway end point location (must exist in G)
"""
# init the actual 2D dict with Nones
actual = dict()
for i in range(len(ideal_endpts)):
actual[i] = dict()
for j in range(len(ideal_endpts[i])):
actual[i][j] = None # None: did not find any actual by default
# find actual highway endpts in G for each corresponding ideal
for i in range(len(ideal_endpts)):
for j in range(len(ideal_endpts[i])):
assert ideal_endpts[i][j] # ideal should never be None
if ideal_endpts[i][j]:
loc = ideal_endpts[i][j]
# NOTE: top right corner might encounter a big max_search_around (~2n) than other nodes
for l1_dist in range(max_search_around + 1):
# potential: all valid index (>=0) that are EXACTLY l1_dist (manhattan) from loc
potential = surrounding_loc(loc, l1_dist)
for p_loc in potential:
if p_loc in G.nodes:
# TODO: tie breaking can be optimized here, or maybe pass the entire array?
p_surr = surrounding_loc(p_loc, 1)
surrounding_count = 0 # TODO: what is this for
for p_s in p_surr:
if p_s in G.nodes:
surrounding_count += 1
if surrounding_count < 0: # if p_loc connected to <4 other map nodes, ignore TODO???
continue
actual[i][j] = p_loc # only pick p_loc if connected to =4 other map nodes
# print("i={},j={},actual[i][j]={}".format(i, j, p_loc))
break
else:
continue
break
return actual
def get_outer_hw_endpts(actual_endpts):
"""
Get the outer highway endpoints
1. select the potential endpoints out from actual_endpts
2. sort them in the clockwise highway order
:param actual_endpts: 2D matrix, each index corresponds to the 'actual' highway end point location (must exist in G)
:return final_endpts_list: a list storing index of outer highway endpoints in clockwise order
"""
# init data structure to store the endpoints
endpts_set = set() # potential endpoints set
endpts_index_set = set() # potential endpoints set's index
# init important constants to use
length = len(actual_endpts)
half_len = int((length + 1) / 2) # >= half means above, < half means below
# upper row
for i in range(length):
for j in range(length):
if actual_endpts[j][i] is not None:
endpts_set.add(actual_endpts[j][i])
endpts_index_set.add((j, i))
break
# right col
for i in range(length):
for j in range(length - 1, -1, -1):
if actual_endpts[i][j] is not None:
endpts_set.add(actual_endpts[i][j])
endpts_index_set.add((i, j))
break
# lower row
for i in range(length):
for j in range(length - 1, -1, -1):
if actual_endpts[j][i] is not None:
endpts_set.add(actual_endpts[j][i])
endpts_index_set.add((j, i))
break
# left col
for i in range(length):
for j in range(length):
if actual_endpts[i][j] is not None:
endpts_set.add(actual_endpts[i][j])
endpts_index_set.add((i, j))
break
# start with top-left, start index = row, col
row, col = -1, -1
start, current = None, None
while col < length:
col += 1
row = -1
while row < length:
row += 1
if (row, col) in endpts_index_set:
start = row, col, quartile(row, col, half_len)
current = start
break
else:
continue
break
# construct the end points in order
assert start is not None
final_endpts_list = [] # storing index of endpoints
stop = False
while not stop:
row, col = current[0], current[1]
final_endpts_list.append((row, col)) # add current to the ordered index list
endpts_index_set.remove((row, col)) # remove current from the set (avoid picking again)
# 1st quartile: try up first, then right, finally down
if current[2] == 1:
# try going up (right)
for c in range(col, length): # col -> length: go right
for r in range(row, -1, -1): # row-1 -> 0: go up
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# try going down, larger col has larger priority => go as right as possible
# try until get to half-row or +3
for r in range(row + 1, int(max(half_len, row + 4))): # row+1 -> +4: go down
for c in range(col, length): # col -> len: go right (b.c. left is better)
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# 2nd quartile: try left first, then up, finally right
if current[2] == 2:
# try going left (up)
for r in range(row, -1, -1): # row -> 0: go up
for c in range(col, -1, -1): # col-1 -> 0: go left
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# try going right, smaller row has larger priority => go as up as possible
# try until get to half-col or +3
for c in range(col + 1, int(max(half_len, col + 4))): # col+1 -> +4: go right
for r in range(row, -1, -1): # row -> 0: go up (b.c. down is better)
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# 3rd quartile: try down first, then left, finally up
if current[2] == 3:
# try going down (left)
for c in range(col, -1, -1): # col -> 0: go left
for r in range(row, length): # row+1 -> length: go down
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# try going up, smaller col has larger priority => go as left as possible
# try until get to half-row or +3
for r in range(row - 1, int(min(half_len - 1, row - 4)), -1): # row-1 -> -4: go up
for c in range(col, -1, -1): # col -> 0: go left (b.c. right is better)
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# 4th quartile: try right first, then down, finally left
if current[2] == 4:
# try going right (down)
for r in range(row, length): # row -> length: go down
for c in range(col, length): # col+1 -> length: go right
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# try going left, larger row has larger priority => go as down as possible
# try until get to half-col or -3
for c in range(col - 1, int(min(half_len - 1, col - 4)), -1): # col-1 -> -4: go left
for r in range(row, length): # row -> length: go down (b.c. up is better)
if (r, c) in endpts_index_set:
row, col = r, c
break
else:
continue
break
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# still cannot find any, then pick the nearest as the next index
min_dist = 10000
target = (-1, -1)
for (x, y) in endpts_index_set:
i_dist = cityblock((x, y), (row, col))
if i_dist < min_dist:
min_dist = i_dist
target = (x, y)
row, col = target if target != (-1, -1) else (row, col)
if row != current[0] or col != current[1]: # already find the next 'current'
current = row, col, quartile(row, col, half_len)
stop = True if current == start else False
continue
# post-processing, determining if stop
if len(endpts_index_set) == 0:
print("Success")
stop = True
continue
if row == current[0] and col == current[1]:
print("Not Success")
stop = True
continue
return final_endpts_list
def quartile(row, col, half):
"""
Return quartile index of row, col based on half
:param row: row index
:param col: col index
:param half: half of the length pf actual_endpts
:return index: quartile index (same as the quartile order in math 2D grid)
"""
if row < half and col < half:
return 2 # left-top
if row < half and col >= half:
return 1 # right-top
if row >= half and col < half:
return 3 # left-bottom
if row >= half and col >= half:
return 4 # right-bottom
def actual_endpts_graph(actual_endpts):
"""
Return graph storing prospective highways endpoints' corresponding location in actual_endpts index
:param actual_endpts: returned from actual_endpts_dict storing the actual endpoints (valid)
:return G_endpts_index: graph storing highways endpoints' corresponding location in actual_endpts index
"""
# TODO: considering the new Greedy, this function might be useless
# TODO: handle outer most highway differently?
G_endpts_index = nx.Graph()
for i in range(len(actual_endpts)):
for j in range(len(actual_endpts[i])):
if actual_endpts[i][j]:
# only add edge between two adjacent, not none endpoints
G_endpts_index.add_node(ut.Coord(i, j, ''), pos=(i, j))
for u in G_endpts_index.nodes:
for direction in ut.directions:
v = ut.coord_add(u, ut.dir_to_vec[direction])
if v in G_endpts_index.nodes: # add adjacent nodes (1 away from current node)
G_endpts_index.add_edge(u, v)
G_endpts_index.remove_edges_from(nx.selfloop_edges(G_endpts_index)) # remove self-loops...
return G_endpts_index
def surrounding_loc(loc, l1_dist):
"""
Get all valid index (>=0) that are EXACTLY l1_dist from loc, as an array, in O(l1_dist)
:param loc: the center location, the target of l1_dist
:param l1_dist: the manhattan distance
:return surrounding locations as an array, shape should be diamond with loc at center
"""
new_loc = []
if l1_dist == 0:
new_loc = [loc]
return new_loc
for i in range(1, l1_dist + 1):
j = l1_dist - i
if i == 0 or j == 0:
new_loc.append((loc[0] + i, loc[1] + j))
new_loc.append((loc[0] - i, loc[1] + j))
new_loc.append((loc[0] + j, loc[1] + i))
new_loc.append((loc[0] + j, loc[1] - i))
else:
new_loc.append((loc[0] + i, loc[1] + j))
new_loc.append((loc[0] + i, loc[1] - j))
new_loc.append((loc[0] - i, loc[1] + j))
new_loc.append((loc[0] - i, loc[1] - j))
new_loc = [ut.Coord(x, y, '') for (x, y) in new_loc if x >= 0 and y >= 0]
return new_loc
def highway_span(highway):
"""
Get the max difference in x (horizontal span) & y (vertical span) in a highway
:param highway: the highway to work on
:return: x_span and y_span
"""
max_x, min_x, max_y, min_y = -1, 100000, -1, 100000
for node in highway:
# simply keep track of max_x, min_x, max_y, min_y of nodes in the highway
x, y = node[0], node[1]
max_x = max(x, max_x)
max_y = max(y, max_y)
min_x = min(x, min_x)
min_y = min(y, min_y)
return (max_x - min_x), (max_y - min_y)
def plan_outer_highways(G_directed, final_endpts_list, actual_endpts):
"""
Plan the outer-most highway in clockwise direction, this highway MUST be connected
:param G_directed: the directed version of G, returned from build_directed_from_undirected
:param final_endpts_list: the outer highway endpoints list in the correct order storing index of endpoints
:param actual_endpts: actual endpoints of highways, considering obstacles and max_search_around, as 2D matrix
:return all_highways: list storing all highway segments (outer)
"""
# all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways = []
endpts_size = len(final_endpts_list)
has_path_from = dict()
has_path_to = dict()
# init a graph to keep track if highway connectivity
outer_hw_dG = nx.DiGraph()
for i in range(endpts_size):
outer_hw_dG.add_node(final_endpts_list[i])
for i in range(endpts_size): # iterate through final_endpts_list in order to get index
if i + 1 < endpts_size:
i_curr, i_next = final_endpts_list[i], final_endpts_list[i + 1]
else:
i_curr, i_next = final_endpts_list[i], final_endpts_list[0]
has_path_from[i_curr] = False
has_path_to[i_next] = False
# get actual highway endpoints for s and t for the current highway
actual_s = actual_endpts[i_curr[0]][i_curr[1]]
actual_t = actual_endpts[i_next[0]][i_next[1]]
try:
# get dijkstra_path of the highway.
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
# append path & path info to all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways.append((path, -1, -1)) # -1, -1 indicating it is special outer highway
# update edge properties in G_directed so later plans have access to the updated version
for i in range(1, len(path)):
curr_node = path[i] # current highway node
prev_node = path[i - 1] # previous highway node
highway_edge = (prev_node, curr_node) # highway_edge (in the direction of highway)
conj_edge = (curr_node, prev_node) # conj_edge (in the opposite direction of highway)
assert highway_edge in G_directed.edges # highway_edge should be in G_directed for sure
# remove conjugate edge in the path => avoid conflict!!!
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
has_path_from[i_curr] = True
has_path_to[i_next] = True
outer_hw_dG.add_edge(i_curr, i_next)
except nx.NetworkXNoPath:
_ = None
# now deal with has_path_to[i] == False endpoints
for i in range(endpts_size):
if not has_path_to[final_endpts_list[i]]: # has_path_from[i_curr] == False
before = range(i - 2, -1, -1)
after = range(endpts_size - 1, i + 1, -1)
for i_p in list(before) + list(after): # try i_next
i_curr, i_prev = final_endpts_list[i], final_endpts_list[i_p]
actual_s = actual_endpts[i_prev[0]][i_prev[1]]
actual_t = actual_endpts[i_curr[0]][i_curr[1]]
try:
# get dijkstra_path of the highway.
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
# append path & path info to all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways.append((path, -1, -1)) # -1, -1 indicating it is special outer highway
# update edge properties in G_directed so later plans have access to the updated version
for i in range(1, len(path)):
curr_node = path[i] # current highway node
prev_node = path[i - 1] # previous highway node
highway_edge = (prev_node, curr_node) # highway_edge (in the direction of highway)
conj_edge = (curr_node, prev_node) # conj_edge (in the opposite direction of highway)
assert highway_edge in G_directed.edges # highway_edge should be in G_directed for sure
# remove conjugate edge in the path => avoid conflict!!!
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
has_path_from[i_prev] = True
has_path_to[i_curr] = True
outer_hw_dG.add_edge(i_prev, i_curr)
break
except nx.NetworkXNoPath:
_ = None
# now deal with has_path_from[i] == False endpoints
for i in range(endpts_size):
if not has_path_from[final_endpts_list[i]]: # has_path_from[i_curr] == False
after = range(i + 2, endpts_size) # start trying from i+2 since i+1 failed
before = range(0, i - 1)
for i_n in list(after) + list(before): # try i_next
i_curr, i_next = final_endpts_list[i], final_endpts_list[i_n]
actual_s = actual_endpts[i_curr[0]][i_curr[1]]
actual_t = actual_endpts[i_next[0]][i_next[1]]
try:
# get dijkstra_path of the highway.
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
# append path & path info to all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways.append((path, -1, -1)) # -1, -1 indicating it is special outer highway
# update edge properties in G_directed so later plans have access to the updated version
for i in range(1, len(path)):
curr_node = path[i] # current highway node
prev_node = path[i - 1] # previous highway node
highway_edge = (prev_node, curr_node) # highway_edge (in the direction of highway)
conj_edge = (curr_node, prev_node) # conj_edge (in the opposite direction of highway)
assert highway_edge in G_directed.edges # highway_edge should be in G_directed for sure
# remove conjugate edge in the path => avoid conflict!!!
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
has_path_from[i_curr] = True
has_path_to[i_next] = True
outer_hw_dG.add_edge(i_curr, i_next)
break
except nx.NetworkXNoPath:
_ = None
return all_highways
def plan_highways(
G_directed, G_endpts_index, actual_endpts, visited_add_cost, max_allowed_cost, x_width, y_width,
restricted_box=True, r_box_factor=10
):
"""
Main Plan Highway Layout Function
=> plan the highway on G, return a list of a bunch of list (each element: a highway segment)
:param G_directed: the directed version of G, returned from build_directed_from_undirected
:param G_endpts_index: graph storing prospective highways endpoints' corresponding location in actual_endpts index
:param actual_endpts: actual endpoints of highways, considering obstacles and max_search_around, as 2D matrix
:param visited_add_cost: cost added to an edge when a highway used it, avoid heavy traffic jam
:param max_allowed_cost: maximum cost allowed on an edge before removing it, avoid heavy traffic jam
:param x_width: expected width of separation of horizontal highways
:param y_width: expected width of separation of vertical highways
:param restricted_box: True if we want to limit the range a highway can span (non-straight)
:param r_box_factor: how wide can a highway span in regard to its 'width' (how non-straight)
:return all_highways: list storing all highway segments (inner)
"""
all_highways = []
# horizontal
hor_ver = 1
for sy in range(len(actual_endpts)): # number of rows
ty = sy
direction = (sy + 1) % 2 # direction: alternating 0 and 1
for sx in range(len(actual_endpts[sy]) - 1): # number of cols - 1
tx = sx + 1
# get actual highway endpoints for s and t for the current highway, based on direction, order matters
if not direction:
temp_x, temp_y = sx, sy
sx, sy = tx, ty
tx, ty = temp_x, temp_y
actual_s = actual_endpts[sy][sx]
actual_t = actual_endpts[ty][tx]
# only lay highway between two adjacent, not None highway endpoints
if actual_s is None or actual_t is None:
continue # TODO: what about non-adjacent?
from_s, to_t = False, False
try:
# get dijkstra_path of the highway. if direction=1: from s to t; if direction=0: from t to s
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
# check if the highway is in the required box range, if not, the highway is improper, remove it
if restricted_box and r_box_factor != 0:
# get the max difference in x (horizontal span) & y (vertical span) in a highway
max_dx, max_dy = highway_span(path)
# if any (x or y) span is larger than the box, remove the highway by 'continue' without adding
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
raise nx.NetworkXNoPath('box error')
# append path & path info to all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways.append((path, hor_ver, direction))
# update edge properties in G_directed so later plans have access to the updated version
for i in range(1, len(path)):
curr_node = path[i] # current highway node
prev_node = path[i - 1] # previous highway node
highway_edge = (prev_node, curr_node) # highway_edge (in the direction of highway)
conj_edge = (curr_node, prev_node) # conj_edge (in the opposite direction of highway)
assert highway_edge in G_directed.edges # highway_edge should be in G_directed for sure
# remove conjugate edge in the path => avoid conflict!!!
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
# update edge cost in the path by add visited_add_cost => avoid traffic jam!!!
if visited_add_cost > 0: # update weight
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] + visited_add_cost
if new_weight > max_allowed_cost != 0: # if too much traffic, remove
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else: # otherwise, add cost to weight
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
from_s, to_t = True, True # if path found, should set flags True's
except nx.NetworkXNoPath:
print("\tno path between {} and {}".format(actual_s, actual_t))
if not from_s and not to_t:
if sx < tx: # to the right
sx_prev = [i for i in range(sx - 1, -1, -1)]
tx_after = [i for i in range(tx + 1, len(actual_endpts[sy]))]
else: # to the left
sx_prev = [i for i in range(sx + 1, len(actual_endpts[sy]))]
tx_after = [i for i in range(tx - 1, -1, -1)]
for sx_new in sx_prev: # fix to_t
actual_s = actual_endpts[sy][sx_new]
actual_t = actual_endpts[ty][tx]
if actual_s is None or actual_t is None:
continue
try: # same as the try above
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
if restricted_box and r_box_factor != 0:
max_dx, max_dy = highway_span(path)
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
continue
all_highways.append((path, hor_ver, direction))
for i in range(1, len(path)):
curr_node, prev_node = path[i], path[i - 1]
highway_edge, conj_edge = (prev_node, curr_node), (curr_node, prev_node)
assert highway_edge in G_directed.edges
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
if visited_add_cost > 0:
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] \
+ visited_add_cost
if new_weight > max_allowed_cost != 0:
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else:
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
print("to_t solved")
break # if found path from sx_new to t, break from the loop to stop
except nx.NetworkXNoPath:
continue
for tx_new in tx_after: # fix from_s
actual_s = actual_endpts[sy][sx]
actual_t = actual_endpts[ty][tx_new]
if actual_s is None or actual_t is None:
continue
try: # same as the try above
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
if restricted_box and r_box_factor != 0:
max_dx, max_dy = highway_span(path)
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
continue
all_highways.append((path, hor_ver, direction))
for i in range(1, len(path)):
curr_node, prev_node = path[i], path[i - 1]
highway_edge, conj_edge = (prev_node, curr_node), (curr_node, prev_node)
assert highway_edge in G_directed.edges
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
if visited_add_cost > 0:
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] \
+ visited_add_cost
if new_weight > max_allowed_cost != 0:
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else:
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
print("from_s solved")
break # if found path from sx_new to t, break from the loop to stop
except nx.NetworkXNoPath:
continue
# vertical
hor_ver = 0
for sx in range(len(actual_endpts[0])): # number of cols
tx = sx
direction = sx % 2 # direction: alternating 0 and 1
for sy in range(len(actual_endpts) - 1): # number of rows - 1
ty = sy + 1
# get actual highway endpoints for s and t for the current highway, based on direction, order matters
if not direction:
temp_x, temp_y = sx, sy
sx, sy = tx, ty
tx, ty = temp_x, temp_y
actual_s = actual_endpts[sy][sx]
actual_t = actual_endpts[ty][tx]
# only lay highway between two adjacent, not None highway endpoints
if actual_s is None or actual_t is None:
continue # TODO: what about non-adjacent?
from_s, to_t = False, False
try:
# get dijkstra_path of the highway. if direction=1: from s to t; if direction=0: from t to s
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
# check if the highway is in the required box range, if not, the highway is improper, remove it
if restricted_box and r_box_factor != 0:
# get the max difference in x (horizontal span) & y (vertical span) in a highway
max_dx, max_dy = highway_span(path)
# if any (x or y) span is larger than the box, remove the highway by 'continue' without adding
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
raise nx.NetworkXNoPath('box error')
# append path & path info to all_highways in format ([hw as list], hor_ver, direction) tuple
all_highways.append((path, hor_ver, direction))
# update edge properties in G_directed so later plans have access to the updated version
for i in range(1, len(path)):
curr_node = path[i] # current highway node
prev_node = path[i - 1] # previous highway node
highway_edge = (prev_node, curr_node) # highway_edge (in the direction of highway)
conj_edge = (curr_node, prev_node) # conj_edge (in the opposite direction of highway)
assert highway_edge in G_directed.edges # highway_edge should be in G_directed for sure
# remove conjugate edge in the path => avoid conflict!!!
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
# update edge cost in the path by add visited_add_cost => avoid traffic jam!!!
if visited_add_cost > 0: # update weight
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] + visited_add_cost
if new_weight > max_allowed_cost != 0: # if too much traffic, remove
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else: # otherwise, add cost to weight
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
from_s, to_t = True, True # if path found, should set flags True's
except nx.NetworkXNoPath:
print("\tno path between {} and {}".format(actual_s, actual_t))
if not from_s and not to_t:
if sx < tx: # to the down
sx_prev = [i for i in range(sx - 1, -1, -1)]
tx_after = [i for i in range(tx + 1, len(actual_endpts[sy]))]
else: # to the up
sx_prev = [i for i in range(sx + 1, len(actual_endpts[sy]))]
tx_after = [i for i in range(tx - 1, -1, -1)]
for sx_new in sx_prev: # fix to_t
actual_s = actual_endpts[sy][sx_new]
actual_t = actual_endpts[ty][tx]
if actual_s is None or actual_t is None:
continue
try: # same as the try above
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
if restricted_box and r_box_factor != 0:
max_dx, max_dy = highway_span(path)
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
continue
all_highways.append((path, hor_ver, direction))
for i in range(1, len(path)):
curr_node, prev_node = path[i], path[i - 1]
highway_edge, conj_edge = (prev_node, curr_node), (curr_node, prev_node)
assert highway_edge in G_directed.edges
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
if visited_add_cost > 0:
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] \
+ visited_add_cost
if new_weight > max_allowed_cost != 0:
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else:
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
print("to_t solved")
break # if found path from sx_new to t, break from the loop to stop
except nx.NetworkXNoPath:
continue
for tx_new in tx_after: # fix from_s
actual_s = actual_endpts[sy][sx]
actual_t = actual_endpts[ty][tx_new]
if actual_s is None or actual_t is None:
continue
try: # same as the try above
path = nx.dijkstra_path(G_directed, actual_s, actual_t)
if restricted_box and r_box_factor != 0:
max_dx, max_dy = highway_span(path)
if (hor_ver == 1 and max_dx > x_width * r_box_factor) \
or (hor_ver == 0 and max_dy > y_width * r_box_factor):
print("\tno path between {} and {} in the restricted box".format(actual_s, actual_t))
continue
all_highways.append((path, hor_ver, direction))
for i in range(1, len(path)):
curr_node, prev_node = path[i], path[i - 1]
highway_edge, conj_edge = (prev_node, curr_node), (curr_node, prev_node)
assert highway_edge in G_directed.edges
if conj_edge in G_directed.edges:
G_directed.remove_edge(conj_edge[0], conj_edge[1])
if visited_add_cost > 0:
new_weight = G_directed.edges[highway_edge[0], highway_edge[1]]['weight'] \
+ visited_add_cost
if new_weight > max_allowed_cost != 0:
G_directed.remove_edge(highway_edge[0], highway_edge[1])
else:
G_directed.edges[highway_edge[0], highway_edge[1]].update({'weight': new_weight})
print("from_s solved")
break # if found path from sx_new to t, break from the loop to stop
except nx.NetworkXNoPath:
continue
return all_highways
def build_highway_on_empty(all_highways):
"""
Build highway from all_highways on a new empty graph, only storing highway information (node, direction, etc.)
:param all_highways: all highways in a list returned from plan_highways
:return G_hw: new graph with ONLY these highways, node in format ut.HighwayCoord
"""
G_hw = nx.Graph()
# each highway segment in all_highways is in format ([hw as list], hor_ver, direction)
for highway, hor_ver, direction in all_highways:
assert len(highway) > 1
prev_dir = '' # store direction of the previous node (use for when reach the end)
# add highway nodes to G_hw
for i in range(len(highway)):
# get direction depending on current node & next node on the highway (4 cases)
index, index_next = i, i + 1
node = highway[index]
x_curr, y_curr = node[0], node[1]
next_node = highway[index_next] if -len(highway) < index_next < len(highway) else None
if next_node: # when next node exists: get direction of node based on next_node
dir = ''
x_next, y_next = next_node[0], next_node[1]
# 4 kinds of relation between 2 connected grid node
if x_curr == x_next and y_next == y_curr + 1: # up/north
dir = 'u'
elif x_curr == x_next and y_next == y_curr - 1: # down/south
dir = 'd'
elif y_next == y_curr and x_next == x_curr + 1: # right/east
dir = 'r'
elif y_next == y_curr and x_next == x_curr - 1: # left/west
dir = 'l'
else:
assert dir != '' # next node exist, direction must be one of the four, must not be ''
else: # next node does not exist => end of hw => put direction as X for placeholder
dir = 'X'.upper()
prev_dir = dir
# init a temp highway node with x, y index, only to check if G_hw contain this node already
temp_hw_node = ut.HighwayCoord(x_curr, y_curr, '')
if temp_hw_node in G_hw:
# if the node is already in the G_hw, get new_label = append dir to the original dir
original_node = G_hw.nodes[temp_hw_node]
original_label = original_node['label']
new_label = original_label + dir
G_hw.remove_node(temp_hw_node) # remove the original node
else:
# if the node not in G_hw yet, simply new_label = dir
new_label = dir
# add the new node to G_hw, with new_label as the direction label (possibly contain multiple directions)
new_node = ut.HighwayCoord(x_curr, y_curr, new_label)
G_hw.add_node(new_node, pos=(x_curr, y_curr), label=new_label)
# add highway edges to G_hw (undirected)
for i in range(len(highway)):
index, index_next = i, i + 1
node = highway[index]
x_curr, y_curr = node[0], node[1]
next_node = highway[index_next] if -len(highway) < index_next < len(highway) else None
if next_node: # when next node exists
x_next, y_next = next_node[0], next_node[1]
n_hw_curr = ut.HighwayCoord(x_curr, y_curr, '')
n_hw_next = ut.HighwayCoord(x_next, y_next, '')
G_hw.add_edge(n_hw_curr, n_hw_next)
# TODO: when next node does not exist, should still add edge based on direction, otherwise miss end edge
return G_hw
def highway_post_processing(G_original, G_highway_only, all_highways):
"""
Return several graphs storing different graphs for later use
:param G_original: (ut.Coord) the undirected original graph returned from load_G
:param G_highway_only: (ut.HighwayCoord) highway graph G_hw returned from build_highway_on_empty
:param all_highways: all highways as list returned from plan_highways => [([hw as list], hor_ver, direction)]
:return G_together: (ut.HighwayCoord) containing highway nodes, town nodes, temp lock nodes
:return G_town_only: (ut.Coord) containing only non-highway nodes (possibly town & locks, no distinction)
:return G_highway_only: (ut.HighwayCoord) same G_highway_only as passed in, no change from build_highway_on_empty
:return G_temp_locks: (ut.HighwayCoord) containing only temp lock nodes with no edges
:return G_final_town: (ut.HighwayCoord) containing all non-highway nodes (possibly town & locks, no distinction)
"""
all_actual_highways = [hw[0] for hw in all_highways] # actual highway locations list
all_hw_nodes = [*set(itertools.chain(*all_actual_highways))] # all highway nodes, no repeat
# remove all hw_nodes from G_original => G_original now becomes town nodes only
for hw_node in all_hw_nodes:
G_original.remove_node(hw_node)
# init G_town_only, containing only town nodes (ut.Coord)
G_town_only = deepcopy(G_original)
# init G_final_town as empty graph (later should contain towns & temporary locks)
G_final_town = nx.Graph()
# init G_together from G_highway_only, containing only highway nodes (ut.HighwayCoord)
G_together = deepcopy(G_highway_only)
# init G_temp_locks as empty graph (later should contain temporary locks only)
G_temp_locks = nx.Graph()
for town_node in G_town_only.nodes: # iterate each town node (ut.Coord) in G_town_only
tx, ty = town_node[0], town_node[1] # town x, y index
# surroundings: 4 surrounding nodes of town_node (ut.HighwayCoord no label)
surrounding1 = ut.HighwayCoord(tx + 1, ty, '')
surrounding2 = ut.HighwayCoord(tx, ty + 1, '')
surrounding3 = ut.HighwayCoord(tx - 1, ty, '')
surrounding4 = ut.HighwayCoord(tx, ty - 1, '')
# if any surroundings is also in G_highway_only (a highway nodes) => mark town_node as temp 'lock'
if surrounding1 in G_highway_only or surrounding2 in G_highway_only \
or surrounding3 in G_highway_only or surrounding4 in G_highway_only:
# add it to G_together & G_temp_locks as a temp lock
G_together.add_node(ut.HighwayCoord(tx, ty, 'lock'), pos=(tx, ty), label='lock')
G_temp_locks.add_node(ut.HighwayCoord(tx, ty, 'lock'), pos=(tx, ty), label='lock')
# otherwise, keep town_node as a normal town node with '.'
else:
# add it to G_together as a town
G_together.add_node(ut.HighwayCoord(tx, ty, '.'), pos=(tx, ty), label='.')
# either way, add town_node to G_final_town (ut.HighwayCoord label '.')
G_final_town.add_node(ut.HighwayCoord(tx, ty, '.'), pos=(tx, ty), label='.')
# add all edges between towns in G_together and G_final_town
for tn_u, tn_v in G_town_only.edges:
tn_u_x, tn_u_y = tn_u[0], tn_u[1]
tn_v_x, tn_v_y = tn_v[0], tn_v[1]
u_node, v_node = ut.HighwayCoord(tn_u_x, tn_u_y, '.'), ut.HighwayCoord(tn_v_x, tn_v_y, '.')
G_together.add_edge(u_node, v_node)
G_final_town.add_edge(u_node, v_node)
return G_together, G_town_only, G_highway_only, G_temp_locks, G_final_town
def get_valid_locks(G_together, G_temp_locks, G_final_town):
"""
Process temp_locks and get the actual valid locks (since many are invalid and leads to possible errors)
:param G_together: (ut.HighwayCoord) containing highway nodes, town nodes, temp lock nodes
:param G_temp_locks: (ut.HighwayCoord) containing only temp lock nodes with no edges
:param G_final_town: (ut.HighwayCoord) containing all non-highway nodes (possibly town & locks, no distinction)
:return G_locks: (ut.HighwayCoord) contain the finalized, valid, true locks (next to town & highway)
:return G_together_new: (ut.HighwayCoord): updated, containing highway nodes, town nodes, valid lock nodes
"""
# init G_locks as empty graph (later should contain valid locks only)
G_locks = nx.Graph()
# init G_together_new from G_together (containing all highway nodes, town nodes, temp lock nodes)
G_together_new = deepcopy(G_together)
# separate original towns as a list (towns are connected components in G_final_town, should include temp locks)
towns = [G_final_town.subgraph(c).copy() for c in nx.connected_components(G_final_town)]
for town_and_lock in towns: # iterate each town in all towns
# town_temp_locks: temp locks (from G_temp_locks) in the town (town_and_lock) as a list
town_temp_locks = [t_node for t_node in town_and_lock.nodes if t_node in G_temp_locks.nodes]
town_temp_locks_copy = deepcopy(town_temp_locks) # init town_temp_locks_copy (to keep a copy of locks)
# town_without_lock: from town_and_lock remove temp locks => town nodes only!
town_without_lock = deepcopy(town_and_lock)
for l in town_temp_locks:
town_without_lock.remove_node(l)
# 1. Greedy fix for problem type 1: lock not connected any town nodes => cannot reach from town
for l in town_temp_locks_copy: # iterate each temp lock l (ut.HighwayCoord)
if l not in town_temp_locks: # make sure this temp lock is still in town_temp_locks (keep updating)