# -*- coding: utf-8 -*-

import re
import sys
import os
import glob
import operator
import traceback
import importlib

from contextlib import contextmanager
from datetime import timedelta
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count

from gevent._util import Lazy

from . import util
from .resources import parse_resources
from .resources import setup_resources
from .resources import unparse_resources
from .sysinfo import RUNNING_ON_CI
from .sysinfo import PYPY
from .sysinfo import PY2
from .sysinfo import RESOLVER_ARES
from .sysinfo import RUN_LEAKCHECKS
from .sysinfo import OSX
from . import six
from . import travis

# Import this while we're probably single-threaded/single-processed
# to try to avoid issues with PyPy 5.10.
# See https://bitbucket.org/pypy/pypy/issues/2769/systemerror-unexpected-internal-exception
try:
    __import__('_testcapi')
except (ImportError, OSError):
    # This can raise a wide variety of errors
    pass

TIMEOUT = 100 # seconds
AVAIL_NWORKERS = cpu_count() - 1
DEFAULT_NWORKERS = int(os.environ.get('NWORKERS') or max(AVAIL_NWORKERS, 4))
if DEFAULT_NWORKERS > 15:
    DEFAULT_NWORKERS = 10


if RUN_LEAKCHECKS:
    # Capturing the stats takes time, and we run each
    # test at least twice
    TIMEOUT = 200

DEFAULT_RUN_OPTIONS = {
    'timeout': TIMEOUT
}


if RUNNING_ON_CI:
    # Too many and we get spurious timeouts
    DEFAULT_NWORKERS = 4 if not OSX else 2


def _package_relative_filename(filename, package):
    if not os.path.isfile(filename) and package:
        # Ok, try to locate it as a module in the package
        package_dir = _dir_from_package_name(package)
        return os.path.join(package_dir, filename)
    return filename

def _dir_from_package_name(package):
    package_mod = importlib.import_module(package)
    package_dir = os.path.dirname(package_mod.__file__)
    return package_dir


class ResultCollector(object):

    def __init__(self):
        self.total = 0
        self.failed = {}
        self.passed = {}
        self.total_cases = 0
        self.total_skipped = 0
        # Every RunResult reported: failed, passed, rerun
        self._all_results = []
        self.reran = {}

    def __iadd__(self, result):
        self._all_results.append(result)

        if not result:
            self.failed[result.name] = result #[cmd, kwargs]
        else:
            self.passed[result.name] = True
        self.total_cases += result.run_count
        self.total_skipped += result.skipped_count
        return self

    def __ilshift__(self, result):
        """
        collector <<= result

        Stores the result, but does not count it towards
        the number of cases run, skipped, passed or failed.
        """
        self._all_results.append(result)
        self.reran[result.name] = result
        return self

    @property
    def longest_running_tests(self):
        """
        A new list of RunResult objects, sorted from longest running
        to shortest running.
        """
        return sorted(self._all_results,
                      key=operator.attrgetter('run_duration'),
                      reverse=True)


class FailFast(Exception):
    pass

