Package flumotion :: Package worker :: Module job
[hide private]

Source Code for Module flumotion.worker.job

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28  import sys 
 29   
 30  from twisted.internet import defer, reactor 
 31   
 32  from flumotion.common import errors, log 
 33  from flumotion.common import messages 
 34  from flumotion.common.i18n import N_, gettexter 
 35  from flumotion.configure import configure 
 36  from flumotion.worker import base 
 37   
 38  __version__ = "$Rev: 7162 $" 
 39  T_ = gettexter() 
 40   
 41   
42 -class ComponentJobAvatar(base.BaseJobAvatar):
43
44 - def haveMind(self):
45 46 def bootstrap(*args): 47 return self.mindCallRemote('bootstrap', *args)
48 49 def create(_, job): 50 self.debug("asking job to create component with avatarId %s," 51 " type %s", job.avatarId, job.type) 52 return self.mindCallRemote('create', job.avatarId, job.type, 53 job.moduleName, job.methodName, 54 job.nice, job.conf)
55 56 def success(_, avatarId): 57 self.debug('job started component with avatarId %s', 58 avatarId) 59 # FIXME: drills down too much? 60 self._heaven._startSet.createSuccess(avatarId) 61 62 def error(failure, job): 63 msg = log.getFailureMessage(failure) 64 if failure.check(errors.ComponentCreateError): 65 self.warning('could not create component %s of type %s:' 66 ' %s', job.avatarId, job.type, msg) 67 else: 68 self.warning('unhandled error creating component %s: %s', 69 job.avatarId, msg) 70 # FIXME: drills down too much? 71 self._heaven._startSet.createFailed(job.avatarId, failure) 72 73 def gotPid(pid): 74 self.pid = pid 75 info = self._heaven.getManagerConnectionInfo() 76 if info.use_ssl: 77 transport = 'ssl' 78 else: 79 transport = 'tcp' 80 job = self._heaven.getJobInfo(pid) 81 workerName = self._heaven.getWorkerName() 82 83 d = bootstrap(workerName, info.host, info.port, transport, 84 info.authenticator, job.bundles) 85 d.addCallback(create, job) 86 d.addCallback(success, job.avatarId) 87 d.addErrback(error, job) 88 return d 89 d = self.mindCallRemote("getPid") 90 d.addCallback(gotPid) 91 return d 92
93 - def stop(self):
94 """ 95 returns: a deferred marking completed stop. 96 """ 97 if not self.mind: 98 self.debug('already logged out') 99 return defer.succeed(None) 100 else: 101 self.debug('stopping') 102 return self.mindCallRemote('stop')
103
104 - def sendFeed(self, feedName, fd, eaterId):
105 """ 106 Tell the feeder to send the given feed to the given fd. 107 108 @returns: whether the fd was successfully handed off to the component. 109 """ 110 self.debug('Sending FD %d to component job to feed %s to fd', 111 fd, feedName) 112 113 # it is possible that the component has logged out, in which 114 # case we don't have a mind. Trying to check for this earlier 115 # only introduces a race, so we handle it here by triggering a 116 # disconnect on the fd. 117 if self.mind: 118 message = "sendFeed %s %s" % (feedName, eaterId) 119 return self._sendFileDescriptor(fd, message) 120 else: 121 self.debug('my mind is gone, trigger disconnect') 122 return False
123
124 - def receiveFeed(self, eaterAlias, fd, feedId):
125 """ 126 Tell the feeder to receive the given feed from the given fd. 127 128 @returns: whether the fd was successfully handed off to the component. 129 """ 130 self.debug('Sending FD %d to component job to eat %s from fd', 131 fd, eaterAlias) 132 133 # same note as in sendFeed 134 if self.mind: 135 message = "receiveFeed %s %s" % (eaterAlias, feedId) 136 return self._sendFileDescriptor(fd, message) 137 else: 138 self.debug('my mind is gone, trigger disconnect') 139 return False
140
141 - def perspective_cleanShutdown(self):
142 """ 143 This notification from the job process will be fired when it is 144 shutting down, so that although the process might still be 145 around, we know it's OK to accept new start requests for this 146 avatar ID. 147 """ 148 self.info("component %s shutting down cleanly", self.avatarId) 149 # FIXME: drills down too much? 150 self._heaven._startSet.shutdownStart(self.avatarId)
151 152
153 -class ComponentJobInfo(base.JobInfo):
154 __slots__ = ('conf', ) 155
156 - def __init__(self, pid, avatarId, type, moduleName, methodName, 157 nice, bundles, conf):
161 162
163 -class ComponentJobHeaven(base.BaseJobHeaven):
164 avatarClass = ComponentJobAvatar 165
166 - def getManagerConnectionInfo(self):
167 """ 168 Gets the L{flumotion.common.connection.PBConnectionInfo} 169 describing how to connect to the manager. 170 171 @rtype: L{flumotion.common.connection.PBConnectionInfo} 172 """ 173 return self.brain.managerConnectionInfo
174
175 - def spawn(self, avatarId, type, moduleName, methodName, nice, 176 bundles, conf):
177 """ 178 Spawn a new job. 179 180 This will spawn a new flumotion-job process, running under the 181 requested nice level. When the job logs in, it will be told to 182 load bundles and run a function, which is expected to return a 183 component. 184 185 @param avatarId: avatarId the component should use to log in 186 @type avatarId: str 187 @param type: type of component to start 188 @type type: str 189 @param moduleName: name of the module to create the component from 190 @type moduleName: str 191 @param methodName: the factory method to use to create the component 192 @type methodName: str 193 @param nice: nice level 194 @type nice: int 195 @param bundles: ordered list of (bundleName, bundlePath) for this 196 component 197 @type bundles: list of (str, str) 198 @param conf: component configuration 199 @type conf: dict 200 """ 201 d = self._startSet.createStart(avatarId) 202 203 p = base.JobProcessProtocol(self, avatarId, self._startSet) 204 executable = os.path.join(configure.bindir, 'flumotion-job') 205 if not os.path.exists(executable): 206 self.error("Trying to spawn job process, but '%s' does not " 207 "exist", executable) 208 argv = [executable, avatarId, self._socketPath] 209 210 realexecutable = executable 211 212 # Run some jobs under valgrind, optionally. Would be nice to have the 213 # arguments to run it with configurable, but this'll do for now. 214 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 215 # avatar IDs. 216 if 'FLU_VALGRIND_JOB' in os.environ: 217 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 218 if avatarId in jobnames: 219 realexecutable = 'valgrind' 220 # We can't just valgrind flumotion-job, we have to valgrind 221 # python running flumotion-job, otherwise we'd need 222 # --trace-children (not quite sure why), which we don't want 223 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 224 '--leak-resolution=high', '--show-reachable=yes', 225 'python'] + argv 226 227 childFDs = {0: 0, 1: 1, 2: 2} 228 env = {} 229 env.update(os.environ) 230 env['FLU_DEBUG'] = log.getDebug() 231 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 232 childFDs=childFDs) 233 234 p.setPid(process.pid) 235 236 self.addJobInfo(process.pid, 237 ComponentJobInfo(process.pid, avatarId, type, 238 moduleName, methodName, nice, 239 bundles, conf)) 240 return d
241 242
243 -class CheckJobAvatar(base.BaseJobAvatar):
244
245 - def haveMind(self):
246 # FIXME: drills down too much? 247 248 def gotPid(pid): 249 self.pid = pid 250 job = self._heaven.getJobInfo(pid) 251 self._heaven._startSet.createSuccess(job.avatarId)
252 253 d = self.mindCallRemote("getPid") 254 d.addCallback(gotPid) 255 return d
256
257 - def stop(self):
258 """ 259 returns: a deferred marking completed stop. 260 """ 261 self._heaven._startSet.shutdownStart(self.avatarId) 262 self._heaven.killJob(self.avatarId, signal.SIGTERM)
263
264 - def perspective_cleanShutdown(self):
265 self.debug("job is stopping")
266 267
268 -class CheckJobHeaven(base.BaseJobHeaven):
269 avatarClass = CheckJobAvatar 270 271 _checkCount = 0 272 _timeout = 45 273
274 - def __init__(self, brain):
275 base.BaseJobHeaven.__init__(self, brain) 276 277 # job processes that are available to do work (i.e. not actively 278 # running checks) 279 self.jobPool = []
280
281 - def getCheckJobFromPool(self):
282 if self.jobPool: 283 job, expireDC = self.jobPool.pop(0) 284 expireDC.cancel() 285 self.debug('running check in already-running job %s', 286 job.avatarId) 287 return defer.succeed(job) 288 289 avatarId = 'check-%d' % (self._checkCount, ) 290 self._checkCount += 1 291 292 self.debug('spawning new job %s to run a check', avatarId) 293 d = self._startSet.createStart(avatarId) 294 295 p = base.JobProcessProtocol(self, avatarId, self._startSet) 296 executable = os.path.join(configure.bindir, 'flumotion-job') 297 argv = [executable, avatarId, self._socketPath] 298 299 childFDs = {0: 0, 1: 1, 2: 2} 300 env = {} 301 env.update(os.environ) 302 env['FLU_DEBUG'] = log.getDebug() 303 process = reactor.spawnProcess(p, executable, env=env, args=argv, 304 childFDs=childFDs) 305 306 p.setPid(process.pid) 307 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None, 308 None, []) 309 self._jobInfos[process.pid] = jobInfo 310 311 def haveMind(_): 312 # we have a mind, in theory; return the job avatar 313 return self.avatars[avatarId]
314 315 d.addCallback(haveMind) 316 return d
317
318 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
319 320 def haveJob(job): 321 322 def callProc(_): 323 return job.mindCallRemote('runFunction', moduleName, 324 methodName, *args, **kwargs)
325 326 def timeout(sig): 327 self.killJobByPid(job.pid, sig) 328 329 def haveResult(res): 330 if not termtimeout.active(): 331 self.info("Discarding error %s", res) 332 res = messages.Result() 333 res.add(messages.Error( 334 T_(N_("Check timed out.")), 335 debug=("Timed out running %s."%methodName))) 336 else: 337 338 def expire(): 339 if (job, expireDC) in self.jobPool: 340 self.debug('stopping idle check job process %s', 341 job.avatarId) 342 self.jobPool.remove((job, expireDC)) 343 job.mindCallRemote('stop') 344 expireDC = reactor.callLater(self._timeout, expire) 345 self.jobPool.append((job, expireDC)) 346 347 if termtimeout.active(): 348 termtimeout.cancel() 349 if killtimeout.active(): 350 killtimeout.cancel() 351 return res 352 353 # add callbacks and errbacks that kill the job 354 355 termtimeout = reactor.callLater(self._timeout, timeout, 356 signal.SIGTERM) 357 killtimeout = reactor.callLater(self._timeout, timeout, 358 signal.SIGKILL) 359 360 d = job.mindCallRemote('bootstrap', self.getWorkerName(), 361 None, None, None, None, bundles) 362 d.addCallback(callProc) 363 d.addCallbacks(haveResult, haveResult) 364 return d 365 366 d = self.getCheckJobFromPool() 367 d.addCallback(haveJob) 368 369 return d 370