Package Bio :: Package Sequencing :: Module Phd
[hide private]
[frames] | no frames]

Source Code for Module Bio.Sequencing.Phd

  1  """ 
  2  Parser for PHD files output by PHRED and used by PHRAP and 
  3  CONSED. 
  4   
  5  Works fine with PHRED 0.020425.c 
  6   
  7  Version 1.1, 03/09/2004 
  8  written by Cymon J. Cox (cymon@duke.edu) and Frank Kauff (fkauff@duke.edu) 
  9  Comments, bugs, problems, suggestions to one uf us are welcome! 
 10   
 11  Uses the Biopython Parser interface for parsing: ParserSupport.py 
 12   
 13  """ 
 14   
 15  import os 
 16  from types import * 
 17   
 18  from Bio import File 
 19  from Bio import Index 
 20  from Bio import Seq 
 21  from Bio import SeqRecord 
 22  from Bio.ParserSupport import * 
 23  from Bio.Alphabet import IUPAC 
 24   
 25  CKEYWORDS=['CHROMAT_FILE','ABI_THUMBPRINT','PHRED_VERSION','CALL_METHOD',\ 
 26          'QUALITY_LEVELS','TIME','TRACE_ARRAY_MIN_INDEX','TRACE_ARRAY_MAX_INDEX',\ 
 27          'TRIM','TRACE_PEAK_AREA_RATIO','CHEM','DYE'] 
 28   
