| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 | # -*- coding: utf-8 -*-"""    werkzeug.contrib.lint    ~~~~~~~~~~~~~~~~~~~~~    .. versionadded:: 0.5    This module provides a middleware that performs sanity checks of the WSGI    application.  It checks that :pep:`333` is properly implemented and warns    on some common HTTP errors such as non-empty responses for 304 status    codes.    This module provides a middleware, the :class:`LintMiddleware`.  Wrap your    application with it and it will warn about common problems with WSGI and    HTTP while your application is running.    It's strongly recommended to use it during development.    :copyright: (c) 2014 by the Werkzeug Team, see AUTHORS for more details.    :license: BSD, see LICENSE for more details."""try:    from urllib.parse import urlparseexcept ImportError:    from urlparse import urlparsefrom warnings import warnfrom werkzeug.datastructures import Headersfrom werkzeug.http import is_entity_headerfrom werkzeug.wsgi import FileWrapperfrom werkzeug._compat import string_typesclass WSGIWarning(Warning):    """Warning class for WSGI warnings."""class HTTPWarning(Warning):    """Warning class for HTTP warnings."""def check_string(context, obj, stacklevel=3):    if type(obj) is not str:        warn(WSGIWarning('%s requires bytestrings, got %s' %                         (context, obj.__class__.__name__)))class InputStream(object):    def __init__(self, stream):        self._stream = stream    def read(self, *args):        if len(args) == 0:            warn(WSGIWarning('wsgi does not guarantee an EOF marker on the '                             'input stream, thus making calls to '                             'wsgi.input.read() unsafe.  Conforming servers '                             'may never return from this call.'),                 stacklevel=2)        elif len(args) != 1:            warn(WSGIWarning('too many parameters passed to wsgi.input.read()'),                 stacklevel=2)        return self._stream.read(*args)    def readline(self, *args):        if len(args) == 0:            warn(WSGIWarning('Calls to wsgi.input.readline() without arguments'                             ' are unsafe.  Use wsgi.input.read() instead.'),                 stacklevel=2)        elif len(args) == 1:            warn(WSGIWarning('wsgi.input.readline() was called with a size hint. '                             'WSGI does not support this, although it\'s available '                             'on all major servers.'),                 stacklevel=2)        else:            raise TypeError('too many arguments passed to wsgi.input.readline()')        return self._stream.readline(*args)    def __iter__(self):        try:            return iter(self._stream)        except TypeError:            warn(WSGIWarning('wsgi.input is not iterable.'), stacklevel=2)            return iter(())    def close(self):        warn(WSGIWarning('application closed the input stream!'),             stacklevel=2)        self._stream.close()class ErrorStream(object):    def __init__(self, stream):        self._stream = stream    def write(self, s):        check_string('wsgi.error.write()', s)        self._stream.write(s)    def flush(self):        self._stream.flush()    def writelines(self, seq):        for line in seq:            self.write(seq)    def close(self):        warn(WSGIWarning('application closed the error stream!'),             stacklevel=2)        self._stream.close()class GuardedWrite(object):    def __init__(self, write, chunks):        self._write = write        self._chunks = chunks    def __call__(self, s):        check_string('write()', s)        self._write.write(s)        self._chunks.append(len(s))class GuardedIterator(object):    def __init__(self, iterator, headers_set, chunks):        self._iterator = iterator        self._next = iter(iterator).next        self.closed = False        self.headers_set = headers_set        self.chunks = chunks    def __iter__(self):        return self    def next(self):        if self.closed:            warn(WSGIWarning('iterated over closed app_iter'),                 stacklevel=2)        rv = self._next()        if not self.headers_set:            warn(WSGIWarning('Application returned before it '                             'started the response'), stacklevel=2)        check_string('application iterator items', rv)        self.chunks.append(len(rv))        return rv    def close(self):        self.closed = True        if hasattr(self._iterator, 'close'):            self._iterator.close()        if self.headers_set:            status_code, headers = self.headers_set            bytes_sent = sum(self.chunks)            content_length = headers.get('content-length', type=int)            if status_code == 304:                for key, value in headers:                    key = key.lower()                    if key not in ('expires', 'content-location') and \                       is_entity_header(key):                        warn(HTTPWarning('entity header %r found in 304 '                                         'response' % key))                if bytes_sent:                    warn(HTTPWarning('304 responses must not have a body'))            elif 100 <= status_code < 200 or status_code == 204:                if content_length != 0:                    warn(HTTPWarning('%r responses must have an empty '                                     'content length' % status_code))                if bytes_sent:                    warn(HTTPWarning('%r responses must not have a body' %                                     status_code))            elif content_length is not None and content_length != bytes_sent:                warn(WSGIWarning('Content-Length and the number of bytes '                                 'sent to the client do not match.'))    def __del__(self):        if not self.closed:            try:                warn(WSGIWarning('Iterator was garbage collected before '                                 'it was closed.'))            except Exception:                passclass LintMiddleware(object):    """This middleware wraps an application and warns on common errors.    Among other thing it currently checks for the following problems:    -   invalid status codes    -   non-bytestrings sent to the WSGI server    -   strings returned from the WSGI application    -   non-empty conditional responses    -   unquoted etags    -   relative URLs in the Location header    -   unsafe calls to wsgi.input    -   unclosed iterators    Detected errors are emitted using the standard Python :mod:`warnings`    system and usually end up on :data:`stderr`.    ::        from werkzeug.contrib.lint import LintMiddleware        app = LintMiddleware(app)    :param app: the application to wrap    """    def __init__(self, app):        self.app = app    def check_environ(self, environ):        if type(environ) is not dict:            warn(WSGIWarning('WSGI environment is not a standard python dict.'),                 stacklevel=4)        for key in ('REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',                    'wsgi.version', 'wsgi.input', 'wsgi.errors',                    'wsgi.multithread', 'wsgi.multiprocess',                    'wsgi.run_once'):            if key not in environ:                warn(WSGIWarning('required environment key %r not found'                                 % key), stacklevel=3)        if environ['wsgi.version'] != (1, 0):            warn(WSGIWarning('environ is not a WSGI 1.0 environ'),                 stacklevel=3)        script_name = environ.get('SCRIPT_NAME', '')        if script_name and script_name[:1] != '/':            warn(WSGIWarning('SCRIPT_NAME does not start with a slash: %r'                             % script_name), stacklevel=3)        path_info = environ.get('PATH_INFO', '')        if path_info[:1] != '/':            warn(WSGIWarning('PATH_INFO does not start with a slash: %r'                             % path_info), stacklevel=3)    def check_start_response(self, status, headers, exc_info):        check_string('status', status)        status_code = status.split(None, 1)[0]        if len(status_code) != 3 or not status_code.isdigit():            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)        if len(status) < 4 or status[3] != ' ':            warn(WSGIWarning('Invalid value for status %r.  Valid '                             'status strings are three digits, a space '                             'and a status explanation'), stacklevel=3)        status_code = int(status_code)        if status_code < 100:            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)        if type(headers) is not list:            warn(WSGIWarning('header list is not a list'), stacklevel=3)        for item in headers:            if type(item) is not tuple or len(item) != 2:                warn(WSGIWarning('Headers must tuple 2-item tuples'),                     stacklevel=3)            name, value = item            if type(name) is not str or type(value) is not str:                warn(WSGIWarning('header items must be strings'),                     stacklevel=3)            if name.lower() == 'status':                warn(WSGIWarning('The status header is not supported due to '                                 'conflicts with the CGI spec.'),                     stacklevel=3)        if exc_info is not None and not isinstance(exc_info, tuple):            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)        headers = Headers(headers)        self.check_headers(headers)        return status_code, headers    def check_headers(self, headers):        etag = headers.get('etag')        if etag is not None:            if etag.startswith(('W/', 'w/')):                if etag.startswith('w/'):                    warn(HTTPWarning('weak etag indicator should be upcase.'),                         stacklevel=4)                etag = etag[2:]            if not (etag[:1] == etag[-1:] == '"'):                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)        location = headers.get('location')        if location is not None:            if not urlparse(location).netloc:                warn(HTTPWarning('absolute URLs required for location header'),                     stacklevel=4)    def check_iterator(self, app_iter):        if isinstance(app_iter, string_types):            warn(WSGIWarning('application returned string.  Response will '                             'send character for character to the client '                             'which will kill the performance.  Return a '                             'list or iterable instead.'), stacklevel=3)    def __call__(self, *args, **kwargs):        if len(args) != 2:            warn(WSGIWarning('Two arguments to WSGI app required'), stacklevel=2)        if kwargs:            warn(WSGIWarning('No keyword arguments to WSGI app allowed'),                 stacklevel=2)        environ, start_response = args        self.check_environ(environ)        environ['wsgi.input'] = InputStream(environ['wsgi.input'])        environ['wsgi.errors'] = ErrorStream(environ['wsgi.errors'])        # hook our own file wrapper in so that applications will always        # iterate to the end and we can check the content length        environ['wsgi.file_wrapper'] = FileWrapper        headers_set = []        chunks = []        def checking_start_response(*args, **kwargs):            if len(args) not in (2, 3):                warn(WSGIWarning('Invalid number of arguments: %s, expected '                                 '2 or 3' % len(args), stacklevel=2))            if kwargs:                warn(WSGIWarning('no keyword arguments allowed.'))            status, headers = args[:2]            if len(args) == 3:                exc_info = args[2]            else:                exc_info = None            headers_set[:] = self.check_start_response(status, headers,                                                       exc_info)            return GuardedWrite(start_response(status, headers, exc_info),                                chunks)        app_iter = self.app(environ, checking_start_response)        self.check_iterator(app_iter)        return GuardedIterator(app_iter, headers_set, chunks)
 |