-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathconcatFasta.py
executable file
·103 lines (90 loc) · 2.27 KB
/
concatFasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python
import os
import sys
def main():
if len(sys.argv) <= 1:
print("No files provided!")
print('Usage: ./concatFasta.py *.fasta - or - ./concatFasta.py 1.fas 2.fas...')
sys.exit(1)
files = sys.argv[1:]
#print("concatenating fastas using the following order:")
#print("if this is incorrect, change something")
pre=None
samps=dict()
#loop through and get list of samples
for file in sorted(files):
for s in read_fasta(file):
samps[s[0]] = ""
for file in sorted(files):
#print(file)
pre=file.split("_")[0]
#get seqlen
seqlen = None
#seen
seen = dict()
for s in read_fasta(file):
seen[s[0]] = 0
seqlen = len(s[1])
samps[s[0]] = samps[s[0]] + s[1]
for key in samps.keys():
if key not in seen:
samps[key] = samps[key] + Nrepeats("N", seqlen)
print("Using prefix from files to write output:",pre)
oname = pre + ".fasta"
write_fasta(oname, samps)
def Nrepeats(pattern, N):
ret = ""
for i in range(int(N)):
ret = ret + str(pattern)
return(ret)
#write fasta from dict
def write_fasta(name, d):
with open(name, 'w') as fh:
try:
for sample in d.keys():
to_write = ">" + str(sample) + "\n" + d[sample] + "\n"
fh.write(to_write)
except IOError as e:
print("Could not read file:",e)
sys.exit(1)
except Exception as e:
print("Unexpected error:",e)
sys.exit(1)
finally:
fh.close()
#Read samples as FASTA. Generator function
def read_fasta(fas):
if os.path.exists(fas):
with open(fas, 'r') as fh:
try:
contig = ""
seq = ""
for line in fh:
line = line.strip()
if not line:
continue
#print(line)
if line[0] == ">": #Found a header line
#If we already loaded a contig, yield that contig and
#start loading a new one
if contig:
yield([contig,seq]) #yield
contig = "" #reset contig and seq
seq = ""
split_line = line.split()
contig = (split_line[0].replace(">",""))
else:
seq += line
#Iyield last sequence, if it has both a header and sequence
if contig and seq:
yield([contig,seq])
except IOError:
print("Could not read file ",fas)
sys.exit(1)
finally:
fh.close()
else:
raise FileNotFoundError("File %s not found!"%fas)
#Call main function
if __name__ == '__main__':
main()