123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # -*- coding: utf-8 -*-
- """
- werkzeug.contrib.lint
- ~~~~~~~~~~~~~~~~~~~~~
- .. versionadded:: 0.5
- This module provides a middleware that performs sanity checks of the WSGI
- application. It checks that :pep:`333` is properly implemented and warns
- on some common HTTP errors such as non-empty responses for 304 status
- codes.
- This module provides a middleware, the :class:`LintMiddleware`. Wrap your
- application with it and it will warn about common problems with WSGI and
- HTTP while your application is running.
- It's strongly recommended to use it during development.
- :copyright: (c) 2014 by the Werkzeug Team, see AUTHORS for more details.
- :license: BSD, see LICENSE for more details.
- """
- try:
- from urllib.parse import urlparse
- except ImportError:
- from urlparse import urlparse
- from warnings import warn
- from werkzeug.datastructures import Headers
- from werkzeug.http import is_entity_header
- from werkzeug.wsgi import FileWrapper
- from werkzeug._compat import string_types
- class WSGIWarning(Warning):
- """Warning class for WSGI warnings."""
- class HTTPWarning(Warning):
- """Warning class for HTTP warnings."""
- def check_string(context, obj, stacklevel=3):
- if type(obj) is not str:
- warn(WSGIWarning('%s requires bytestrings, got %s' %
- (context, obj.__class__.__name__)))
- class InputStream(object):
- def __init__(self, stream):
- self._stream = stream
- def read(self, *args):
- if len(args) == 0:
- warn(WSGIWarning('wsgi does not guarantee an EOF marker on the '
- 'input stream, thus making calls to '
- 'wsgi.input.read() unsafe. Conforming servers '
- 'may never return from this call.'),
- stacklevel=2)
- elif len(args) != 1:
- warn(WSGIWarning('too many parameters passed to wsgi.input.read()'),
- stacklevel=2)
- return self._stream.read(*args)
- def readline(self, *args):
- if len(args) == 0:
- warn(WSGIWarning('Calls to wsgi.input.readline() without arguments'
- ' are unsafe. Use wsgi.input.read() instead.'),
- stacklevel=2)
- elif len(args) == 1:
- warn(WSGIWarning('wsgi.input.readline() was called with a size hint. '
- 'WSGI does not support this, although it\'s available '
- 'on all major servers.'),
- stacklevel=2)
- else:
- raise TypeError('too many arguments passed to wsgi.input.readline()')
- return self._stream.readline(*args)
- def __iter__(self):
- try:
- return iter(self._stream)
- except TypeError:
- warn(WSGIWarning('wsgi.input is not iterable.'), stacklevel=2)
- return iter(())
- def close(self):
- warn(WSGIWarning('application closed the input stream!'),
- stacklevel=2)
- self._stream.close()
- class ErrorStream(object):
- def __init__(self, stream):
- self._stream = stream
- def write(self, s):
- check_string('wsgi.error.write()', s)
- self._stream.write(s)
- def flush(self):
- self._stream.flush()
- def writelines(self, seq):
- for line in seq:
- self.write(seq)
- def close(self):
- warn(WSGIWarning('application closed the error stream!'),
- stacklevel=2)
- self._stream.close()
- class GuardedWrite(object):
- def __init__(self, write, chunks):
- self._write = write
- self._chunks = chunks
- def __call__(self, s):
- check_string('write()', s)
- self._write.write(s)
- self._chunks.append(len(s))
- class GuardedIterator(object):
- def __init__(self, iterator, headers_set, chunks):
- self._iterator = iterator
- self._next = iter(iterator).next
- self.closed = False
- self.headers_set = headers_set
- self.chunks = chunks
- def __iter__(self):
- return self
- def next(self):
- if self.closed:
- warn(WSGIWarning('iterated over closed app_iter'),
- stacklevel=2)
- rv = self._next()
- if not self.headers_set:
- warn(WSGIWarning('Application returned before it '
- 'started the response'), stacklevel=2)
- check_string('application iterator items', rv)
- self.chunks.append(len(rv))
- return rv
- def close(self):
- self.closed = True
- if hasattr(self._iterator, 'close'):
- self._iterator.close()
- if self.headers_set:
- status_code, headers = self.headers_set
- bytes_sent = sum(self.chunks)
- content_length = headers.get('content-length', type=int)
- if status_code == 304:
- for key, value in headers:
- key = key.lower()
- if key not in ('expires', 'content-location') and \
- is_entity_header(key):
- warn(HTTPWarning('entity header %r found in 304 '
- 'response' % key))
- if bytes_sent:
- warn(HTTPWarning('304 responses must not have a body'))
- elif 100 <= status_code < 200 or status_code == 204:
- if content_length != 0:
- warn(HTTPWarning('%r responses must have an empty '
- 'content length' % status_code))
- if bytes_sent:
- warn(HTTPWarning('%r responses must not have a body' %
- status_code))
- elif content_length is not None and content_length != bytes_sent:
- warn(WSGIWarning('Content-Length and the number of bytes '
- 'sent to the client do not match.'))
- def __del__(self):
- if not self.closed:
- try:
- warn(WSGIWarning('Iterator was garbage collected before '
- 'it was closed.'))
- except Exception:
- pass
- class LintMiddleware(object):
- """This middleware wraps an application and warns on common errors.
- Among other thing it currently checks for the following problems:
- - invalid status codes
- - non-bytestrings sent to the WSGI server
- - strings returned from the WSGI application
- - non-empty conditional responses
- - unquoted etags
- - relative URLs in the Location header
- - unsafe calls to wsgi.input
- - unclosed iterators
- Detected errors are emitted using the standard Python :mod:`warnings`
- system and usually end up on :data:`stderr`.
- ::
- from werkzeug.contrib.lint import LintMiddleware
- app = LintMiddleware(app)
- :param app: the application to wrap
- """
- def __init__(self, app):
- self.app = app
- def check_environ(self, environ):
- if type(environ) is not dict:
- warn(WSGIWarning('WSGI environment is not a standard python dict.'),
- stacklevel=4)
- for key in ('REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
- 'wsgi.version', 'wsgi.input', 'wsgi.errors',
- 'wsgi.multithread', 'wsgi.multiprocess',
- 'wsgi.run_once'):
- if key not in environ:
- warn(WSGIWarning('required environment key %r not found'
- % key), stacklevel=3)
- if environ['wsgi.version'] != (1, 0):
- warn(WSGIWarning('environ is not a WSGI 1.0 environ'),
- stacklevel=3)
- script_name = environ.get('SCRIPT_NAME', '')
- if script_name and script_name[:1] != '/':
- warn(WSGIWarning('SCRIPT_NAME does not start with a slash: %r'
- % script_name), stacklevel=3)
- path_info = environ.get('PATH_INFO', '')
- if path_info[:1] != '/':
- warn(WSGIWarning('PATH_INFO does not start with a slash: %r'
- % path_info), stacklevel=3)
- def check_start_response(self, status, headers, exc_info):
- check_string('status', status)
- status_code = status.split(None, 1)[0]
- if len(status_code) != 3 or not status_code.isdigit():
- warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
- if len(status) < 4 or status[3] != ' ':
- warn(WSGIWarning('Invalid value for status %r. Valid '
- 'status strings are three digits, a space '
- 'and a status explanation'), stacklevel=3)
- status_code = int(status_code)
- if status_code < 100:
- warn(WSGIWarning('status code < 100 detected'), stacklevel=3)
- if type(headers) is not list:
- warn(WSGIWarning('header list is not a list'), stacklevel=3)
- for item in headers:
- if type(item) is not tuple or len(item) != 2:
- warn(WSGIWarning('Headers must tuple 2-item tuples'),
- stacklevel=3)
- name, value = item
- if type(name) is not str or type(value) is not str:
- warn(WSGIWarning('header items must be strings'),
- stacklevel=3)
- if name.lower() == 'status':
- warn(WSGIWarning('The status header is not supported due to '
- 'conflicts with the CGI spec.'),
- stacklevel=3)
- if exc_info is not None and not isinstance(exc_info, tuple):
- warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)
- headers = Headers(headers)
- self.check_headers(headers)
- return status_code, headers
- def check_headers(self, headers):
- etag = headers.get('etag')
- if etag is not None:
- if etag.startswith(('W/', 'w/')):
- if etag.startswith('w/'):
- warn(HTTPWarning('weak etag indicator should be upcase.'),
- stacklevel=4)
- etag = etag[2:]
- if not (etag[:1] == etag[-1:] == '"'):
- warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)
- location = headers.get('location')
- if location is not None:
- if not urlparse(location).netloc:
- warn(HTTPWarning('absolute URLs required for location header'),
- stacklevel=4)
- def check_iterator(self, app_iter):
- if isinstance(app_iter, string_types):
- warn(WSGIWarning('application returned string. Response will '
- 'send character for character to the client '
- 'which will kill the performance. Return a '
- 'list or iterable instead.'), stacklevel=3)
- def __call__(self, *args, **kwargs):
- if len(args) != 2:
- warn(WSGIWarning('Two arguments to WSGI app required'), stacklevel=2)
- if kwargs:
- warn(WSGIWarning('No keyword arguments to WSGI app allowed'),
- stacklevel=2)
- environ, start_response = args
- self.check_environ(environ)
- environ['wsgi.input'] = InputStream(environ['wsgi.input'])
- environ['wsgi.errors'] = ErrorStream(environ['wsgi.errors'])
- # hook our own file wrapper in so that applications will always
- # iterate to the end and we can check the content length
- environ['wsgi.file_wrapper'] = FileWrapper
- headers_set = []
- chunks = []
- def checking_start_response(*args, **kwargs):
- if len(args) not in (2, 3):
- warn(WSGIWarning('Invalid number of arguments: %s, expected '
- '2 or 3' % len(args), stacklevel=2))
- if kwargs:
- warn(WSGIWarning('no keyword arguments allowed.'))
- status, headers = args[:2]
- if len(args) == 3:
- exc_info = args[2]
- else:
- exc_info = None
- headers_set[:] = self.check_start_response(status, headers,
- exc_info)
- return GuardedWrite(start_response(status, headers, exc_info),
- chunks)
- app_iter = self.app(environ, checking_start_response)
- self.check_iterator(app_iter)
- return GuardedIterator(app_iter, headers_set, chunks)
|