import copy
import json
import os
import ssl
import sys
import subprocess
import urllib

import html5lib
import py
import pytest

from wptserver import WPTServer

HERE = os.path.dirname(os.path.abspath(__file__))
WPT_ROOT = os.path.normpath(os.path.join(HERE, '..', '..'))
HARNESS = os.path.join(HERE, 'harness.html')
TEST_TYPES = ('functional', 'unit')

sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools")))
import localpaths

sys.path.insert(0, os.path.normpath(os.path.join(WPT_ROOT, "tools", "webdriver")))
import webdriver


def pytest_addoption(parser):
    parser.addoption("--binary", action="store", default=None, help="path to browser binary")
    parser.addoption("--headless", action="store_true", default=False, help="run browser in headless mode")


def pytest_collect_file(file_path, path, parent):
    if file_path.suffix.lower() != '.html':
        return

    # Tests are organized in directories by type
    test_type = os.path.relpath(str(file_path), HERE)
    if os.path.sep not in test_type or ".." in test_type:
        # HTML files in this directory are not tests
        return
    test_type = test_type.split(os.path.sep)[1]

    # Handle the deprecation of Node construction in pytest6
    # https://docs.pytest.org/en/stable/deprecations.html#node-construction-changed-to-node-from-parent
    if hasattr(HTMLItem, "from_parent"):
        return HTMLItem.from_parent(parent, filename=str(file_path), test_type=test_type)
    return HTMLItem(parent, str(file_path), test_type)


def pytest_configure(config):
    config.proc = subprocess.Popen(["geckodriver"])
    config.add_cleanup(config.proc.kill)

    capabilities = {"alwaysMatch": {"acceptInsecureCerts": True, "moz:firefoxOptions": {}}}
    if config.getoption("--binary"):
        capabilities["alwaysMatch"]["moz:firefoxOptions"]["binary"] = config.getoption("--binary")
    if config.getoption("--headless"):
        capabilities["alwaysMatch"]["moz:firefoxOptions"]["args"] = ["--headless"]

    config.driver = webdriver.Session("localhost", 4444,
                                      capabilities=capabilities)
    config.add_cleanup(config.driver.end)

    # Although the name of the `_create_unverified_context` method suggests
    # that it is not intended for external consumption, the standard library's
    # documentation explicitly endorses its use:
    #
    # > To revert to the previous, unverified, behavior
    # > ssl._create_unverified_context() can be passed to the context
    # > parameter.
    #
    # https://docs.python.org/2/library/httplib.html#httplib.HTTPSConnection
    config.ssl_context = ssl._create_unverified_context()

    config.server = WPTServer(WPT_ROOT)
    config.server.start(config.ssl_context)
    config.add_cleanup(config.server.stop)


def resolve_uri(context, uri):
    if uri.startswith('/'):
        base = WPT_ROOT
        path = uri[1:]
    else:
        base = os.path.dirname(context)
        path = uri

    return os.path.exists(os.path.join(base, path))


