Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 from flumotion.common import log 23 from flumotion.extern.fdpass import fdpass 24 25 from twisted.internet import unix, main, address, tcp 26 from twisted.spread import pb 27 28 import errno 29 import os 30 import socket 31 import struct 32 import time 33 34 __version__ = "$Rev: 7982 $" 35 36 37 # Heavily based on 38 # http://twistedmatrix.com/trac/browser/sandbox/exarkun/copyover/server.py 39 # and client.py 40 # Thanks for the inspiration! 41 42 # Since we're doing this over a stream socket, our file descriptor messages 43 # aren't guaranteed to be received alone; they could arrive along with some 44 # unrelated data. 45 # So, we prefix the message with a 16 byte magic signature, and a length, 46 # and if we receive file descriptors decode based on this. 47 # 48 # map() instead of a string to workaround gettext encoding problems. 49 # 50 MAGIC_SIGNATURE = ''.join(map(chr, [253, 252, 142, 127, 7, 71, 185, 234, 51 161, 117, 238, 216, 220, 54, 200, 163])) 52 53 59 6062 transport = FDServer63 64 6668 if not self.connected: 69 return 70 try: 71 (fds, message) = fdpass.readfds(self.fileno(), 64 * 1024) 72 except OSError, e: 73 if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): 74 return 75 else: 76 return main.CONNECTION_LOST 77 else: 78 if not message: 79 return main.CONNECTION_DONE 80 81 if len(fds) > 0: 82 # Look for our magic cookie in (possibly) the midst of other 83 # data. Pass surrounding chunks, if any, onto dataReceived(), 84 # which (undocumentedly) must return None unless a failure 85 # occurred. 86 # Pass the actual FDs and their message to 87 # fileDescriptorsReceived() 88 offset = message.find(MAGIC_SIGNATURE) 89 if offset < 0: 90 # Old servers did not send this; be hopeful that this 91 # doesn't have bits of other protocol (i.e. PB) mixed up 92 # in it. 93 return self.protocol.fileDescriptorsReceived(fds, message) 94 elif offset > 0: 95 ret = self.protocol.dataReceived(message[0:offset]) 96 if ret: 97 return ret 98 99 msglen = struct.unpack("@I", message[offset+16:offset+20])[0] 100 offset += 20 101 ret = self.protocol.fileDescriptorsReceived(fds, 102 message[offset:offset+msglen]) 103 if ret: 104 return ret 105 106 if offset+msglen < len(message): 107 return self.protocol.dataReceived(message[offset+msglen:]) 108 return ret 109 else: 110 # self.debug("No FDs, passing to dataReceived") 111 return self.protocol.dataReceived(message)112 113 118 119121 """ 122 A pb.Broker subclass that handles FDs being passed to it (with associated 123 data) over the same connection as the normal PB data stream. 124 When an FD is seen, it creates new protocol objects for them from the 125 childFactory attribute. 126 """ 127 # FIXME: looks like we can only use our own subclasses that take 128 # three __init__ args 129175 176131 """ 132 @param connectionClass: subclass of L{twisted.internet.tcp.Connection} 133 """ 134 pb.Broker.__init__(self, **kwargs) 135 136 self.childFactory = childFactory 137 self._connectionClass = connectionClass138 139 # This is the complex bit. If our underlying transport receives a file 140 # descriptor, this gets called - along with the data we got with the FD. 141 # We create an appropriate protocol object, and attach it to the reactor. 142144 if len(fds) == 1: 145 fd = fds[0] 146 147 # Note that we hardcode IPv4 here! 148 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 149 150 # PROBE: received fd; see porter.py 151 self.debug("[fd %5d] (ts %f) received fd from %d, created socket", 152 sock.fileno(), time.time(), fd) 153 154 # Undocumentedly (other than a comment in 155 # Python/Modules/socketmodule.c), socket.fromfd() calls dup() on 156 # the passed FD before it actually wraps it in a socket object. 157 # So, we need to close the FD that we originally had... 158 os.close(fd) 159 160 try: 161 peeraddr = sock.getpeername() 162 except socket.error: 163 self.info("Socket disconnected before being passed to client") 164 sock.close() 165 return 166 167 # Based on bits in tcp.Port.doRead() 168 addr = address._ServerFactoryIPv4Address('TCP', 169 peeraddr[0], peeraddr[1]) 170 protocol = self.childFactory.buildProtocol(addr) 171 172 self._connectionClass(sock, protocol, peeraddr, message) 173 else: 174 self.warning("Unexpected: FD-passing message with len(fds) != 1")178 keepSocketAlive = False 179193 194181 # We override this (from tcp._SocketCloser) so that we can close 182 # sockets properly in the normal case, but once we've passed our 183 # socket on via the FD-channel, we just close() it (not calling 184 # shutdown() which will close the TCP channel without closing 185 # the FD itself) 186 if self.keepSocketAlive: 187 try: 188 self.socket.close() 189 except socket.error: 190 pass 191 else: 192 tcp.Server._closeSocket(self)196 """ 197 A subclass of tcp.Server that permits passing the FDs used to other 198 processes (by just calling close(2) rather than shutdown(2) on them) 199 """ 200 pass201 202204 transport = PassableServerConnection205
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Sun Sep 13 13:20:25 2009 | http://epydoc.sourceforge.net |