class Runner(object):

    TIME_WAIT_REAP = 0.1
    TIME_WAIT_SPAWN = 0.05

    def __init__(self,
                 tests,
                 *,
                 allowed_return_codes=(),
                 configured_failing_tests=(),
                 failfast=False,
                 quiet=False,
                 configured_run_alone_tests=(),
                 worker_count=DEFAULT_NWORKERS,
                 second_chance=False):
        """
        :keyword allowed_return_codes: Return codes other than
           0 that are counted as a success. Needed because some versions
           of Python give ``unittest`` weird return codes.
        :keyword quiet: Set to True or False to explicitly choose. Set to
            `None` to use the default, which may come from the environment variable
            ``GEVENTTEST_QUIET``.
        """
        self._tests = tests
        self._configured_failing_tests = configured_failing_tests
        self._quiet = quiet
        self._configured_run_alone_tests = configured_run_alone_tests

        assert not (failfast and second_chance)
        self._failfast = failfast
        self._second_chance = second_chance

        self.results = ResultCollector()
        self.results.total = len(self._tests)
        self._running_jobs = []

        self._worker_count = min(len(tests), worker_count) or 1
        self._allowed_return_codes = allowed_return_codes

    def _run_one(self, cmd, **kwargs):
        kwargs['allowed_return_codes'] = self._allowed_return_codes
        if self._quiet is not None:
            kwargs['quiet'] = self._quiet
        result = util.run(cmd, **kwargs)
        if not result and self._second_chance:
            self.results <<= result
            util.log("> %s", result.name, color='warning')
            result = util.run(cmd, **kwargs)
        if not result and self._failfast:
            # Under Python 3.9 (maybe older versions?), raising the
            # SystemExit here (a background thread belonging to the
            # pool) doesn't seem to work well. It gets stuck waiting
            # for a lock? The job never shows up as finished.
            raise FailFast(cmd)
        self.results += result

    def _reap(self):
        "Clean up the list of running jobs, returning how many are still outstanding."
        for r in self._running_jobs[:]:
            if not r.ready():
                continue
            if r.successful():
                self._running_jobs.remove(r)
            else:
                r.get()
                sys.exit('Internal error in testrunner.py: %r' % (r, ))
        return len(self._running_jobs)

    def _reap_all(self):
        util.log("Reaping %d jobs", len(self._running_jobs), color="debug")
        while self._running_jobs:
            if not self._reap():
                break
            util.sleep(self.TIME_WAIT_REAP)

    def _spawn(self, pool, cmd, options):
        while True:
            if self._reap() < self._worker_count:
                job = pool.apply_async(self._run_one, (cmd, ), options or {})
                self._running_jobs.append(job)
                return

            util.sleep(self.TIME_WAIT_SPAWN)

    def __call__(self):
        util.log("Running tests in parallel with concurrency %s %s." % (
            self._worker_count,
            util._colorize('number', '(concurrency available: %d)' % AVAIL_NWORKERS)
        ),)
        # Setting global state, in theory we can be used multiple times.
        # This is fine as long as we are single threaded and call these
        # sequentially.
        util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet

        start = util.perf_counter()
        try:
            self._run_tests()
        except KeyboardInterrupt:
            self._report(util.perf_counter() - start, exit=False)
            util.log('(partial results)\n')
            raise
        except:
            traceback.print_exc()
            raise

        self._reap_all()
        self._report(util.perf_counter() - start, exit=True)

    def _run_tests(self):
        "Runs the tests, produces no report."
        run_alone = []

        tests = self._tests
        pool = ThreadPool(self._worker_count)
        try:
            for cmd, options in tests:
                options = options or {}
                if matches(self._configured_run_alone_tests, cmd):
                    run_alone.append((cmd, options))
                else:
                    self._spawn(pool, cmd, options)
            pool.close()
            pool.join()

            if run_alone:
                util.log("Running tests marked standalone")
                for cmd, options in run_alone:
                    self._run_one(cmd, **options)
        except KeyboardInterrupt:
            try:
                util.log('Waiting for currently running to finish...')
                self._reap_all()
            except KeyboardInterrupt:
                pool.terminate()
                raise
        except:
            pool.terminate()
            raise

    def _report(self, elapsed_time, exit=False):
        results = self.results
        report(
            results,
            exit=exit,
            took=elapsed_time,
            configured_failing_tests=self._configured_failing_tests,
        )


class TravisFoldingRunner(object):

    def __init__(self, runner, travis_fold_msg):
        self._runner = runner
        self._travis_fold_msg = travis_fold_msg
        self._travis_fold_name = str(int(util.perf_counter()))

        # A zope-style acquisition proxy would be convenient here.
        run_tests = runner._run_tests

        def _run_tests():
            self._begin_fold()
            try:
                run_tests()
            finally:
                self._end_fold()

        runner._run_tests = _run_tests

    def _begin_fold(self):
        travis.fold_start(self._travis_fold_name,
                          self._travis_fold_msg)

    def _end_fold(self):
        travis.fold_end(self._travis_fold_name)

    def __call__(self):
        return self._runner()


