1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Flumotion-launch: A gst-launch analog for Flumotion.
24
25 The goal of flumotion-launch is to provide an easy way for testing
26 flumotion components, without involving much of Flumotion's core code.
27
28 Flumotion-launch takes a terse gst-launch-like syntax, translates that
29 into a component graph, and starts the components. An example would be::
30
31 flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer
32
33 You can also set properties::
34
35 flumotion-launch videotest framerate=15/2
36
37 You can link specific feeders as well::
38
39 flumotion-launch firewire .audio ! vorbis-encoder
40 flumotion-launch firewire firewire0.audio ! vorbis-encoder
41
42 Components can be backreferenced using their names::
43
44 flumotion-launch videotest audiotest videotest0. ! ogg-muxer \
45 audiotest0. ! ogg-muxer0.
46
47 In addition, components can have plugs::
48
49 flumotion-launch http-streamer /requestlogger-file,logfile=/dev/stdout
50
51 Flumotion-launch explicitly avoids much of Flumotion's core logic. It
52 does not import flumotion.manager, flumotion.admin, or flumotion.worker.
53 There is no depgraph, no feed server, no job process. Although it might
54 be useful in the future to add a way to use the standard interfaces to
55 start components via admin, manager, worker, and job instances, this
56 low-level interface is useful in debugging problems and should be kept.
57 """
58
59
60 import os
61 import sys
62
63 from twisted.python import reflect
64 from twisted.internet import reactor, defer
65
66 from flumotion.common import log, common, registry, errors, messages
67 from flumotion.common import i18n
68 from flumotion.common.options import OptionParser
69 from flumotion.configure import configure
70 from flumotion.twisted import flavors
71
72 from flumotion.launch import parse
73
74 from gettext import gettext as _
75
76 __version__ = "$Rev: 7352 $"
77 _headings = {
78 messages.ERROR: _('Error'),
79 messages.WARNING: _('Warning'),
80 messages.INFO: _('Note')}
81
82
84 sys.stderr.write(x + '\n')
85 raise SystemExit(1)
86
87
129
134
137
140
144
145 - def eatFromFD(self, eaterAlias, feedId, fd):
149
150
152 fds = {}
153 wrappersByName = dict([(wrapper.name, wrapper)
154 for wrapper in wrappers])
155
156 def starter(wrapper, feedName, write):
157 return lambda: wrapper.feedToFD(feedName, write)
158 for wrapper in wrappers:
159 eaters = wrapper.config.get('eater', {})
160 for eaterName in eaters:
161 for feedId, eaterAlias in eaters[eaterName]:
162 compName, feederName = common.parseFeedId(feedId)
163 read, write = os.pipe()
164 log.debug('launch', '%s: read from fd %d, write to fd %d',
165 feedId, read, write)
166 start = starter(wrappersByName[compName], feederName, write)
167 fds[feedId] = (read, start)
168 return fds
169
170
172
173
174 def provide_clock():
175
176 need_sync = [x for x in wrappers if x.config['clock-master']]
177
178 if need_sync:
179 master = None
180 for x in need_sync:
181 if x.config['clock-master'] == x.config['avatarId']:
182 master = x
183 break
184 assert master
185 need_sync.remove(master)
186 d = master.provideMasterClock(7600 - 1)
187
188 def addNeedSync(clocking):
189 return need_sync, clocking
190 d.addCallback(addNeedSync)
191 return d
192 else:
193 return defer.succeed((None, None))
194
195 def do_start(synchronization, wrapper):
196 need_sync, clocking = synchronization
197
198
199 eaters = wrapper.config.get('eater', {})
200 for eaterName in eaters:
201 for feedId, eaterAlias in eaters[eaterName]:
202 read, start = fds[feedId]
203 wrapper.eatFromFD(eaterAlias, feedId, read)
204 start()
205 if (not need_sync) or (wrapper not in need_sync) or (not clocking):
206 clocking = None
207 if clocking:
208 wrapper.set_master_clock(*clocking)
209 return synchronization
210
211 def do_stop(failure):
212 for wrapper in wrappers:
213 wrapper.stop()
214 return failure
215
216 for wrapper in wrappers:
217 if not wrapper.instantiate():
218
219
220 return defer.fail(errors.ComponentStartError(wrapper))
221 d = provide_clock()
222 for wrapper in wrappers:
223 d.addCallback(do_start, wrapper)
224 d.addErrback(do_stop)
225 return d
226
227
229 from flumotion.common import setup
230 setup.setupPackagePath()
231 from flumotion.configure import configure
232 log.debug('launch', 'Running Flumotion version %s' %
233 configure.version)
234 import twisted.copyright
235 log.debug('launch', 'Running against Twisted version %s' %
236 twisted.copyright.version)
237 from flumotion.project import project
238 for p in project.list():
239 log.debug('launch', 'Registered project %s version %s' % (
240 p, project.get(p, 'version')))
241
242 parser = OptionParser(domain="flumotion-launch")
243
244 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args))
245 options, args = parser.parse_args(args)
246
247 i18n.installGettext()
248
249
250 if options.verbose:
251 log.setFluDebug("*:3")
252
253
254 if options.version:
255 print common.version("flumotion-launch")
256 return 0
257
258 if options.debug:
259 log.setFluDebug(options.debug)
260
261
262 configs = parse.parse_args(args[1:])
263
264
265 wrappers = [ComponentWrapper(config) for config in configs]
266
267
268 fds = make_pipes(wrappers)
269
270 reactor.running = False
271 reactor.failure = False
272 reactor.callLater(0, lambda: setattr(reactor, 'running', True))
273
274 d = start_components(wrappers, fds)
275
276 def errback(failure):
277 log.debug('launch', log.getFailureMessage(failure))
278 print "Error occurred: %s" % failure.getErrorMessage()
279 failure.printDetailedTraceback()
280 reactor.failure = True
281 if reactor.running:
282 print "Stopping reactor."
283 reactor.stop()
284 d.addErrback(errback)
285
286 if not reactor.failure:
287 print 'Running the reactor. Press Ctrl-C to exit.'
288
289 log.debug('launch', 'Starting reactor')
290 reactor.run()
291
292 log.debug('launch', 'Reactor stopped')
293
294 if reactor.failure:
295 return 1
296 else:
297 return 0
298