29 -class Record:
30 """Hold information from a PHD file 31 32 """
33 - def __init__(self):
34 self.file_name = '' 35 self.comments={} 36 for kw in CKEYWORDS: 37 self.comments[kw.lower()]=None 38 self.sites = [] 39 self.seq = '' 40 self.seq_trimmed = ''
41 42
43 -class Iterator:
44 """Iterates over a file of multiple PHD records 45 46 Methods: 47 next Return the next record from the stream, or None. 48 """ 49
50 - def __init__(self, handle, parser=None):
51 """__init__(self, handle, parser=None) 52 53 Create a new iterator. handle is a file-like object. parser 54 is an optional Parser object to change the results into another form. 55 If set to None, then the raw contents of the file will be returned. 56 """ 57 58 if type(handle) is not FileType and type(handle) is not InstanceType: 59 raise ValueError, "I expected a file handle or file-like object" 60 self._uhandle = File.UndoHandle(handle) 61 self._parser = parser
62
63 - def next(self):
64 """next(self) -> object 65 66 Return the next PHD record from the file. If no more records 67 return None. 68 """ 69 70 lines = [] 71 while 1: 72 line = self._uhandle.readline() 73 if not line: 74 break 75 # If a new record, then put the line back and stop. 76 if lines and line[:14] == 'BEGIN_SEQUENCE': 77 self._uhandle.saveline(line) 78 break 79 lines.append(line) 80 81 if not lines: 82 return None 83 84 data = ''.join(lines) 85 if self._parser is not None: 86 return self._parser.parse(File.StringHandle(data)) 87 return data
88
89 - def __iter__(self):
90 return iter(self.next, None)
91
92 -class RecordParser(AbstractParser):
93 """Parses PHD file data into a Record object 94 95 """
96 - def __init__(self):
97 self._scanner = _Scanner() 98 self._consumer = _RecordConsumer()
99
100 - def parse(self, handle):
101 if isinstance(handle, File.UndoHandle): 102 uhandle = handle 103 else: 104 uhandle = File.UndoHandle(handle) 105 self._scanner.feed(uhandle, self._consumer) 106 return self._consumer.data
107 108
109 -class _Scanner:
110 """Scans a PHD-formatted file 111 112 Methods: 113 feed - Feed one PHD record. 114 """
115 - def feed(self, handle, consumer):
116 """feed(self, handle, consumer) 117 118 Feed in PHD data for scanning. handle is a file-like object 119 containing PHD data. consumer is a Consumer object that will 120 receive events as the PHD data is scanned. 121 """ 122 assert isinstance(handle, File.UndoHandle), \ 123 "handle must be an UndoHandle" 124 if handle.peekline(): 125 self._scan_record(handle, consumer)
126
127 - def _scan_record(self, uhandle, consumer):
128 self._scan_begin_sequence(uhandle, consumer) 129 self._scan_comments(uhandle, consumer) 130 self._scan_dna(uhandle, consumer) 131 consumer.end_sequence()
132
133 - def _scan_begin_sequence(self, uhandle, consumer):
134 read_and_call(uhandle, consumer.begin_sequence, start = 'BEGIN_SEQUENCE')
135
136 - def _scan_comments(self, uhandle, consumer):
137 138 read_and_call_while(uhandle, consumer.noevent, blank=1) 139 read_and_call(uhandle, consumer.noevent, start = 'BEGIN_COMMENT') 140 read_and_call_while(uhandle, consumer.noevent, blank=1) 141 142 while 1: 143 for kw in CKEYWORDS: 144 if attempt_read_and_call(uhandle,getattr(consumer,kw.lower()),start=kw+':'): 145 break # recognized keyword: end for loop and do another while 146 else: 147 break # no keywords found: end while loop 148 149 read_and_call_while(uhandle, consumer.noevent, blank=1) 150 read_and_call(uhandle, consumer.noevent, start = 'END_COMMENT')
151
152 - def _scan_dna(self, uhandle, consumer):
153 while 1: 154 line = uhandle.readline() 155 if is_blank_line(line) or line == 'BEGIN_DNA\n': 156 continue 157 elif line == 'END_DNA\n': 158 break 159 consumer.read_dna(line)
160 161
162 -class _RecordConsumer(AbstractConsumer):
163 """Consumer that converts a PHD record to a Record object 164 165 """
166 - def __init__(self):
167 self.data = None
168
169 - def begin_sequence(self, line):
170 self.data = Record() 171 self.data.file_name = line[15:].rstrip()
172
173 - def end_sequence(self):
174 self.data.seq = Seq.Seq(''.join([n[0] for n in self.data.sites]), IUPAC.IUPACAmbiguousDNA()) 175 first = self.data.comments['trim'][0] 176 last = self.data.comments['trim'][1] 177 self.data.seq_trimmed = Seq.Seq(self.data.seq.tostring()[first : last], IUPAC.IUPACAmbiguousDNA())
178
179 - def chromat_file(self, line):
180 self.data.comments['chromat_file'] = line[13:-1].strip()
181
182 - def abi_thumbprint(self, line):
183 self.data.comments['abi_thumbprint'] = int(line[15:-1].strip())
184
185 - def phred_version(self, line):
186 self.data.comments['phred_version'] = line[14:-1].strip()
187
188 - def call_method(self, line):
189 self.data.comments['call_method'] = line[12:-1].strip()
190
191 - def quality_levels(self, line):
192 self.data.comments['quality_levels'] = int(line[15:-1].strip())
193
194 - def time(self, line):
195 self.data.comments['time'] = line[5:-1].strip()
196
197 - def trace_array_min_index(self, line):
198 self.data.comments['trace_array_min_index'] = int(line[22:-1].strip())
199
200 - def trace_array_max_index(self, line):
201 self.data.comments['trace_array_max_index'] = int(line[22:-1].strip())
202
203 - def trim(self, line):
204 first, last, prob = line[5:-1].split() 205 self.data.comments['trim'] = (int(first), int(last), float(prob))
206
207 - def trace_peak_area_ratio(self, line):
208 self.data.comments['trace_peak_area_ratio'] = float(line[22:-1].strip())
209
210 - def chem(self, line):
211 self.data.comments['chem'] = line[5:-1].strip()
212
213 - def dye(self, line):
214 self.data.comments['dye'] = line[4:-1].strip()
215
216 - def read_dna(self, line):
217 base, quality, location = line.split() 218 self.data.sites.append((base, quality, location))
219