class Discovery(object):
    package_dir = None
    package = None

    def __init__(
            self,
            tests=None,
            ignore_files=None,
            ignored=(),
            coverage=False,
            package=None,
            config=None,
            allow_combine=True,
    ):
        self.config = config or {}
        self.ignore = set(ignored or ())
        self.tests = tests
        self.configured_test_options = config.get('TEST_FILE_OPTIONS', set())
        self.allow_combine = allow_combine
        if ignore_files:
            ignore_files = ignore_files.split(',')
            for f in ignore_files:
                self.ignore.update(set(load_list_from_file(f, package)))

        if coverage:
            self.ignore.update(config.get('IGNORE_COVERAGE', set()))

        if package:
            self.package = package
            self.package_dir = _dir_from_package_name(package)

    class Discovered(object):
        def __init__(self, package, configured_test_options, ignore, config, allow_combine):
            self.orig_dir = os.getcwd()
            self.configured_run_alone = config['RUN_ALONE']
            self.configured_failing_tests = config['FAILING_TESTS']
            self.package = package
            self.configured_test_options = configured_test_options
            self.allow_combine = allow_combine
            self.ignore = ignore

            self.to_import = []
            self.std_monkey_patch_files = []
            self.no_monkey_patch_files = []

            self.commands = []

        @staticmethod
        def __makes_simple_monkey_patch(
                contents,
                _patch_present=re.compile(br'[^#].*patch_all\(\)'),
                _patch_indented=re.compile(br'    .*patch_all\(\)')
        ):
            return (
                # A non-commented patch_all() call is present
                bool(_patch_present.search(contents))
                # that is not indented (because that implies its not at the top-level,
                # so some preconditions are being set)
                and not _patch_indented.search(contents)
            )

        @staticmethod
        def __file_allows_monkey_combine(contents):
            return b'testrunner-no-monkey-combine' not in contents

        @staticmethod
        def __file_allows_combine(contents):
            return b'testrunner-no-combine' not in contents

        @staticmethod
        def __calls_unittest_main_toplevel(
                contents,
                _greentest_main=re.compile(br'    greentest.main\(\)'),
                _unittest_main=re.compile(br'    unittest.main\(\)'),
                _import_main=re.compile(br'from gevent.testing import.*main'),
                _main=re.compile(br'    main\(\)'),
        ):
            # TODO: Add a check that this comes in a line directly after
            # if __name__ == __main__.
            return (
                _greentest_main.search(contents)
                or _unittest_main.search(contents)
                or (_import_main.search(contents) and _main.search(contents))
            )

        def __has_config(self, filename):
            return (
                RUN_LEAKCHECKS
                or filename in self.configured_test_options
                or filename in self.configured_run_alone
                or matches(self.configured_failing_tests, filename)
            )

        def __can_monkey_combine(self, filename, contents):
            return (
                self.allow_combine
                and not self.__has_config(filename)
                and self.__makes_simple_monkey_patch(contents)
                and self.__file_allows_monkey_combine(contents)
                and self.__file_allows_combine(contents)
                and self.__calls_unittest_main_toplevel(contents)
            )

        @staticmethod
        def __makes_no_monkey_patch(contents, _patch_present=re.compile(br'[^#].*patch_\w*\(')):
            return not _patch_present.search(contents)

        def __can_nonmonkey_combine(self, filename, contents):
            return (
                self.allow_combine
                and not self.__has_config(filename)
                and self.__makes_no_monkey_patch(contents)
                and self.__file_allows_combine(contents)
                and self.__calls_unittest_main_toplevel(contents)
            )

        def __begin_command(self):
            cmd = [sys.executable, '-u']
            # XXX: -X track-resources is broken. This happened when I updated to
            # PyPy 7.3.2. It started failing to even start inside the virtual environment
            # with
            #
            # debug: OperationError:
            # debug:  operror-type: ImportError
            # debug:  operror-value: No module named traceback
            #
            # I don't know if this is PyPy's problem or a problem in virtualenv:
            #
            # virtualenv==20.0.35
            # virtualenv-clone==0.5.4
            # virtualenvwrapper==4.8.4
            #
            # Deferring investigation until I need this...

            # if PYPY and PY2:
            #     # Doesn't seem to be an env var for this.
            #     # XXX: track-resources is broken in virtual environments
            #     # on 7.3.2.
            #     cmd.extend(('-X', 'track-resources'))
            return cmd


        def __add_test(self, qualified_name, filename, contents):
            if b'TESTRUNNER' in contents: # test__monkey_patching.py
                # XXX: Rework this to avoid importing.
                # XXX: Rework this to allow test combining (it could write the files out and return
                # them directly; we would use 'python -m gevent.monkey --module unittest ...)
                self.to_import.append(qualified_name)
            elif self.__can_monkey_combine(filename, contents):
                self.std_monkey_patch_files.append(qualified_name if self.package else filename)
            elif self.__can_nonmonkey_combine(filename, contents):
                self.no_monkey_patch_files.append(qualified_name if self.package else filename)
            else:
                # XXX: For simple python module tests, try this with
                # `runpy.run_module`, very similar to the way we run
                # things for monkey patching. The idea here is that we
                # can perform setup ahead of time (e.g.,
                # setup_resources()) in each test without having to do
                # it manually or force calls or modifications to those
                # tests.
                cmd = self.__begin_command()
                if self.package:
                    # Using a package is the best way to work with coverage 5
                    # when we specify 'source = <package>'
                    cmd.append('-m' + qualified_name)
                else:
                    cmd.append(filename)

                options = DEFAULT_RUN_OPTIONS.copy()
                options.update(self.configured_test_options.get(filename, {}))
                self.commands.append((cmd, options))

        @staticmethod
        def __remove_options(lst):
            return [x for x in lst if x and not x.startswith('-')]

        def __expand_imports(self):
            for qualified_name in self.to_import:
                module = importlib.import_module(qualified_name)
                for cmd, options in module.TESTRUNNER():
                    if self.__remove_options(cmd)[-1] in self.ignore:
                        continue
                    self.commands.append((cmd, options))
            del self.to_import[:]

        def __combine_commands(self, files, group_size=5):
            if not files:
                return

            from itertools import groupby
            cnt = [0, 0]
            def make_group(_):
                if cnt[0] > group_size:
                    cnt[0] = 0
                    cnt[1] += 1
                cnt[0] += 1
                return cnt[1]

            for _, group in groupby(files, make_group):

                cmd = self.__begin_command()
                cmd.append('-m')
                cmd.append('unittest')
                # cmd.append('-v')
                for name in group:
                    cmd.append(name)
                self.commands.insert(0, (cmd, DEFAULT_RUN_OPTIONS.copy()))

            del files[:]


        def visit_file(self, filename):
            # Support either 'gevent.tests.foo' or 'gevent/tests/foo.py'
            if filename.startswith('gevent.tests'):
                # XXX: How does this interact with 'package'? Probably not well
                qualified_name = module_name = filename
                filename = filename[len('gevent.tests') + 1:]
                filename = filename.replace('.', os.sep) + '.py'
            else:
                module_name = os.path.splitext(filename)[0]
                qualified_name = self.package + '.' + module_name if self.package else module_name

            # Also allow just 'foo' as a shortcut for 'gevent.tests.foo'
            abs_filename = os.path.abspath(filename)
            if (
                    not os.path.exists(abs_filename)
                    and not filename.endswith('.py')
                    and os.path.exists(abs_filename + '.py') ):
                abs_filename += '.py'

            with open(abs_filename, 'rb') as f:
                # Some of the test files (e.g., test__socket_dns) are
                # UTF8 encoded. Depending on the environment, Python 3 may
                # try to decode those as ASCII, which fails with UnicodeDecodeError.
                # Thus, be sure to open and compare in binary mode.
                # Open the absolute path to make errors more clear,
                # but we can't store the absolute path, our configuration is based on
                # relative file names.
                contents = f.read()

            self.__add_test(qualified_name, filename, contents)

        def visit_files(self, filenames):
            for filename in filenames:
                self.visit_file(filename)
            with Discovery._in_dir(self.orig_dir):
                self.__expand_imports()
            self.__combine_commands(self.std_monkey_patch_files)
            self.__combine_commands(self.no_monkey_patch_files)

    @staticmethod
    @contextmanager
    def _in_dir(package_dir):
        olddir = os.getcwd()
        if package_dir:
            os.chdir(package_dir)
        try:
            yield
        finally:
            os.chdir(olddir)

    @Lazy
    def discovered(self):
        tests = self.tests
        discovered = self.Discovered(self.package, self.configured_test_options,
                                     self.ignore, self.config, self.allow_combine)

        # We need to glob relative names, our config is based on filenames still
        with self._in_dir(self.package_dir):
            if not tests:
                tests = set(glob.glob('test_*.py')) - set(['test_support.py'])
            else:
                tests = set(tests)

            if self.ignore:
                # Always ignore the designated list, even if tests
                # were specified on the command line. This fixes a
                # nasty interaction with
                # test__threading_vs_settrace.py being run under
                # coverage when 'grep -l subprocess test*py' is used
                # to list the tests to run.
                tests -= self.ignore
            tests = sorted(tests)
            discovered.visit_files(tests)

        return discovered

    def __iter__(self):
        return iter(self.discovered.commands) # pylint:disable=no-member

    def __len__(self):
        return len(self.discovered.commands) # pylint:disable=no-member

