Package Bio :: Package MultiProc :: Module copen
[hide private]
[frames] | no frames]

Source Code for Module Bio.MultiProc.copen

  1  """ 
  2  This implements a set of classes that wraps a file object interface 
  3  around code that executes in another process.  This allows you fork 
  4  many different commands and let the run concurrently. 
  5   
  6  Functions: 
  7  copen_sys     Open a file-like pipe to a system command. 
  8  copen_fn      Open a file-like pipe to a python function. 
  9   
 10  """ 
 11   
 12  import warnings 
 13  warnings.warn("Bio.MultiProc is deprecated. If you want to use this code, please let the Biopython developers know by sending an email to biopython-dev@biopython.org to avoid permanent removal of Bio.MultiProc.", 
 14                DeprecationWarning) 
 15   
 16  import os 
 17  import sys 
 18  import time 
 19  import signal 
 20   
21 -def copen_sys(syscmd, *args):
22 """copen_sys(syscmd, *args) -> file-like object 23 24 Open a file-like object that returns the output from a system 25 command. 26 27 """ 28 # python requires first element to be the path 29 if not args or args[0] != syscmd: 30 args = [syscmd] + list(args) 31 32 r, w = os.pipe() 33 er, ew = os.pipe() 34 35 pid = os.fork() 36 if pid == 0: # child process 37 os.close(r) 38 os.close(er) 39 os.dup2(w, sys.stdout.fileno()) 40 os.dup2(ew, sys.stderr.fileno()) 41 try: 42 os.execvp(syscmd, args) # execute it! 43 except: 44 sys.stderr.write("%s could not be executed\n" % syscmd) 45 os._exit(-1) 46 os._exit(0) 47 48 # parent 49 os.close(w) 50 os.close(ew) 51 return _ProcHandle(pid, os.fdopen(r, 'r'), os.fdopen(er, 'r'))
52
53 -def copen_fn(func, *args, **keywords):
54 """copen_fn(func, *args, **keywords) -> file-like object 55 56 Open a file-like object that returns the output from function 57 call. The object's 'read' method returns the return value from 58 the function. The function is executed as a separate process so 59 any variables modified by the function does not affect the ones in 60 the parent process. The return value of the function must be 61 pickle-able. 62 63 """ 64 try: 65 import cPickle as pickle 66 except ImportError: 67 import pickle 68 r, w = os.pipe() 69 er, ew = os.pipe() 70 71 pid = os.fork() 72 if pid == 0: # child process 73 cwrite, errwrite = os.fdopen(w, 'w'), os.fdopen(ew, 'w') 74 try: 75 output = func(*args, **keywords) 76 # Pickle may fail here is the object is not pickleable. 77 s = pickle.dumps(output, 1) 78 except: 79 import traceback 80 traceback.print_exc(file=errwrite) 81 errwrite.flush() 82 os._exit(-1) 83 try: 84 cwrite.write(s) 85 cwrite.flush() 86 except IOError, x: 87 # There can be an IOError if the parent is no longer 88 # listening. Ignore it. 89 pass 90 os._exit(0) 91 92 # parent 93 os.close(w) 94 os.close(ew) 95 return _PickleHandle(pid, os.fdopen(r, 'r'), os.fdopen(er, 'r'))
96 97
98 -class _ProcHandle:
99 """This object provides a file-like interface to a running 100 process. 101 102 Members: 103 pid what is the PID of the subprocess? 104 killsig what signal killed the child process? 105 status what was the status of the command? 106 107 Methods: 108 close Close this process, killing it if necessary. 109 fileno Return the fileno used to read from the process. 110 wait Wait for the process to finish. 111 poll Is the process finished? 112 elapsed How much time has this process taken? 113 read 114 readline 115 readlines 116 117 """
118 - def __init__(self, pid, cread, errread=None):
119 """Create a wrapper around a running process. pid is the 120 process ID. cread is the file object used to read from the 121 child. errread is an optional file object used to read errors 122 from the child. 123 124 """ 125 _active.append(self) 126 127 self.pid = pid 128 self.status = None 129 self.killsig = None 130 131 self._start, self._end = time.time(), None 132 self._cread, self._errread = cread, errread 133 self._output = [] 134 self._done = 0 135 self._closed = 0
136
137 - def __del__(self):
138 self.close() # kill the process
139
140 - def _kill(self):
141 """Kill the process and return killsig""" 142 try: 143 pid, ind = os.waitpid(self.pid, os.WNOHANG) 144 if pid == self.pid: # died 145 return 0 146 # First, try to kill it with a SIGTERM. 147 os.kill(self.pid, signal.SIGTERM) 148 # Wait .5 seconds for it to die. 149 end = time.time() + 0.5 150 while time.time() < end: 151 pid, ind = os.waitpid(self.pid, os.WNOHANG) 152 if pid == self.pid: 153 return ind & 0xff 154 time.sleep(0.1) 155 # It didn't die, so kill with a SIGKILL 156 os.kill(self.pid, signal.SIGKILL) 157 return signal.SIGKILL 158 except OSError: 159 pass
160
161 - def close(self):
162 """Close the process, killing it if it is still running.""" 163 # If this gets called in the middle of object initialization, 164 # the _closed attribute will not exist. 165 if not hasattr(self, '_closed') or self._closed: 166 return 167 # on cleanup, _active may not be defined! 168 if _active and self in _active: 169 _active.remove(self) 170 if not self._done: 171 self.killsig = self._kill() 172 self._end = time.time() 173 self.status = None 174 self.killsig = signal.SIGTERM 175 self._done = 1 176 self._output = [] 177 self._closed = 1
178
179 - def fileno(self):
180 """Return the file descriptor used to read from the process.""" 181 return self._cread.fileno()
182
183 - def readline(self):
184 """Return the next line or '' if finished.""" 185 self.wait() 186 if not self._output: 187 return '' 188 line = self._output[0] 189 del self._output[0] 190 return line
191
192 - def readlines(self):
193 """Return the output of the process as a list of strings.""" 194 self.wait() 195 output = self._output 196 self._output = [] 197 return output
198
199 - def read(self):
200 """Return the output as a string.""" 201 self.wait() 202 output = self._output 203 self._output = [] 204 return "".join(output)
205
206 - def wait(self):
207 """Wait for the process to finish.""" 208 import select 209 if self._done: 210 return 211 # wait until stuff's ready to be read 212 select.select([self], [], []) 213 self._cleanup_child()
214
215 - def poll(self):
216 """Return a boolean. Is the process finished running?""" 217 import select 218 if self._done: 219 return 1 220 # If I'm done, then read the results. 221 if select.select([self], [], [], 0)[0]: 222 self._cleanup_child() 223 return self._done
224
225 - def elapsed(self):
226 """Return the number of seconds elapsed since the process began.""" 227 if self._end: # if I've finished, return the total time 228 return self._end - self._start 229 return time.time() - self._start
230
231 - def _cleanup_child(self):
232 """Do necessary cleanup functions after child is finished running.""" 233 if self._done: 234 return 235 236 # read the output 237 self._output = self._cread.readlines() 238 self._cread.close() 239 if self._errread: 240 error = self._errread.read() 241 self._errread.close() 242 if error: 243 raise AssertionError, "Error in child process:\n\n%s" % error 244 # It would be nice to be able to save the exception 245 # and traceback somehow, and raise it in the parent. 246 #raise etype, value 247 # Remove myself from the active list. 248 if _active and self in _active: 249 _active.remove(self) 250 251 pid, ind = os.waitpid(self.pid, 0) 252 self.status, self.killsig = ind >> 8, ind & 0xff 253 self._end = time.time() 254 self._done = 1
255
256 -class _PickleHandle:
257 """ 258 259 Members: 260 pid what is the PID of the subprocess? 261 killsig what signal killed the child process? 262 status what was the status of the command? 263 264 Methods: 265 close Close this process, killing it if necessary. 266 fileno Return the fileno used to read from the process. 267 wait Wait for the process to finish. 268 poll Is the process finished? 269 elapsed How much time has this process taken? 270 read Return a Python object. 271 272 """
273 - def __init__(self, pid, cread, errread=None):
274 """Create a wrapper around a running process. pid is the 275 process ID. cread is the file object used to read from the 276 child. errread is an optional file object used to read errors 277 from the child. 278 279 """ 280 self._phandle = _ProcHandle(pid, cread, errread)
281
282 - def __getattr__(self, attr):
283 # This object does not support 'readline' or 'readlines' 284 if attr.startswith('readline'): 285 raise AttributeError, attr 286 return getattr(self._phandle, attr)
287
288 - def read(self):
289 """Return a Python object or ''.""" 290 try: 291 import cPickle as pickle 292 except ImportError: 293 import pickle 294 r = self._phandle.read() 295 if not r: 296 return r 297 return pickle.loads(r)
298 299 300 # Handle SIGTERM below 301 302 # Keep a list of all the active child processes. If the process is 303 # forcibly killed, e.g. by a SIGTERM, make sure the child processes 304 # die too. 305 _active = [] # list of _ProcHandle objects 306 307 _HANDLING = 0
308 -def _handle_sigterm(signum, stackframe):
309 """Handles a SIGTERM. Cleans up.""" 310 global _HANDLING 311 if _HANDLING: 312 return 313 _HANDLING = 1 314 _cleanup() 315 # call the previous handler 316 if _PREV_SIGTERM is not None: 317 signal.signal(signal.SIGTERM, _PREV_SIGTERM) 318 os.kill(os.getpid(), signum)
319
320 -def _cleanup():
321 """Close all active commands.""" 322 for obj in _active[:]: 323 obj.close()
324 325 _PREV_SIGTERM = signal.getsignal(signal.SIGTERM) 326 signal.signal(signal.SIGTERM, _handle_sigterm) 327