Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion-launch: A gst-launch analog for Flumotion. 
 24   
 25  The goal of flumotion-launch is to provide an easy way for testing 
 26  flumotion components, without involving much of Flumotion's core code. 
 27   
 28  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 29  into a component graph, and starts the components. An example would be:: 
 30   
 31    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 32   
 33  You can also set properties:: 
 34   
 35    flumotion-launch videotest framerate=15/2 
 36   
 37  You can link specific feeders as well:: 
 38   
 39    flumotion-launch firewire .audio ! vorbis-encoder 
 40    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 41   
 42  Components can be backreferenced using their names:: 
 43   
 44    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 45                     audiotest0. ! ogg-muxer0. 
 46   
 47  In addition, components can have plugs:: 
 48   
 49    flumotion-launch http-streamer /requestlogger-file,logfile=/dev/stdout 
 50   
 51  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 52  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 53  There is no depgraph, no feed server, no job process. Although it might 
 54  be useful in the future to add a way to use the standard interfaces to 
 55  start components via admin, manager, worker, and job instances, this 
 56  low-level interface is useful in debugging problems and should be kept. 
 57  """ 
 58   
 59   
 60  import os 
 61  import sys 
 62   
 63  from twisted.python import reflect 
 64  from twisted.internet import reactor, defer 
 65   
 66  from flumotion.common import log, common, registry, errors, messages 
 67  from flumotion.common import i18n 
 68  from flumotion.common.options import OptionParser 
 69  from flumotion.configure import configure 
 70  from flumotion.twisted import flavors 
 71   
 72  from flumotion.launch import parse 
 73   
 74  from gettext import gettext as _ 
 75   
 76  __version__ = "$Rev: 7352 $" 
 77  _headings = { 
 78      messages.ERROR: _('Error'), 
 79      messages.WARNING: _('Warning'), 
 80      messages.INFO: _('Note')} 
 81   
 82   
83 -def err(x):
84 sys.stderr.write(x + '\n') 85 raise SystemExit(1)
86 87
88 -class ComponentWrapper(object, log.Loggable):
89 logCategory = "compwrapper" 90
91 - def __init__(self, config):
92 self.name = config['name'] 93 self.config = config 94 self.procedure = self._getProcedure(config['type']) 95 self.component = None
96
97 - def _getProcedure(self, type):
98 r = registry.getRegistry() 99 c = r.getComponent(type) 100 try: 101 entry = c.getEntryByType('component') 102 except KeyError: 103 err('Component %s has no component entry' % self.name) 104 importname = entry.getModuleName(c.getBase()) 105 try: 106 module = reflect.namedAny(importname) 107 except Exception, e: 108 err('Could not load module %s for component %s: %s' 109 % (importname, self.name, e)) 110 return getattr(module, entry.getFunction())
111
112 - def instantiate(self):
113 errors = [] 114 115 def haveError(value): 116 translator = i18n.Translator() 117 localedir = os.path.join(configure.localedatadir, 'locale') 118 # FIXME: add locales as messages from domains come in 119 translator.addLocaleDir(configure.PACKAGE, localedir) 120 print "%s: %s" % (_headings[value.level], 121 translator.translate(value)) 122 if value.debug: 123 print "Debug information:", value.debug 124 errors.append(value)
125 126 self.component = self.procedure(self.config, 127 haveError=haveError) 128 return not bool(errors)
129
130 - def provideMasterClock(self, port):
131 # rtype: defer.Deferred 132 d = self.component.provide_master_clock(port) 133 return d
134
135 - def set_master_clock(self, ip, port, base_time):
136 return self.component.set_master_clock(ip, port, base_time)
137
138 - def stop(self):
139 return self.component.stop()
140
141 - def feedToFD(self, feedName, fd):
142 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd)) 143 return self.component.feedToFD(feedName, fd, os.close)
144
145 - def eatFromFD(self, eaterAlias, feedId, fd):
146 self.debug('eatFromFD(eaterAlias=%s, feedId=%s, %d)', 147 eaterAlias, feedId, fd) 148 return self.component.eatFromFD(eaterAlias, feedId, fd)
149 150
151 -def make_pipes(wrappers):
152 fds = {} # feedcompname:feeder => (fd, start()) 153 wrappersByName = dict([(wrapper.name, wrapper) 154 for wrapper in wrappers]) 155 156 def starter(wrapper, feedName, write): 157 return lambda: wrapper.feedToFD(feedName, write)
158 for wrapper in wrappers: 159 eaters = wrapper.config.get('eater', {}) 160 for eaterName in eaters: 161 for feedId, eaterAlias in eaters[eaterName]: 162 compName, feederName = common.parseFeedId(feedId) 163 read, write = os.pipe() 164 log.debug('launch', '%s: read from fd %d, write to fd %d', 165 feedId, read, write) 166 start = starter(wrappersByName[compName], feederName, write) 167 fds[feedId] = (read, start) 168 return fds 169 170
171 -def start_components(wrappers, fds):
172 # figure out the links and start the components 173 174 def provide_clock(): 175 # second phase: clocking 176 need_sync = [x for x in wrappers if x.config['clock-master']] 177 178 if need_sync: 179 master = None 180 for x in need_sync: 181 if x.config['clock-master'] == x.config['avatarId']: 182 master = x 183 break 184 assert master 185 need_sync.remove(master) 186 d = master.provideMasterClock(7600 - 1) # hack! 187 188 def addNeedSync(clocking): 189 return need_sync, clocking
190 d.addCallback(addNeedSync) 191 return d 192 else: 193 return defer.succeed((None, None)) 194 195 def do_start(synchronization, wrapper): 196 need_sync, clocking = synchronization 197 198 # start it up, with clocking data only if it needs it 199 eaters = wrapper.config.get('eater', {}) 200 for eaterName in eaters: 201 for feedId, eaterAlias in eaters[eaterName]: 202 read, start = fds[feedId] 203 wrapper.eatFromFD(eaterAlias, feedId, read) 204 start() 205 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 206 clocking = None 207 if clocking: 208 wrapper.set_master_clock(*clocking) 209 return synchronization 210 211 def do_stop(failure): 212 for wrapper in wrappers: 213 wrapper.stop() 214 return failure 215 216 for wrapper in wrappers: 217 if not wrapper.instantiate(): 218 # we don't have a ComponentState, so we cheat and give the 219 # exception a wrapper 220 return defer.fail(errors.ComponentStartError(wrapper)) 221 d = provide_clock() 222 for wrapper in wrappers: 223 d.addCallback(do_start, wrapper) 224 d.addErrback(do_stop) 225 return d 226 227
228 -def main(args):
229 from flumotion.common import setup 230 setup.setupPackagePath() 231 from flumotion.configure import configure 232 log.debug('launch', 'Running Flumotion version %s' % 233 configure.version) 234 import twisted.copyright 235 log.debug('launch', 'Running against Twisted version %s' % 236 twisted.copyright.version) 237 from flumotion.project import project 238 for p in project.list(): 239 log.debug('launch', 'Registered project %s version %s' % ( 240 p, project.get(p, 'version'))) 241 242 parser = OptionParser(domain="flumotion-launch") 243 244 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args)) 245 options, args = parser.parse_args(args) 246 247 i18n.installGettext() 248 249 # verbose overrides --debug 250 if options.verbose: 251 log.setFluDebug("*:3") 252 253 # handle all options 254 if options.version: 255 print common.version("flumotion-launch") 256 return 0 257 258 if options.debug: 259 log.setFluDebug(options.debug) 260 261 # note parser versus parse 262 configs = parse.parse_args(args[1:]) 263 264 # load the modules, make the component 265 wrappers = [ComponentWrapper(config) for config in configs] 266 267 # make socket pairs 268 fds = make_pipes(wrappers) 269 270 reactor.running = False 271 reactor.failure = False 272 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 273 274 d = start_components(wrappers, fds) 275 276 def errback(failure): 277 log.debug('launch', log.getFailureMessage(failure)) 278 print "Error occurred: %s" % failure.getErrorMessage() 279 failure.printDetailedTraceback() 280 reactor.failure = True 281 if reactor.running: 282 print "Stopping reactor." 283 reactor.stop()
284 d.addErrback(errback) 285 286 if not reactor.failure: 287 print 'Running the reactor. Press Ctrl-C to exit.' 288 289 log.debug('launch', 'Starting reactor') 290 reactor.run() 291 292 log.debug('launch', 'Reactor stopped') 293 294 if reactor.failure: 295 return 1 296 else: 297 return 0 298