419 lines
14 KiB
Python
419 lines
14 KiB
Python
import itertools
|
|
import os
|
|
from json import JSONEncoder
|
|
|
|
from lit.BooleanExpression import BooleanExpression
|
|
from lit.TestTimes import read_test_times
|
|
|
|
# Test result codes.
|
|
|
|
class ResultCode(object):
|
|
"""Test result codes."""
|
|
|
|
# All result codes (including user-defined ones) in declaration order
|
|
_all_codes = []
|
|
|
|
@staticmethod
|
|
def all_codes():
|
|
return ResultCode._all_codes
|
|
|
|
# We override __new__ and __getnewargs__ to ensure that pickling still
|
|
# provides unique ResultCode objects in any particular instance.
|
|
_instances = {}
|
|
|
|
def __new__(cls, name, label, isFailure):
|
|
res = cls._instances.get(name)
|
|
if res is None:
|
|
cls._instances[name] = res = super(ResultCode, cls).__new__(cls)
|
|
return res
|
|
|
|
def __getnewargs__(self):
|
|
return (self.name, self.label, self.isFailure)
|
|
|
|
def __init__(self, name, label, isFailure):
|
|
self.name = name
|
|
self.label = label
|
|
self.isFailure = isFailure
|
|
ResultCode._all_codes.append(self)
|
|
|
|
def __repr__(self):
|
|
return '%s%r' % (self.__class__.__name__,
|
|
(self.name, self.isFailure))
|
|
|
|
|
|
# Successes
|
|
EXCLUDED = ResultCode('EXCLUDED', 'Excluded', False)
|
|
SKIPPED = ResultCode('SKIPPED', 'Skipped', False)
|
|
UNSUPPORTED = ResultCode('UNSUPPORTED', 'Unsupported', False)
|
|
PASS = ResultCode('PASS', 'Passed', False)
|
|
FLAKYPASS = ResultCode('FLAKYPASS', 'Passed With Retry', False)
|
|
XFAIL = ResultCode('XFAIL', 'Expectedly Failed', False)
|
|
# Failures
|
|
UNRESOLVED = ResultCode('UNRESOLVED', 'Unresolved', True)
|
|
TIMEOUT = ResultCode('TIMEOUT', 'Timed Out', True)
|
|
FAIL = ResultCode('FAIL', 'Failed', True)
|
|
XPASS = ResultCode('XPASS', 'Unexpectedly Passed', True)
|
|
|
|
|
|
# Test metric values.
|
|
|
|
class MetricValue(object):
|
|
def format(self):
|
|
"""
|
|
format() -> str
|
|
|
|
Convert this metric to a string suitable for displaying as part of the
|
|
console output.
|
|
"""
|
|
raise RuntimeError("abstract method")
|
|
|
|
def todata(self):
|
|
"""
|
|
todata() -> json-serializable data
|
|
|
|
Convert this metric to content suitable for serializing in the JSON test
|
|
output.
|
|
"""
|
|
raise RuntimeError("abstract method")
|
|
|
|
class IntMetricValue(MetricValue):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def format(self):
|
|
return str(self.value)
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
class RealMetricValue(MetricValue):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def format(self):
|
|
return '%.4f' % self.value
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
class JSONMetricValue(MetricValue):
|
|
"""
|
|
JSONMetricValue is used for types that are representable in the output
|
|
but that are otherwise uninterpreted.
|
|
"""
|
|
def __init__(self, value):
|
|
# Ensure the value is a serializable by trying to encode it.
|
|
# WARNING: The value may change before it is encoded again, and may
|
|
# not be encodable after the change.
|
|
try:
|
|
e = JSONEncoder()
|
|
e.encode(value)
|
|
except TypeError:
|
|
raise
|
|
self.value = value
|
|
|
|
def format(self):
|
|
e = JSONEncoder(indent=2, sort_keys=True)
|
|
return e.encode(self.value)
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
def toMetricValue(value):
|
|
if isinstance(value, MetricValue):
|
|
return value
|
|
elif isinstance(value, int):
|
|
return IntMetricValue(value)
|
|
elif isinstance(value, float):
|
|
return RealMetricValue(value)
|
|
else:
|
|
# 'long' is only present in python2
|
|
try:
|
|
if isinstance(value, long):
|
|
return IntMetricValue(value)
|
|
except NameError:
|
|
pass
|
|
|
|
# Try to create a JSONMetricValue and let the constructor throw
|
|
# if value is not a valid type.
|
|
return JSONMetricValue(value)
|
|
|
|
|
|
# Test results.
|
|
|
|
class Result(object):
|
|
"""Wrapper for the results of executing an individual test."""
|
|
|
|
def __init__(self, code, output='', elapsed=None):
|
|
# The result code.
|
|
self.code = code
|
|
# The test output.
|
|
self.output = output
|
|
# The wall timing to execute the test, if timing.
|
|
self.elapsed = elapsed
|
|
self.start = None
|
|
self.pid = None
|
|
# The metrics reported by this test.
|
|
self.metrics = {}
|
|
# The micro-test results reported by this test.
|
|
self.microResults = {}
|
|
|
|
def addMetric(self, name, value):
|
|
"""
|
|
addMetric(name, value)
|
|
|
|
Attach a test metric to the test result, with the given name and list of
|
|
values. It is an error to attempt to attach the metrics with the same
|
|
name multiple times.
|
|
|
|
Each value must be an instance of a MetricValue subclass.
|
|
"""
|
|
if name in self.metrics:
|
|
raise ValueError("result already includes metrics for %r" % (
|
|
name,))
|
|
if not isinstance(value, MetricValue):
|
|
raise TypeError("unexpected metric value: %r" % (value,))
|
|
self.metrics[name] = value
|
|
|
|
def addMicroResult(self, name, microResult):
|
|
"""
|
|
addMicroResult(microResult)
|
|
|
|
Attach a micro-test result to the test result, with the given name and
|
|
result. It is an error to attempt to attach a micro-test with the
|
|
same name multiple times.
|
|
|
|
Each micro-test result must be an instance of the Result class.
|
|
"""
|
|
if name in self.microResults:
|
|
raise ValueError("Result already includes microResult for %r" % (
|
|
name,))
|
|
if not isinstance(microResult, Result):
|
|
raise TypeError("unexpected MicroResult value %r" % (microResult,))
|
|
self.microResults[name] = microResult
|
|
|
|
|
|
# Test classes.
|
|
|
|
class TestSuite:
|
|
"""TestSuite - Information on a group of tests.
|
|
|
|
A test suite groups together a set of logically related tests.
|
|
"""
|
|
|
|
def __init__(self, name, source_root, exec_root, config):
|
|
self.name = name
|
|
self.source_root = source_root
|
|
self.exec_root = exec_root
|
|
# The test suite configuration.
|
|
self.config = config
|
|
|
|
self.test_times = read_test_times(self)
|
|
|
|
def getSourcePath(self, components):
|
|
return os.path.join(self.source_root, *components)
|
|
|
|
def getExecPath(self, components):
|
|
return os.path.join(self.exec_root, *components)
|
|
|
|
class Test:
|
|
"""Test - Information on a single test instance."""
|
|
|
|
def __init__(self, suite, path_in_suite, config, file_path = None):
|
|
self.suite = suite
|
|
self.path_in_suite = path_in_suite
|
|
self.config = config
|
|
self.file_path = file_path
|
|
|
|
# A list of conditions under which this test is expected to fail.
|
|
# Each condition is a boolean expression of features and target
|
|
# triple parts. These can optionally be provided by test format
|
|
# handlers, and will be honored when the test result is supplied.
|
|
self.xfails = []
|
|
|
|
# If true, ignore all items in self.xfails.
|
|
self.xfail_not = False
|
|
|
|
# A list of conditions that must be satisfied before running the test.
|
|
# Each condition is a boolean expression of features. All of them
|
|
# must be True for the test to run.
|
|
# FIXME should target triple parts count here too?
|
|
self.requires = []
|
|
|
|
# A list of conditions that prevent execution of the test.
|
|
# Each condition is a boolean expression of features and target
|
|
# triple parts. All of them must be False for the test to run.
|
|
self.unsupported = []
|
|
|
|
# An optional number of retries allowed before the test finally succeeds.
|
|
# The test is run at most once plus the number of retries specified here.
|
|
self.allowed_retries = getattr(config, 'test_retry_attempts', 0)
|
|
|
|
# The test result, once complete.
|
|
self.result = None
|
|
|
|
# The previous test failure state, if applicable.
|
|
self.previous_failure = False
|
|
|
|
# The previous test elapsed time, if applicable.
|
|
self.previous_elapsed = 0.0
|
|
|
|
if '/'.join(path_in_suite) in suite.test_times:
|
|
time = suite.test_times['/'.join(path_in_suite)]
|
|
self.previous_elapsed = abs(time)
|
|
self.previous_failure = time < 0
|
|
|
|
|
|
def setResult(self, result):
|
|
assert self.result is None, "result already set"
|
|
assert isinstance(result, Result), "unexpected result type"
|
|
try:
|
|
expected_to_fail = self.isExpectedToFail()
|
|
except ValueError as err:
|
|
# Syntax error in an XFAIL line.
|
|
result.code = UNRESOLVED
|
|
result.output = str(err)
|
|
else:
|
|
if expected_to_fail:
|
|
# pass -> unexpected pass
|
|
if result.code is PASS:
|
|
result.code = XPASS
|
|
# fail -> expected fail
|
|
elif result.code is FAIL:
|
|
result.code = XFAIL
|
|
self.result = result
|
|
|
|
def isFailure(self):
|
|
assert self.result
|
|
return self.result.code.isFailure
|
|
|
|
def getFullName(self):
|
|
return self.suite.config.name + ' :: ' + '/'.join(self.path_in_suite)
|
|
|
|
def getFilePath(self):
|
|
if self.file_path:
|
|
return self.file_path
|
|
return self.getSourcePath()
|
|
|
|
def getSourcePath(self):
|
|
return self.suite.getSourcePath(self.path_in_suite)
|
|
|
|
def getExecPath(self):
|
|
return self.suite.getExecPath(self.path_in_suite)
|
|
|
|
def isExpectedToFail(self):
|
|
"""
|
|
isExpectedToFail() -> bool
|
|
|
|
Check whether this test is expected to fail in the current
|
|
configuration. This check relies on the test xfails property which by
|
|
some test formats may not be computed until the test has first been
|
|
executed.
|
|
Throws ValueError if an XFAIL line has a syntax error.
|
|
"""
|
|
|
|
if self.xfail_not:
|
|
return False
|
|
|
|
features = self.config.available_features
|
|
triple = getattr(self.suite.config, 'target_triple', "")
|
|
|
|
# Check if any of the xfails match an available feature or the target.
|
|
for item in self.xfails:
|
|
# If this is the wildcard, it always fails.
|
|
if item == '*':
|
|
return True
|
|
|
|
# If this is a True expression of features and target triple parts,
|
|
# it fails.
|
|
try:
|
|
if BooleanExpression.evaluate(item, features, triple):
|
|
return True
|
|
except ValueError as e:
|
|
raise ValueError('Error in XFAIL list:\n%s' % str(e))
|
|
|
|
return False
|
|
|
|
def isWithinFeatureLimits(self):
|
|
"""
|
|
isWithinFeatureLimits() -> bool
|
|
|
|
A test is within the feature limits set by run_only_tests if
|
|
1. the test's requirements ARE satisfied by the available features
|
|
2. the test's requirements ARE NOT satisfied after the limiting
|
|
features are removed from the available features
|
|
|
|
Throws ValueError if a REQUIRES line has a syntax error.
|
|
"""
|
|
|
|
if not self.config.limit_to_features:
|
|
return True # No limits. Run it.
|
|
|
|
# Check the requirements as-is (#1)
|
|
if self.getMissingRequiredFeatures():
|
|
return False
|
|
|
|
# Check the requirements after removing the limiting features (#2)
|
|
featuresMinusLimits = [f for f in self.config.available_features
|
|
if not f in self.config.limit_to_features]
|
|
if not self.getMissingRequiredFeaturesFromList(featuresMinusLimits):
|
|
return False
|
|
|
|
return True
|
|
|
|
def getMissingRequiredFeaturesFromList(self, features):
|
|
try:
|
|
return [item for item in self.requires
|
|
if not BooleanExpression.evaluate(item, features)]
|
|
except ValueError as e:
|
|
raise ValueError('Error in REQUIRES list:\n%s' % str(e))
|
|
|
|
def getMissingRequiredFeatures(self):
|
|
"""
|
|
getMissingRequiredFeatures() -> list of strings
|
|
|
|
Returns a list of features from REQUIRES that are not satisfied."
|
|
Throws ValueError if a REQUIRES line has a syntax error.
|
|
"""
|
|
|
|
features = self.config.available_features
|
|
return self.getMissingRequiredFeaturesFromList(features)
|
|
|
|
def getUnsupportedFeatures(self):
|
|
"""
|
|
getUnsupportedFeatures() -> list of strings
|
|
|
|
Returns a list of features from UNSUPPORTED that are present
|
|
in the test configuration's features or target triple.
|
|
Throws ValueError if an UNSUPPORTED line has a syntax error.
|
|
"""
|
|
|
|
features = self.config.available_features
|
|
triple = getattr(self.suite.config, 'target_triple', "")
|
|
|
|
try:
|
|
return [item for item in self.unsupported
|
|
if BooleanExpression.evaluate(item, features, triple)]
|
|
except ValueError as e:
|
|
raise ValueError('Error in UNSUPPORTED list:\n%s' % str(e))
|
|
|
|
def getUsedFeatures(self):
|
|
"""
|
|
getUsedFeatures() -> list of strings
|
|
|
|
Returns a list of all features appearing in XFAIL, UNSUPPORTED and
|
|
REQUIRES annotations for this test.
|
|
"""
|
|
import lit.TestRunner
|
|
parsed = lit.TestRunner._parseKeywords(self.getSourcePath(), require_script=False)
|
|
feature_keywords = ('UNSUPPORTED:', 'REQUIRES:', 'XFAIL:')
|
|
boolean_expressions = itertools.chain.from_iterable(
|
|
parsed[k] or [] for k in feature_keywords
|
|
)
|
|
tokens = itertools.chain.from_iterable(
|
|
BooleanExpression.tokenize(expr) for expr in
|
|
boolean_expressions if expr != '*'
|
|
)
|
|
matchExpressions = set(filter(BooleanExpression.isMatchExpression, tokens))
|
|
return matchExpressions
|