123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703 |
- import re
- import io
- import os
- import sys
- import codecs
- from weakref import WeakKeyDictionary
- PY2 = sys.version_info[0] == 2
- CYGWIN = sys.platform.startswith('cygwin')
- # Determine local App Engine environment, per Google's own suggestion
- APP_ENGINE = ('APPENGINE_RUNTIME' in os.environ and
- 'Development/' in os.environ['SERVER_SOFTWARE'])
- WIN = sys.platform.startswith('win') and not APP_ENGINE
- DEFAULT_COLUMNS = 80
- _ansi_re = re.compile(r'\033\[((?:\d|;)*)([a-zA-Z])')
- def get_filesystem_encoding():
- return sys.getfilesystemencoding() or sys.getdefaultencoding()
- def _make_text_stream(stream, encoding, errors,
- force_readable=False, force_writable=False):
- if encoding is None:
- encoding = get_best_encoding(stream)
- if errors is None:
- errors = 'replace'
- return _NonClosingTextIOWrapper(stream, encoding, errors,
- line_buffering=True,
- force_readable=force_readable,
- force_writable=force_writable)
- def is_ascii_encoding(encoding):
- """Checks if a given encoding is ascii."""
- try:
- return codecs.lookup(encoding).name == 'ascii'
- except LookupError:
- return False
- def get_best_encoding(stream):
- """Returns the default stream encoding if not found."""
- rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding()
- if is_ascii_encoding(rv):
- return 'utf-8'
- return rv
- class _NonClosingTextIOWrapper(io.TextIOWrapper):
- def __init__(self, stream, encoding, errors,
- force_readable=False, force_writable=False, **extra):
- self._stream = stream = _FixupStream(stream, force_readable,
- force_writable)
- io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
- # The io module is a place where the Python 3 text behavior
- # was forced upon Python 2, so we need to unbreak
- # it to look like Python 2.
- if PY2:
- def write(self, x):
- if isinstance(x, str) or is_bytes(x):
- try:
- self.flush()
- except Exception:
- pass
- return self.buffer.write(str(x))
- return io.TextIOWrapper.write(self, x)
- def writelines(self, lines):
- for line in lines:
- self.write(line)
- def __del__(self):
- try:
- self.detach()
- except Exception:
- pass
- def isatty(self):
- # https://bitbucket.org/pypy/pypy/issue/1803
- return self._stream.isatty()
- class _FixupStream(object):
- """The new io interface needs more from streams than streams
- traditionally implement. As such, this fix-up code is necessary in
- some circumstances.
- The forcing of readable and writable flags are there because some tools
- put badly patched objects on sys (one such offender are certain version
- of jupyter notebook).
- """
- def __init__(self, stream, force_readable=False, force_writable=False):
- self._stream = stream
- self._force_readable = force_readable
- self._force_writable = force_writable
- def __getattr__(self, name):
- return getattr(self._stream, name)
- def read1(self, size):
- f = getattr(self._stream, 'read1', None)
- if f is not None:
- return f(size)
- # We only dispatch to readline instead of read in Python 2 as we
- # do not want cause problems with the different implementation
- # of line buffering.
- if PY2:
- return self._stream.readline(size)
- return self._stream.read(size)
- def readable(self):
- if self._force_readable:
- return True
- x = getattr(self._stream, 'readable', None)
- if x is not None:
- return x()
- try:
- self._stream.read(0)
- except Exception:
- return False
- return True
- def writable(self):
- if self._force_writable:
- return True
- x = getattr(self._stream, 'writable', None)
- if x is not None:
- return x()
- try:
- self._stream.write('')
- except Exception:
- try:
- self._stream.write(b'')
- except Exception:
- return False
- return True
- def seekable(self):
- x = getattr(self._stream, 'seekable', None)
- if x is not None:
- return x()
- try:
- self._stream.seek(self._stream.tell())
- except Exception:
- return False
- return True
- if PY2:
- text_type = unicode
- bytes = str
- raw_input = raw_input
- string_types = (str, unicode)
- int_types = (int, long)
- iteritems = lambda x: x.iteritems()
- range_type = xrange
- def is_bytes(x):
- return isinstance(x, (buffer, bytearray))
- _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
- # For Windows, we need to force stdout/stdin/stderr to binary if it's
- # fetched for that. This obviously is not the most correct way to do
- # it as it changes global state. Unfortunately, there does not seem to
- # be a clear better way to do it as just reopening the file in binary
- # mode does not change anything.
- #
- # An option would be to do what Python 3 does and to open the file as
- # binary only, patch it back to the system, and then use a wrapper
- # stream that converts newlines. It's not quite clear what's the
- # correct option here.
- #
- # This code also lives in _winconsole for the fallback to the console
- # emulation stream.
- #
- # There are also Windows environments where the `msvcrt` module is not
- # available (which is why we use try-catch instead of the WIN variable
- # here), such as the Google App Engine development server on Windows. In
- # those cases there is just nothing we can do.
- def set_binary_mode(f):
- return f
- try:
- import msvcrt
- except ImportError:
- pass
- else:
- def set_binary_mode(f):
- try:
- fileno = f.fileno()
- except Exception:
- pass
- else:
- msvcrt.setmode(fileno, os.O_BINARY)
- return f
- try:
- import fcntl
- except ImportError:
- pass
- else:
- def set_binary_mode(f):
- try:
- fileno = f.fileno()
- except Exception:
- pass
- else:
- flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
- fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
- return f
- def isidentifier(x):
- return _identifier_re.search(x) is not None
- def get_binary_stdin():
- return set_binary_mode(sys.stdin)
- def get_binary_stdout():
- _wrap_std_stream('stdout')
- return set_binary_mode(sys.stdout)
- def get_binary_stderr():
- _wrap_std_stream('stderr')
- return set_binary_mode(sys.stderr)
- def get_text_stdin(encoding=None, errors=None):
- rv = _get_windows_console_stream(sys.stdin, encoding, errors)
- if rv is not None:
- return rv
- return _make_text_stream(sys.stdin, encoding, errors,
- force_readable=True)
- def get_text_stdout(encoding=None, errors=None):
- _wrap_std_stream('stdout')
- rv = _get_windows_console_stream(sys.stdout, encoding, errors)
- if rv is not None:
- return rv
- return _make_text_stream(sys.stdout, encoding, errors,
- force_writable=True)
- def get_text_stderr(encoding=None, errors=None):
- _wrap_std_stream('stderr')
- rv = _get_windows_console_stream(sys.stderr, encoding, errors)
- if rv is not None:
- return rv
- return _make_text_stream(sys.stderr, encoding, errors,
- force_writable=True)
- def filename_to_ui(value):
- if isinstance(value, bytes):
- value = value.decode(get_filesystem_encoding(), 'replace')
- return value
- else:
- import io
- text_type = str
- raw_input = input
- string_types = (str,)
- int_types = (int,)
- range_type = range
- isidentifier = lambda x: x.isidentifier()
- iteritems = lambda x: iter(x.items())
- def is_bytes(x):
- return isinstance(x, (bytes, memoryview, bytearray))
- def _is_binary_reader(stream, default=False):
- try:
- return isinstance(stream.read(0), bytes)
- except Exception:
- return default
- # This happens in some cases where the stream was already
- # closed. In this case, we assume the default.
- def _is_binary_writer(stream, default=False):
- try:
- stream.write(b'')
- except Exception:
- try:
- stream.write('')
- return False
- except Exception:
- pass
- return default
- return True
- def _find_binary_reader(stream):
- # We need to figure out if the given stream is already binary.
- # This can happen because the official docs recommend detaching
- # the streams to get binary streams. Some code might do this, so
- # we need to deal with this case explicitly.
- if _is_binary_reader(stream, False):
- return stream
- buf = getattr(stream, 'buffer', None)
- # Same situation here; this time we assume that the buffer is
- # actually binary in case it's closed.
- if buf is not None and _is_binary_reader(buf, True):
- return buf
- def _find_binary_writer(stream):
- # We need to figure out if the given stream is already binary.
- # This can happen because the official docs recommend detatching
- # the streams to get binary streams. Some code might do this, so
- # we need to deal with this case explicitly.
- if _is_binary_writer(stream, False):
- return stream
- buf = getattr(stream, 'buffer', None)
- # Same situation here; this time we assume that the buffer is
- # actually binary in case it's closed.
- if buf is not None and _is_binary_writer(buf, True):
- return buf
- def _stream_is_misconfigured(stream):
- """A stream is misconfigured if its encoding is ASCII."""
- # If the stream does not have an encoding set, we assume it's set
- # to ASCII. This appears to happen in certain unittest
- # environments. It's not quite clear what the correct behavior is
- # but this at least will force Click to recover somehow.
- return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii')
- def _is_compatible_text_stream(stream, encoding, errors):
- stream_encoding = getattr(stream, 'encoding', None)
- stream_errors = getattr(stream, 'errors', None)
- # Perfect match.
- if stream_encoding == encoding and stream_errors == errors:
- return True
- # Otherwise, it's only a compatible stream if we did not ask for
- # an encoding.
- if encoding is None:
- return stream_encoding is not None
- return False
- def _force_correct_text_reader(text_reader, encoding, errors,
- force_readable=False):
- if _is_binary_reader(text_reader, False):
- binary_reader = text_reader
- else:
- # If there is no target encoding set, we need to verify that the
- # reader is not actually misconfigured.
- if encoding is None and not _stream_is_misconfigured(text_reader):
- return text_reader
- if _is_compatible_text_stream(text_reader, encoding, errors):
- return text_reader
- # If the reader has no encoding, we try to find the underlying
- # binary reader for it. If that fails because the environment is
- # misconfigured, we silently go with the same reader because this
- # is too common to happen. In that case, mojibake is better than
- # exceptions.
- binary_reader = _find_binary_reader(text_reader)
- if binary_reader is None:
- return text_reader
- # At this point, we default the errors to replace instead of strict
- # because nobody handles those errors anyways and at this point
- # we're so fundamentally fucked that nothing can repair it.
- if errors is None:
- errors = 'replace'
- return _make_text_stream(binary_reader, encoding, errors,
- force_readable=force_readable)
- def _force_correct_text_writer(text_writer, encoding, errors,
- force_writable=False):
- if _is_binary_writer(text_writer, False):
- binary_writer = text_writer
- else:
- # If there is no target encoding set, we need to verify that the
- # writer is not actually misconfigured.
- if encoding is None and not _stream_is_misconfigured(text_writer):
- return text_writer
- if _is_compatible_text_stream(text_writer, encoding, errors):
- return text_writer
- # If the writer has no encoding, we try to find the underlying
- # binary writer for it. If that fails because the environment is
- # misconfigured, we silently go with the same writer because this
- # is too common to happen. In that case, mojibake is better than
- # exceptions.
- binary_writer = _find_binary_writer(text_writer)
- if binary_writer is None:
- return text_writer
- # At this point, we default the errors to replace instead of strict
- # because nobody handles those errors anyways and at this point
- # we're so fundamentally fucked that nothing can repair it.
- if errors is None:
- errors = 'replace'
- return _make_text_stream(binary_writer, encoding, errors,
- force_writable=force_writable)
- def get_binary_stdin():
- reader = _find_binary_reader(sys.stdin)
- if reader is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stdin.')
- return reader
- def get_binary_stdout():
- writer = _find_binary_writer(sys.stdout)
- if writer is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stdout.')
- return writer
- def get_binary_stderr():
- writer = _find_binary_writer(sys.stderr)
- if writer is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stderr.')
- return writer
- def get_text_stdin(encoding=None, errors=None):
- rv = _get_windows_console_stream(sys.stdin, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_reader(sys.stdin, encoding, errors,
- force_readable=True)
- def get_text_stdout(encoding=None, errors=None):
- rv = _get_windows_console_stream(sys.stdout, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_writer(sys.stdout, encoding, errors,
- force_writable=True)
- def get_text_stderr(encoding=None, errors=None):
- rv = _get_windows_console_stream(sys.stderr, encoding, errors)
- if rv is not None:
- return rv
- return _force_correct_text_writer(sys.stderr, encoding, errors,
- force_writable=True)
- def filename_to_ui(value):
- if isinstance(value, bytes):
- value = value.decode(get_filesystem_encoding(), 'replace')
- else:
- value = value.encode('utf-8', 'surrogateescape') \
- .decode('utf-8', 'replace')
- return value
- def get_streerror(e, default=None):
- if hasattr(e, 'strerror'):
- msg = e.strerror
- else:
- if default is not None:
- msg = default
- else:
- msg = str(e)
- if isinstance(msg, bytes):
- msg = msg.decode('utf-8', 'replace')
- return msg
- def open_stream(filename, mode='r', encoding=None, errors='strict',
- atomic=False):
- # Standard streams first. These are simple because they don't need
- # special handling for the atomic flag. It's entirely ignored.
- if filename == '-':
- if any(m in mode for m in ['w', 'a', 'x']):
- if 'b' in mode:
- return get_binary_stdout(), False
- return get_text_stdout(encoding=encoding, errors=errors), False
- if 'b' in mode:
- return get_binary_stdin(), False
- return get_text_stdin(encoding=encoding, errors=errors), False
- # Non-atomic writes directly go out through the regular open functions.
- if not atomic:
- if encoding is None:
- return open(filename, mode), True
- return io.open(filename, mode, encoding=encoding, errors=errors), True
- # Some usability stuff for atomic writes
- if 'a' in mode:
- raise ValueError(
- 'Appending to an existing file is not supported, because that '
- 'would involve an expensive `copy`-operation to a temporary '
- 'file. Open the file in normal `w`-mode and copy explicitly '
- 'if that\'s what you\'re after.'
- )
- if 'x' in mode:
- raise ValueError('Use the `overwrite`-parameter instead.')
- if 'w' not in mode:
- raise ValueError('Atomic writes only make sense with `w`-mode.')
- # Atomic writes are more complicated. They work by opening a file
- # as a proxy in the same folder and then using the fdopen
- # functionality to wrap it in a Python file. Then we wrap it in an
- # atomic file that moves the file over on close.
- import tempfile
- fd, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename),
- prefix='.__atomic-write')
- if encoding is not None:
- f = io.open(fd, mode, encoding=encoding, errors=errors)
- else:
- f = os.fdopen(fd, mode)
- return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True
- # Used in a destructor call, needs extra protection from interpreter cleanup.
- if hasattr(os, 'replace'):
- _replace = os.replace
- _can_replace = True
- else:
- _replace = os.rename
- _can_replace = not WIN
- class _AtomicFile(object):
- def __init__(self, f, tmp_filename, real_filename):
- self._f = f
- self._tmp_filename = tmp_filename
- self._real_filename = real_filename
- self.closed = False
- @property
- def name(self):
- return self._real_filename
- def close(self, delete=False):
- if self.closed:
- return
- self._f.close()
- if not _can_replace:
- try:
- os.remove(self._real_filename)
- except OSError:
- pass
- _replace(self._tmp_filename, self._real_filename)
- self.closed = True
- def __getattr__(self, name):
- return getattr(self._f, name)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, tb):
- self.close(delete=exc_type is not None)
- def __repr__(self):
- return repr(self._f)
- auto_wrap_for_ansi = None
- colorama = None
- get_winterm_size = None
- def strip_ansi(value):
- return _ansi_re.sub('', value)
- def should_strip_ansi(stream=None, color=None):
- if color is None:
- if stream is None:
- stream = sys.stdin
- return not isatty(stream)
- return not color
- # If we're on Windows, we provide transparent integration through
- # colorama. This will make ANSI colors through the echo function
- # work automatically.
- if WIN:
- # Windows has a smaller terminal
- DEFAULT_COLUMNS = 79
- from ._winconsole import _get_windows_console_stream, _wrap_std_stream
- def _get_argv_encoding():
- import locale
- return locale.getpreferredencoding()
- if PY2:
- def raw_input(prompt=''):
- sys.stderr.flush()
- if prompt:
- stdout = _default_text_stdout()
- stdout.write(prompt)
- stdin = _default_text_stdin()
- return stdin.readline().rstrip('\r\n')
- try:
- import colorama
- except ImportError:
- pass
- else:
- _ansi_stream_wrappers = WeakKeyDictionary()
- def auto_wrap_for_ansi(stream, color=None):
- """This function wraps a stream so that calls through colorama
- are issued to the win32 console API to recolor on demand. It
- also ensures to reset the colors if a write call is interrupted
- to not destroy the console afterwards.
- """
- try:
- cached = _ansi_stream_wrappers.get(stream)
- except Exception:
- cached = None
- if cached is not None:
- return cached
- strip = should_strip_ansi(stream, color)
- ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
- rv = ansi_wrapper.stream
- _write = rv.write
- def _safe_write(s):
- try:
- return _write(s)
- except:
- ansi_wrapper.reset_all()
- raise
- rv.write = _safe_write
- try:
- _ansi_stream_wrappers[stream] = rv
- except Exception:
- pass
- return rv
- def get_winterm_size():
- win = colorama.win32.GetConsoleScreenBufferInfo(
- colorama.win32.STDOUT).srWindow
- return win.Right - win.Left, win.Bottom - win.Top
- else:
- def _get_argv_encoding():
- return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding()
- _get_windows_console_stream = lambda *x: None
- _wrap_std_stream = lambda *x: None
- def term_len(x):
- return len(strip_ansi(x))
- def isatty(stream):
- try:
- return stream.isatty()
- except Exception:
- return False
- def _make_cached_stream_func(src_func, wrapper_func):
- cache = WeakKeyDictionary()
- def func():
- stream = src_func()
- try:
- rv = cache.get(stream)
- except Exception:
- rv = None
- if rv is not None:
- return rv
- rv = wrapper_func()
- try:
- stream = src_func() # In case wrapper_func() modified the stream
- cache[stream] = rv
- except Exception:
- pass
- return rv
- return func
- _default_text_stdin = _make_cached_stream_func(
- lambda: sys.stdin, get_text_stdin)
- _default_text_stdout = _make_cached_stream_func(
- lambda: sys.stdout, get_text_stdout)
- _default_text_stderr = _make_cached_stream_func(
- lambda: sys.stderr, get_text_stderr)
- binary_streams = {
- 'stdin': get_binary_stdin,
- 'stdout': get_binary_stdout,
- 'stderr': get_binary_stderr,
- }
- text_streams = {
- 'stdin': get_text_stdin,
- 'stdout': get_text_stdout,
- 'stderr': get_text_stderr,
- }
|