Package flumotion :: Package component :: Package misc :: Package httpserver :: Module httpfile
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpfile

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_misc_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import string 
 23  import time 
 24   
 25  # mp4seek is a library to split MP4 files, see the MP4File class docstring 
 26  HAS_MP4SEEK = False 
 27  try: 
 28      import mp4seek.async 
 29      HAS_MP4SEEK = True 
 30  except ImportError: 
 31      pass 
 32   
 33  from twisted.web import resource, server, http 
 34  from twisted.web import error as weberror, static 
 35  from twisted.internet import defer, reactor, error, abstract 
 36  from twisted.cred import credentials 
 37  from twisted.python.failure import Failure 
 38   
 39  from flumotion.configure import configure 
 40  from flumotion.component import component 
 41  from flumotion.common import log, messages, errors, netutils 
 42  from flumotion.component.component import moods 
 43  from flumotion.component.misc.porter import porterclient 
 44  from flumotion.component.misc.httpserver import fileprovider 
 45  from flumotion.component.base import http as httpbase 
 46  from flumotion.twisted import fdserver 
 47   
 48  __version__ = "$Rev: 8008 $" 
 49   
 50  LOG_CATEGORY = "httpserver" 
 51   
 52   
53 -class BadRequest(weberror.ErrorPage):
54 """ 55 Web error for invalid requests 56 """ 57
58 - def __init__(self, message="Invalid request format"):
59 weberror.ErrorPage.__init__(self, http.BAD_REQUEST, 60 "Bad Request", message)
61 62
63 -class InternalServerError(weberror.ErrorPage):
64 """ 65 Web error for internal failures 66 """ 67
68 - def __init__(self, message="The server failed to complete the request"):
69 weberror.ErrorPage.__init__(self, http.INTERNAL_SERVER_ERROR, 70 "Internal Server Error", message)
71 72
73 -class File(resource.Resource, log.Loggable):
74 """ 75 this file is inspired by/adapted from twisted.web.static 76 """ 77 78 logCategory = LOG_CATEGORY 79 80 defaultType = "application/octet-stream" 81 82 childNotFound = weberror.NoResource("File not found.") 83 forbiddenResource = weberror.ForbiddenResource("Access forbidden") 84 badRequest = BadRequest() 85 internalServerError = InternalServerError() 86
87 - def __init__(self, path, httpauth, 88 mimeToResource=None, 89 rateController=None, 90 requestModifiers=None, 91 metadataProvider=None):
92 resource.Resource.__init__(self) 93 94 self._path = path 95 self._httpauth = httpauth 96 # mapping of mime type -> File subclass 97 self._mimeToResource = mimeToResource or {} 98 self._rateController = rateController 99 self._metadataProvider = metadataProvider 100 self._requestModifiers = requestModifiers or [] 101 self._factory = MimedFileFactory(httpauth, self._mimeToResource, 102 rateController=rateController, 103 metadataProvider=metadataProvider, 104 requestModifiers=requestModifiers)
105
106 - def getChild(self, path, request):
107 self.log('getChild: self %r, path %r', self, path) 108 # we handle a request ending in '/' as well; this is how those come in 109 if path == '': 110 return self 111 112 try: 113 child = self._path.child(path) 114 except fileprovider.NotFoundError: 115 return self.childNotFound 116 except fileprovider.AccessError: 117 return self.forbiddenResource 118 except fileprovider.InsecureError: 119 return self.badRequest 120 121 return self._factory.create(child)
122
123 - def render(self, request):
124 """ 125 The request gets rendered by asking the httpauth object for 126 authentication, which returns a deferred. 127 This deferred will callback when the request gets authenticated. 128 """ 129 130 # PROBE: incoming request; see httpstreamer.resources 131 self.debug('[fd %5d] (ts %f) incoming request %r', 132 request.transport.fileno(), time.time(), request) 133 134 d = self._httpauth.startAuthentication(request) 135 d.addCallbacks(self._requestAuthenticated, self._authenticationFailed, 136 callbackArgs=(request, ), errbackArgs=(request, )) 137 # return NOT_DONE_YET, as required by the twisted.web interfaces 138 return server.NOT_DONE_YET
139
140 - def _authenticationFailed(self, failure, request):
141 # Authentication failed; nothing more to do, just swallow the 142 # failure. The object responsible for authentication has already 143 # written a proper response to the client and closed the request. 144 pass
145
146 - def _requestAuthenticated(self, result, request):
147 # Authentication suceeded. Start rendering the request. 148 # We always want to call _terminateRequest after rendering, 149 # regardless of whether there's a failure while rendering it or not. 150 d = defer.succeed(result) 151 d.addCallback(self._renderRequest, request) 152 d.addBoth(self._terminateRequest, request) 153 return d
154
155 - def _terminateRequest(self, body, request):
156 if body == server.NOT_DONE_YET: 157 # _renderRequest will return NOT_DONE_YET if it started serving the 158 # file. This means the callback chain started by _renderRequest has 159 # finished and we're currently serving the file. 160 return 161 if isinstance(body, Failure): 162 # Something went wrong, log it 163 self.warning("Failure during request rendering: %s", 164 log.getFailureMessage(body)) 165 body = self.internalServerError.render(request) 166 if body: 167 # the callback chain from _renderRequest chose to return a string 168 # body, write it out to the client 169 request.write(body) 170 self.debug('[fd %5d] Terminate request %r', 171 request.transport.fileno(), request) 172 request.finish()
173
174 - def _renderRequest(self, _, request):
175 176 # PROBE: authenticated request; see httpstreamer.resources 177 self.debug('[fd %5d] (ts %f) authenticated request %r', 178 request.transport.fileno(), time.time(), request) 179 180 # Now that we're authenticated (or authentication wasn't requested), 181 # write the file (or appropriate other response) to the client. 182 # We override static.File to implement Range requests, and to get 183 # access to the transfer object to abort it later; the bulk of this 184 # is a direct copy of static.File.render, though. 185 try: 186 self.debug("Opening file %s", self._path) 187 provider = self._path.open() 188 except fileprovider.NotFoundError: 189 self.debug("Could not find resource %s", self._path) 190 return self.childNotFound.render(request) 191 except fileprovider.CannotOpenError: 192 self.debug("%s is a directory, can't be GET", self._path) 193 return self.childNotFound.render(request) 194 except fileprovider.AccessError: 195 return self.forbiddenResource.render(request) 196 197 # Different headers not normally set in static.File... 198 # Specify that we will close the connection after this request, and 199 # that the client must not issue further requests. 200 # We do this because future requests on this server might actually need 201 # to go to a different process (because of the porter) 202 request.setHeader('Server', 'Flumotion/%s' % configure.version) 203 request.setHeader('Connection', 'close') 204 # We can do range requests, in bytes. 205 # UGLY HACK FIXME: if pdf, then do not accept range requests 206 # because Adobe Reader plugin messes up 207 if not self._path.path.endswith('.pdf'): 208 request.setHeader('Accept-Ranges', 'bytes') 209 210 if request.setLastModified(provider.getmtime()) is http.CACHED: 211 return '' 212 213 contentType = provider.mimeType or self.defaultType 214 215 if contentType: 216 self.debug('File content type: %r' % contentType) 217 request.setHeader('content-type', contentType) 218 219 fileSize = provider.getsize() 220 # first and last byte offset we will write 221 first = 0 222 last = fileSize - 1 223 224 requestRange = request.getHeader('range') 225 if requestRange is not None: 226 # We have a partial data request. 227 # for interpretation of range, see RFC 2068 14.36 228 # examples: bytes=500-999; bytes=-500 (suffix mode; last 500) 229 self.log('range request, %r', requestRange) 230 rangeKeyValue = string.split(requestRange, '=') 231 if len(rangeKeyValue) != 2: 232 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 233 return '' 234 235 if rangeKeyValue[0] != 'bytes': 236 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 237 return '' 238 239 # ignore a set of range requests for now, only take the first 240 ranges = rangeKeyValue[1].split(',')[0] 241 l = ranges.split('-') 242 if len(l) != 2: 243 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 244 return '' 245 246 start, end = l 247 248 if start: 249 # byte-range-spec 250 first = int(start) 251 if end: 252 last = min(int(end), last) 253 elif end: 254 # suffix-byte-range-spec 255 count = int(end) 256 # we can't serve more than there are in the file 257 if count > fileSize: 258 count = fileSize 259 first = fileSize - count 260 else: 261 # need at least start or end 262 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 263 return '' 264 265 # Start sending from the requested position in the file 266 if first: 267 # TODO: logs suggest this is called with negative values, 268 # figure out how 269 self.debug("Request for range \"%s\" of file, seeking to " 270 "%d of total file size %d", ranges, first, fileSize) 271 provider.seek(first) 272 273 # FIXME: is it still partial if the request was for the complete 274 # file ? Couldn't find a conclusive answer in the spec. 275 request.setResponseCode(http.PARTIAL_CONTENT) 276 request.setHeader('Content-Range', "bytes %d-%d/%d" % 277 (first, last, fileSize)) 278 279 request.setResponseRange(first, last, fileSize) 280 d = defer.maybeDeferred(self.do_prepareBody, 281 request, provider, first, last) 282 283 def dispatchMethod(header, request): 284 if request.method == 'HEAD': 285 # the _terminateRequest callback will be fired, and the request 286 # will be finished 287 return '' 288 return self._startRequest(request, header, provider, first, last)
289 290 d.addCallback(dispatchMethod, request) 291 292 return d
293
294 - def _startRequest(self, request, header, provider, first, last):
295 # Call request modifiers 296 for modifier in self._requestModifiers: 297 modifier.modify(request) 298 299 # PROBE: started request; see httpstreamer.resources 300 self.debug('[fd %5d] (ts %f) started request %r', 301 request.transport.fileno(), time.time(), request) 302 303 if self._metadataProvider: 304 self.log("Retrieving metadata using %r", self._metadataProvider) 305 d = self._metadataProvider.getMetadata(self._path.path) 306 else: 307 d = defer.succeed(None) 308 309 def metadataError(failure): 310 self.warning('Error retrieving metadata for file %s' 311 ' using plug %r. %r', 312 self._path.path, 313 self._metadataProvider, 314 failure.value)
315 316 d.addErrback(metadataError) 317 d.addCallback(self._configureTransfer, request, header, 318 provider, first, last) 319 320 return d 321
322 - def _configureTransfer(self, metadata, request, header, 323 provider, first, last):
324 if self._rateController: 325 self.debug("Creating RateControl object using plug %r and " 326 "metadata %r", self._rateController, metadata) 327 328 # We are passing a metadata dictionary as Proxy settings. 329 # So the rate control can use it if needed. 330 d = defer.maybeDeferred( 331 self._rateController.createProducerConsumerProxy, 332 request, metadata) 333 else: 334 d = defer.succeed(request) 335 336 def attachProxy(consumer, provider, header, first, last): 337 # If we have a header, give it to the consumer first 338 if header: 339 consumer.write(header) 340 341 # Set the provider first, because for very small file 342 # the transfer could terminate right away. 343 request._provider = provider 344 transfer = FileTransfer(provider, last + 1, consumer) 345 request._transfer = transfer 346 347 # The important NOT_DONE_YET was already returned by the render() 348 # method and the value returned here is just part of a convention 349 # between _renderRequest and _terminateRequest. The latter assumes 350 # that if the deferred chain initiated by _renderRequest will fire 351 # with NOT_DONE_YET if the transfer is in progress. 352 return server.NOT_DONE_YET
353 354 d.addCallback(attachProxy, provider, header, first, last) 355 356 return d 357
358 - def do_prepareBody(self, request, provider, first, last):
359 """ 360 I am called before the body of the response gets written, 361 and after generic header setting has been done. 362 363 I set Content-Length. 364 365 Override me to send additional headers, or to prefix the body 366 with data headers. 367 368 I can return a Deferred, that should fire with a string header. That 369 header will be written to the request. 370 """ 371 request.setHeader("Content-Length", str(last - first + 1)) 372 return ''
373 374
375 -class MimedFileFactory(log.Loggable):
376 """ 377 I create File subclasses based on the mime type of the given path. 378 """ 379 380 logCategory = LOG_CATEGORY 381 382 defaultType = "application/octet-stream" 383
384 - def __init__(self, httpauth, 385 mimeToResource=None, 386 rateController=None, 387 requestModifiers=None, 388 metadataProvider=None):
389 self._httpauth = httpauth 390 self._mimeToResource = mimeToResource or {} 391 self._rateController = rateController 392 self._requestModifiers = requestModifiers 393 self._metadataProvider = metadataProvider
394
395 - def create(self, path):
396 """ 397 Creates and returns an instance of a File subclass based 398 on the mime type of the given path. 399 """ 400 mimeType = path.mimeType or self.defaultType 401 self.debug("Create %s file for %s", mimeType, path) 402 klazz = self._mimeToResource.get(mimeType, File) 403 return klazz(path, self._httpauth, 404 mimeToResource=self._mimeToResource, 405 rateController=self._rateController, 406 requestModifiers=self._requestModifiers, 407 metadataProvider=self._metadataProvider)
408 409
410 -class FLVFile(File):
411 """ 412 I am a File resource for FLV files. 413 I can handle requests with a 'start' GET parameter. 414 This parameter represents the byte offset from where to start. 415 If it is non-zero, I will output an FLV header so the result is 416 playable. 417 """ 418 header = 'FLV\x01\x01\000\000\000\x09\000\000\000\x09' 419
420 - def do_prepareBody(self, request, provider, first, last):
421 self.log('do_prepareBody for FLV') 422 length = last - first + 1 423 ret = '' 424 425 # if there is a non-zero start get parameter, prefix the body with 426 # our FLV header 427 # each value is a list 428 try: 429 start = int(request.args.get('start', ['0'])[0]) 430 except ValueError: 431 start = 0 432 # range request takes precedence over our start parsing 433 if request.getHeader('range') is None and start: 434 self.debug('Start %d passed, seeking', start) 435 provider.seek(start) 436 length = last - start + 1 + len(self.header) 437 ret = self.header 438 439 request.setHeader("Content-Length", str(length)) 440 441 return ret
442 443
444 -class MP4File(File):
445 """ 446 I am a File resource for MP4 files. 447 If I have a library for manipulating MP4 files available, I can handle 448 requests with a 'start' GET parameter, Without the library, I ignore this 449 parameter. 450 The 'start' parameter represents the time offset from where to start, in 451 seconds. If it is non-zero, I will seek inside the file to the sample with 452 that time, and prepend the content with rebuilt MP4 tables, to make the 453 output playable. 454 """ 455
456 - def do_prepareBody(self, request, provider, first, last):
457 self.log('do_prepareBody for MP4') 458 length = last - first + 1 459 ret = '' 460 461 # if there is a non-zero start get parameter, split the file, prefix 462 # the body with the regenerated header and seek inside the provider 463 try: 464 start = float(request.args.get('start', ['0'])[0]) 465 except ValueError: 466 start = 0 467 # range request takes precedence over our start parsing 468 if request.getHeader('range') is None and start and HAS_MP4SEEK: 469 self.debug('Start %f passed, seeking', start) 470 provider.seek(0) 471 d = self._split_file(provider, start) 472 473 def seekAndSetContentLength(header_and_offset): 474 header, offset = header_and_offset 475 # the header is a file-like object with the file pointer at the 476 # end, the offset is a number 477 length = last - offset + 1 + header.tell() 478 provider.seek(offset) 479 request.setHeader("Content-Length", str(length)) 480 header.seek(0) 481 return header.read()
482 483 def seekingFailed(failure): 484 # swallow the failure and serve the file from the beginning 485 self.warning("Seeking in MP4 file %s failed: %s", provider, 486 log.getFailureMessage(failure)) 487 provider.seek(0) 488 request.setHeader('Content-Length', str(length)) 489 return ret
490 491 d.addCallback(seekAndSetContentLength) 492 d.addErrback(seekingFailed) 493 return d 494 else: 495 request.setHeader('Content-Length', str(length)) 496 return defer.succeed(ret) 497
498 - def _split_file(self, provider, start):
499 d = defer.Deferred() 500 501 def read_some_data(how_much, from_where): 502 if how_much: 503 provider.seek(from_where) 504 read_d = provider.read(how_much) 505 read_d.addCallback(splitter.feed) 506 read_d.addErrback(d.errback) 507 else: 508 d.callback(splitter.result())
509 510 splitter = mp4seek.async.Splitter(start) 511 splitter.start(read_some_data) 512 513 return d 514 515
516 -class FileTransfer(log.Loggable):
517 """ 518 A class to represent the transfer of a file over the network. 519 """ 520 521 logCategory = LOG_CATEGORY 522 523 consumer = None 524
525 - def __init__(self, provider, size, consumer):
526 """ 527 @param provider: an asynchronous file provider 528 @type provider: L{fileprovider.File} 529 @param size: file position to which file should be read 530 @type size: int 531 @param consumer: consumer to receive the data 532 @type consumer: L{twisted.internet.interfaces.IFinishableConsumer} 533 """ 534 self.provider = provider 535 self.size = size 536 self.consumer = consumer 537 self.written = self.provider.tell() 538 self.bytesWritten = 0 539 self._pending = None 540 self._again = False # True if resume was called while waiting for data 541 self._finished = False # Set when we finish a transfer 542 self.debug("Calling registerProducer on %r", consumer) 543 consumer.registerProducer(self, 0)
544
545 - def resumeProducing(self):
546 if not self.consumer: 547 return 548 self._produce()
549
550 - def pauseProducing(self):
551 pass
552
553 - def stopProducing(self):
554 self.debug('Stop producing from %s at %d/%d bytes', 555 self.provider, self.provider.tell(), self.size) 556 # even though it's the consumer stopping us, from looking at 557 # twisted code it looks like we still are required to 558 # unregister and notify the request that we're done... 559 self._terminate()
560
561 - def _produce(self):
562 if self._pending: 563 # We already are waiting for data, just remember more is needed 564 self._again = True 565 return 566 self._again = False 567 d = self.provider.read(min(abstract.FileDescriptor.bufferSize, 568 self.size - self.written)) 569 self._pending = d 570 d.addCallbacks(self._cbGotData, self._ebReadFailed)
571
572 - def _cbGotData(self, data):
573 self._pending = None 574 575 # We might have got a stopProducing before the _cbGotData callback has 576 # been fired, so we might be in the _finished state. If so, just 577 # return. 578 if self._finished: 579 return 580 581 if data: 582 # WARNING! This call goes back to the reactor! Read the comment in 583 # _writeToConsumer! 584 self._writeToConsumer(data) 585 586 # We again might be in _finished state, because we might just 587 # got out of the reactor after writing some data to the consumer. 588 # 589 # The story goes thusly: 590 # 1) you write the last data chunk 591 # 2) before you get out of _writeToConsumer(), the _cbGotData gets 592 # fired again 593 # 3) because it's the last write (we've written the entire file) 594 # _terminate() gets called 595 # 4) consumer and provider are set to None 596 # 5) you return from the _writeToConsumer call 597 # 598 # If this happened, just exit (again) 599 if self._finished: 600 return 601 602 if self.provider.tell() == self.size: 603 self.debug('Written entire file of %d bytes from %s', 604 self.size, self.provider) 605 self._terminate() 606 elif self._again: 607 # Continue producing 608 self._produce()
609
610 - def _ebReadFailed(self, failure):
611 self._pending = None 612 self.warning('Failure during file %s reading: %s', 613 self.provider, log.getFailureMessage(failure)) 614 self._terminate()
615
616 - def _writeToConsumer(self, data):
617 self.written += len(data) 618 self.bytesWritten += len(data) 619 # this .write will spin the reactor, calling .doWrite and then 620 # .resumeProducing again, so be prepared for a re-entrant call 621 self.consumer.write(data)
622
623 - def _terminate(self):
624 self.provider.close() 625 self.provider = None 626 self.consumer.unregisterProducer() 627 self.consumer.finish() 628 self.consumer = None 629 self._finished = True
630