Package Bio :: Package SeqIO :: Module generic
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.generic

  1  #!/usr/bin/env python 
  2  # Created: Tue Sep 11 17:21:54 2001 
  3  # Last changed: Time-stamp: <01/09/19 13:28:22 thomas> 
  4  # thomas@cbs.dtu.dk, http://www.cbs.dtu.dk/thomas/index.html 
  5  # File: generic.py 
  6  # based on Brads's code 
  7  # 
  8  # This code is part of the Biopython distribution and governed by its 
  9  # license.  Please see the LICENSE file that should have been included 
 10  # as part of this package. 
 11   
 12  import warnings 
 13  warnings.warn("Bio.SeqIO.generic is deprecated." \ 
 14                + " We hope the new code in Bio.SeqIO will be suitable for" \ 
 15                + " most users.  Please get in touch on the mailing lists if" \ 
 16                + " this (or its removal) causes any problems for you.", 
 17                DeprecationWarning) 
 18   
 19  import sys 
 20  import os, re, time 
 21  sys.path.insert(0, os.path.expanduser('~thomas/cbs/python/biopython')) 
 22   
 23  import string 
 24  import Bio.Alphabet 
 25   
 26  from Bio.Seq import Seq 
 27  #from Bio.SeqRecord import SeqRecord 
 28   