def load_list_from_file(filename, package):
    result = []
    if filename:
        # pylint:disable=unspecified-encoding
        with open(_package_relative_filename(filename, package)) as f:
            for x in f:
                x = x.split('#', 1)[0].strip()
                if x:
                    result.append(x)
    return result


def matches(possibilities, command, include_flaky=True):
    if isinstance(command, list):
        command = ' '.join(command)
    for line in possibilities:
        if not include_flaky and line.startswith('FLAKY '):
            continue
        line = line.replace('FLAKY ', '')
        # Our configs are still mostly written in terms of file names,
        # but the non-monkey tests are now using package names.
        # Strip off '.py' from filenames to see if we match a module.
        # XXX: This could be much better. Our command needs better structure.
        if command.endswith(' ' + line) or command.endswith(line.replace(".py", '')):
            return True
        if ' '  not in command and command == line:
            return True
    return False


def format_seconds(seconds):
    if seconds < 20:
        return '%.1fs' % seconds
    seconds = str(timedelta(seconds=round(seconds)))
    if seconds.startswith('0:'):
        seconds = seconds[2:]
    return seconds


def _show_longest_running(result_collector, how_many=5):
    longest_running_tests = result_collector.longest_running_tests
    if not longest_running_tests:
        return
    # The only tricky part is handling repeats. we want to show them,
    # but not count them as a distinct entry.

    util.log('\nLongest-running tests:')
    length_of_longest_formatted_decimal = len('%.1f' % longest_running_tests[0].run_duration)

    frmt = '%' + str(length_of_longest_formatted_decimal) + '.1f seconds: %s'
    seen_names = set()
    for result in longest_running_tests:
        util.log(frmt, result.run_duration, result.name)
        seen_names.add(result.name)
        if len(seen_names) >= how_many:
            break



