Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import random 
 24  import socket 
 25  import string 
 26  import time 
 27  from urllib2 import urlparse 
 28   
 29  from twisted.cred import portal 
 30  from twisted.internet import protocol, reactor, address, error, defer 
 31  from twisted.spread import pb 
 32  from zope.interface import implements 
 33   
 34  from flumotion.common import medium, log, messages, errors 
 35  from flumotion.common.i18n import N_, gettexter 
 36  from flumotion.component import component 
 37  from flumotion.component.component import moods 
 38  from flumotion.twisted import credentials, fdserver, checkers 
 39  from flumotion.twisted import reflect 
 40   
 41  __version__ = "$Rev: 7983 $" 
 42  T_ = gettexter() 
 43   
 44   
45 -class PorterAvatar(pb.Avatar, log.Loggable):
46 """ 47 An Avatar in the porter representing a streamer 48 """ 49
50 - def __init__(self, avatarId, porter, mind):
51 self.avatarId = avatarId 52 self.porter = porter 53 54 # The underlying transport is now accessible as 55 # self.mind.broker.transport, on which we can call sendFileDescriptor 56 self.mind = mind
57
58 - def isAttached(self):
59 return self.mind != None
60
61 - def logout(self):
62 self.debug("porter client %s logging out", self.avatarId) 63 self.mind = None
64
65 - def perspective_registerPath(self, path):
66 self.log("Perspective called: registering path \"%s\"" % path) 67 self.porter.registerPath(path, self)
68
69 - def perspective_deregisterPath(self, path):
70 self.log("Perspective called: deregistering path \"%s\"" % path) 71 self.porter.deregisterPath(path, self)
72
73 - def perspective_registerPrefix(self, prefix):
74 self.log("Perspective called: registering default") 75 self.porter.registerPrefix(prefix, self)
76
77 - def perspective_deregisterPrefix(self, prefix):
78 self.log("Perspective called: deregistering default") 79 self.porter.deregisterPrefix(prefix, self)
80 81
82 -class PorterRealm(log.Loggable):
83 """ 84 A Realm within the Porter that creates Avatars for streamers logging into 85 the porter. 86 """ 87 implements(portal.IRealm) 88
89 - def __init__(self, porter):
90 """ 91 @param porter: The porter that avatars created from here should use. 92 @type porter: L{Porter} 93 """ 94 self.porter = porter
95
96 - def requestAvatar(self, avatarId, mind, *interfaces):
97 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 98 avatarId, mind, interfaces) 99 if pb.IPerspective in interfaces: 100 avatar = PorterAvatar(avatarId, self.porter, mind) 101 return pb.IPerspective, avatar, avatar.logout 102 else: 103 raise NotImplementedError("no interface")
104 105
106 -class PorterMedium(component.BaseComponentMedium):
107
108 - def remote_getPorterDetails(self):
109 """ 110 Return the location, login username/password, and listening port 111 and interface for the porter as a tuple (path, username, 112 password, port, interface). 113 """ 114 return (self.comp._socketPath, self.comp._username, 115 self.comp._password, self.comp._iptablesPort, 116 self.comp._interface)
117 118
119 -class Porter(component.BaseComponent, log.Loggable):
120 """ 121 The porter optionally sits in front of a set of streamer components. 122 The porter is what actually deals with incoming connections on a socket. 123 It decides which streamer to direct the connection to, then passes the FD 124 (along with some amount of already-read data) to the appropriate streamer. 125 """ 126 127 componentMediumClass = PorterMedium 128
129 - def init(self):
130 # We maintain a map of path -> avatar (the underlying transport is 131 # accessible from the avatar, we need this for FD-passing) 132 self._mappings = {} 133 self._prefixes = {} 134 135 self._socketlistener = None 136 137 self._socketPath = None 138 self._username = None 139 self._password = None 140 self._port = None 141 self._iptablesPort = None 142 self._porterProtocol = None 143 144 self._interface = ''
145
146 - def registerPath(self, path, avatar):
147 """ 148 Register a path as being served by a streamer represented by this 149 avatar. Will remove any previous registration at this path. 150 151 @param path: The path to register 152 @type path: str 153 @param avatar: The avatar representing the streamer to direct this path 154 to 155 @type avatar: L{PorterAvatar} 156 """ 157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 158 if path in self._mappings: 159 self.warning("Replacing existing mapping for path \"%s\"" % path) 160 161 self._mappings[path] = avatar
162
163 - def deregisterPath(self, path, avatar):
164 """ 165 Attempt to deregister the given path. A deregistration will only be 166 accepted if the mapping is to the avatar passed. 167 168 @param path: The path to deregister 169 @type path: str 170 @param avatar: The avatar representing the streamer being deregistered 171 @type avatar: L{PorterAvatar} 172 """ 173 if path in self._mappings: 174 if self._mappings[path] == avatar: 175 self.debug("Removing porter mapping for \"%s\"" % path) 176 del self._mappings[path] 177 else: 178 self.warning( 179 "Mapping not removed: refers to a different avatar") 180 else: 181 self.warning("Mapping not removed: no mapping found")
182
183 - def registerPrefix(self, prefix, avatar):
184 """ 185 Register a destination for all requests directed to anything beginning 186 with a specified prefix. Where there are multiple matching prefixes, 187 the longest is selected. 188 189 @param avatar: The avatar being registered 190 @type avatar: L{PorterAvatar} 191 """ 192 193 self.debug("Setting prefix \"%s\" for porter", prefix) 194 if prefix in self._prefixes: 195 self.warning("Overwriting prefix") 196 197 self._prefixes[prefix] = avatar
198
199 - def deregisterPrefix(self, prefix, avatar):
200 """ 201 Attempt to deregister a default destination for all requests not 202 directed to a specifically-mapped path. This will only succeed if the 203 default is currently equal to this avatar. 204 205 @param avatar: The avatar being deregistered 206 @type avatar: L{PorterAvatar} 207 """ 208 if prefix not in self._prefixes: 209 self.warning("Mapping not removed: no mapping found") 210 return 211 212 if self._prefixes[prefix] == avatar: 213 self.debug("Removing prefix destination from porter") 214 del self._prefixes[prefix] 215 else: 216 self.warning( 217 "Not removing prefix destination: expected avatar not found")
218
219 - def findPrefixMatch(self, path):
220 found = None 221 # TODO: Horribly inefficient. Replace with pathtree code. 222 for prefix in self._prefixes.keys(): 223 self.log("Checking: %r, %r" % (prefix, path)) 224 if (path.startswith(prefix) and 225 (not found or len(found) < len(prefix))): 226 found = prefix 227 if found: 228 return self._prefixes[found] 229 else: 230 return None
231
232 - def findDestination(self, path):
233 """ 234 Find a destination Avatar for this path. 235 @returns: The Avatar for this mapping, or None. 236 """ 237 238 if path in self._mappings: 239 return self._mappings[path] 240 else: 241 return self.findPrefixMatch(path)
242
243 - def generateSocketPath(self):
244 """ 245 Generate a socket pathname in an appropriate location 246 """ 247 # Also see worker/worker.py:_getSocketPath(), and note that 248 # this suffers from the same potential race. 249 import tempfile 250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 251 os.close(fd) 252 253 return name
254
255 - def generateRandomString(self, numchars):
256 """ 257 Generate a random US-ASCII string of length numchars 258 """ 259 string = "" 260 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 261 for _ in range(numchars): 262 string += chars[random.randint(0, len(chars) - 1)] 263 264 return string
265
266 - def have_properties(self):
267 props = self.config['properties'] 268 269 self.fixRenamedProperties(props, 270 [('socket_path', 'socket-path')]) 271 272 # We can operate in two modes: explicitly configured (neccesary if you 273 # want to handle connections from components in other managers), and 274 # self-configured (which is sufficient for slaving only streamers 275 # within this manager 276 if 'socket-path' in props: 277 # Explicitly configured 278 self._socketPath = props['socket-path'] 279 self._username = props['username'] 280 self._password = props['password'] 281 else: 282 # Self-configuring. Use a randomly create username/password, and 283 # a socket with a random name. 284 self._username = self.generateRandomString(12) 285 self._password = self.generateRandomString(12) 286 self._socketPath = self.generateSocketPath() 287 288 self._port = int(props['port']) 289 self._iptablesPort = int(props.get('iptables-port', self._port)) 290 self._porterProtocol = props.get('protocol', 291 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 292 self._interface = props.get('interface', '')
293
294 - def do_stop(self):
295 d = None 296 if self._socketlistener: 297 # stopListening() calls (via a callLater) connectionLost(), which 298 # will unlink our socket, so we don't need to explicitly delete it. 299 d = self._socketlistener.stopListening() 300 self._socketlistener = None 301 return d
302
303 - def do_setup(self):
304 # Create our combined PB-server/fd-passing channel 305 self.have_properties() 306 realm = PorterRealm(self) 307 checker = checkers.FlexibleCredentialsChecker() 308 checker.addUser(self._username, self._password) 309 310 p = portal.Portal(realm, [checker]) 311 serverfactory = pb.PBServerFactory(p) 312 313 try: 314 # Rather than a normal listenTCP() or listenUNIX(), we use 315 # listenWith so that we can specify our particular Port, which 316 # creates Transports that we know how to pass FDs over. 317 try: 318 os.unlink(self._socketPath) 319 except OSError: 320 pass 321 322 self._socketlistener = reactor.listenWith( 323 fdserver.FDPort, self._socketPath, serverfactory) 324 self.debug("Now listening on socketPath %s" % self._socketPath) 325 except error.CannotListenError, e: 326 self.warning("Failed to create socket %s" % self._socketPath) 327 m = messages.Error(T_(N_( 328 "Network error: socket path %s is not available."), 329 self._socketPath)) 330 self.addMessage(m) 331 self.setMood(moods.sad) 332 return defer.fail(errors.ComponentSetupHandledError()) 333 334 # Create the class that deals with the specific protocol we're proxying 335 # in this porter. 336 try: 337 proto = reflect.namedAny(self._porterProtocol) 338 self.debug("Created proto %r" % proto) 339 except (ImportError, AttributeError): 340 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 341 self._porterProtocol) 342 proto = HTTPPorterProtocol 343 344 # And of course we also want to listen for incoming requests in the 345 # appropriate protocol (HTTP, RTSP, etc.) 346 factory = PorterProtocolFactory(self, proto) 347 try: 348 reactor.listenWith( 349 fdserver.PassableServerPort, self._port, factory, 350 interface=self._interface) 351 self.debug("Now listening on port %d" % self._port) 352 except error.CannotListenError, e: 353 self.warning("Failed to listen on port %d" % self._port) 354 m = messages.Error(T_(N_( 355 "Network error: TCP port %d is not available."), self._port)) 356 self.addMessage(m) 357 self.setMood(moods.sad) 358 return defer.fail(errors.ComponentSetupHandledError())
359 360
361 -class PorterProtocolFactory(protocol.Factory):
362
363 - def __init__(self, porter, protocol):
364 self._porter = porter 365 self.protocol = protocol
366
367 - def buildProtocol(self, addr):
368 p = self.protocol(self._porter) 369 p.factory = self 370 return p
371 372
373 -class PorterProtocol(protocol.Protocol, log.Loggable):
374 """ 375 The base porter is capable of accepting HTTP-like protocols (including 376 RTSP) - it reads the first line of a request, and makes the decision 377 solely on that. 378 379 We can't guarantee that we read precisely a line, so the buffer we 380 accumulate will actually be larger than what we actually parse. 381 382 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 383 @cvar delimiters: a list of valid line delimiters I check for 384 """ 385 386 logCategory = 'porterprotocol' 387 388 # Don't permit a first line longer than this. 389 MAX_SIZE = 4096 390 391 # Timeout any client connected to the porter for longer than this. A normal 392 # client should only ever be connected for a fraction of a second. 393 PORTER_CLIENT_TIMEOUT = 30 394 395 # In fact, because we check \r, we'll never need to check for \r\n - we 396 # leave this in as \r\n is the more correct form. At the other end, this 397 # gets processed by a full protocol implementation, so being flexible hurts 398 # us not at all 399 delimiters = ['\r\n', '\n', '\r'] 400
401 - def __init__(self, porter):
402 self._buffer = '' 403 self._porter = porter 404 self.requestId = None # a string that should identify the request 405 406 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 407 self._timeout)
408
409 - def connectionMade(self):
410 411 self.requestId = self.generateRequestId() 412 # PROBE: accepted connection 413 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection", 414 self.transport.fileno(), time.time(), self.requestId) 415 416 protocol.Protocol.connectionMade(self)
417
418 - def _timeout(self):
419 self._timeoutDC = None 420 self.debug("Timing out porter client after %d seconds", 421 self.PORTER_CLIENT_TIMEOUT) 422 self.transport.loseConnection()
423
424 - def connectionLost(self, reason):
425 if self._timeoutDC: 426 self._timeoutDC.cancel() 427 self._timeoutDC = None
428
429 - def dataReceived(self, data):
430 self._buffer = self._buffer + data 431 self.log("Got data, buffer now \"%s\"" % self._buffer) 432 # We accept more than just '\r\n' (the true HTTP line end) in the 433 # interests of compatibility. 434 for delim in self.delimiters: 435 try: 436 line, remaining = self._buffer.split(delim, 1) 437 break 438 except ValueError: 439 # We didn't find this delimiter; continue with the others. 440 pass 441 else: 442 # Failed to find a valid delimiter. 443 self.log("No valid delimiter found") 444 if len(self._buffer) > self.MAX_SIZE: 445 446 # PROBE: dropping 447 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, " 448 "buffer exceeded", 449 self.transport.fileno(), time.time(), 450 self.requestId) 451 452 return self.transport.loseConnection() 453 else: 454 # No delimiter found; haven't reached the length limit yet. 455 # Wait for more data. 456 return 457 458 # Got a line. self._buffer is still our entire buffer, should be 459 # provided to the slaved process. 460 parsed = self.parseLine(line) 461 if not parsed: 462 self.log("Couldn't parse the first line") 463 return self.transport.loseConnection() 464 465 identifier = self.extractIdentifier(parsed) 466 if not identifier: 467 self.log("Couldn't find identifier in first line") 468 return self.transport.loseConnection() 469 470 if self.requestId: 471 self.log("Injecting request-id %r", self.requestId) 472 parsed = self.injectRequestId(parsed, self.requestId) 473 # Since injecting the token might have modified the parsed 474 # representation of the request, we need to reconstruct the buffer. 475 # Fortunately, we know what delimiter did we split on, what's the 476 # remaining part and that we only split the buffer in two parts 477 self._buffer = delim.join((self.unparseLine(parsed), remaining)) 478 479 # PROBE: request 480 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s", 481 self.transport.fileno(), time.time(), self.requestId, 482 identifier) 483 484 # Ok, we have an identifier. Is it one we know about, or do we have 485 # a default destination? 486 destinationAvatar = self._porter.findDestination(identifier) 487 488 if not destinationAvatar or not destinationAvatar.isAttached(): 489 if destinationAvatar: 490 self.debug("There was an avatar, but it logged out?") 491 492 # PROBE: no destination; see send fd 493 self.debug( 494 "[fd %5d] (ts %f) (request-id %r) no destination avatar found", 495 self.transport.fileno(), time.time(), self.requestId) 496 497 self.writeNotFoundResponse() 498 return self.transport.loseConnection() 499 500 # Transfer control over this FD. Pass all the data so-far received 501 # along in the same message. The receiver will push that data into 502 # the Twisted Protocol object as if it had been normally received, 503 # so it looks to the receiver like it has read the entire data stream 504 # itself. 505 506 # PROBE: send fd; see no destination and fdserver.py 507 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s", 508 self.transport.fileno(), time.time(), self.requestId, 509 destinationAvatar.avatarId) 510 511 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 512 # if it blocks. 513 try: 514 destinationAvatar.mind.broker.transport.sendFileDescriptor( 515 self.transport.fileno(), self._buffer) 516 except OSError, e: 517 self.warning("[fd %5d] failed to send FD: %s", 518 self.transport.fileno(), log.getExceptionMessage(e)) 519 self.writeServiceUnavailableResponse() 520 return self.transport.loseConnection() 521 522 # PROBE: sent fd; see no destination and fdserver.py 523 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s", 524 self.transport.fileno(), time.time(), self.requestId, 525 destinationAvatar.avatarId) 526 527 # After this, we don't want to do anything with the FD, other than 528 # close our reference to it - but not close the actual TCP connection. 529 # We set keepSocketAlive to make loseConnection() only call close() 530 # rather than shutdown() then close() 531 self.transport.keepSocketAlive = True 532 self.transport.loseConnection()
533
534 - def parseLine(self, line):
535 """ 536 Parse the initial line of the request. Return an object that can be 537 used to uniquely identify the stream being requested by passing it to 538 extractIdentifier, or None if the request is unreadable. 539 540 Subclasses should override this. 541 """ 542 raise NotImplementedError
543
544 - def unparseLine(self, parsed):
545 """ 546 Recreate the initial request line from the parsed representation. The 547 recreated line does not need to be exactly identical, but both 548 parsedLine(unparseLine(line)) and line should contain the same 549 information (i.e. unparseLine should not lose information). 550 551 UnparseLine has to return a valid line from the porter protocol's 552 scheme point of view (for instance, HTTP). 553 554 Subclasses should override this. 555 """ 556 raise NotImplementedError
557
558 - def extractIdentifier(self, parsed):
559 """ 560 Extract a string that uniquely identifies the requested stream from the 561 parsed representation of the first request line. 562 563 Subclasses should override this, depending on how they implemented 564 parseLine. 565 """ 566 raise NotImplementedError
567
568 - def generateRequestId(self):
569 """ 570 Return a string that will uniquely identify the request. 571 572 Subclasses should override this if they want to use request-ids and 573 also implement injectRequestId. 574 """ 575 raise NotImplementedError
576
577 - def injectRequestId(self, parsed, requestId):
578 """ 579 Take the parsed representation of the first request line and a string 580 token, return a parsed representation of the request line with the 581 request-id possibly mixed into it. 582 583 Subclasses should override this if they generate request-ids. 584 """ 585 # by default, ignore the request-id 586 return parsed
587
588 - def writeNotFoundResponse(self):
589 """ 590 Write a response indicating that the requested resource was not found 591 in this protocol. 592 593 Subclasses should override this to use the correct protocol. 594 """ 595 raise NotImplementedError
596
598 """ 599 Write a response indicating that the requested resource was 600 temporarily uavailable in this protocol. 601 602 Subclasses should override this to use the correct protocol. 603 """ 604 raise NotImplementedError
605 606
607 -class HTTPPorterProtocol(PorterProtocol):
608 scheme = 'http' 609 protos = ["HTTP/1.0", "HTTP/1.1"] 610 requestIdParameter = 'FLUREQID' 611 requestIdBitsNo = 256 612
613 - def parseLine(self, line):
614 try: 615 (method, location, proto) = map(string.strip, line.split(' ', 2)) 616 617 if proto not in self.protos: 618 return None 619 620 # Currently, we just use the URL parsing code from urllib2 621 parsed_url = urlparse.urlparse(location) 622 623 return method, parsed_url, proto 624 625 except ValueError: 626 return None
627
628 - def unparseLine(self, parsed):
629 method, parsed_url, proto = parsed 630 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
631
632 - def generateRequestId(self):
633 # Remember to return something that does not need quoting to be put in 634 # a GET parameter. This way we spare ourselves the effort of quoting in 635 # injectRequestId. 636 return hex(random.getrandbits(self.requestIdBitsNo))[2:]
637
638 - def injectRequestId(self, parsed, requestId):
639 method, parsed_url, proto = parsed 640 # assuming no need to escape the requestId, see generateRequestId 641 sep = '' 642 if parsed_url[4] != '': 643 sep = '&' 644 query_string = ''.join((parsed_url[4], 645 sep, self.requestIdParameter, '=', 646 requestId)) 647 parsed_url = (parsed_url[:4] + 648 (query_string, ) 649 + parsed_url[5:]) 650 return method, parsed_url, proto
651
652 - def extractIdentifier(self, parsed):
653 method, parsed_url, proto = parsed 654 # Currently, we just return the path part of the URL. 655 return parsed_url[2]
656
657 - def writeNotFoundResponse(self):
658 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
659
661 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n" 662 "Service temporarily unavailable")
663 664
665 -class RTSPPorterProtocol(HTTPPorterProtocol):
666 scheme = 'rtsp' 667 protos = ["RTSP/1.0"] 668
669 - def writeNotFoundResponse(self):
670 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
671
673 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n" 674 "Service temporarily unavailable")
675