404 lines
14 KiB
Python
404 lines
14 KiB
Python
import os
|
|
from xml.sax.saxutils import quoteattr
|
|
from json import JSONEncoder
|
|
|
|
from lit.BooleanExpression import BooleanExpression
|
|
|
|
# Test result codes.
|
|
|
|
class ResultCode(object):
|
|
"""Test result codes."""
|
|
|
|
# We override __new__ and __getnewargs__ to ensure that pickling still
|
|
# provides unique ResultCode objects in any particular instance.
|
|
_instances = {}
|
|
def __new__(cls, name, isFailure):
|
|
res = cls._instances.get(name)
|
|
if res is None:
|
|
cls._instances[name] = res = super(ResultCode, cls).__new__(cls)
|
|
return res
|
|
def __getnewargs__(self):
|
|
return (self.name, self.isFailure)
|
|
|
|
def __init__(self, name, isFailure):
|
|
self.name = name
|
|
self.isFailure = isFailure
|
|
|
|
def __repr__(self):
|
|
return '%s%r' % (self.__class__.__name__,
|
|
(self.name, self.isFailure))
|
|
|
|
PASS = ResultCode('PASS', False)
|
|
FLAKYPASS = ResultCode('FLAKYPASS', False)
|
|
XFAIL = ResultCode('XFAIL', False)
|
|
FAIL = ResultCode('FAIL', True)
|
|
XPASS = ResultCode('XPASS', True)
|
|
UNRESOLVED = ResultCode('UNRESOLVED', True)
|
|
UNSUPPORTED = ResultCode('UNSUPPORTED', False)
|
|
TIMEOUT = ResultCode('TIMEOUT', True)
|
|
|
|
# Test metric values.
|
|
|
|
class MetricValue(object):
|
|
def format(self):
|
|
"""
|
|
format() -> str
|
|
|
|
Convert this metric to a string suitable for displaying as part of the
|
|
console output.
|
|
"""
|
|
raise RuntimeError("abstract method")
|
|
|
|
def todata(self):
|
|
"""
|
|
todata() -> json-serializable data
|
|
|
|
Convert this metric to content suitable for serializing in the JSON test
|
|
output.
|
|
"""
|
|
raise RuntimeError("abstract method")
|
|
|
|
class IntMetricValue(MetricValue):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def format(self):
|
|
return str(self.value)
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
class RealMetricValue(MetricValue):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def format(self):
|
|
return '%.4f' % self.value
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
class JSONMetricValue(MetricValue):
|
|
"""
|
|
JSONMetricValue is used for types that are representable in the output
|
|
but that are otherwise uninterpreted.
|
|
"""
|
|
def __init__(self, value):
|
|
# Ensure the value is a serializable by trying to encode it.
|
|
# WARNING: The value may change before it is encoded again, and may
|
|
# not be encodable after the change.
|
|
try:
|
|
e = JSONEncoder()
|
|
e.encode(value)
|
|
except TypeError:
|
|
raise
|
|
self.value = value
|
|
|
|
def format(self):
|
|
e = JSONEncoder(indent=2, sort_keys=True)
|
|
return e.encode(self.value)
|
|
|
|
def todata(self):
|
|
return self.value
|
|
|
|
def toMetricValue(value):
|
|
if isinstance(value, MetricValue):
|
|
return value
|
|
elif isinstance(value, int):
|
|
return IntMetricValue(value)
|
|
elif isinstance(value, float):
|
|
return RealMetricValue(value)
|
|
else:
|
|
# 'long' is only present in python2
|
|
try:
|
|
if isinstance(value, long):
|
|
return IntMetricValue(value)
|
|
except NameError:
|
|
pass
|
|
|
|
# Try to create a JSONMetricValue and let the constructor throw
|
|
# if value is not a valid type.
|
|
return JSONMetricValue(value)
|
|
|
|
|
|
# Test results.
|
|
|
|
class Result(object):
|
|
"""Wrapper for the results of executing an individual test."""
|
|
|
|
def __init__(self, code, output='', elapsed=None):
|
|
# The result code.
|
|
self.code = code
|
|
# The test output.
|
|
self.output = output
|
|
# The wall timing to execute the test, if timing.
|
|
self.elapsed = elapsed
|
|
# The metrics reported by this test.
|
|
self.metrics = {}
|
|
# The micro-test results reported by this test.
|
|
self.microResults = {}
|
|
|
|
def addMetric(self, name, value):
|
|
"""
|
|
addMetric(name, value)
|
|
|
|
Attach a test metric to the test result, with the given name and list of
|
|
values. It is an error to attempt to attach the metrics with the same
|
|
name multiple times.
|
|
|
|
Each value must be an instance of a MetricValue subclass.
|
|
"""
|
|
if name in self.metrics:
|
|
raise ValueError("result already includes metrics for %r" % (
|
|
name,))
|
|
if not isinstance(value, MetricValue):
|
|
raise TypeError("unexpected metric value: %r" % (value,))
|
|
self.metrics[name] = value
|
|
|
|
def addMicroResult(self, name, microResult):
|
|
"""
|
|
addMicroResult(microResult)
|
|
|
|
Attach a micro-test result to the test result, with the given name and
|
|
result. It is an error to attempt to attach a micro-test with the
|
|
same name multiple times.
|
|
|
|
Each micro-test result must be an instance of the Result class.
|
|
"""
|
|
if name in self.microResults:
|
|
raise ValueError("Result already includes microResult for %r" % (
|
|
name,))
|
|
if not isinstance(microResult, Result):
|
|
raise TypeError("unexpected MicroResult value %r" % (microResult,))
|
|
self.microResults[name] = microResult
|
|
|
|
|
|
# Test classes.
|
|
|
|
class TestSuite:
|
|
"""TestSuite - Information on a group of tests.
|
|
|
|
A test suite groups together a set of logically related tests.
|
|
"""
|
|
|
|
def __init__(self, name, source_root, exec_root, config):
|
|
self.name = name
|
|
self.source_root = source_root
|
|
self.exec_root = exec_root
|
|
# The test suite configuration.
|
|
self.config = config
|
|
|
|
def getSourcePath(self, components):
|
|
return os.path.join(self.source_root, *components)
|
|
|
|
def getExecPath(self, components):
|
|
return os.path.join(self.exec_root, *components)
|
|
|
|
class Test:
|
|
"""Test - Information on a single test instance."""
|
|
|
|
def __init__(self, suite, path_in_suite, config, file_path = None):
|
|
self.suite = suite
|
|
self.path_in_suite = path_in_suite
|
|
self.config = config
|
|
self.file_path = file_path
|
|
|
|
# A list of conditions under which this test is expected to fail.
|
|
# Each condition is a boolean expression of features and target
|
|
# triple parts. These can optionally be provided by test format
|
|
# handlers, and will be honored when the test result is supplied.
|
|
self.xfails = []
|
|
|
|
# A list of conditions that must be satisfied before running the test.
|
|
# Each condition is a boolean expression of features. All of them
|
|
# must be True for the test to run.
|
|
# FIXME should target triple parts count here too?
|
|
self.requires = []
|
|
|
|
# A list of conditions that prevent execution of the test.
|
|
# Each condition is a boolean expression of features and target
|
|
# triple parts. All of them must be False for the test to run.
|
|
self.unsupported = []
|
|
|
|
# The test result, once complete.
|
|
self.result = None
|
|
|
|
def setResult(self, result):
|
|
if self.result is not None:
|
|
raise ValueError("test result already set")
|
|
if not isinstance(result, Result):
|
|
raise ValueError("unexpected result type")
|
|
|
|
self.result = result
|
|
|
|
# Apply the XFAIL handling to resolve the result exit code.
|
|
try:
|
|
if self.isExpectedToFail():
|
|
if self.result.code == PASS:
|
|
self.result.code = XPASS
|
|
elif self.result.code == FAIL:
|
|
self.result.code = XFAIL
|
|
except ValueError as e:
|
|
# Syntax error in an XFAIL line.
|
|
self.result.code = UNRESOLVED
|
|
self.result.output = str(e)
|
|
|
|
def getFullName(self):
|
|
return self.suite.config.name + ' :: ' + '/'.join(self.path_in_suite)
|
|
|
|
def getFilePath(self):
|
|
if self.file_path:
|
|
return self.file_path
|
|
return self.getSourcePath()
|
|
|
|
def getSourcePath(self):
|
|
return self.suite.getSourcePath(self.path_in_suite)
|
|
|
|
def getExecPath(self):
|
|
return self.suite.getExecPath(self.path_in_suite)
|
|
|
|
def isExpectedToFail(self):
|
|
"""
|
|
isExpectedToFail() -> bool
|
|
|
|
Check whether this test is expected to fail in the current
|
|
configuration. This check relies on the test xfails property which by
|
|
some test formats may not be computed until the test has first been
|
|
executed.
|
|
Throws ValueError if an XFAIL line has a syntax error.
|
|
"""
|
|
|
|
features = self.config.available_features
|
|
triple = getattr(self.suite.config, 'target_triple', "")
|
|
|
|
# Check if any of the xfails match an available feature or the target.
|
|
for item in self.xfails:
|
|
# If this is the wildcard, it always fails.
|
|
if item == '*':
|
|
return True
|
|
|
|
# If this is a True expression of features and target triple parts,
|
|
# it fails.
|
|
try:
|
|
if BooleanExpression.evaluate(item, features, triple):
|
|
return True
|
|
except ValueError as e:
|
|
raise ValueError('Error in XFAIL list:\n%s' % str(e))
|
|
|
|
return False
|
|
|
|
def isWithinFeatureLimits(self):
|
|
"""
|
|
isWithinFeatureLimits() -> bool
|
|
|
|
A test is within the feature limits set by run_only_tests if
|
|
1. the test's requirements ARE satisfied by the available features
|
|
2. the test's requirements ARE NOT satisfied after the limiting
|
|
features are removed from the available features
|
|
|
|
Throws ValueError if a REQUIRES line has a syntax error.
|
|
"""
|
|
|
|
if not self.config.limit_to_features:
|
|
return True # No limits. Run it.
|
|
|
|
# Check the requirements as-is (#1)
|
|
if self.getMissingRequiredFeatures():
|
|
return False
|
|
|
|
# Check the requirements after removing the limiting features (#2)
|
|
featuresMinusLimits = [f for f in self.config.available_features
|
|
if not f in self.config.limit_to_features]
|
|
if not self.getMissingRequiredFeaturesFromList(featuresMinusLimits):
|
|
return False
|
|
|
|
return True
|
|
|
|
def getMissingRequiredFeaturesFromList(self, features):
|
|
try:
|
|
return [item for item in self.requires
|
|
if not BooleanExpression.evaluate(item, features)]
|
|
except ValueError as e:
|
|
raise ValueError('Error in REQUIRES list:\n%s' % str(e))
|
|
|
|
def getMissingRequiredFeatures(self):
|
|
"""
|
|
getMissingRequiredFeatures() -> list of strings
|
|
|
|
Returns a list of features from REQUIRES that are not satisfied."
|
|
Throws ValueError if a REQUIRES line has a syntax error.
|
|
"""
|
|
|
|
features = self.config.available_features
|
|
return self.getMissingRequiredFeaturesFromList(features)
|
|
|
|
def getUnsupportedFeatures(self):
|
|
"""
|
|
getUnsupportedFeatures() -> list of strings
|
|
|
|
Returns a list of features from UNSUPPORTED that are present
|
|
in the test configuration's features or target triple.
|
|
Throws ValueError if an UNSUPPORTED line has a syntax error.
|
|
"""
|
|
|
|
features = self.config.available_features
|
|
triple = getattr(self.suite.config, 'target_triple', "")
|
|
|
|
try:
|
|
return [item for item in self.unsupported
|
|
if BooleanExpression.evaluate(item, features, triple)]
|
|
except ValueError as e:
|
|
raise ValueError('Error in UNSUPPORTED list:\n%s' % str(e))
|
|
|
|
def isEarlyTest(self):
|
|
"""
|
|
isEarlyTest() -> bool
|
|
|
|
Check whether this test should be executed early in a particular run.
|
|
This can be used for test suites with long running tests to maximize
|
|
parallelism or where it is desirable to surface their failures early.
|
|
"""
|
|
return self.suite.config.is_early
|
|
|
|
def writeJUnitXML(self, fil):
|
|
"""Write the test's report xml representation to a file handle."""
|
|
test_name = quoteattr(self.path_in_suite[-1])
|
|
test_path = self.path_in_suite[:-1]
|
|
safe_test_path = [x.replace(".","_") for x in test_path]
|
|
safe_name = self.suite.name.replace(".","-")
|
|
|
|
if safe_test_path:
|
|
class_name = safe_name + "." + "/".join(safe_test_path)
|
|
else:
|
|
class_name = safe_name + "." + safe_name
|
|
class_name = quoteattr(class_name)
|
|
testcase_template = '<testcase classname={class_name} name={test_name} time="{time:.2f}"'
|
|
elapsed_time = self.result.elapsed if self.result.elapsed is not None else 0.0
|
|
testcase_xml = testcase_template.format(class_name=class_name, test_name=test_name, time=elapsed_time)
|
|
fil.write(testcase_xml)
|
|
if self.result.code.isFailure:
|
|
fil.write(">\n\t<failure ><![CDATA[")
|
|
# In Python2, 'str' and 'unicode' are distinct types, but in Python3, the type 'unicode' does not exist
|
|
# and instead 'bytes' is distinct
|
|
# in Python3, there's no unicode
|
|
if isinstance(self.result.output, str):
|
|
encoded_output = self.result.output
|
|
elif isinstance(self.result.output, bytes):
|
|
encoded_output = self.result.output.decode("utf-8", 'ignore')
|
|
else:
|
|
encoded_output = self.result.output.encode("utf-8", 'ignore')
|
|
# In the unlikely case that the output contains the CDATA terminator
|
|
# we wrap it by creating a new CDATA block
|
|
fil.write(encoded_output.replace("]]>", "]]]]><![CDATA[>"))
|
|
fil.write("]]></failure>\n</testcase>")
|
|
elif self.result.code == UNSUPPORTED:
|
|
unsupported_features = self.getMissingRequiredFeatures()
|
|
if unsupported_features:
|
|
skip_message = "Skipping because of: " + ", ".join(unsupported_features)
|
|
else:
|
|
skip_message = "Skipping because of configuration."
|
|
|
|
fil.write(">\n\t<skipped message={} />\n</testcase>\n".format(quoteattr(skip_message)))
|
|
else:
|
|
fil.write("/>")
|