def _summarize(actual):
    def _scrub_stack(test_obj):
        copy = dict(test_obj)
        del copy['stack']
        return copy

    def _expand_status(status_obj):
        for key, value in [item for item in status_obj.items()]:
            # In "status" and "test" objects, the "status" value enum
            # definitions are interspersed with properties for unrelated
            # metadata. The following condition is a best-effort attempt to
            # ignore non-enum properties.
            if key != key.upper() or not isinstance(value, int):
                continue

            del status_obj[key]

            if status_obj['status'] == value:
                status_obj[u'status_string'] = key

        del status_obj['status']

        return status_obj

    def _summarize_test(test_obj):
        del test_obj['index']

        assert 'phase' in test_obj
        assert 'phases' in test_obj
        assert 'COMPLETE' in test_obj['phases']
        assert test_obj['phase'] == test_obj['phases']['COMPLETE']
        del test_obj['phases']
        del test_obj['phase']

        return _expand_status(_scrub_stack(test_obj))

    def _summarize_status(status_obj):
        return _expand_status(_scrub_stack(status_obj))


    summarized = {}

    summarized[u'summarized_status'] = _summarize_status(actual['status'])
    summarized[u'summarized_tests'] = [
        _summarize_test(test) for test in actual['tests']]
    summarized[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name'))
    summarized[u'summarized_asserts'] = [
        {"assert_name": assert_item["assert_name"],
        "test": assert_item["test"]["name"] if assert_item["test"] else None,
        "args": assert_item["args"],
        "status": assert_item["status"]} for assert_item in actual["asserts"]]
    summarized[u'type'] = actual['type']

    return summarized


class HTMLItem(pytest.Item, pytest.Collector):
    def __init__(self, parent, filename, test_type):
        self.url = parent.session.config.server.url(filename)
        self.type = test_type
        # Some tests are reliant on the WPT servers substitution functionality,
        # so tests must be retrieved from the server rather than read from the
        # file system directly.
        handle = urllib.request.urlopen(self.url,
                                        context=parent.session.config.ssl_context)
        try:
            markup = handle.read()
        finally:
            handle.close()

        if test_type not in TEST_TYPES:
            raise ValueError('Unrecognized test type: "%s"' % test_type)

        parsed = html5lib.parse(markup, namespaceHTMLElements=False)
        name = None
        self.expected = None

        for element in parsed.iter():
            if not name and element.tag == 'title':
                name = element.text
                continue
            if element.tag == 'script':
                if element.attrib.get('id') == 'expected':
                    try:
                        self.expected = json.loads(element.text)
                    except ValueError:
                        print("Failed parsing JSON in %s" % filename)
                        raise

        if not name:
            raise ValueError('No name found in %s add a <title> element' % filename)
        elif self.type == 'functional':
            if not self.expected:
                raise ValueError('Functional tests must specify expected report data')
        elif self.type == 'unit' and self.expected:
            raise ValueError('Unit tests must not specify expected report data')

        # Ensure that distinct items have distinct fspath attributes.
        # This is necessary because pytest has an internal cache keyed on it,
        # and only the first test with any given fspath will be run.
        #
        # This cannot use super(HTMLItem, self).__init__(..) because only the
        # Collector constructor takes the fspath argument.
        pytest.Item.__init__(self, name, parent)
        pytest.Collector.__init__(self, name, parent, fspath=py.path.local(filename))


    def reportinfo(self):
        return self.fspath, None, self.url

    def repr_failure(self, excinfo):
        return pytest.Collector.repr_failure(self, excinfo)

    def runtest(self):
        if self.type == 'unit':
            self._run_unit_test()
        elif self.type == 'functional':
            self._run_functional_test()
        else:
            raise NotImplementedError

    def _run_unit_test(self):
        driver = self.session.config.driver
        server = self.session.config.server

        driver.url = server.url(HARNESS)

        actual = driver.execute_async_script(
            'runTest("%s", "foo", arguments[0])' % self.url
        )

        summarized = _summarize(copy.deepcopy(actual))

        print(json.dumps(summarized, indent=2))

        assert summarized[u'summarized_status'][u'status_string'] == u'OK', summarized[u'summarized_status'][u'message']
        for test in summarized[u'summarized_tests']:
            msg = "%s\n%s" % (test[u'name'], test[u'message'])
            assert test[u'status_string'] == u'PASS', msg

    def _run_functional_test(self):
        driver = self.session.config.driver
        server = self.session.config.server

        driver.url = server.url(HARNESS)

        test_url = self.url
        actual = driver.execute_async_script('runTest("%s", "foo", arguments[0])' % test_url)

        print(json.dumps(actual, indent=2))

        summarized = _summarize(copy.deepcopy(actual))

        print(json.dumps(summarized, indent=2))

        # Test object ordering is not guaranteed. This weak assertion verifies
        # that the indices are unique and sequential
        indices = [test_obj.get('index') for test_obj in actual['tests']]
        self._assert_sequence(indices)

        self.expected[u'summarized_tests'].sort(key=lambda test_obj: test_obj.get('name'))

        # Make asserts opt-in for now
        if "summarized_asserts" not in self.expected:
            del summarized["summarized_asserts"]
        else:
            # We can't be sure of the order of asserts even within the same test
            # although we could also check for the failing assert being the final
            # one
            for obj in [summarized, self.expected]:
                obj["summarized_asserts"].sort(
                    key=lambda x: (x["test"] or "", x["status"], x["assert_name"], tuple(x["args"])))

        assert summarized == self.expected

    @staticmethod
    def _assert_sequence(nums):
        if nums and len(nums) > 0:
            assert nums == list(range(1, nums[-1] + 1))
