summaryrefslogtreecommitdiff
path: root/utils/lit/lit/run.py
blob: c4f9eb2d0fc4c994c69b28db3b40a328d274adb8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import os
import sys
import threading
import time
import traceback
try:
    import Queue as queue
except ImportError:
    import queue

try:
    import win32api
except ImportError:
    win32api = None

import multiprocessing
import lit.Test

def abort_now():
    """Abort the current process without doing any exception teardown"""
    sys.stdout.flush()
    if win32api:
        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
    else:
        os.kill(0, 9)

class _Display(object):
    def __init__(self, display, provider, maxFailures):
        self.display = display
        self.provider = provider
        self.maxFailures = maxFailures or object()
        self.failedCount = 0
    def update(self, test):
        self.display.update(test)
        self.failedCount += (test.result.code == lit.Test.FAIL)
        if self.failedCount == self.maxFailures:
            self.provider.cancel()

class Run(object):
    """
    This class represents a concrete, configured testing run.
    """

    def __init__(self, lit_config, tests):
        self.lit_config = lit_config
        self.tests = tests
        # Set up semaphores to limit parallelism of certain classes of tests.
        # For example, some ASan tests require lots of virtual memory and run
        # faster with less parallelism on OS X.
        self.parallelism_semaphores = \
                {k: multiprocessing.Semaphore(v) for k, v in
                 self.lit_config.parallelism_groups.items()}

    def execute_test(self, test):
        return _execute_test_impl(test, self.lit_config,
                                  self.parallelism_semaphores)

    def execute_tests_in_pool(self, jobs, max_time):
        # We need to issue many wait calls, so compute the final deadline and
        # subtract time.time() from that as we go along.
        deadline = None
        if max_time:
            deadline = time.time() + max_time

        # Start a process pool. Copy over the data shared between all test runs.
        # FIXME: Find a way to capture the worker process stderr. If the user
        # interrupts the workers before we make it into our task callback, they
        # will each raise a KeyboardInterrupt exception and print to stderr at
        # the same time.
        pool = multiprocessing.Pool(jobs, worker_initializer,
                                    (self.lit_config,
                                     self.parallelism_semaphores))

        # Install a console-control signal handler on Windows.
        if win32api is not None:
            def console_ctrl_handler(type):
                print('\nCtrl-C detected, terminating.')
                pool.terminate()
                pool.join()
                abort_now()
                return True
            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)

        try:
            async_results = [pool.apply_async(worker_run_one_test,
                                              args=(test_index, test),
                                              callback=self.consume_test_result)
                             for test_index, test in enumerate(self.tests)]
            pool.close()

            # Wait for all results to come in. The callback that runs in the
            # parent process will update the display.
            for a in async_results:
                if deadline:
                    a.wait(deadline - time.time())
                else:
                    # Python condition variables cannot be interrupted unless
                    # they have a timeout. This can make lit unresponsive to
                    # KeyboardInterrupt, so do a busy wait with a timeout.
                    while not a.ready():
                        a.wait(1)
                if not a.successful():
                    a.get() # Exceptions raised here come from the worker.
                if self.hit_max_failures:
                    break
        except:
            # Stop the workers and wait for any straggling results to come in
            # if we exited without waiting on every async result.
            pool.terminate()
            raise
        finally:
            pool.join()

    def execute_tests(self, display, jobs, max_time=None):
        """
        execute_tests(display, jobs, [max_time])

        Execute each of the tests in the run, using up to jobs number of
        parallel tasks, and inform the display of each individual result. The
        provided tests should be a subset of the tests available in this run
        object.

        If max_time is non-None, it should be a time in seconds after which to
        stop executing tests.

        The display object will have its update method called with each test as
        it is completed. The calls are guaranteed to be locked with respect to
        one another, but are *not* guaranteed to be called on the same thread as
        this method was invoked on.

        Upon completion, each test in the run will have its result
        computed. Tests which were not actually executed (for any reason) will
        be given an UNRESOLVED result.
        """
        # Don't do anything if we aren't going to run any tests.
        if not self.tests or jobs == 0:
            return

        # Save the display object on the runner so that we can update it from
        # our task completion callback.
        self.display = display

        self.failure_count = 0
        self.hit_max_failures = False
        if self.lit_config.singleProcess:
            global child_lit_config
            child_lit_config = self.lit_config
            for test_index, test in enumerate(self.tests):
                result = worker_run_one_test(test_index, test)
                self.consume_test_result(result)
        else:
            self.execute_tests_in_pool(jobs, max_time)

        # Mark any tests that weren't run as UNRESOLVED.
        for test in self.tests:
            if test.result is None:
                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))

    def consume_test_result(self, pool_result):
        """Test completion callback for worker_run_one_test

        Updates the test result status in the parent process. Each task in the
        pool returns the test index and the result, and we use the index to look
        up the original test object. Also updates the progress bar as tasks
        complete.
        """
        # Don't add any more test results after we've hit the maximum failure
        # count.  Otherwise we're racing with the main thread, which is going
        # to terminate the process pool soon.
        if self.hit_max_failures:
            return

        (test_index, test_with_result) = pool_result
        # Update the parent process copy of the test. This includes the result,
        # XFAILS, REQUIRES, and UNSUPPORTED statuses.
        assert self.tests[test_index].file_path == test_with_result.file_path, \
                "parent and child disagree on test path"
        self.tests[test_index] = test_with_result
        self.display.update(test_with_result)

        # If we've finished all the tests or too many tests have failed, notify
        # the main thread that we've stopped testing.
        self.failure_count += (test_with_result.result.code == lit.Test.FAIL)
        if self.lit_config.maxFailures and \
                self.failure_count == self.lit_config.maxFailures:
            self.hit_max_failures = True

def _execute_test_impl(test, lit_config, parallelism_semaphores):
    """Execute one test"""
    pg = test.config.parallelism_group
    if callable(pg):
        pg = pg(test)

    result = None
    semaphore = None
    try:
        if pg:
            semaphore = parallelism_semaphores[pg]
        if semaphore:
            semaphore.acquire()
        start_time = time.time()
        result = test.config.test_format.execute(test, lit_config)
        # Support deprecated result from execute() which returned the result
        # code and additional output as a tuple.
        if isinstance(result, tuple):
            code, output = result
            result = lit.Test.Result(code, output)
        elif not isinstance(result, lit.Test.Result):
            raise ValueError("unexpected result from test execution")
        result.elapsed = time.time() - start_time
    except KeyboardInterrupt:
        raise
    except:
        if lit_config.debug:
            raise
        output = 'Exception during script execution:\n'
        output += traceback.format_exc()
        output += '\n'
        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
    finally:
        if semaphore:
            semaphore.release()

    test.setResult(result)

child_lit_config = None
child_parallelism_semaphores = None

def worker_initializer(lit_config, parallelism_semaphores):
    """Copy expensive repeated data into worker processes"""
    global child_lit_config
    child_lit_config = lit_config
    global child_parallelism_semaphores
    child_parallelism_semaphores = parallelism_semaphores

def worker_run_one_test(test_index, test):
    """Run one test in a multiprocessing.Pool

    Side effects in this function and functions it calls are not visible in the
    main lit process.

    Arguments and results of this function are pickled, so they should be cheap
    to copy. For efficiency, we copy all data needed to execute all tests into
    each worker and store it in the child_* global variables. This reduces the
    cost of each task.

    Returns an index and a Result, which the parent process uses to update
    the display.
    """
    try:
        _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
        return (test_index, test)
    except KeyboardInterrupt as e:
        # If a worker process gets an interrupt, abort it immediately.
        abort_now()
    except:
        traceback.print_exc()