def report(result_collector, # type: ResultCollector
           exit=True, took=None,
           configured_failing_tests=()):
    # pylint:disable=redefined-builtin,too-many-branches,too-many-locals
    total = result_collector.total
    failed = result_collector.failed
    passed = result_collector.passed
    total_cases = result_collector.total_cases
    total_skipped = result_collector.total_skipped

    _show_longest_running(result_collector)

    if took:
        took = ' in %s' % format_seconds(took)
    else:
        took = ''

    failed_expected = []
    failed_unexpected = []
    passed_unexpected = []

    for name in passed:
        if matches(configured_failing_tests, name, include_flaky=False):
            passed_unexpected.append(name)

    if passed_unexpected:
        util.log('\n%s/%s unexpected passes', len(passed_unexpected), total, color='error')
        print_list(passed_unexpected)

    if result_collector.reran:
        util.log('\n%s/%s tests rerun', len(result_collector.reran), total, color='warning')
        print_list(result_collector.reran)

    if failed:
        util.log('\n%s/%s tests failed%s', len(failed), total, took, color='warning')

        for name in failed:
            if matches(configured_failing_tests, name, include_flaky=True):
                failed_expected.append(name)
            else:
                failed_unexpected.append(name)

        if failed_expected:
            util.log('\n%s/%s expected failures', len(failed_expected), total, color='warning')
            print_list(failed_expected)

        if failed_unexpected:
            util.log('\n%s/%s unexpected failures', len(failed_unexpected), total, color='error')
            print_list(failed_unexpected)

    util.log(
        '\nRan %s tests%s in %s files%s',
        total_cases,
        util._colorize('skipped', " (skipped=%d)" % total_skipped) if total_skipped else '',
        total,
        took,
    )

    if exit:
        if failed_unexpected:
            sys.exit(min(100, len(failed_unexpected)))
        if passed_unexpected:
            sys.exit(101)
        if total <= 0:
            sys.exit('No tests found.')