29 -class SeqRecord:
30 # possible backwards incompatibility ! 31 # all id and descriptions are stripped - NO MORE '\n'
32 - def __init__(self, seq, id = "<unknown id>", name = "<unknown name>", 33 description = "<unknown description>"):
34 self.seq = seq 35 self.id = id 36 self.name = name 37 self.description = description 38 # annotations about the whole sequence 39 self.annotations = {} 40 41 # annotations about parts of the sequence 42 self.features = []
43
44 - def __str__(self):
45 res = '' 46 res += '%s %s' % (self.name, self.seq.data) 47 return res
48 49
50 -class GenericFormat:
51 - def __init__(self, instream=None, outstream=None, 52 alphabet = Bio.Alphabet.generic_alphabet, 53 start_indicator = None):
54 self.instream = instream 55 self.outstream = outstream 56 self.alphabet = alphabet 57 self._n = -1 58 self._lookahead = None 59 self.start_indicator = start_indicator
60
61 - def find_start(self):
62 # find the start of data 63 line = self.instream.readline() 64 l = len(self.start_indicator) 65 while line and line[:l] != self.start_indicator: 66 line = self.instream.readline() 67 self._lookahead = line 68 self._n = 0
69
70 - def get_header(self, line):
71 try: 72 x = string.split(line[1:-1], None, 1) 73 if len(x) == 1: 74 id = x[0].strip() 75 desc = "" 76 else: 77 id, desc = [x.strip() for x in x] 78 79 except: 80 print >> sys.stderr, 'Unable to get header !!!' 81 print >> sys.stderr, 'Offending line:', line 82 sys.exit(0) 83 84 return (id, desc)
85
86 - def next(self):
87 self._n = self._n + 1 88 89 line = self._lookahead 90 if not line: return None 91 92 id, desc = self.get_header(line) 93 lines = [] 94 line = self.instream.readline() 95 l = len(self.start_indicator) 96 while line: 97 if line[:l] == self.start_indicator: 98 break 99 lines.append(line[:-1]) 100 line = self.instream.readline() 101 102 self._lookahead = line 103 104 return SeqRecord(Seq(string.join(lines, ""), self.alphabet), 105 id = id, name = id, description = desc)
106
107 - def __getitem__(self, i):
108 # wrapper to the normal Python "for spam in list:" idiom 109 assert i == self._n # forward iteration only! 110 x = self.next() 111 if x is None: 112 raise IndexError, i 113 return x
114
115 - def write(self, record):
116 pass
117
118 - def write_records(self, records):
119 # In general, can assume homogenous records... useful? 120 for record in records: 121 self.write(record)
122
123 - def close(self):
124 return self.outstream.close()
125
126 - def flush(self):
127 return self.outstream.flush()
128
129 -class FastaFormat(GenericFormat):
130 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
131 GenericFormat.__init__(self, instream, outstream, alphabet, '>') 132 if instream: self.find_start()
133
134 - def write(self, record):
135 id = record.id 136 description = record.description 137 138 self.outstream.write(">%s %s%s" % (id, description,os.linesep)) 139 140 data = record.seq.tostring() 141 for i in range(0, len(data), 60): 142 self.outstream.write(data[i:i+60] + os.linesep)
143
144 -class LargeFastaFormat(GenericFormat):
145 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
146 GenericFormat.__init__(self, instream, outstream, alphabet) 147 self.entries = None
148
149 - def next(self):
150 if not self.entries: 151 txt = instream.read() 152 self.entries = txt.split('>')[1:] 153 self._n = -1 154 155 self._n += 1 156 if self._n >= len(self.entries): return None 157 158 entry = self.entries[self._n] 159 160 name,seq= entry.split(os.linesep,1) 161 name, desc = self.get_header(name) 162 163 seq = seq.replace(os.linesep,'') 164 return SeqRecord(Seq(seq, self.alphabet), id = name, 165 name = name, description = desc)
166 167
168 - def write(self, record):
169 id = record.id 170 description = record.description 171 172 self.outstream.write(">%s %s%s" % (id, description,os.linesep)) 173 174 data = record.seq.tostring() 175 for i in range(0, len(data), 60): 176 self.outstream.write(data[i:i+60] + os.linesep)
177
178 -class PirFormat(GenericFormat):
179 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
180 GenericFormat.__init__(self, instream, outstream, alphabet, '>P1;') 181 if instream: self.find_start()
182
183 - def write(self, record):
184 id = record.id 185 assert os.linesep not in id 186 description = record.description 187 assert os.linesep not in description 188 189 self.outstream.write(">P1;%s %s%s" % (id, description,os.linesep)) 190 191 data = record.seq.tostring() 192 for i in range(0, len(data), 60): 193 self.outstream.write(data[i:i+60] + os.linesep) 194 195 if data[-1] != '*': 196 self.outstream.write("*" + os.linesep)
197
198 -class EMBLFormat(GenericFormat):
199 order = ['AC', 'DT', 'DE', 'GN', 'OS', 'OC', 'DR'] 200
201 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
202 GenericFormat.__init__(self, instream, outstream, alphabet, 'ID ') 203 if instream: self.find_start()
204 205 206
207 - def next(self):
208 self._n = self._n + 1 209 210 line = self._lookahead 211 if not line: return None 212 213 dict = {} 214 while line: 215 if line[:2] == '//': break 216 feature = line[:2] 217 if feature == ' ': feature = 'SQ' 218 dict.setdefault(feature, []) 219 dict[feature].append(line[5:].strip()) 220 221 line = self.instream.readline() 222 assert 'ID' in dict.keys() 223 224 self._lookahead = self.instream.readline() 225 226 seq = Seq(string.join(dict['SQ'][1:], ''), self.alphabet) 227 ID = dict['ID'][0].split()[0] 228 229 rec = SeqRecord(seq, id = ID, name = ID, description = dict.get('DE',[])[0]) 230 rec.annotations = dict 231 232 return rec
233
234 - def write(self, record):
235 id = record.id 236 237 description = record.description 238 if description and not description[-1] == os.linesep: 239 description = description + os.linesep 240 241 dataclass = 'STANDARD;' 242 division = 'PRT;' # fix that to change for DNA sequence 243 length = len(record.seq) 244 245 dict = record.annotations 246 put = self.outstream.write 247 248 if dict.has_key('ID'): 249 put('ID %s' % dict['ID'][0]) 250 else: 251 put('ID %-12s%+12s%+10s% 6d AA.%s' % (id, dataclass, division, length, 252 os.linesep)) 253 254 features = record.annotations.keys() 255 if 'ID' in features: features.remove('ID') 256 if 'SQ' in features: features.remove('SQ') 257 258 for feature in self.order: 259 if not feature in features: continue 260 features.remove(feature) 261 for line in dict[feature]: 262 put('%s %s%s' % (feature, line, os.linesep)) 263 264 for feature in features: 265 if feature[0] == 'R': continue 266 # TODO 267 # fix the order of all R* features 268 for line in dict[feature]: 269 put('%s %s%s' % (feature, line, os.linesep)) 270 271 if dict.has_key('SQ'): 272 # put('SQ %s\n' % '\n '.join(dict['SQ'][1:])) 273 put('SQ %s' + os.linesep % os.linesep + ' '.join(dict['SQ'][1:])) 274 else: 275 put('SQ SEQUENCE%4d AA;' % (length,os.linesep)) 276 data = record.seq.tostring() 277 for i in range(0, len(data), 60): 278 put(data[i:i+60] + os.linesep) 279 280 put('//' + os.linesep)
281
282 -class GCGFormat(GenericFormat):
283 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
284 GenericFormat.__init__(self, instream, outstream, alphabet)
285
286 - def next(self):
287 line = instream.readline() 288 if not line: return None 289 290 desc = '' 291 while 1: 292 if line.find('..') > -1: break 293 desc += line.strip() 294 line = instream.readline() 295 296 id = line.split()[0] 297 298 seq = '' 299 while 1: 300 line = instream.readline() 301 if not line: break 302 seq += re.sub('[^a-zA-Z-]','',line).upper() 303 304 return SeqRecord(Seq(seq, self.alphabet), 305 id = id, name = id, description = desc)
306 307
308 - def write(self, record):
309 id = record.id 310 description = record.description 311 312 put = self.outstream.write 313 314 if not description: description = id 315 put(description) 316 if description[-1] != os.linesep : put(os.linesep) 317 318 timestamp = time.strftime('%B %d, %Y %H:%M', time.localtime(time.time())) 319 put('%s Length: %d %s Type: P%s' % (id, len(record.seq), timestamp, os.linesep)) 320 data = record.seq.tostring() 321 for i in range(0, len(data), 60): 322 put('% 6d %s%s' % (i+1,data[i:i+60],os.linesep)) 323 324 put(os.linesep)
325
326 -class ClustalFormat(GenericFormat):
327 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
328 GenericFormat.__init__(self, instream, outstream, alphabet) 329 self.sequences = None 330 self.ids = None
331
332 - def ParseAlignment(self):
333 line = self.instream.readline() 334 if not line: return 335 if not line[:7] == 'CLUSTAL': 336 print >> sys.stderr, 'Warning file does not start with CLUSTAL header' 337 338 dict = {} 339 self.ids = [] 340 while 1: 341 line = self.instream.readline() 342 if not line: break 343 if line[0] == ' ': continue 344 fields = line.split() 345 if not len(fields): continue 346 name, seq = fields 347 if not name in self.ids: self.ids.append(name) 348 dict.setdefault(name, '') 349 dict[name] += seq.upper() 350 351 self.sequences = dict 352 self._n = -1
353 354
355 - def next(self):
356 if not self.ids: self.ParseAlignment() 357 358 self._n += 1 359 if self._n >= len(self.ids): return None 360 361 name = self.ids[self._n] 362 seq = self.sequences[name] 363 364 return SeqRecord(Seq(seq, self.alphabet),id = name, name = name, 365 description = 'Clustal Alignment')
366
367 -class NexusFormat(GenericFormat):
368 - def __init__(self, instream=None, outstream=None, alphabet = Bio.Alphabet.generic_alphabet):
369 GenericFormat.__init__(self, instream, outstream, alphabet) 370 self.sequences = None 371 self.ids = None
372
373 - def ParseNexus(self):
374 line = self.instream.readline() 375 if not line: return 376 377 self.ids = [] 378 379 found_data, found_matrix = 0,0 380 while 1: 381 # search for the data block 382 if line.lower().find('begin data;') > -1: 383 found_data = 1 384 break 385 line = self.instream.readline() 386 if not line: break 387 388 while 1: 389 # search for the matrix block 390 if line.lower().find('matrix') > -1: 391 found_matrix = 1 392 break 393 line = self.instream.readline() 394 if not line: break 395 396 397 dict = {} 398 while 1: 399 # read name, sequence pairs until first ';' 400 line = self.instream.readline() 401 if not line: break 402 if line.find(';') > -1: break 403 404 fields = line.split() 405 if len(fields) <2: continue 406 407 name = fields[0] 408 if not name in self.ids: self.ids.append(name) 409 dict.setdefault(name, '') 410 dict[name] += ''.join(fields[1:]) 411 412 self.sequences = dict 413 self._n = -1
414 415
416 - def next(self):
417 if not self.ids: self.ParseNexus() 418 419 self._n += 1 420 if self._n >= len(self.ids): return None 421 422 name = self.ids[self._n] 423 seq = self.sequences[name] 424 425 return SeqRecord(Seq(seq, self.alphabet),id = name, name = name, 426 description = '')
427
428 -class ReadSeq:
429 - def __init__(self):
430 self.fdict = { 431 'fasta': FastaFormat, 432 'largefasta': LargeFastaFormat, 433 'embl': EMBLFormat, 434 'pir': PirFormat, 435 'gcg': GCGFormat, 436 'clustal': ClustalFormat, # read only 437 'nexus': NexusFormat, # read only 438 }
439 440
441 - def Convert(self, informat, outformat, instream=None, outstream=None):
442 instream = instream or sys.stdin 443 outstrem = outstream or sys.stdout 444 445 if instream == '-': instream = sys.stdin 446 if outstream == '-': outstream = sys.stdout 447 if type(instream) != type(sys.stdin): instream = open(instream) 448 if type(outstream) != type(sys.stdout): outstream = open(outstream, 'w+') 449 450 try: 451 reader = self.fdict[informat.lower()](instream=instream) 452 writer = self.fdict[outformat.lower()](outstream = outstream) 453 except: 454 print >> sys.stderr, 'Unknown format: %s -> %s' % (informat, outformat) 455 return 456 457 while 1: 458 rec = reader.next() 459 if not rec: break 460 writer.write(rec)
461 462 463 if __name__ == '__main__': 464 465 readseq = ReadSeq() 466 467 try: 468 instream = sys.argv[1] 469 informat = sys.argv[2] 470 outstream = sys.argv[3] 471 outformat = sys.argv[4] 472 except IndexError: 473 p = os.path.basename(sys.argv[0]) 474 print >> sys.stderr, 'Usage: %s <instream> <informat> <outstream> <outformat>' % p 475 print >> sys.stderr, '\twhere "-" can be used for stdin resp. stdout' 476 print >> sys.stderr, '\tKnown formats: %s' % ', '.join(readseq.fdict.keys()) 477 478 print >> sys.stderr, '%s\te.g. %s eftu.fas fasta eftu.emb embl' % (os.linesep, p) 479 print >> sys.stderr, '\tor zcat test.aln.gz | %s - clustal - fasta' % p 480 sys.exit(0) 481 482 readseq.Convert(informat, outformat, instream, outstream) 483