-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgtools.py
11119 lines (10765 loc) · 451 KB
/
gtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# vim: set syntax=none nospell:
# #####################
# Script name: gtools.py
# Created by: geoff.mcnamara@gmail.com
# Created on: 2018
# Purpose: this is a quick-n-dirty set of tools that might be useful
# ... much room for improvment and lots of danger points - not recommended for anything you care about # noqa:
# ... use this file in anyway and all your data could be at risk ;)
# ... lots of debugging code still remains and legacy code that should be removed # noqa:
# ... this file is constantly changing so nothing should depend on it ;)
# ... depends on python3!
# LICENSE = MIT
# WARNING: Use at your own risk!
# Side effects include but not limited: severe nosebleeds and complete loss of data.
# This file is subject to frequent change!
# Please contact me anytime. I willingly accept compliments and I tolerate complaints (most of the time).
# Requires: docopt, dbug.py, gftools.py, and others
# #####################################
# To see an overview:
# python3 -m pydoc gtools
# note: because dbug is pulled in with import you can call it drectly from this module as well, also funcname
# to have access:
# in bash --- export PYTHONPATH="${PYTHONPATH}:/home/geoffm/dev/python/gmodules"
# or
# import sys
# sys.path.append("/home/geoffm/dev/python/gmodules")
# #####################################
# ### IMPORTS ### #
import os
import sys
import shutil
import glob
import re
import subprocess
from datetime import datetime # , date
from math import ceil
from urllib.request import urlopen
import inspect
from inspect import (getframeinfo, currentframe)
from docopt import docopt
import time
import requests
import threading
import configparser
import itertools
# ### GLOBALS ### #
# console = Console()
dtime = datetime.now().strftime("%Y%m%d-%H%M")
SCRIPT = os.path.basename(__file__)
styles_d = {'normal': '0', 'bold': '1', 'bright': 1, 'dim': '2', 'italic': '3', 'underline': '4', 'blink': '5', 'fast_blink': '6', 'reverse': '7', 'crossed_out': '9'}
fg_colors_d = {'black': ";30",
'red': ";31",
'green': ";32", # I don't like "their" green, should "my" green (;38) instead or even better is rgb(0,215,0)
# 'green': ";38",
'yellow': ';33',
'blue': ';34',
'magenta': ';35',
'cyan': ';36',
'white': ';37',
'normal': '38',
'bold normal': ';39'}
bg_colors_d = {'black': "40",
'red': "41",
'green': "42",
'yellow': "43",
"blue": "44",
'magenta': "45",
"cyan": "46",
'white': "47",
'normal': '48',
'normal1': '49'}
# ############### #
# ### CLASSES ### #
# ############### #
# ############
class Spinner:
# ########
"""
purpose: prints a spinner in place
input: msg="": str style='bar': ellipsis, pipe, box, vbar, growing_bar, missing_box, solid_box, arrow, clock, bar, balloons, moon, dot, braille, pulse
prog|progressive|progress: bool
color: str
txt_color: str
elspsed: bool
requires:
import sys
import threading
import itertools
return: none
use:
with Spinner("Working...", style='bar')
long_proc()
with Spinner("Doing long work...", style='vbar', progressive=True, colors=shades('red'))
for x in range(100):
time. sleep(1)
"""
def __init__(self, message="", *args, delay=0.2, style="pipe", **kwargs):
"""--== debugging ==--"""
# dbug(f"class: Spinner {funcname()} style: {style} args: {args} kwargs: {kwargs}")
# dbug(args)
# dbug(kwargs)
"""--== config ==--"""
# color = kvarg_val(["color", "spinner_color", "spinner_clr"], kwargs, dflt="")
txt_color = kvarg_val(["text_color", "txt_color", "text_clr", "txt_clr"], kwargs, dflt="")
self.color = kvarg_val(["color", 'spinner_clr', 'spinner_color', 'clr'], kwargs, dflt="")
# if isinstance(color, list):
self.colors = kvarg_val(["colors", "color", "spinner_colrs", "spinner_clrs"], kwargs, dflt=[])
self.COLOR = sub_color(self.color)
txt_color = kvarg_val(["txt_color", 'txt_clr'], kwargs, dflt="")
self.TXT_COLOR = sub_color(txt_color)
self.elapsed = bool_val(['elapsed', 'elapse', "time", "timed"], args, kwargs, dflt=False)
# dbug(self.elapsed)
time_color = kvarg_val(["time_color", 'time_clr', 'elapsed_clr', 'elapsed_time_clr', 'elapsed_color', 'elapse_color', 'elapse_clr'], kwargs, dflt=txt_color)
# dbug(time_color)
self.time_color = time_color
self.TIME_COLOR = sub_color(time_color)
# self.elapsed_time = 0
self.etime = bool_val(["etime", "show_elapsed", 'elpased_time', 'elapsed', 'time'], args, kwargs, dflt=False)
self.centered = bool_val(['center', 'centered'], args, kwargs, dflt=False)
self.RESET = sub_color('reset')
self.start_time = time.time()
# dbug(self.etime)
self.style = style = kvarg_val(["style", 'type'], kwargs, dflt=style)
# dbug(self.style)
self.prog = bool_val("prog", args, kwargs, dflt=True)
self.style_len = kvarg_val(["length", "width"], kwargs, dflt=4)
"""--== set default ==--"""
if isinstance(self.colors, str):
# just to make sure it is a list before going forward
self.colors = [self.colors]
spinner_chrs = ['.', '.', '.', '.'] # the default will be ellipsis
# self.prog = True # the default
"""--== SEP_LINE ==--"""
if style in ('ellipsis', 'elipsis', 'elipses', 'ellipses'): # for the spelling challenged like me
spinner_chrs = ['.', '.', '.', '.']
# spinner_chrs = ['.' * self.style_len]
self.style_len = len(spinner_chrs)
self.prog = True
# dbug(f"Style: {self.style} prog: {self.prog} style_len: {self.style_len} color: {self.color} time_color: {self.time_color}")
if style == "pipe":
spinner_chrs = ['/', '-', '\\', '|']
self.prog = False
if style == "arrow":
# spinner_chrs = ['⬍', '⬈', '⮕', '⬊', '⬍', '⬋', '⬅', '⬉']
spinner_chrs = ["←", "↖", "↑", "↗", "→", "↘", "↓", "↙"]
self.prog = False
if style == 'clock':
spinner_chrs = ['🕛', '🕧', '🕐', '🕜', '🕑', '🕝', '🕒', '🕞', '🕓', '🕟', '🕔', '🕠', '🕕', '🕡', '🕖', '🕢' '🕗', '🕣', '🕘', '🕤', '🕙', '🕥', '🕚', '🕦']
self.style_len = len(spinner_chrs)
self.prog = False
if style == 'moon':
spinner_chrs = ['🌑', '🌘', '🌗', '🌖', '🌕', '🌔', '🌓', '🌒']
self.prog = False
if style == 'vbar':
spinner_chrs = ['_', '▁', '▂', '▃', '▄', '▅', '▆', '▇', '█', '▇', '▆', '▅', '▄', '▃', '▂', '▁', '_']
self.style_len = len(spinner_chrs)
if style in ('growing_bar', 'pump'):
spinner_chrs = ['\u258F', '\u258E', '\u258D', '\u258C', '\u258B', '\u258A', '\u2589', '\u2588', '\u2589', '\u258A', '\u258B', '\u258C', '\u258D', '\u258E', '\u258F', ' ']
self.prog = False
if style == 'bar':
spinner_chrs = ['█', '█', '█', '█', '█']
self.prog = True
self.style_len = 20
if style == 'braille':
spinner_chrs = ['⣾', '⣷', '⣯', '⣟', '⡿', '⢿', '⣻', '⣽']
self.prog = False
if style == 'dot':
spinner_chrs = ['⠁', '⠂', '⠄', '⡀', '⢀', '⠠', '⠐', '⠈']
self.prog = False
if style == 'box' or style == 'boxes':
spinner_chrs = ['◰', '◳', '◲', '◱']
self.prog = False
if style == 'solid_box' or style == 'solid_boxes':
spinner_chrs = ['\u2598', '\u259D', '\u2597', '\u2596']
self.prog = False
if style == 'missing_box' or style == 'hole_boxes':
spinner_chrs = ['\u259F', '\u2599', '\u259B', '\u259C']
self.prog = False
if style == 'balloons':
spinner_chrs = ['.', 'o', 'O', '@', '*']
self.prog = False
if style == 'pulse':
spinner_chrs = ['.', 'o', 'O', 'o']
self.prog = False
if style == 'batball':
spinner_chrs = ['q', 'p', 'b']
self.prog = False
# self.style_len = len(spinner_chrs)
self.chr_cnt = 1
self.clr_cnt = 0
self.spinner = itertools.cycle(spinner_chrs)
self.delay = kvarg_val("delay", kwargs, dflt=delay)
self.busy = False
self.spinner_visible = False
message = printit(message, "str", 'noprnt', color=txt_color)
# dbug(type(message))
if self.centered:
spinner_len = 1
if self.prog:
spinner_len = len(spinner_chrs)
if self.etime:
spinner_len += 2
shift = -(spinner_len)
message = printit(message, 'centered', prnt=False, rtrn_type="str", color=txt_color, shift=shift)
sys.stdout.write(message)
"""--== SEP_LINE ==--"""
def write_next(self):
# write the next chr in spinner_chrs
with self._screen_lock:
if not self.spinner_visible:
# dbug(self.colors)
if len(self.colors) > 1:
color = self.colors[self.clr_cnt]
# dbug(color)
self.COLOR = sub_color(color)
# dbug(f"color: {color} chr_cnt: {self.clr_cnt} repr(self.COLOR): {repr(self.COLOR)}")
self.clr_cnt += 1
if self.clr_cnt > len(self.colors) - 1:
self.clr_cnt = 0
spinner_chr = self.COLOR + str(next(self.spinner)) + self.RESET
if self.style == 'clock':
if self.chr_cnt >= self.style_len:
self.chr_cnt = 0
else:
self.chr_cnt += 1
if self.style == 'clock':
if nclen(spinner_chr) > 1:
# I have absolutely no idea why this is needed but it is. Believe me I beat this to death - it causes a 'blink'
# dbug(f"self.style: {self.style} self.chr_cnt: {self.chr_cnt} self.style_len: {self.style_len} spinner_chr: {spinner_chr} nclen(spinner_chr)) {nclen(spinner_chr)}")
sys.stdout.write(' ')
else:
# dbug(f"self.style: {self.style} self.chr_cnt: {self.chr_cnt} self.style_len: {self.style_len} spinner_chr: {spinner_chr} nclen(spinner_chr)) {nclen(spinner_chr)}")
sys.stdout.write(spinner_chr)
else:
# dbug(f"self.style: {self.style} self.chr_cnt: {self.chr_cnt} self.style_len: {self.style_len} spinner_chr: {spinner_chr} nclen(spinner_chr)) {nclen(spinner_chr)}")
sys.stdout.write(spinner_chr)
# sys.stdout.write(spinner_chr)
self.spinner_visible = True
sys.stdout.flush()
"""--== SEP_LINE ==--"""
def spin_backover(self, cleanup=False):
with self._screen_lock:
# _self_lock is the thread
if self.spinner_visible:
if not self.prog:
# backover chr if not prog (progressive)
# dbug(f"self.style: {self.style} self.chr_cnt: {self.chr_cnt} self.style_len: {self.style_len}")
sys.stdout.write('\b')
else:
# if progressive then ...
# clr_cnt = self.chr_cnt % len(self.colors)
# self.COLOR = self.colors[clr_cnt]
# dbug(self.COLOR)
# dbug(f"prog: {self.prog} chr_cnt: {self.chr_cnt} style_len: {self.style_len}")
self.chr_cnt += 1
if self.chr_cnt > self.style_len:
# if we hit the len of style_len... print elapsed time... then...
if self.etime:
elapsed_time = round(time.time() - self.start_time, 2)
elapsed_time = f"{elapsed_time:>6}" # if we go over 999 seconds we will be in trouble with the length
# dbug(elapsed_time)
sys.stdout.write(f" {self.TIME_COLOR}{elapsed_time}{self.RESET}")
# time.sleep(self.delay)
sys.stdout.write("\b" * (len(str(elapsed_time)) + 1))
# sys.stdout.write(self.RESET)
sys.stdout.flush()
while self.chr_cnt > 1:
# while chr_cnt is more than 1... back over
self.chr_cnt -= 1
sys.stdout.write("\b")
sys.stdout.write(" ")
sys.stdout.write("\b")
time.sleep(self.delay)
sys.stdout.flush()
sys.stdout.write(" ")
sys.stdout.write("\b")
sys.stdout.flush()
time.sleep(self.delay)
# self.chr_cnt = 0
if self.style in ('clock', 'moon'):
sys.stdout.write('\b')
# if self.chr_cnt > self.style_len:
# #dbug("how did we get here") # not sure if this section is needed
# if self.etime:
# elapsed_time = round(time.time() - self.start_time, 2)
# sys.stdout.write(f" {self.TIME_COLOR}{elapsed_time}{self.RESET}")
sys.stdout.write('\x1b[?25l') # Hide cursor
self.spinner_visible = False
if cleanup:
sys.stdout.write(' ') # overwrite spinner with blank
sys.stdout.write('\r') # move to next line
sys.stdout.write("\x1b[?25h") # Restore cursor
sys.stdout.flush()
"""--== SEP_LINE ==--"""
def spin_proc(self):
while self.busy:
self.write_next()
if self.etime and not self.prog:
elapsed_time = round(time.time() - self.start_time, 2)
elapsed_time = f"{elapsed_time:>6}" # this is to assure proper back over length - if we go over 999 seconds we are in trouble with the length
sys.stdout.write(f" {self.TIME_COLOR}{elapsed_time}{self.RESET}")
# dbug(len(elapsed_time))
# time.sleep(self.delay)
sys.stdout.write("\b" * (len(elapsed_time) + 1))
time.sleep(self.delay)
self.spin_backover()
"""--== SEP_LINE ==--"""
def __enter__(self):
try:
if sys.stdout.isatty():
self._screen_lock = threading.Lock()
self.busy = True
self.thread = threading.Thread(target=self.spin_proc)
self.thread.start()
except Exception as Error:
dbug(Error)
pass
"""--== SEP_LINE ==--"""
def __exit__(self, exc_type, exc_val, exc_traceback):
self.busy = False
if sys.stdout.isatty():
# self.busy = False
# end_time = time.time()
# self.elapsed_time = (end_time - self.start_time)
# sys.stdout.write(f"({self.elapsed_time})")
self.spin_backover(cleanup=True)
sys.stdout.write('\r')
sys.stdout.flush()
print() # ??? tries to force the cursor to the next line
else:
sys.stdout.write("\x1b[?25h") # Restore cursor
sys.stdout.write('\r')
sys.stdout.flush()
print() # ??? tries to force the cursor to the next line
# ### class Spinner: ### #
# ##################
def Spinner_demo():
# ##############
"""
purpose: demo of Spinner
"""
with Spinner("[red on black]Working...approx: 5 sec[/] ", 'elapsed', style='vbar', prog=False,
colors=shades('white'), elapsed_clr="yellow! on black", centered=True):
time.sleep(5)
styles = ["ellipsis", "dot", "bar", "vbar", "moon", "pipe", "arrow", "balloons", 'braille', 'pulse', 'box', 'clock']
# colors = ["red", "blue", "green", "yellow", "magenta", "white", "cyan"]
# with Spinner(f"Demo progressive vbar ", 'centered', 'prog', 'elapsed', style='vbar', colors=colors):
# time.sleep(5)
red_shades = ['rgb(20,0,0)', 'rgb(40,0,0)', 'rgb(60,0,0)', 'rgb(80,0,0)', 'rgb(100,0,0)',
'rgb(120,0,0)', 'rgb(140,0,0)', 'rgb(160,0,0)', 'rgb(180,0,0)', 'rgb(200,0,0)',
' rgb(220,0,0)', 'rgb(240, 0,0)', 'rgb(254,0,0)', 'rgb(254,0,0)', 'rgb(254,0,0)', 'rgb(254,0,0)']
while True:
style = gselect(styles, 'centered', 'quit', rtrn="v")
# style = gselect(styles, 'centered', rtrn="v", colors=colors)
if style in ("q", "Q", ""):
break
wait_time = cinput("For how many seconds: (default=15) ") or 15
color = cinput("Input a spinner color Note: if you enter a list (eg ['red', 'green', 'blue']) then the spinner will rotate colors accordingly, default is shades of red: ") or red_shades
txt_color = cinput("Input a text color, default is normal: ") or 'normal'
time_color = cinput("Input a time color, default is 'yellow! on black': ") or 'yellow! on black'
printit("Note: some spinner styles will not use the progressive setting...", 'centered')
prog = askYN("Should we try progressive?", "n", 'centered')
printit([f"Demo: Spinner(style: {style}", f"txt_clr: {txt_color}", f"spinner_clr: {color[:4]}...", f"time_clr: {time_color}", f"Progressive: {prog}... "], 'centered')
with Spinner("Demo: Spinner: working... ): ", 'centered', 'elapsed', color=color, txt_color=txt_color, elapsed_clr=time_color, style=style, prog=prog):
time.sleep(wait_time) # to simulate a long process
# #########################
class Transcript:
# #####################
"""
Transcript - direct print output to a file, in addition to terminal. It appends the file target
Usage:
import transcript
transcript.start('logfile.log')
print("inside file")
transcript.stop()
print("outside file")
Requires:
import sys
class Transcript
def transcript_start
def transcript_stop
This should be moved to gtools.py
"""
def __init__(self, filename):
self.terminal = sys.stdout
self.logfile = open(filename, "a")
"""--== SEP_LINE ==--"""
def write(self, message):
self.terminal.write(message)
self.logfile.write(message)
"""--== SEP_LINE ==--"""
def flush(self):
# this flush method is needed for python 3 compatibility.
# this handles the flush command by doing nothing.
# you might want to specify some extra behavior here.
pass
# ### class Transcript: ### #
def write_file(data, fname, *args, **kwargs):
"""
purpose: writes data to a file typically as a csv file (or a .dat file) - creates a new file or appends an existing file
options:
- colnames: list (adds hdr line if not currently there)
- comment_lines: str | list
- bak: bool
- ind: bool
- prnt: bool
- ask: bool
- append: bool # whether to append data - the default=False whereby submitted data will overwrite the file
Notes:
- only use this to write a data file = either csv (or a dat file which is a csv file with the header commented)
- data should be a list of lists (rows of cols)
- assumes first line is comma delimited colnames or use colnames option (in a dat file the first line will be commented)
- all comments will go to/near the top
"""
"""--== Debugging ==--"""
# dbug(funcname())
# dbug(data[:3])
# dbug(args)
# dbug(kwargs)
"""--== Config ==--"""
prnt = bool_val(["print", "prnt", "show", "verbose"], args, kwargs, dflt=True) # True for now TODO
ask_b = bool_val(["ask"], args, kwargs, dflt=True) # True for now TODO
id_flag = bool_val(["orig_id_flag", "id_flag", "indx", "idx", "id"], args, kwargs, dflt=False)
bak = bool_val(["bak"], args, kwargs, dflt=True)
# comment_lines = kvarg_val(["comment_lines"], kwargs, dflt=[])
colnames = kvarg_val(["colnames", "col_names", "cols"], kwargs, dflt=[])
append_b = bool_val(["append", "a"], args, kwargs, dflt=False)
"""--== Local Import(s) ==--"""
# import csv
"""--== Init ==--"""
# writefile = fname
data_lines = []
csv_lines = [] # this will become a list of comma delimited strings/lines and will include a header string/line as the first line
content_lines = [] # lines read from the file (if it exists) - all non-comment lines
comment_lines = [] # comments lines read from the file if it exists
file_type = "csv" # this will either be a 'csv' or a 'dat' based on file name extension
"""--== Convert ==--"""
if isinstance(data, str):
# dbug(f"Converting a data if str into a list... data: {data}")
data = [data]
for row in data:
# dbug(row)
if isinstance(row, list):
if isinstance(row, list):
row = comma_join(row)
csv_lines.append(row)
if isinstance(colnames, list) and colnames != []:
# colnames was given
dbug(colnames)
hdr_line = comma_join(colnames)
# data.insert(0, ", ".join(colnames))
"""--== Validate ==--"""
if not isinstance(colnames, list):
dbug("Colnames must be a list ... colnames: {colnames} ... returning...")
return
if islol(data):
# dbug(data)
for row in data:
# dbug(row)
new_row = comma_join(row)
# dbug(new_row)
data_lines.append(new_row)
else:
dbug("Data is not a list of lists and it should be. data: {data}")
return
if len(data[0]) < 2: # arbitrary
dbug("Data should have more content: {data[:3]}")
return
"""--== Init ==--"""
if file_exists(fname):
# content_lines = cat_file(fname, 'lst')
with open(fname, "r") as f:
lines = f.readlines()
for ln in lines:
ln = ln.rstrip("\n")
if ln.startswith("#"):
comment_lines.append(ln)
content_lines.append(ln)
if not append_b:
dbug(data_lines)
if fname.endswith(".dat"):
file_type = "dat"
hdr_line = content_lines[0].lstrip("#")
data_lines = data_lines[1:]
else:
hdr_line = content_lines[0]
#### test hdr_line?? ###
# TODO deal with id vs no_id: id_flag
# dbug(file_type)
# dbug(hdr_line)
# dbug(comment_lines)
# dbug(data_lines)
# dbug(content_lines)
# dbug(csv_lines)
"""--== WIP below ==--"""
# are we ready to write out the file
# cp file first
if bak:
import shutil
bak_fname = fname + ".bak"
if file_exists(bak_fname):
shutil.copy(bak_fname, fname + ".prev")
shutil.copy(fname, fname + ".bak")
printit(f"File: {fname} has been backed up to: {fname}.bak", 'centered')
if append_b:
# do_append_file()
with open(fname, "a") as f:
for ln in data:
if not ln.endswith("\n"):
ln += "\n"
f.write(ln)
else:
# do_write_file()
# either creates a new file or overwites existing file - above is for appending
new_lines = []
if file_type == "dat":
dbug(hdr_line)
for ln in comment_lines:
new_lines.append(ln)
for ln in data_lines:
new_lines.append(ln)
"""--== SEP_LINE ==--"""
dbug(new_lines, 'lst')
"""--== SEP_LINE ==--"""
dbug('ask')
with open(fname, "w") as f:
for ln in new_lines:
if isinstance(ln, list):
ln = comma_join(ln)
if not ln.endswith("\n"):
ln += "\n"
f.write(ln)
if prnt:
printit(f"The file fname: {fname} has be re-written...", 'centered')
return data
# ### EOB def write_file() ### #
# #############################
def transcript_start(filename, *args, **kwargs):
# ##########################
"""
Start class Transcript(object=filename),
appending print output to given filename
"""
prnt = bool_val(["prnt", 'printit', 'show'], args, kwargs, dflt=False)
centered_b = bool_val(['centered'], args, kwargs, dflt=False)
if prnt:
printit(f"Starting transcript out to {filename}", centered=centered_b)
sys.stdout = Transcript(filename)
# ########################
def transcript_stop(*args, **kwargs):
# ###################
"""
Stop class Transcript() and return print functionality to normal
"""
prnt = bool_val(["prnt", 'printit', 'show'], args, kwargs, dflt=False)
centered_b = bool_val(['centered'], args, kwargs, dflt=False)
sys.stdout.logfile.close()
sys.stdout = sys.stdout.terminal
if prnt:
printit("Stopping transcript out", centered=centered_b)
def transcript_demo():
file = "./gtools-transcript-demo.out"
if file_exists(file):
printit(f"Removing existing file: {file}", 'centered')
os.remove(file)
printit(f"We are now going to write all output to file: {file}", 'centered')
transcript_start(file, 'prnt', 'centered')
printit("This output will also be found in file: {file}", 'boxed', 'centered')
transcript_stop('prnt', 'centered')
printit(f"Here are the contents of file: {file}", 'centered')
contents = cat_file(file)
printit(contents)
# #######################################
class ThreadWithReturn(threading.Thread):
# ###################################
"""
requires:
import threading
usage:
run_cmd_threaded(cmd)
which does this ...
dbug("Just getting started...")
cmd = "/home/geoffm/ofrd.sh"
t1 = ThreadWithReturn(target=run_cmd, args=(cmd,))
t1.start()
dbug("Done")
result = t1.join()
dbug(result)
"""
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
"""--== SEP_LINE ==--"""
def run(self):
self._return = self._target(*self._args, **self._kwargs)
"""--== SEP_LINE ==--"""
def join(self):
threading.Thread.join(self)
return self._return
# # #############
# class DoMenu:
# # #########
# """
# deprecated for gselect
# NOTE: to test this: echo 1 | python3 -m doctest -v ./gtools.py
# Use:
# Make sure you have the functions called for:
# do_item1() do_item2() do_item3() do_item4() etc
# Note: this is a dictionary ... hence do_select({title: function})
# # >>> main_menu = DoMenu("Main")
# # >>> main_menu.add_selection({"item1":cls})
# # >>> main_menu.do_select() # doctest: +ELLIPSIS
# # ============================================================
# # Main
# # ============================================================
# # 1 item1...
# # ...
# """
# # #####################
# def __init__(self, name):
# self.name = name
# # dbug(f"args:{self.args}")
# self.selections = {}
# self.choices = {}
# self.length = 60
# self.cls = False
# self.center = True
#
# def add_selection(self, selection):
# """
# selection needs to be a dictionary element ie:
# {"get this done":do_it} where do_it is actually a function name
# """
# # dbug(f"selection:{selection}")
# if not isinstance(selection, dict):
# dbug("Selection: " + selection + "must be a dictionary")
# self.selections.update(selection)
# """--== SEP_LINE ==--"""
# def do_select(self):
# if self.cls:
# cls()
# cnt = 0
# # dbug(f"self.itms:{self.itms}")
# # dbug(f"self.selections:{self.selections}")
# mmax = max(self.selections)
# mmax_len = len(mmax)
# side_len = mmax_len / 2
# position = int((self.length + 2) / 2 - side_len)
# # dbug(f"mmax:{mmax} mmax_len:{mmax_len} side_len:{side_len} position:{position}") # noqa:
# do_title_three_line(self.name, self.length)
# # do_line("=",self.length)
# # do_title(self.name,"+",self.length)
# # do_line("=",self.length)
# #
# self.add_selection({"Quit [Qq] or blank": exit})
# for k, v in self.selections.items():
# cnt += 1
# print(f"{cnt:{position}} {k}")
# # print("{:position}} {}".format(cnt, k))
# # dbug(f"k:{k} v:{v}")
# self.choices.update({str(cnt): v})
# doline("=", self.length)
# # prompt = input(f"Please make your selection [q=quit or 1-{cnt}]: ")
# prompt = input("Please make your selection [q=quit or 1-{}]: ".format(cnt))
# ans = input(centered(prompt, self.length))
# doline("=", self.length)
# # dbug(f"Selected: {ans} Action: {self.choices[ans]}")
# # dbug(f"self.choices:{self.choices}")
# if ans in ("q", "Q", ""):
# sys.exit()
# ret = self.choices[ans]()
# # dbug(f"returning ret: {ret}")
# return ret
# # EOB class DoMenu:
# ### FUNCTIONS ### #
# ################
def funcname(n=0):
# ############
"""
purpose: returns current function name - primarily used for debugging
"""
return sys._getframe(n + 1).f_code.co_name
# ###########
def lineno():
# #######
"""
purpose: returns current line number - primarily used for debugging
"""
cf = currentframe()
return str(cf.f_back.f_lineno)
# ##############################################
def ddbug(msg="", *args, ask=False, exit=False):
# ##########################################
"""
purpose: this is for use by dbug only... as dbug can't call itself
"""
"""--== Debugging ==--"""
# print(f"funcname: {funcname()}")
# print(f"ask: {ask}")
"""--== SEP_LINE ==--"""
# dbug = DBUG if DBUG else 1
# my_centered = True if 'center' in args else False
# my_centered = True if 'centered' in args else centered
ask = True if 'ask' in args else False
exit = True if 'exit' in args else False
# cf = currentframe()
filename = str(inspect.getfile(currentframe().f_back))
filename = os.path.basename(filename)
fname = str(getframeinfo(currentframe().f_back).function)
fname = os.path.basename(fname)
lineno = str(inspect.currentframe().f_back.f_lineno)
# msg = "DEBUG: [" + {fname} + ":" + {lineno} + "] " + {msg}
# if file:
# msg = "DEBUG: [" + filename + ";" + fname + "(..):" + lineno + "] " + msg
# else:
# msg = "DEBUG: [ " + str(fname) + "():" + str(lineno) + " ] " + str(msg)
msg = f"*** DDBUG ***: [{filename}; {fname} (..): {lineno} ] {msg}"
# from gtools import printit # imported here to avoid circular referece
try:
print(msg)
except Exception as e:
print(f"print({msg}) failed Error: {e}")
print(msg)
if ask:
# from gtools import askYN
ans = input("Do you want to exit? ")
if ans in ("y", "Y", "yes", "Yes"):
sys.exit()
if exit:
sys.exit()
return msg
# ### EOB def ddbug(msg="", *args, ask=False, exit=False): ### #
# ############
def dbug(xvar="", *args, exit=False, ask=False, prnt=True, **kwargs):
# ########
"""
purpose: display DEBUG file, function, linenumber and your msg of local variable content
required: xvar: str ... can be a local variable, a python statement, a string, a list
options:
-ask: bool # forces a breakpoint, stops execution ans asks the user before continuing
-here: bool # will only return the DEBUG info (file, function. linenumber) as a string without printing - typically used like this boxed("my box", footer=dbug('here'))
-boxed: bool # boxes debug output
-box_color: str # declares box_color default is 'red! on black'
-color: str # declares text output color
-centered: bool # centers the debug output
-lst: bool # forces printing the variable contents as a list (like printit()) - allows displaying ansii codes properly
-titled: bool # only works when 'boxed'... puts the DEBUG info into the title of a box This should probably be the default...
-footerred: bool # only works when 'boxed'... puts the DEBUG info into footer of a box
returns: prints out debug info (unless 'here' option invoked)
notes: Enjoy!
To test:
run: python3 -m doctest -v dbug.py
# >>> a = "xyz"
# >>> dbug_var(a)
DEBUG: [dbug.py; <module>:1] a:xyz
'1'
"""
"""--== Imports ==--"""
# from gtools import bool_val, askYN, kvarg_val, boxed, centered # kvarg_val
"""--== Config ==--"""
center = bool_val(['center', 'centered'], args, kwargs)
box = bool_val(['box', 'boxed'], args, kwargs, dflt=False)
color = kvarg_val('color', kwargs, dflt='')
# box_color = kvarg_val('box_color', kwargs, dflt="red! on black")
box_color = kvarg_val('box_color', kwargs, dflt="white! on red")
# if 'center' in args or 'centered' in args:
# center = True
DBUG = kvarg_val(['dbug', 'DBUG'], kwargs, dflt=True) # True unless specified as False
if not DBUG:
# the idea here is if dbug looks like this: {where DBUG is True or >0) dbug(..., dbug=DBUG) then this func will honor the value of DBUG (ie True or False)
# this should ease the pain for change ... now you can do this
# DBUG = bool_val('dbug', args)
# and then dbug(f"whatever here", dbug=DBUG)
# ddbug(f"DBUG: {DBUG} therefore I am returning [empty handed sort of speak]")
# further... you could put DBUG = 0 (or False) in GLOBAL vars and use docopts to turn on or off DBUG
return
EXIT = bool_val('exit', args, kwargs, dflt=False)
ask_b = bool_val('ask', args, kwargs, dflt=False)
lineno_b = bool_val(['lineno_b', 'lineno', 'line_no', 'line_number'], args, kwargs, dflt=False)
here_b = bool_val('here', args, kwargs, dflt=False)
# title = kvarg_val("title", kwargs, dflt="")
# footer = kvarg_val("footer", kwargs, dflt="")
list_b = bool_val(["lst", "list"], args, kwargs, dflt=False) # print the list in lines if xvar contains a list (like printit for dbug)
titled = bool_val("titled", args, kwargs, dflt=True) # consider making this the default - this puts the title=debug_info in the box
footered = bool_val(["footered", "footerred"], args, kwargs, dflt=False) # yes, this is footerred <-- TODO probably need to change or eliminate this
"""--== Init ==--"""
# title = ""
# footer = ""
if footered:
box = True
"""--== SEP_LINE ==--"""
# ddbug(f"xvar: {xvar}")
if str(xvar) == 'lineno':
lineno_b = True
if str(xvar) == 'here':
here_b = True
if str(xvar) == 'ask':
ask_b = True
# see below where msg_literal gets changed when it is == 'ask'
# fullpath filename
i_filename = str(inspect.getfile(currentframe().f_back))
# filename without path
filename = os.path.basename(i_filename)
# fname = function name
fname = str(getframeinfo(currentframe().f_back).function)
lineno = str(inspect.currentframe().f_back.f_lineno)
if lineno_b:
return lineno
if here_b:
# if 'only' in str(xvar).lower():
# return f"{filename}:{fname}:{lineno}"
# if 'now' in str(xvar).lower():
# return f"DEBUG: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} [{filename}:{fname}:{lineno}]"
return f"DEBUG: [{filename}:{fname}:{lineno}]"
# this inconsistently holds what we want for the most part ...just wrong format
# you would have to put this directly into your code and include a msg
# ie eg: <frame at 0x7f4d6e4b25e0, file '/home/geoffm/././t.py', line 3000, code tdbug>
frame = inspect.currentframe().f_back
def do_prnt(rtrn_msg, *args, ask=ask_b, **kwargs): # , ask=ask_b, centered=centered):
titled = bool_val("titled", args, kwargs, dflt=True)
footered = bool_val("footered", args, kwargs, dflt=False)
# title = kvarg_val("title", kwargs, dflt="")
footer = kvarg_val("footer", kwargs, dflt="")
to_prnt = f"{rtrn_msg}"
# ddbug(rtrn_msg)
if "\n" in to_prnt:
to_prnt = "\n" + to_prnt
to_prnt = f"DEBUG: [{filename}:{fname}:{lineno}] {rtrn_msg}"
# to_prnt = f"DEBUG: [{filename}:{fname}:{lineno}] {rtrn_msg} {msg_literal}:"
# ddbug(f"to_prnt: {to_prnt}")
if box:
# dbug(f"titled: {titled}")
if titled or footered:
phrases = to_prnt.split(']')
# ddbug(f"phrases: {phrases}")
if titled:
# ddbug(titled)
my_title = phrases[0] + "]"
# ddbug(f"phrases: {phrases}")
# ddbug(f"phrases[0]: {phrases[0]}")
# ddbug(f"my_title: {my_title}")
to_prnt = phrases[1]
# ddbug(f"to_prnt: {to_prnt}")
# this is such a kludge ... arghhh TODO !!!!!
work_l = phrases[1].split("\n")
# ddbug(f"work_l[0]: {work_l[0]}")
# ddbug(f"msg_literal: {msg_literal}")
tst_first_line = work_l[0].strip()
tst_first_line = tst_first_line.rstrip(":")
# ddbug(f"tst_first_line: {tst_first_line}")
if tst_first_line == msg_literal.strip():
# ddbug("bingo-bongo")
# ddbug(f"work_l[1:]: {work_l[1:]}")
to_prnt = "\n".join(work_l[1:])
if footered:
footer = phrases[0] + "]"
to_prnt = phrases[1]
to_prnt = boxed(to_prnt, color=color, box_color=box_color, title=my_title, footer=footer)
if center:
# num_rows, columns = os.popen("stty size", "r").read().split()
# lfill = (int(columns) - len(to_prnt)) // 2
# to_prnt = " " * lfill + to_prnt
to_prnt = centered(to_prnt)
if isinstance(to_prnt, list):
# dbug(center)
# ddbug(to_prnt)
for ln in to_prnt:
# ddbug(ln)
# print(phrases[1] + "?: " + ln)
print(ln)
else:
print(f"{to_prnt}")
if ask_b:
# ddbug(f"ask_b: {ask_b}")
if not askYN("Continue?: ", "y", centered=center):
sys.exit()
# ddbug('ask')
if EXIT:
sys.exit()
# title = kvarg_val('title', kvargs, "")
# ### EOB def do_prnt(rtrn_msg, *args, ask=ask_b, **kwargs): # , ask=ask_b, centered=centered): ### #
line_literal = inspect.getframeinfo(frame).code_context[0].strip() # the literal string including dbug(...)
# ddbug(f"line_literal: {line_literal}")
msg_literal = re.search(r"\((.*)?\).*", line_literal).group(1) # ?non-greedy search
# ddbug(f"msg_literal: {msg_literal}")
# ddbug(f"msg_literal: {msg_literal}")
if msg_literal.replace("'", "") == 'ask' or msg_literal.replace('"', '') == "ask":
# ddbug(f"msg_literal: {msg_literal} args: {args}")
ask_b = True
rtrn_msg = "Enter or 'y' to continue... anything else to exit... "
to_prnt = f"DEBUG: [{filename}:{fname}:{lineno}] {rtrn_msg}"
if not askYN(to_prnt):
sys.exit()
return
# try:
# ddbug(f"msg_literal: {msg_literal}")
# # import pyparsing as pp
# all_args = pp.commaSeparatedList.parseString(msg_literal).asList()
# msg_literal = all_args[0]
# ddbug(f"msg_literal: {msg_literal} all_args: {all_args}")
# # import abracadabra
# except Exception as Error:
all_args = msg_literal.split(",")
msg_literal = all_args[0]
"""--== SEP_LINE ==--"""
lvars = inspect.currentframe().f_back.f_locals
# ddbug(f"msg_literal: {msg_literal}")
if msg_literal.startswith('f"') or msg_literal.startswith("f'"):
do_prnt(xvar, args, ask=ask_b, footered=footered, title=titled)
return
if msg_literal.startswith('"') or msg_literal.startswith("'"):
# ddbug(f"msg_literal: {msg_literal}")
msg_literal = msg_literal[1:-1]
# ddbug(f"msg_literal: {msg_literal} lvas: {lvars}")
found = False
if msg_literal in lvars:
# ddbug(f"Looks like this msg_literal: {msg_literal} is in lvars")
found = True
# ddbug(f"found: {found}")
for k, v in lvars.items():
# ddbug(f"lvars: {lvars}")
# ddbug(f"testing msg_literal: {msg_literal} against k: {k}")
if msg_literal == k or found: # in loop below we might break out and miss testing if in lvars hence the found value
# ddbug(f"looks like msg_literal: {msg_literal} == k: {k}")
try:
if "\n" in str(xvar) and list_b:
xvar = xvar.split("\n")
xvar = [x for x in xvar if x != ""]
except Exception as Error:
ddbug(f"Error: {Error}")
pass
# so this is in the local vars so lets build it as a string
# ddbug(f"msg_literal: {msg_literal} xvar: {xvar}")
if isinstance(xvar, list) and list_b:
# contents of msg_literal (xvar) will be treated like printit as best we can
# ddbug(f"xvar: {xvar} islol(xvar): {islol(xvar)} islos(xvar): {islos(xvar)}")
# orig_xvar = xvar
if islol(xvar):
lines1 = []
# ddbug("here now")
if islos(xvar):
for elem in xvar:
lines1.append("\n".join(elem))
xvar = lines1
# ddbug(xvar)
else:
lines = xvar
xvar = "You used 'lst' with dbug. Too many levels deep. Using printit with a box."
# printit("You used 'lst' with dbug. Too many levels deep. Using printit with a box.")
my_prnt = f"DEBUG: [{filename}:{fname}:{lineno}] {msg_literal}"
printit(lines, 'boxed', title="Too many layers deep for dbug to handle. Just using printit (boxed)", footer=f"{my_prnt}", box_color="red on white")
return
list_b = False
# ddbug(lines)
if list_b:
# ddbug(xvar)
xvar = "\n" + "\n".join(xvar)
# ddbug(xvar)
# ddbug(f"type(orig_xvar): {type(orig_xvar)} islos(orig_xvar): {islos(orig_xvar)} type(xvar): {type(xvar)} islos(xvar): {islos(xvar)}")
if islos(xvar):
# ddbug(f"hmmm this looks like xvar: {xvar} islos", 'ask')
lines0 = []
for ln in xvar:
lines0.append(ln)
xvar = lines0
# ddbug(f"xvar: {xvar}")
# for ln in xvar:
# ddbug(f"ln: {ln}")