def print_list(lst):
    for name in lst:
        util.log(' - %s', name)

def _setup_environ(debug=False):
    def not_set(key):
        return not bool(os.environ.get(key))

    if (not_set('PYTHONWARNINGS')
            and (not sys.warnoptions
                 # Python 3.7 goes from [] to ['default'] for nothing
                 or sys.warnoptions == ['default'])):
        # action:message:category:module:line

        # - when a warning matches
        #   more than one option, the action for the last matching
        #   option is performed.
        # - action is one of : ignore, default, all, module, once, error

        # Enable default warnings such as ResourceWarning.
        # ResourceWarning doesn't exist on Py2, so don't put it
        # in there to avoid a warnnig.
        defaults = [
            'default',
            'default::DeprecationWarning',
        ]
        if not PY2:
            defaults.append('default::ResourceWarning')

        os.environ['PYTHONWARNINGS'] = ','.join(defaults + [
            # On Python 3[.6], the system site.py module has
            # "open(fullname, 'rU')" which produces the warning that
            # 'U' is deprecated, so ignore warnings from site.py
            'ignore:::site:',
            # pkgutil on Python 2 complains about missing __init__.py
            'ignore:::pkgutil:',
            # importlib/_bootstrap.py likes to spit out "ImportWarning:
            # can't resolve package from __spec__ or __package__, falling
            # back on __name__ and __path__". I have no idea what that means, but it seems harmless
            # and is annoying.
            'ignore:::importlib._bootstrap:',
            'ignore:::importlib._bootstrap_external:',
            # importing ABCs from collections, not collections.abc
            'ignore:::pkg_resources._vendor.pyparsing:',
            'ignore:::dns.namedict:',
            # dns.hash itself is being deprecated, importing it raises the warning;
            # we don't import it, but dnspython still does
            'ignore:::dns.hash:',
            # dns.zone uses some raw regular expressions
            # without the r'' syntax, leading to DeprecationWarning: invalid
            # escape sequence. This is fixed in 2.0 (Python 3 only).
            'ignore:::dns.zone:',
        ])

    if not_set('PYTHONFAULTHANDLER'):
        os.environ['PYTHONFAULTHANDLER'] = 'true'

    if not_set('GEVENT_DEBUG') and debug:
        os.environ['GEVENT_DEBUG'] = 'debug'

    if not_set('PYTHONTRACEMALLOC') and debug:
        # This slows the tests down quite a bit. Reserve
        # for debugging.
        os.environ['PYTHONTRACEMALLOC'] = '10'

    if not_set('PYTHONDEVMODE'):
        # Python 3.7 and above.
        os.environ['PYTHONDEVMODE'] = '1'

    if not_set('PYTHONMALLOC') and debug:
        # Python 3.6 and above.
        # This slows the tests down some, but
        # can detect memory corruption. Unfortunately
        # it can also be flaky, especially in pre-release
        # versions of Python (e.g., lots of crashes on Python 3.8b4).
        os.environ['PYTHONMALLOC'] = 'debug'

    if sys.version_info.releaselevel != 'final' and not debug:
        os.environ['PYTHONMALLOC'] = 'default'
        os.environ['PYTHONDEVMODE'] = ''

    # PYTHONSAFEPATH breaks the assumptions of some tests, notably test_interpreters.py
    os.environ.pop('PYTHONSAFEPATH', None)

    interesting_envs = {
        k: os.environ[k]
        for k in os.environ
        if k.startswith(('PYTHON', 'GEVENT'))
    }
    widest_k = max(len(k) for k in interesting_envs)
    for k, v in sorted(interesting_envs.items()):
        util.log('%*s\t=\t%s', widest_k, k, v, color="debug")


