1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import socket
24 import time
25 import errno
26 import string
27 import resource
28 import fcntl
29
30 import gst
31
32 try:
33 from twisted.web import http
34 except ImportError:
35 from twisted.protocols import http
36
37 from twisted.web import server, resource as web_resource
38 from twisted.internet import reactor, defer
39 from twisted.python import reflect
40
41 from flumotion.configure import configure
42 from flumotion.common import errors
43
44 from flumotion.common import common, log, keycards
45
46 from flumotion.component.base import http as httpbase
47
48 __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer']
49 __version__ = "$Rev: 7931 $"
50
51 HTTP_NAME = 'FlumotionHTTPServer'
52 HTTP_VERSION = configure.version
53
54 ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN">
55 <html>
56 <head>
57 <title>%(code)d %(error)s</title>
58 </head>
59 <body>
60 <h2>%(code)d %(error)s</h2>
61 </body>
62 </html>
63 """
64
65 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION)
66
67
68
69
71
72 __reserve_fds__ = 50
73
74 logCategory = 'httpstreamer'
75
76
77
78
79 isLeaf = True
80
82 """
83 @param streamer: L{MultifdSinkStreamer}
84 """
85 self.streamer = streamer
86 self.httpauth = httpauth
87
88 self._requests = {}
89
90 self.maxclients = self.getMaxAllowedClients(-1)
91 self.maxbandwidth = -1
92
93
94 self._redirectOnFull = None
95
96 self._removing = {}
97
98 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
99 self.loggers = streamer.plugs.get(socket, [])
100
101 socket = \
102 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
103 self.modifiers = streamer.plugs.get(socket, [])
104
105 self.logfilter = None
106
107 web_resource.Resource.__init__(self)
108
110
111
112 if fd in self._requests:
113 request = self._requests[fd]
114 self._removeClient(request, fd, stats)
115 else:
116 self.warning('[fd %5d] not found in _requests' % fd)
117
119 """
120 Start to remove all the clients connected (this will complete
121 asynchronously from another thread)
122
123 Returns a deferred that will fire once they're all removed.
124 """
125 l = []
126 for fd in self._requests:
127 self._removing[fd] = defer.Deferred()
128 l.append(self._removing[fd])
129 self.streamer.remove_client(fd)
130
131 return defer.DeferredList(l)
132
134 self.putChild(path, self)
135
137 self.logfilter = logfilter
138
140 """
141 Close the logfile, then reopen using the previous logfilename
142 """
143 for logger in self.loggers:
144 self.debug('rotating logger %r' % logger)
145 logger.rotate()
146
147 - def logWrite(self, fd, ip, request, stats):
148
149 headers = request.getAllHeaders()
150
151 if stats:
152 bytes_sent = stats[0]
153 time_connected = int(stats[3] / gst.SECOND)
154 else:
155 bytes_sent = -1
156 time_connected = -1
157
158 args = {'ip': ip,
159 'time': time.gmtime(),
160 'method': request.method,
161 'uri': request.uri,
162 'username': '-',
163 'get-parameters': request.args,
164 'clientproto': request.clientproto,
165 'response': request.code,
166 'bytes-sent': bytes_sent,
167 'referer': headers.get('referer', None),
168 'user-agent': headers.get('user-agent', None),
169 'time-connected': time_connected}
170
171 l = []
172 for logger in self.loggers:
173 l.append(defer.maybeDeferred(
174 logger.event, 'http_session_completed', args))
175
176 return defer.DeferredList(l)
177
179 self.info('setting maxclients to %d' % limit)
180 self.maxclients = self.getMaxAllowedClients(limit)
181
182 self.info('set maxclients to %d' % self.maxclients)
183
185 self.maxbandwidth = limit
186 self.info("set maxbandwidth to %d", self.maxbandwidth)
187
189 self._redirectOnFull = url
190
191
192
194 """
195 Write out the HTTP headers for the incoming HTTP request.
196
197 @rtype: boolean
198 @returns: whether or not the file descriptor can be used further.
199 """
200 fd = request.transport.fileno()
201 fdi = request.fdIncoming
202
203
204 if fd == -1:
205 self.info('[fd %5d] Client gone before writing header' % fdi)
206
207 return False
208 if fd != request.fdIncoming:
209 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd))
210
211 return False
212
213 content = self.streamer.get_content_type()
214 request.setHeader('Server', HTTP_SERVER)
215 request.setHeader('Date', http.datetimeToString())
216 request.setHeader('Cache-Control', 'no-cache')
217 request.setHeader('Cache-Control', 'private')
218 request.setHeader('Content-type', content)
219
220
221 for modifier in self.modifiers:
222 modifier.modify(request)
223
224
225 headers = []
226 for name, value in request.headers.items():
227 headers.append('%s: %s\r\n' % (name.capitalize(), value))
228 for cookie in request.cookies:
229 headers.append('%s: %s\r\n' % ("Set-Cookie", cookie))
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 try:
245
246
247
248
249 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers))
250
251 request.startedWriting = True
252 return True
253 except OSError, (no, s):
254 if no == errno.EBADF:
255 self.info('[fd %5d] client gone before writing header' % fd)
256 elif no == errno.ECONNRESET:
257 self.info(
258 '[fd %5d] client reset connection writing header' % fd)
259 else:
260 self.info(
261 '[fd %5d] unhandled write error when writing header: %s'
262 % (fd, s))
263
264 del request
265 return False
266
268 if self.streamer.caps == None:
269 self.debug('We have no caps yet')
270 return False
271
272 return True
273
275 """
276 maximum number of allowed clients based on soft limit for number of
277 open file descriptors and fd reservation. Increases soft limit to
278 hard limit if possible.
279 """
280 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE)
281 import sys
282 version = sys.version_info
283
284 if maxclients != -1:
285 neededfds = maxclients + self.__reserve_fds__
286
287
288
289
290 if version[:3] == (2, 4, 3) and \
291 not hasattr(socket, "has_2_4_3_patch"):
292 self.warning(
293 'Setting hardmax to 1024 due to python 2.4.3 bug')
294 hardmax = 1024
295
296 if neededfds > softmax:
297 lim = min(neededfds, hardmax)
298 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax))
299 return lim - self.__reserve_fds__
300 else:
301 return maxclients
302 else:
303 return softmax - self.__reserve_fds__
304
306 if self.maxclients >= 0 and len(self._requests) >= self.maxclients:
307 return True
308 elif self.maxbandwidth >= 0:
309
310 if ((len(self._requests) + 1) *
311 self.streamer.getCurrentBitrate() >= self.maxbandwidth):
312 return True
313 return False
314
316 """
317 Add a request, so it can be used for statistics.
318
319 @param request: the request
320 @type request: twisted.protocol.http.Request
321 """
322
323 fd = request.transport.fileno()
324 self._requests[fd] = request
325
327 """
328 Returns whether we want to log a request from this IP; allows us to
329 filter requests from automated monitoring systems.
330 """
331 if self.logfilter:
332 return not self.logfilter.isInRange(ip)
333 else:
334 return True
335
337 """
338 Removes a request and add logging.
339 Note that it does not disconnect the client; it is called in reaction
340 to a client disconnecting.
341 It also removes the keycard if one was created.
342
343 @param request: the request
344 @type request: L{twisted.protocols.http.Request}
345 @param fd: the file descriptor for the client being removed
346 @type fd: L{int}
347 @param stats: the statistics for the removed client
348 @type stats: GValueArray
349 """
350
351
352 self.debug('[fd %5d] (ts %f) finishing request %r',
353 request.transport.fileno(), time.time(), request)
354
355 ip = request.getClientIP()
356 if self._logRequestFromIP(ip):
357 d = self.logWrite(fd, ip, request, stats)
358 else:
359 d = defer.succeed(True)
360 self.info('[fd %5d] Client from %s disconnected' % (fd, ip))
361
362
363
364
365 self.httpauth.cleanupAuth(fd)
366
367 self.debug('[fd %5d] (ts %f) closing transport %r', fd, time.time(),
368 request.transport)
369
370
371
372 del self._requests[fd]
373 request.transport.loseConnection()
374
375 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
376
377 def _done(_):
378 if fd in self._removing:
379 self.debug("client is removed; firing deferred")
380 removeD = self._removing.pop(fd)
381 removeD.callback(None)
382
383
384 self.debug('[fd %5d] (ts %f) finished request %r',
385 fd, time.time(), request)
386
387 d.addCallback(_done)
388 return d
389
406
407
408
409
410
440
442 self.debug('Not sending data, it\'s not ready')
443 return server.NOT_DONE_YET
444
463
465
466 fdi = request.fdIncoming
467 if not self._writeHeaders(request):
468 self.debug("[fd %5d] not adding as a client" % fdi)
469 return
470 self._addClient(request)
471
472
473
474
475
476
477
478
479
480
481
482 fd = fdi
483 self.debug("[fd %5d] taking away from Twisted" % fd)
484 reactor.removeReader(request.transport)
485
486
487
488
489 try:
490 fcntl.fcntl(fd, fcntl.F_GETFL)
491 except IOError, e:
492 if e.errno == errno.EBADF:
493 self.warning("[fd %5d] is not actually open, ignoring" % fd)
494 else:
495 self.warning("[fd %5d] error during check: %s (%d)" % (
496 fd, e.strerror, e.errno))
497 return
498
499
500 self.streamer.add_client(fd)
501 ip = request.getClientIP()
502
503
504 self.debug('[fd %5d] (ts %f) started request %r',
505 fd, time.time(), request)
506
507 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
508
509 render_GET = _render
510 render_HEAD = _render
511
512
513 -class HTTPRoot(web_resource.Resource, log.Loggable):
528