def main():
    # pylint:disable=too-many-locals,too-many-statements,too-many-branches
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--ignore')
    parser.add_argument(
        '--discover', action='store_true',
        help="Only print the tests found."
    )
    parser.add_argument(
        '--config', default='known_failures.py',
        help="The path to the config file containing "
        "FAILING_TESTS, IGNORED_TESTS and RUN_ALONE. "
        "Defaults to %(default)s."
    )
    parser.add_argument(
        "--coverage", action="store_true",
        help="Enable coverage recording with coverage.py."
    )
    # TODO: Quiet and verbose should be mutually exclusive
    parser.add_argument(
        "--quiet", action="store_true", default=True,
        help="Be quiet. Defaults to %(default)s. Also the "
        "GEVENTTEST_QUIET environment variable."
    )
    parser.add_argument("--verbose", action="store_false", dest='quiet')

    parser.add_argument(
        "--debug", action="store_true", default=False,
        help="Enable debug settings. If the GEVENT_DEBUG environment variable is not set, "
        "this sets it to 'debug'. This can also enable PYTHONTRACEMALLOC and the debug PYTHONMALLOC "
        "allocators, if not already set. Defaults to %(default)s."
     )

    parser.add_argument(
        "--package", default="gevent.tests",
        help="Load tests from the given package. Defaults to %(default)s."
    )
    parser.add_argument(
        "--processes", "-j", default=DEFAULT_NWORKERS, type=int,
        help="Use up to the given number of parallel processes to execute tests. "
        "Defaults to %(default)s."
    )
    parser.add_argument(
        '--no-combine', default=True, action='store_false',
        help="Do not combine tests into process groups."
    )
    parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
                        action='store', type=parse_resources,
                        help='specify which special resource intensive tests '
                        'to run. "all" is the default; "none" may also be used. '
                        'Disable individual resources with a leading -.'
                        'For example, "-u-network". GEVENTTEST_USE_RESOURCES is used '
                        'if no argument is given. To only use one resources, specify '
                        '"-unone,resource".')
    parser.add_argument("--travis-fold", metavar="MSG",
                        help="Emit Travis CI log fold markers around the output.")

    fail_parser = parser.add_mutually_exclusive_group()
    fail_parser.add_argument(
        "--second-chance", action="store_true", default=False,
        help="Give failed tests a second chance.")
    fail_parser.add_argument(
        '--failfast', '-x', action='store_true', default=False,
        help="Stop running after the first failure.")

    parser.add_argument('tests', nargs='*')
    options = parser.parse_args()
    # options.use will be either None for not given, or a list
    # of the last specified -u argument.
    # If not given, use the default, which we'll take from the environment, if set.
    options.use = list(set(parse_resources() if options.use is None else options.use))

    # Whether or not it came from the environment, put it in the
    # environment now.
    os.environ['GEVENTTEST_USE_RESOURCES'] = unparse_resources(options.use)
    setup_resources(options.use)


    # Set this before any test imports in case of 'from .util import QUIET';
    # not that this matters much because we spawn tests in subprocesses,
    # it's the environment setting that matters
    util.QUIET = options.quiet
    if 'GEVENTTEST_QUIET' not in os.environ:
        os.environ['GEVENTTEST_QUIET'] = str(options.quiet)

    FAILING_TESTS = []
    IGNORED_TESTS = []
    RUN_ALONE = []

    coverage = False
    if options.coverage or os.environ.get("GEVENTTEST_COVERAGE"):
        if PYPY and RUNNING_ON_CI:
            print("Ignoring coverage option on PyPy on CI; slow")
        else:
            coverage = True
            cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc")
            if PYPY:
                cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc-pypy")

            this_dir = os.path.dirname(__file__)
            site_dir = os.path.join(this_dir, 'coveragesite')
            site_dir = os.path.abspath(site_dir)
            os.environ['PYTHONPATH'] = site_dir + os.pathsep + os.environ.get("PYTHONPATH", "")
            # We change directory often, use an absolute path to keep all the
            # coverage files (which will have distinct suffixes because of parallel=true in .coveragerc
            # in this directory; makes them easier to combine and use with coverage report)
            os.environ['COVERAGE_FILE'] = os.path.abspath(".") + os.sep + ".coverage"
            # XXX: Log this with color. Right now, it interferes (buffering) with other early
            # output.
            print("Enabling coverage to", os.environ['COVERAGE_FILE'],
                  "with site", site_dir,
                  "and configuration file", cov_config)
            assert os.path.exists(cov_config)
            assert os.path.exists(os.path.join(site_dir, 'sitecustomize.py'))

    _setup_environ(debug=options.debug)

    if options.config:
        config = {}
        options.config = _package_relative_filename(options.config, options.package)
        with open(options.config) as f: # pylint:disable=unspecified-encoding
            config_data = f.read()
        six.exec_(config_data, config)
        FAILING_TESTS = config['FAILING_TESTS']
        IGNORED_TESTS = config['IGNORED_TESTS']
        RUN_ALONE = config['RUN_ALONE']

    tests = Discovery(
        options.tests,
        ignore_files=options.ignore,
        ignored=IGNORED_TESTS,
        coverage=coverage,
        package=options.package,
        config=config,
        allow_combine=options.no_combine,
    )
    if options.discover:
        for cmd, options in tests:
            print(util.getname(cmd, env=options.get('env'), setenv=options.get('setenv')))
        print('%s tests found.' % len(tests))
    else:
        if PYPY and RESOLVER_ARES:
            # XXX: Add a way to force these.
            print("Not running tests on pypy with c-ares; not a supported configuration")
            return

        if options.package:
            # Put this directory on the path so relative imports work.
            package_dir = _dir_from_package_name(options.package)
            os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + os.pathsep + package_dir

        allowed_return_codes = ()
        if sys.version_info[:3] >= (3, 12, 1):
            # unittest suddenly started failing with this return code
            # if all tests in a module are skipped in 3.12.1.
            allowed_return_codes += (5,)

        runner = Runner(
            tests,
            allowed_return_codes=allowed_return_codes,
            configured_failing_tests=FAILING_TESTS,
            failfast=options.failfast,
            quiet=options.quiet,
            configured_run_alone_tests=RUN_ALONE,
            worker_count=options.processes,
            second_chance=options.second_chance,
        )

        if options.travis_fold:
            runner = TravisFoldingRunner(runner, options.travis_fold)

        runner()


if __name__ == '__main__':
    main()
