[Python][NFC] Run yapf on the entire repo.

This executes `yapf -i -r .` from the top-level directory. It is a
purely formatting change.
This commit is contained in:
Mike Urbach 2021-04-21 15:13:25 -06:00 committed by mikeurbach
parent b0e69ae120
commit 377c1042cf
12 changed files with 714 additions and 716 deletions

View File

@ -9,66 +9,62 @@ from mlir.ir import *
from mlir.dialects import builtin
from os import path
thisDir = path.dirname(__file__)
with Context() as ctxt, Location.unknown():
circt.register_dialects(ctxt)
sys = esi.System()
sys.load_mlir(path.join(thisDir, "esi_load1.mlir"))
sys.load_mlir(path.join(thisDir, "esi_load2.mlir"))
circt.register_dialects(ctxt)
sys = esi.System()
sys.load_mlir(path.join(thisDir, "esi_load1.mlir"))
sys.load_mlir(path.join(thisDir, "esi_load2.mlir"))
i1 = IntegerType.get_signless(1)
i32 = IntegerType.get_signless(32)
i32_chan = esi.channel_type(i32)
sys.print()
i1 = IntegerType.get_signless(1)
i32 = IntegerType.get_signless(32)
i32_chan = esi.channel_type(i32)
sys.print()
with InsertionPoint(sys.body):
op = rtl.RTLModuleOp(
name='MyWidget',
input_ports=[('foo', i32), ('foo_valid', i1)],
output_ports=[('foo_ready', i1)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[1]])
)
with InsertionPoint(sys.body):
op = rtl.RTLModuleOp(name='MyWidget',
input_ports=[('foo', i32), ('foo_valid', i1)],
output_ports=[('foo_ready', i1)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[1]]))
snoop = rtl.RTLModuleOp(
name='I32Snoop',
input_ports=[('foo_in', i32_chan)],
output_ports=[('foo_out', i32_chan)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[0]])
)
snoop = rtl.RTLModuleOp(name='I32Snoop',
input_ports=[('foo_in', i32_chan)],
output_ports=[('foo_out', i32_chan)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[0]]))
esi.buildWrapper(op.operation, ["foo"])
sys.print()
# CHECK-LABEL: rtl.module @MyWidget_esi(%foo: !esi.channel<i32>) {
# CHECK-NEXT: %rawOutput, %valid = esi.unwrap.vr %foo, %pearl.foo_ready : i32
# CHECK-NEXT: %pearl.foo_ready = rtl.instance "pearl" @MyWidget(%rawOutput, %valid) : (i32, i1) -> i1
# CHECK-NEXT: rtl.output
# CHECK-LABEL: rtl.module @MyWidget(%foo: i32, %foo_valid: i1) -> (%foo_ready: i1) {
# CHECK-NEXT: rtl.output %foo_valid : i1
# CHECK-LABEL: rtl.module @I32Snoop(%foo_in: !esi.channel<i32>) -> (%foo_out: !esi.channel<i32>) {
# CHECK-NEXT: rtl.output %foo_in : !esi.channel<i32>
esi.buildWrapper(op.operation, ["foo"])
sys.print()
# CHECK-LABEL: rtl.module @MyWidget_esi(%foo: !esi.channel<i32>) {
# CHECK-NEXT: %rawOutput, %valid = esi.unwrap.vr %foo, %pearl.foo_ready : i32
# CHECK-NEXT: %pearl.foo_ready = rtl.instance "pearl" @MyWidget(%rawOutput, %valid) : (i32, i1) -> i1
# CHECK-NEXT: rtl.output
# CHECK-LABEL: rtl.module @MyWidget(%foo: i32, %foo_valid: i1) -> (%foo_ready: i1) {
# CHECK-NEXT: rtl.output %foo_valid : i1
# CHECK-LABEL: rtl.module @I32Snoop(%foo_in: !esi.channel<i32>) -> (%foo_out: !esi.channel<i32>) {
# CHECK-NEXT: rtl.output %foo_in : !esi.channel<i32>
prod = sys.lookup("IntProducer")
assert (prod is not None)
prod.print()
print() # Newline.
# CHECK: rtl.module.extern @IntProducer(%clk: i1) -> (%ints: !esi.channel<i32>)
prod = sys.lookup("IntProducer")
assert (prod is not None)
prod.print()
print() # Newline.
# CHECK: rtl.module.extern @IntProducer(%clk: i1) -> (%ints: !esi.channel<i32>)
acc = sys.lookup("IntAccumulator")
assert (acc is not None)
acc.print()
print() # Newline.
# CHECK: rtl.module.extern @IntAccumulator(%clk: i1, %ints: i32, %ints_valid: i1) -> (%ints_ready: i1, %sum: i32)
acc = sys.lookup("IntAccumulator")
assert (acc is not None)
acc.print()
print() # Newline.
# CHECK: rtl.module.extern @IntAccumulator(%clk: i1, %ints: i32, %ints_valid: i1) -> (%ints_ready: i1, %sum: i32)
print("\n\n=== Verilog ===")
# CHECK-LABEL: === Verilog ===
# CHECK: interface IValidReady_i32;
# CHECK: // external module IntProducer
# CHECK: // external module IntAccumulator
# CHECK: module MyWidget_esi
# CHECK: module MyWidget
# CHECK: module I32Snoop
sys.print_verilog()
print("\n\n=== Verilog ===")
# CHECK-LABEL: === Verilog ===
# CHECK: interface IValidReady_i32;
# CHECK: // external module IntProducer
# CHECK: // external module IntAccumulator
# CHECK: module MyWidget_esi
# CHECK: module MyWidget
# CHECK: module I32Snoop
sys.print_verilog()

View File

@ -15,21 +15,19 @@ with Context() as ctx, Location.unknown():
with InsertionPoint(m.body):
# CHECK: rtl.module @MyWidget()
# CHECK: rtl.output
op = rtl.RTLModuleOp(
name='MyWidget',
input_ports=[],
output_ports=[],
body_builder=lambda module: rtl.OutputOp([])
)
top = rtl.RTLModuleOp(
name='top',
input_ports=[],
output_ports=[],
body_builder=lambda module: rtl.OutputOp([])
)
op = rtl.RTLModuleOp(name='MyWidget',
input_ports=[],
output_ports=[],
body_builder=lambda module: rtl.OutputOp([]))
top = rtl.RTLModuleOp(name='top',
input_ports=[],
output_ports=[],
body_builder=lambda module: rtl.OutputOp([]))
with InsertionPoint.at_block_terminator(top.body.blocks[0]):
inst = rtl.InstanceOp([], Attribute.parse('"widget"'), Attribute.parse("@MyWidget"), [], Attribute.parse("{}"))
inst = rtl.InstanceOp([], Attribute.parse('"widget"'),
Attribute.parse("@MyWidget"), [],
Attribute.parse("{}"))
msft.locate(inst.operation, "mem", devtype=msft.M20K, x=50, y=100, num=1)
# CHECK: rtl.instance "widget" @MyWidget() {"loc:mem" = #msft.physloc<M20K, 50, 100, 1>, parameters = {}} : () -> ()

View File

@ -10,46 +10,44 @@ from mlir.passmanager import PassManager
import sys
with Context() as ctx, Location.unknown():
circt.register_dialects(ctx)
circt.register_dialects(ctx)
i32 = IntegerType.get_signless(32)
i32 = IntegerType.get_signless(32)
m = Module.create()
with InsertionPoint(m.body):
# CHECK: rtl.module @MyWidget(%my_input: i32) -> (%my_output: i32)
# CHECK: rtl.output %my_input : i32
op = rtl.RTLModuleOp(
name='MyWidget',
input_ports=[('my_input', i32)],
output_ports=[('my_output', i32)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[0]])
)
m = Module.create()
with InsertionPoint(m.body):
# CHECK: rtl.module @MyWidget(%my_input: i32) -> (%my_output: i32)
# CHECK: rtl.output %my_input : i32
op = rtl.RTLModuleOp(name='MyWidget',
input_ports=[('my_input', i32)],
output_ports=[('my_output', i32)],
body_builder=lambda module: rtl.OutputOp(
[module.entry_block.arguments[0]]))
# CHECK: rtl.module @swap(%a: i32, %b: i32) -> (%{{.+}}: i32, %{{.+}}: i32)
# CHECK: rtl.output %b, %a : i32, i32
@rtl.RTLModuleOp.from_py_func(i32, i32)
def swap(a, b):
return b, a
# CHECK: rtl.module @swap(%a: i32, %b: i32) -> (%{{.+}}: i32, %{{.+}}: i32)
# CHECK: rtl.output %b, %a : i32, i32
@rtl.RTLModuleOp.from_py_func(i32, i32)
def swap(a, b):
return b, a
# CHECK: rtl.module @top(%a: i32, %b: i32) -> (%{{.+}}: i32, %{{.+}}: i32)
# CHECK: %[[a0:.+]], %[[b0:.+]] = rtl.instance "" @swap(%a, %b)
# CHECK: %[[a1:.+]], %[[b1:.+]] = rtl.instance "" @swap(%[[a0]], %[[b0]])
# CHECK: rtl.output %[[a1:.+]], %[[b1:.+]] : i32, i32
@rtl.RTLModuleOp.from_py_func(i32, i32)
def top(a, b):
a, b = swap(a, b)
a, b = swap(a, b)
return a, b
# CHECK: rtl.module @top(%a: i32, %b: i32) -> (%{{.+}}: i32, %{{.+}}: i32)
# CHECK: %[[a0:.+]], %[[b0:.+]] = rtl.instance "" @swap(%a, %b)
# CHECK: %[[a1:.+]], %[[b1:.+]] = rtl.instance "" @swap(%[[a0]], %[[b0]])
# CHECK: rtl.output %[[a1:.+]], %[[b1:.+]] : i32, i32
@rtl.RTLModuleOp.from_py_func(i32, i32)
def top(a, b):
a, b = swap(a, b)
a, b = swap(a, b)
return a, b
m.operation.print()
m.operation.print()
# CHECK-LABEL: === Verilog ===
print("=== Verilog ===")
# CHECK-LABEL: === Verilog ===
print("=== Verilog ===")
pm = PassManager.parse("rtl-legalize-names,rtl.module(rtl-cleanup)")
pm.run(m)
# CHECK: module MyWidget
# CHECK: module swap
# CHECK: module top
circt.export_verilog(m, sys.stdout)
pm = PassManager.parse("rtl-legalize-names,rtl.module(rtl-cleanup)")
pm.run(m)
# CHECK: module MyWidget
# CHECK: module swap
# CHECK: module top
circt.export_verilog(m, sys.stdout)

View File

@ -5,63 +5,66 @@ import cosim
class BasicSystemTester(cosim.CosimBase):
"""Provides methods to test the 'basic' simulation."""
"""Provides methods to test the 'basic' simulation."""
def testIntAcc(self, num_msgs):
ep = self.openEP(1, sendType=self.schema.I32,
recvType=self.schema.I32)
sum = 0
for _ in range(num_msgs):
i = random.randint(0, 77)
sum += i
print(f"Sending {i}")
ep.send(self.schema.I32.new_message(i=i))
result = self.readMsg(ep, self.schema.I32)
print(f"Got {result}")
assert (result.i == sum)
def testIntAcc(self, num_msgs):
ep = self.openEP(1, sendType=self.schema.I32, recvType=self.schema.I32)
sum = 0
for _ in range(num_msgs):
i = random.randint(0, 77)
sum += i
print(f"Sending {i}")
ep.send(self.schema.I32.new_message(i=i))
result = self.readMsg(ep, self.schema.I32)
print(f"Got {result}")
assert (result.i == sum)
def testVectorSum(self, num_msgs):
ep = self.openEP(2, sendType=self.schema.ArrayOf2xUi24,
recvType=self.schema.ArrayOf4xSi13)
for _ in range(num_msgs):
# Since the result is unsigned, we need to make sure the sum is
# never negative.
arr = [
random.randint(-468, 777),
random.randint(500, 1250),
random.randint(-468, 777),
random.randint(500, 1250)
]
print(f"Sending {arr}")
ep.send(self.schema.ArrayOf4xSi13.new_message(l=arr))
result = self.readMsg(ep, self.schema.ArrayOf2xUi24)
print(f"Got {result}")
assert (result.l[0] == arr[0] + arr[1])
assert (result.l[1] == arr[2] + arr[3])
def testVectorSum(self, num_msgs):
ep = self.openEP(2,
sendType=self.schema.ArrayOf2xUi24,
recvType=self.schema.ArrayOf4xSi13)
for _ in range(num_msgs):
# Since the result is unsigned, we need to make sure the sum is
# never negative.
arr = [
random.randint(-468, 777),
random.randint(500, 1250),
random.randint(-468, 777),
random.randint(500, 1250)
]
print(f"Sending {arr}")
ep.send(self.schema.ArrayOf4xSi13.new_message(l=arr))
result = self.readMsg(ep, self.schema.ArrayOf2xUi24)
print(f"Got {result}")
assert (result.l[0] == arr[0] + arr[1])
assert (result.l[1] == arr[2] + arr[3])
def testCrypto(self, num_msgs):
ep = self.openEP(3, sendType=self.schema.Struct12811887160382076992,
recvType=self.schema.Struct12811887160382076992)
cfg = self.openEP(4, sendType=self.schema.I1,
recvType=self.schema.Struct14590566522150786282)
def testCrypto(self, num_msgs):
ep = self.openEP(3,
sendType=self.schema.Struct12811887160382076992,
recvType=self.schema.Struct12811887160382076992)
cfg = self.openEP(4,
sendType=self.schema.I1,
recvType=self.schema.Struct14590566522150786282)
cfgWritten = False
for _ in range(num_msgs):
blob = [random.randint(0, 255) for x in range(32)]
print(f"Sending data {blob}")
ep.send(self.schema.Struct12811887160382076992.new_message(
encrypted=False, blob=blob))
cfgWritten = False
for _ in range(num_msgs):
blob = [random.randint(0, 255) for x in range(32)]
print(f"Sending data {blob}")
ep.send(
self.schema.Struct12811887160382076992.new_message(encrypted=False,
blob=blob))
if not cfgWritten:
# Check that messages queue up properly waiting for the config.
otp = [random.randint(0, 255) for x in range(32)]
cfg.send(self.schema.Struct14590566522150786282.new_message(
encrypt=True, otp=otp
))
cfgWritten = True
if not cfgWritten:
# Check that messages queue up properly waiting for the config.
otp = [random.randint(0, 255) for x in range(32)]
cfg.send(
self.schema.Struct14590566522150786282.new_message(encrypt=True,
otp=otp))
cfgWritten = True
result = self.readMsg(ep, self.schema.Struct12811887160382076992)
expectedResults = [x ^ y for (x, y) in zip(otp, blob)]
print(f"Got {blob}")
print(f"Exp {expectedResults}")
assert(list(result.blob) == expectedResults)
result = self.readMsg(ep, self.schema.Struct12811887160382076992)
expectedResults = [x ^ y for (x, y) in zip(otp, blob)]
print(f"Got {blob}")
print(f"Exp {expectedResults}")
assert (list(result.blob) == expectedResults)

View File

@ -5,43 +5,40 @@ import time
class CosimBase:
"""Provides a base class for cosim tests"""
"""Provides a base class for cosim tests"""
def __init__(self, schemaPath, hostPort):
"""Load the schema and connect to the RPC server"""
self.schema = capnp.load(schemaPath)
self.rpc_client = capnp.TwoPartyClient(hostPort)
self.cosim = self.rpc_client.bootstrap().cast_as(
self.schema.CosimDpiServer)
def __init__(self, schemaPath, hostPort):
"""Load the schema and connect to the RPC server"""
self.schema = capnp.load(schemaPath)
self.rpc_client = capnp.TwoPartyClient(hostPort)
self.cosim = self.rpc_client.bootstrap().cast_as(self.schema.CosimDpiServer)
def openEP(self, epNum=1, sendType=None, recvType=None):
"""Open the endpoint, optionally checking the send and recieve types"""
ifaces = self.cosim.list().wait().ifaces
for iface in ifaces:
if iface.endpointID == epNum:
# Optionally check that the type IDs match.
print(f"SendTypeId: {iface.sendTypeID:x}")
print(f"RecvTypeId: {iface.recvTypeID:x}")
if sendType is not None:
assert (iface.sendTypeID ==
sendType.schema.node.id)
if recvType is not None:
assert (iface.recvTypeID ==
recvType.schema.node.id)
def openEP(self, epNum=1, sendType=None, recvType=None):
"""Open the endpoint, optionally checking the send and recieve types"""
ifaces = self.cosim.list().wait().ifaces
for iface in ifaces:
if iface.endpointID == epNum:
# Optionally check that the type IDs match.
print(f"SendTypeId: {iface.sendTypeID:x}")
print(f"RecvTypeId: {iface.recvTypeID:x}")
if sendType is not None:
assert (iface.sendTypeID == sendType.schema.node.id)
if recvType is not None:
assert (iface.recvTypeID == recvType.schema.node.id)
openResp = self.cosim.open(iface).wait()
assert openResp.iface is not None
return openResp.iface
assert False, "Could not find specified EndpointID"
openResp = self.cosim.open(iface).wait()
assert openResp.iface is not None
return openResp.iface
assert False, "Could not find specified EndpointID"
def readMsg(self, ep, expectedType):
"""Cosim doesn't currently support blocking reads. Implement a blocking
def readMsg(self, ep, expectedType):
"""Cosim doesn't currently support blocking reads. Implement a blocking
read via polling."""
while True:
recvResp = ep.recv(False).wait()
if recvResp.hasData:
break
else:
time.sleep(0.01)
assert recvResp.resp is not None
return recvResp.resp.as_struct(expectedType)
while True:
recvResp = ep.recv(False).wait()
if recvResp.hasData:
break
else:
time.sleep(0.01)
assert recvResp.resp is not None
return recvResp.resp.as_struct(expectedType)

View File

@ -6,53 +6,52 @@ import cosim
class LoopbackTester(cosim.CosimBase):
"""Provides methods to test the loopback simulations."""
"""Provides methods to test the loopback simulations."""
def test_list(self):
ifaces = self.cosim.list().wait().ifaces
assert len(ifaces) > 0
def test_list(self):
ifaces = self.cosim.list().wait().ifaces
assert len(ifaces) > 0
def test_open_close(self):
ifaces = self.cosim.list().wait().ifaces
openResp = self.cosim.open(ifaces[0]).wait()
assert openResp.iface is not None
ep = openResp.iface
ep.close().wait()
def test_open_close(self):
ifaces = self.cosim.list().wait().ifaces
openResp = self.cosim.open(ifaces[0]).wait()
assert openResp.iface is not None
ep = openResp.iface
ep.close().wait()
def test_i32(self, num_msgs):
ep = self.openEP(sendType=self.schema.I32,
recvType=self.schema.I32)
for _ in range(num_msgs):
data = random.randint(0, 2**32)
print(f"Sending {data}")
ep.send(self.schema.I32.new_message(i=data))
result = self.readMsg(ep, self.schema.I32)
print(f"Got {result}")
assert (result.i == data)
def test_i32(self, num_msgs):
ep = self.openEP(sendType=self.schema.I32, recvType=self.schema.I32)
for _ in range(num_msgs):
data = random.randint(0, 2**32)
print(f"Sending {data}")
ep.send(self.schema.I32.new_message(i=data))
result = self.readMsg(ep, self.schema.I32)
print(f"Got {result}")
assert (result.i == data)
def write_3bytes(self, ep):
r = random.randrange(0, 2**24)
data = r.to_bytes(3, 'big')
print(f'Sending: {binascii.hexlify(data)}')
ep.send(self.schema.UntypedData.new_message(data=data)).wait()
return data
def write_3bytes(self, ep):
r = random.randrange(0, 2**24)
data = r.to_bytes(3, 'big')
print(f'Sending: {binascii.hexlify(data)}')
ep.send(self.schema.UntypedData.new_message(data=data)).wait()
return data
def read_3bytes(self, ep):
dataMsg = self.readMsg(ep, self.schema.UntypedData)
data = dataMsg.data
print(binascii.hexlify(data))
return data
def read_3bytes(self, ep):
dataMsg = self.readMsg(ep, self.schema.UntypedData)
data = dataMsg.data
print(binascii.hexlify(data))
return data
def test_3bytes(self, num_msgs=50):
ep = self.openEP()
print("Testing writes")
dataSent = list()
for _ in range(num_msgs):
dataSent.append(self.write_3bytes(ep))
print()
print("Testing reads")
dataRecv = list()
for _ in range(num_msgs):
dataRecv.append(self.read_3bytes(ep))
ep.close().wait()
assert dataSent == dataRecv
def test_3bytes(self, num_msgs=50):
ep = self.openEP()
print("Testing writes")
dataSent = list()
for _ in range(num_msgs):
dataSent.append(self.write_3bytes(ep))
print()
print("Testing reads")
dataRecv = list()
for _ in range(num_msgs):
dataRecv.append(self.read_3bytes(ep))
ep.close().wait()
assert dataSent == dataRecv

View File

@ -37,8 +37,7 @@ config.substitutions.append(('%shlibdir', config.circt_shlib_dir))
config.substitutions.append(('%INC%', config.circt_include_dir))
config.substitutions.append(('%PYTHON%', config.python_executable))
llvm_config.with_system_environment(
['HOME', 'INCLUDE', 'LIB', 'TMP', 'TEMP'])
llvm_config.with_system_environment(['HOME', 'INCLUDE', 'LIB', 'TMP', 'TEMP'])
llvm_config.use_default_substitutions()
@ -50,8 +49,8 @@ if config.timeout is not None and config.timeout != "":
# subdirectories contain auxiliary inputs for various tests in their parent
# directories.
config.excludes = [
'Inputs', 'CMakeLists.txt', 'README.txt', 'LICENSE.txt', 'lit.cfg.py',
'lit.local.cfg.py'
'Inputs', 'CMakeLists.txt', 'README.txt', 'LICENSE.txt', 'lit.cfg.py',
'lit.local.cfg.py'
]
# test_source_root: The root path where tests are located.
@ -68,16 +67,15 @@ llvm_config.with_environment('PATH', config.llvm_tools_dir, append_path=True)
if config.bindings_python_enabled:
llvm_config.with_environment('PYTHONPATH', [
os.path.join(config.llvm_obj_root, 'python'),
os.path.join(config.circt_obj_root, 'python')],
append_path=True)
os.path.join(config.circt_obj_root, 'python')
],
append_path=True)
tool_dirs = [config.circt_tools_dir,
config.mlir_tools_dir, config.llvm_tools_dir]
tool_dirs = [
config.circt_tools_dir, config.mlir_tools_dir, config.llvm_tools_dir
]
tools = [
'circt-opt',
'circt-translate',
'firtool',
'circt-rtl-sim.py',
'circt-opt', 'circt-translate', 'firtool', 'circt-rtl-sim.py',
'esi-cosim-runner.py'
]
@ -93,8 +91,7 @@ if config.verilator_path != "":
tools.append('verilator')
config.available_features.add('verilator')
config.available_features.add('rtl-sim')
llvm_config.with_environment(
'VERILATOR_PATH', config.verilator_path)
llvm_config.with_environment('VERILATOR_PATH', config.verilator_path)
# Enable Questa if it has been detected.
if config.quartus_path != "":
@ -112,8 +109,8 @@ if config.vivado_path != "":
config.available_features.add('vivado')
config.substitutions.append(
('%ieee-sim', os.path.join(config.vivado_path, "xsim")))
config.substitutions.append(
('%xsim%', os.path.join(config.vivado_path, "xsim")))
config.substitutions.append(('%xsim%', os.path.join(config.vivado_path,
"xsim")))
# Enable Questa if it has been detected.
if config.questa_path != "":
@ -121,8 +118,8 @@ if config.questa_path != "":
config.available_features.add('ieee-sim')
config.available_features.add('rtl-sim')
if 'LM_LICENSE_FILE' in os.environ:
llvm_config.with_environment(
'LM_LICENSE_FILE', os.environ['LM_LICENSE_FILE'])
llvm_config.with_environment('LM_LICENSE_FILE',
os.environ['LM_LICENSE_FILE'])
tool_dirs.append(config.questa_path)
tools.append('vlog')
@ -135,12 +132,15 @@ if config.questa_path != "":
ieee_sims = list(filter(lambda x: x[0] == '%ieee-sim', config.substitutions))
if len(ieee_sims) > 1:
warnings.warn(f"You have multiple ieee-sim simulators configured, choosing: {ieee_sims[-1][1]}")
warnings.warn(
f"You have multiple ieee-sim simulators configured, choosing: {ieee_sims[-1][1]}"
)
# Enable ESI cosim tests if they have been built.
if config.esi_cosim_path != "":
config.available_features.add('esi-cosim')
config.substitutions.append(('%ESIINC%', f'{config.circt_include_dir}/circt/Dialect/ESI/'))
config.substitutions.append(
('%ESIINC%', f'{config.circt_include_dir}/circt/Dialect/ESI/'))
config.substitutions.append(('%ESICOSIM%', f'{config.esi_cosim_path}'))
# Enable ESI's Capnp tests if they're supported.

View File

@ -4,6 +4,7 @@ import inspect
from mlir.ir import *
class RTLModuleOp:
"""Specialization for the RTL module op class."""
@ -45,12 +46,15 @@ class RTLModuleOp:
output_names.append(StringAttr.get(str(port_name)))
attributes['resultNames'] = ArrayAttr.get(output_names)
attributes["type"] = TypeAttr.get(FunctionType.get(
inputs=input_types, results=output_types))
attributes["type"] = TypeAttr.get(
FunctionType.get(inputs=input_types, results=output_types))
super().__init__(self.build_generic(
attributes=attributes, results=results, operands=operands,
loc=loc, ip=ip))
super().__init__(
self.build_generic(attributes=attributes,
results=results,
operands=operands,
loc=loc,
ip=ip))
if body_builder:
entry_block = self.add_entry_block()
@ -122,8 +126,8 @@ class RTLModuleOp:
if param.kind == param.VAR_KEYWORD:
has_arg_module_op = True
if param.name == "module_op" and (param.kind
== param.POSITIONAL_OR_KEYWORD or
param.kind == param.KEYWORD_ONLY):
== param.POSITIONAL_OR_KEYWORD or
param.kind == param.KEYWORD_ONLY):
has_arg_module_op = True
# Emit the RTLModuleOp.
@ -140,8 +144,8 @@ class RTLModuleOp:
output_ports = zip([None] * len(result_types), result_types)
module_op = RTLModuleOp(name=symbol_name,
input_ports=input_ports,
output_ports=output_ports)
input_ports=input_ports,
output_ports=output_ports)
with InsertionPoint(module_op.add_entry_block()):
module_args = module_op.entry_block.arguments
module_kwargs = {}
@ -167,14 +171,16 @@ class RTLModuleOp:
function_type = FunctionType.get(inputs=inputs, results=return_types)
module_op.attributes["type"] = TypeAttr.get(function_type)
# Set required resultNames attribute. Could we infer real names here?
resultNames = [StringAttr.get('result' + str(i))
for i in range(len(return_values))]
resultNames = [
StringAttr.get('result' + str(i))
for i in range(len(return_values))
]
module_op.attributes["resultNames"] = ArrayAttr.get(resultNames)
def emit_instance_op(*call_args):
call_op = rtl.InstanceOp(return_types, StringAttr.get(''),
FlatSymbolRefAttr.get(symbol_name),
call_args, DictAttr.get({}))
FlatSymbolRefAttr.get(symbol_name), call_args,
DictAttr.get({}))
if return_types is None:
return None
elif len(return_types) == 1:

View File

@ -11,15 +11,13 @@ import circt
import sys
import os
class System (CppSystem):
class System(CppSystem):
mod = None
passes = [
"lower-esi-ports",
"lower-esi-to-physical",
"lower-esi-to-rtl",
"rtl-legalize-names",
"rtl.module(rtl-cleanup)"
"lower-esi-ports", "lower-esi-to-physical", "lower-esi-to-rtl",
"rtl-legalize-names", "rtl.module(rtl-cleanup)"
]
passed = False

View File

@ -34,8 +34,7 @@ config.substitutions.append(('%PATH%', config.environment['PATH']))
config.substitutions.append(('%shlibext', config.llvm_shlib_ext))
config.substitutions.append(('%shlibdir', config.circt_shlib_dir))
llvm_config.with_system_environment(
['HOME', 'INCLUDE', 'LIB', 'TMP', 'TEMP'])
llvm_config.with_system_environment(['HOME', 'INCLUDE', 'LIB', 'TMP', 'TEMP'])
llvm_config.use_default_substitutions()
@ -53,16 +52,12 @@ config.test_exec_root = os.path.join(config.circt_obj_root, 'test')
# Tweak the PATH to include the tools dir.
llvm_config.with_environment('PATH', config.llvm_tools_dir, append_path=True)
tool_dirs = [config.circt_tools_dir,
config.mlir_tools_dir, config.llvm_tools_dir]
tool_dirs = [
config.circt_tools_dir, config.mlir_tools_dir, config.llvm_tools_dir
]
tools = [
'firtool',
'handshake-runner',
'circt-opt',
'circt-translate',
'circt-capi-ir-test',
'esi-tester',
'llhd-sim'
'firtool', 'handshake-runner', 'circt-opt', 'circt-translate',
'circt-capi-ir-test', 'esi-tester', 'llhd-sim'
]
# Enable Verilator if it has been detected.

View File

@ -21,232 +21,246 @@ ThisFileDir = os.path.dirname(__file__)
class Questa:
"""Run and compile funcs for Questasim."""
"""Run and compile funcs for Questasim."""
DefaultDriver = "driver.sv"
DefaultDriver = "driver.sv"
def __init__(self, path, args):
self.args = args
def __init__(self, path, args):
self.args = args
# Find Questa
if os.path.exists(path):
if os.path.isfile(path):
self.path = os.path.dirname(path)
else:
self.path = path
elif "@QUESTA_PATH@" != "":
self.path = "@QUESTA_PATH@"
if os.path.isfile(self.path):
self.path = os.path.dirname(self.path)
elif "QUESTA_PATH" in os.environ:
self.path = os.environ["QUESTA_PATH"]
# Find Questa
if os.path.exists(path):
if os.path.isfile(path):
self.path = os.path.dirname(path)
else:
self.path = path
elif "@QUESTA_PATH@" != "":
self.path = "@QUESTA_PATH@"
if os.path.isfile(self.path):
self.path = os.path.dirname(self.path)
elif "QUESTA_PATH" in os.environ:
self.path = os.environ["QUESTA_PATH"]
def compile(self, sources):
self.dpiLibs = filter(
lambda fn: fn.endswith(".so") or fn.endswith(".dll"),
sources)
sources = filter(
lambda fn: not (fn.endswith(".so") or fn.endswith(".dll")),
sources)
vlog = os.path.join(self.path, "vlog")
args = [vlog, "-sv"] + list(sources)
if self.args.gui:
args.append('+acc')
return subprocess.run(args)
def compile(self, sources):
self.dpiLibs = filter(lambda fn: fn.endswith(".so") or fn.endswith(".dll"),
sources)
sources = filter(lambda fn: not (fn.endswith(".so") or fn.endswith(".dll")),
sources)
vlog = os.path.join(self.path, "vlog")
args = [vlog, "-sv"] + list(sources)
if self.args.gui:
args.append('+acc')
return subprocess.run(args)
def run(self, cycles, simargs):
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
def run(self, cycles, simargs):
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
vsim = os.path.join(self.path, "vsim")
# Note: vsim exit codes say nothing about the test run's pass/fail even
# if $fatal is encountered in the simulation.
if self.args.gui:
cmd = [vsim, top, "-gui", "-voptargs=\"+acc\""]
else:
cmd = [vsim, top, "-batch", "-do", "run -all"]
if cycles >= 0:
cmd.append(f"+cycles={cycles}")
for lib in self.dpiLibs:
svLib = os.path.splitext(lib)[0]
cmd.append("-sv_lib")
cmd.append(svLib)
if self.dpiLibs:
cmd.append("-cpppath")
cmd.append("@CMAKE_CXX_COMPILER@")
return subprocess.run(cmd + simargs.split())
vsim = os.path.join(self.path, "vsim")
# Note: vsim exit codes say nothing about the test run's pass/fail even
# if $fatal is encountered in the simulation.
if self.args.gui:
cmd = [vsim, top, "-gui", "-voptargs=\"+acc\""]
else:
cmd = [vsim, top, "-batch", "-do", "run -all"]
if cycles >= 0:
cmd.append(f"+cycles={cycles}")
for lib in self.dpiLibs:
svLib = os.path.splitext(lib)[0]
cmd.append("-sv_lib")
cmd.append(svLib)
if self.dpiLibs:
cmd.append("-cpppath")
cmd.append("@CMAKE_CXX_COMPILER@")
return subprocess.run(cmd + simargs.split())
class Vivado:
"""Run and compile funcs for Vivado."""
"""Run and compile funcs for Vivado."""
DefaultDriver = "driver.sv"
DefaultDriver = "driver.sv"
def __init__(self, path, args):
self.path = path
self.args = args
def __init__(self, path, args):
self.path = path
self.args = args
def compile(self, sources):
sources = filter(
lambda fn: not (fn.endswith(".so") or fn.endswith(".dll")),
sources)
vlog = os.path.join(self.path, "xvlog")
work = self.libname + "=" + self.libname + ".dir"
args = [vlog, "-sv", "-work", work] + list(sources)
rc = subprocess.run(args)
if rc.returncode != 0:
return rc
def compile(self, sources):
sources = filter(lambda fn: not (fn.endswith(".so") or fn.endswith(".dll")),
sources)
vlog = os.path.join(self.path, "xvlog")
work = self.libname + "=" + self.libname + ".dir"
args = [vlog, "-sv", "-work", work] + list(sources)
rc = subprocess.run(args)
if rc.returncode != 0:
return rc
elab = os.path.join(self.path, "xelab")
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
top = self.libname + "." + top
args = [elab, top, "-lib", work, "-debug", "typical"]
return subprocess.run(args)
elab = os.path.join(self.path, "xelab")
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
top = self.libname + "." + top
args = [elab, top, "-lib", work, "-debug", "typical"]
return subprocess.run(args)
def run(self, cycles, simargs):
xsim = os.path.join(self.path, "xsim")
def run(self, cycles, simargs):
xsim = os.path.join(self.path, "xsim")
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
top = self.libname + "." + top
args = [xsim, top]
if self.args.no_default_driver:
top = self.args.top
else:
top = "driver"
top = self.libname + "." + top
args = [xsim, top]
script = "script.tcl"
with open(script, "w") as f:
commands = "run -all\n"
if not self.args.gui:
commands += "quit\n"
f.write(commands)
args += ["-t", script]
script = "script.tcl"
with open(script, "w") as f:
commands = "run -all\n"
if not self.args.gui:
commands += "quit\n"
f.write(commands)
args += ["-t", script]
if self.args.cycles >= 0:
args += ["-testplusarg", f"cycles={cycles}"]
if self.args.cycles >= 0:
args += ["-testplusarg", f"cycles={cycles}"]
if self.args.gui:
args.append("-gui")
if self.args.gui:
args.append("-gui")
return subprocess.run(args + simargs.split())
return subprocess.run(args + simargs.split())
@property
def libname(self):
name = self.args.sources[0].split("/")[0].split('.')[0]
if not name:
name = "work"
return name
@property
def libname(self):
name = self.args.sources[0].split("/")[0].split('.')[0]
if not name:
name = "work"
return name
class Verilator:
"""Run and compile funcs for Verilator."""
"""Run and compile funcs for Verilator."""
DefaultDriver = "driver.cpp"
DefaultDriver = "driver.cpp"
def __init__(self, args):
# Find Verilator.
if os.path.exists(args.sim):
self.verilator = args.sim
elif "@VERILATOR_PATH@" != "":
self.verilator = "@VERILATOR_PATH@"
elif "VERILATOR_PATH" in os.environ:
self.verilator = os.environ["VERILATOR_PATH"]
def __init__(self, args):
# Find Verilator.
if os.path.exists(args.sim):
self.verilator = args.sim
elif "@VERILATOR_PATH@" != "":
self.verilator = "@VERILATOR_PATH@"
elif "VERILATOR_PATH" in os.environ:
self.verilator = os.environ["VERILATOR_PATH"]
self.top = args.top
if args.objdir != "":
self.ObjDir = args.objdir
else:
self.ObjDir = os.path.basename(args.sources[0]) + ".obj_dir"
self.top = args.top
if args.objdir != "":
self.ObjDir = args.objdir
else:
self.ObjDir = os.path.basename(args.sources[0]) + ".obj_dir"
def compile(self, sources):
dpiLibs = filter(lambda fn: fn.endswith(
".so") or fn.endswith(".dll"), sources)
self.ldPaths = ":".join([os.path.dirname(x) for x in dpiLibs])
return subprocess.run([self.verilator, "--cc", "--top-module",
self.top, "-sv", "--build", "--exe",
"--Mdir", self.ObjDir, "--assert"] + sources)
def compile(self, sources):
dpiLibs = filter(lambda fn: fn.endswith(".so") or fn.endswith(".dll"),
sources)
self.ldPaths = ":".join([os.path.dirname(x) for x in dpiLibs])
return subprocess.run([
self.verilator, "--cc", "--top-module", self.top, "-sv", "--build",
"--exe", "--Mdir", self.ObjDir, "--assert"
] + sources)
def run(self, cycles, args):
exe = os.path.join(self.ObjDir, "V" + self.top)
cmd = [exe]
if cycles >= 0:
cmd.append("--cycles")
cmd.append(str(cycles))
cmd += args.split()
print(f"Running: {cmd}")
sys.stdout.flush()
os.environ["LD_LIBRARY_PATH"] = self.ldPaths
return subprocess.run(cmd)
def run(self, cycles, args):
exe = os.path.join(self.ObjDir, "V" + self.top)
cmd = [exe]
if cycles >= 0:
cmd.append("--cycles")
cmd.append(str(cycles))
cmd += args.split()
print(f"Running: {cmd}")
sys.stdout.flush()
os.environ["LD_LIBRARY_PATH"] = self.ldPaths
return subprocess.run(cmd)
def __main__(args):
argparser = argparse.ArgumentParser(
description="RTL simulation runner for CIRCT")
argparser = argparse.ArgumentParser(
description="RTL simulation runner for CIRCT")
argparser.add_argument("--sim", type=str, default="verilator",
help="Name of the RTL simulator (if in PATH) to " +
"use or path to an executable.")
argparser.add_argument("--no-compile", dest="no_compile",
action='store_true',
help="Don't compile the simulation.")
argparser.add_argument("--no-run", dest="no_run", action='store_true',
help="Don't run the simulation.")
argparser.add_argument("--gui", dest="gui", action='store_true',
help="Bring up the GUI to run.")
argparser.add_argument("--top", type=str, default="top",
help="Name of top module to run.")
argparser.add_argument("--objdir", type=str, default="",
help="(Verilator) Select an 'obj_dir' to use." +
" Must be different from other tests in the same" +
" directory. Defaults to 'sources[0].obj_dir'.")
argparser.add_argument("--simargs", type=str, default="",
help="Simulation arguments string.")
argparser.add_argument("--no-default-driver", dest="no_default_driver",
action='store_true',
help="Do not use the standard top module/drivers.")
argparser.add_argument("--cycles", type=int, default=-1,
help="Number of cycles to run the simulator. " +
" -1 means don't stop.")
argparser.add_argument("--sim",
type=str,
default="verilator",
help="Name of the RTL simulator (if in PATH) to " +
"use or path to an executable.")
argparser.add_argument("--no-compile",
dest="no_compile",
action='store_true',
help="Don't compile the simulation.")
argparser.add_argument("--no-run",
dest="no_run",
action='store_true',
help="Don't run the simulation.")
argparser.add_argument("--gui",
dest="gui",
action='store_true',
help="Bring up the GUI to run.")
argparser.add_argument("--top",
type=str,
default="top",
help="Name of top module to run.")
argparser.add_argument("--objdir",
type=str,
default="",
help="(Verilator) Select an 'obj_dir' to use." +
" Must be different from other tests in the same" +
" directory. Defaults to 'sources[0].obj_dir'.")
argparser.add_argument("--simargs",
type=str,
default="",
help="Simulation arguments string.")
argparser.add_argument("--no-default-driver",
dest="no_default_driver",
action='store_true',
help="Do not use the standard top module/drivers.")
argparser.add_argument("--cycles",
type=int,
default=-1,
help="Number of cycles to run the simulator. " +
" -1 means don't stop.")
argparser.add_argument("sources", nargs="+",
help="The list of source files to be included.")
argparser.add_argument("sources",
nargs="+",
help="The list of source files to be included.")
if len(args) <= 1:
argparser.print_help()
return
args = argparser.parse_args(args[1:])
if len(args) <= 1:
argparser.print_help()
return
args = argparser.parse_args(args[1:])
# Break up simulator string
simParts = os.path.split(args.sim)
simName = simParts[1]
# Break up simulator string
simParts = os.path.split(args.sim)
simName = simParts[1]
if simName in ["questa", "vsim", "vlog", "vopt"]:
sim = Questa(simParts[0], args)
elif simName == "xsim":
sim = Vivado(simParts[0], args)
elif simName == "verilator":
sim = Verilator(args)
else:
print(f"Could not determine simulator from '{args.sim}'",
file=sys.stderr)
return 1
if simName in ["questa", "vsim", "vlog", "vopt"]:
sim = Questa(simParts[0], args)
elif simName == "xsim":
sim = Vivado(simParts[0], args)
elif simName == "verilator":
sim = Verilator(args)
else:
print(f"Could not determine simulator from '{args.sim}'", file=sys.stderr)
return 1
if not args.no_default_driver:
args.sources.append(os.path.join(ThisFileDir, sim.DefaultDriver))
if not args.no_default_driver:
args.sources.append(os.path.join(ThisFileDir, sim.DefaultDriver))
if not args.no_compile:
rc = sim.compile(args.sources)
if rc.returncode != 0:
return rc
if not args.no_run:
rc = sim.run(args.cycles, args.simargs)
return rc.returncode
return 0
if not args.no_compile:
rc = sim.compile(args.sources)
if rc.returncode != 0:
return rc
if not args.no_run:
rc = sim.run(args.cycles, args.simargs)
return rc.returncode
return 0
if __name__ == '__main__':
sys.exit(__main__(sys.argv))
sys.exit(__main__(sys.argv))

View File

@ -25,290 +25,284 @@ ThisFileDir = os.path.dirname(__file__)
class CosimTestRunner:
"""The main class responsible for running a cosim test. We use a separate
"""The main class responsible for running a cosim test. We use a separate
class to allow for per-test mutable state variables."""
def __init__(self, testFile, schema, addlArgs):
"""Parse a test file. Look for comments we recognize anywhere in the
def __init__(self, testFile, schema, addlArgs):
"""Parse a test file. Look for comments we recognize anywhere in the
file. Assemble a list of sources."""
self.args = addlArgs
self.file = testFile
self.runs = list()
self.srcdir = os.path.dirname(self.file)
self.sources = list()
self.top = "top"
self.args = addlArgs
self.file = testFile
self.runs = list()
self.srcdir = os.path.dirname(self.file)
self.sources = list()
self.top = "top"
if "@ESI_COSIM_PATH@" == "" or not os.path.exists("@ESI_COSIM_PATH@"):
raise Exception("The ESI cosimulation DPI library must be " +
"enabled to run cosim tests.")
if "@ESI_COSIM_PATH@" == "" or not os.path.exists("@ESI_COSIM_PATH@"):
raise Exception("The ESI cosimulation DPI library must be " +
"enabled to run cosim tests.")
self.simRunScript = os.path.join(
"@CIRCT_TOOLS_DIR@", "circt-rtl-sim.py")
self.simRunScript = os.path.join("@CIRCT_TOOLS_DIR@", "circt-rtl-sim.py")
if schema == "":
schema = os.path.join(
"@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect", "ESI",
"cosim", "CosimDpi.capnp")
self.schema = schema
if schema == "":
schema = os.path.join("@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect",
"ESI", "cosim", "CosimDpi.capnp")
self.schema = schema
fileReader = open(self.file, "r")
sources = []
for line in fileReader:
# Arguments to circt-rtl-sim, except for source files list
m = re.match(r"^//\s*ARGS:(.*)$", line)
if m:
self.args.extend(m.group(1).split())
# SOURCES are the additional source files (if any). If specified,
# must include the current file. These files are either absolute or
# relative to the current file.
m = re.match(r"^//\s*SOURCES:(.*)$", line)
if m:
sources.extend(m.group(1).split())
# Run this Python line.
m = re.match(r"^//\s*PY:(.*)$", line)
if m:
self.runs.append(m.group(1).strip())
fileReader.close()
fileReader = open(self.file, "r")
sources = []
for line in fileReader:
# Arguments to circt-rtl-sim, except for source files list
m = re.match(r"^//\s*ARGS:(.*)$", line)
if m:
self.args.extend(m.group(1).split())
# SOURCES are the additional source files (if any). If specified,
# must include the current file. These files are either absolute or
# relative to the current file.
m = re.match(r"^//\s*SOURCES:(.*)$", line)
if m:
sources.extend(m.group(1).split())
# Run this Python line.
m = re.match(r"^//\s*PY:(.*)$", line)
if m:
self.runs.append(m.group(1).strip())
fileReader.close()
self.sources = [(src if os.path.isabs(src) else os.path.join(
self.srcdir, src)) for src in self.sources]
self.sources = [
(src if os.path.isabs(src) else os.path.join(self.srcdir, src))
for src in self.sources
]
# Include the cosim DPI SystemVerilog files.
esiInclude = os.path.join(
"@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect", "ESI")
cosimInclude = os.path.join(esiInclude, "cosim")
self.sources.insert(0, os.path.join(cosimInclude, "Cosim_DpiPkg.sv"))
self.sources.insert(1, os.path.join(cosimInclude, "Cosim_Endpoint.sv"))
self.sources.insert(2, os.path.join(esiInclude, "ESIPrimitives.sv"))
self.sources.append("@ESI_COSIM_PATH@")
# Include the cosim DPI SystemVerilog files.
esiInclude = os.path.join("@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect",
"ESI")
cosimInclude = os.path.join(esiInclude, "cosim")
self.sources.insert(0, os.path.join(cosimInclude, "Cosim_DpiPkg.sv"))
self.sources.insert(1, os.path.join(cosimInclude, "Cosim_Endpoint.sv"))
self.sources.insert(2, os.path.join(esiInclude, "ESIPrimitives.sv"))
self.sources.append("@ESI_COSIM_PATH@")
def compile(self):
"""Compile with circt-rtl-sim.py"""
start = time.time()
def compile(self):
"""Compile with circt-rtl-sim.py"""
start = time.time()
# Run the simulation compilation step. Requires a simulator to be
# installed and working.
cmd = [self.simRunScript, "--no-run",
"--objdir", "o"] \
+ self.sources + self.args
print("[INFO] Compile command: " + " ".join(cmd))
vrun = subprocess.run(
cmd,
capture_output=True,
text=True)
# cwd=self.execdir)
output = vrun.stdout + "\n----- STDERR ------\n" + vrun.stderr
if vrun.returncode != 0:
print("====== Compilation failure:")
print(output)
print(f"[INFO] Compile time: {time.time()-start}")
return vrun.returncode
# Run the simulation compilation step. Requires a simulator to be
# installed and working.
cmd = [self.simRunScript, "--no-run",
"--objdir", "o"] \
+ self.sources + self.args
print("[INFO] Compile command: " + " ".join(cmd))
vrun = subprocess.run(cmd, capture_output=True, text=True)
# cwd=self.execdir)
output = vrun.stdout + "\n----- STDERR ------\n" + vrun.stderr
if vrun.returncode != 0:
print("====== Compilation failure:")
print(output)
print(f"[INFO] Compile time: {time.time()-start}")
return vrun.returncode
def writeScript(self, port):
"""Write out the test script."""
def writeScript(self, port):
"""Write out the test script."""
with open("script.py", "w") as script:
# Include a bunch of config variables at the beginning of the
# script for use by the test code.
vars = {
"srcdir": self.srcdir,
"srcfile": self.file,
# 'rpcSchemaPath' points to the CapnProto schema for RPC and is
# the one that nearly all scripts are going to need.
"rpcschemapath": self.schema
}
script.writelines(f"{name} = \"{value}\"\n" for (
name, value) in vars.items())
script.write("\n\n")
with open("script.py", "w") as script:
# Include a bunch of config variables at the beginning of the
# script for use by the test code.
vars = {
"srcdir": self.srcdir,
"srcfile": self.file,
# 'rpcSchemaPath' points to the CapnProto schema for RPC and is
# the one that nearly all scripts are going to need.
"rpcschemapath": self.schema
}
script.writelines(
f"{name} = \"{value}\"\n" for (name, value) in vars.items())
script.write("\n\n")
# Add the test files directory and this files directory to the
# pythonpath.
script.write("import os\n")
script.write("import sys\n")
script.write(
f"sys.path.append(\"{os.path.dirname(self.file)}\")\n")
script.write(
f"sys.path.append(\"{os.path.dirname(__file__)}\")\n")
script.write("\n\n")
script.write(
"simhostport = f'{os.uname()[1]}:" + str(port) + "'\n")
# Add the test files directory and this files directory to the
# pythonpath.
script.write("import os\n")
script.write("import sys\n")
script.write(f"sys.path.append(\"{os.path.dirname(self.file)}\")\n")
script.write(f"sys.path.append(\"{os.path.dirname(__file__)}\")\n")
script.write("\n\n")
script.write("simhostport = f'{os.uname()[1]}:" + str(port) + "'\n")
# Run the lines specified in the test file.
script.writelines(
f"{x}\n" for x in self.runs)
# Run the lines specified in the test file.
script.writelines(f"{x}\n" for x in self.runs)
def run(self):
"""Run the test by creating a Python script, starting the simulation,
def run(self):
"""Run the test by creating a Python script, starting the simulation,
running the Python script, then stopping the simulation. Use
circt-rtl-sim.py to run the sim."""
# These two variables are accessed in the finally block. Declare them
# here to avoid syntax errors in that block.
testProc = None
simProc = None
# These two variables are accessed in the finally block. Declare them
# here to avoid syntax errors in that block.
testProc = None
simProc = None
try:
start = time.time()
# Open log files
simStdout = open("sim_stdout.log", "w")
simStderr = open("sim_stderr.log", "w")
testStdout = open("test_stdout.log", "w")
testStderr = open("test_stderr.log", "w")
# Erase the config file if it exists. We don't want to read
# an old config.
portFileName = "cosim.cfg"
if os.path.exists(portFileName):
os.remove(portFileName)
# Run the simulation.
simEnv = os.environ.copy()
if "@CMAKE_BUILD_TYPE@" == "Debug":
simEnv["COSIM_DEBUG_FILE"] = "cosim_debug.log"
cmd = [self.simRunScript, "--objdir", "o"] + \
self.sources + self.args
print("[INFO] Sim run command: " + " ".join(cmd))
simProc = subprocess.Popen(cmd,
stdout=simStdout,
stderr=simStderr,
env=simEnv,
preexec_fn=os.setsid)
simStderr.close()
simStdout.close()
# Get the port which the simulation RPC selected.
checkCount = 0
while (not os.path.exists(portFileName)) and \
simProc.poll() is None:
time.sleep(0.05)
checkCount += 1
if checkCount > 200:
raise Exception(f"Cosim never wrote cfg file: {portFileName}")
portFile = open(portFileName, "r")
for line in portFile.readlines():
m = re.match("port: (\\d+)", line)
if m is not None:
port = int(m.group(1))
portFile.close()
# Wait for the simulation to start accepting RPC connections.
checkCount = 0
while not isPortOpen(port):
checkCount += 1
if checkCount > 200:
raise Exception(f"Cosim RPC port ({port}) never opened")
if simProc.poll() is not None:
raise Exception("Simulation exited early")
time.sleep(0.05)
# Write the test script.
self.writeScript(port)
# Pycapnp complains if the PWD environment var doesn't match the
# actual CWD.
testEnv = os.environ.copy()
testEnv["PWD"] = os.getcwd()
# Run the test script.
cmd = [sys.executable, "-u", "script.py"]
print("[INFO] Test run command: " + " ".join(cmd))
testProc = subprocess.run(cmd,
stdout=testStdout,
stderr=testStderr,
cwd=os.getcwd(),
env=testEnv)
testStdout.close()
testStderr.close()
finally:
# Make sure to stop the simulation no matter what.
if simProc:
os.killpg(os.getpgid(simProc.pid), signal.SIGINT)
# simProc.send_signal(signal.SIGINT)
# Allow the simulation time to flush its outputs.
try:
start = time.time()
simProc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
simProc.kill()
# Open log files
simStdout = open("sim_stdout.log", "w")
simStderr = open("sim_stderr.log", "w")
testStdout = open("test_stdout.log", "w")
testStderr = open("test_stderr.log", "w")
print(f"[INFO] Run time: {time.time()-start}")
# Erase the config file if it exists. We don't want to read
# an old config.
portFileName = "cosim.cfg"
if os.path.exists(portFileName):
os.remove(portFileName)
# Read the output log files and return the proper result.
err, logs = self.readLogs()
if testProc is not None:
logs += f"---- Test process exit code: {testProc.returncode}\n"
passed = testProc.returncode == 0 and not err
else:
passed = False
if not passed:
print(logs)
# Run the simulation.
simEnv = os.environ.copy()
if "@CMAKE_BUILD_TYPE@" == "Debug":
simEnv["COSIM_DEBUG_FILE"] = "cosim_debug.log"
cmd = [self.simRunScript, "--objdir", "o"] + \
self.sources + self.args
print("[INFO] Sim run command: " + " ".join(cmd))
simProc = subprocess.Popen(
cmd,
stdout=simStdout, stderr=simStderr, env=simEnv,
preexec_fn=os.setsid)
simStderr.close()
simStdout.close()
return 0 if passed else 1
# Get the port which the simulation RPC selected.
checkCount = 0
while (not os.path.exists(portFileName)) and \
simProc.poll() is None:
time.sleep(0.05)
checkCount += 1
if checkCount > 200:
raise Exception(
f"Cosim never wrote cfg file: {portFileName}")
portFile = open(portFileName, "r")
for line in portFile.readlines():
m = re.match("port: (\\d+)", line)
if m is not None:
port = int(m.group(1))
portFile.close()
# Wait for the simulation to start accepting RPC connections.
checkCount = 0
while not isPortOpen(port):
checkCount += 1
if checkCount > 200:
raise Exception(f"Cosim RPC port ({port}) never opened")
if simProc.poll() is not None:
raise Exception("Simulation exited early")
time.sleep(0.05)
# Write the test script.
self.writeScript(port)
# Pycapnp complains if the PWD environment var doesn't match the
# actual CWD.
testEnv = os.environ.copy()
testEnv["PWD"] = os.getcwd()
# Run the test script.
cmd = [sys.executable, "-u", "script.py"]
print("[INFO] Test run command: " + " ".join(cmd))
testProc = subprocess.run(cmd,
stdout=testStdout, stderr=testStderr,
cwd=os.getcwd(), env=testEnv)
testStdout.close()
testStderr.close()
finally:
# Make sure to stop the simulation no matter what.
if simProc:
os.killpg(os.getpgid(simProc.pid), signal.SIGINT)
# simProc.send_signal(signal.SIGINT)
# Allow the simulation time to flush its outputs.
try:
simProc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
simProc.kill()
print(f"[INFO] Run time: {time.time()-start}")
# Read the output log files and return the proper result.
err, logs = self.readLogs()
if testProc is not None:
logs += f"---- Test process exit code: {testProc.returncode}\n"
passed = testProc.returncode == 0 and not err
else:
passed = False
if not passed:
print(logs)
return 0 if passed else 1
def readLogs(self):
"""Read the log files from the simulation and the test script. Only add
def readLogs(self):
"""Read the log files from the simulation and the test script. Only add
the stderr logs if they contain something. Also return a flag
indicating that one of the stderr logs has content."""
foundErr = False
ret = "----- Simulation stdout -----\n"
with open("sim_stdout.log") as f:
ret += f.read()
foundErr = False
ret = "----- Simulation stdout -----\n"
with open("sim_stdout.log") as f:
ret += f.read()
with open("sim_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Simulation stderr -----\n"
ret += stderr
foundErr = True
with open("sim_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Simulation stderr -----\n"
ret += stderr
foundErr = True
ret += "\n----- Test stdout -----\n"
with open("test_stdout.log") as f:
ret += f.read()
ret += "\n----- Test stdout -----\n"
with open("test_stdout.log") as f:
ret += f.read()
with open("test_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Test stderr -----\n"
ret += stderr
foundErr = True
with open("test_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Test stderr -----\n"
ret += stderr
foundErr = True
return (foundErr, ret)
return (foundErr, ret)
def isPortOpen(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
return True if result == 0 else False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
return True if result == 0 else False
def __main__(args):
argparser = argparse.ArgumentParser(
description="RTL cosimulation runner for ESI")
argparser.add_argument("--schema", default="",
help="The schema file to use.")
argparser.add_argument("source",
help="The source run spec file")
argparser.add_argument("addlArgs", nargs=argparse.REMAINDER,
help="Additional arguments to pass through to " +
"'circt-rtl-sim.py'")
argparser = argparse.ArgumentParser(
description="RTL cosimulation runner for ESI")
argparser.add_argument("--schema", default="", help="The schema file to use.")
argparser.add_argument("source", help="The source run spec file")
argparser.add_argument("addlArgs",
nargs=argparse.REMAINDER,
help="Additional arguments to pass through to " +
"'circt-rtl-sim.py'")
if len(args) <= 1:
argparser.print_help()
return
args = argparser.parse_args(args[1:])
if len(args) <= 1:
argparser.print_help()
return
args = argparser.parse_args(args[1:])
# Create and cd into a test directory before running
sourceName = os.path.basename(args.source)
testDir = f"{sourceName}.d"
if not os.path.exists(testDir):
os.mkdir(testDir)
os.chdir(testDir)
# Create and cd into a test directory before running
sourceName = os.path.basename(args.source)
testDir = f"{sourceName}.d"
if not os.path.exists(testDir):
os.mkdir(testDir)
os.chdir(testDir)
runner = CosimTestRunner(args.source, args.schema, args.addlArgs)
rc = runner.compile()
if rc != 0:
return rc
return runner.run()
runner = CosimTestRunner(args.source, args.schema, args.addlArgs)
rc = runner.compile()
if rc != 0:
return rc
return runner.run()
if __name__ == '__main__':
sys.exit(__main__(sys.argv))
sys.exit(__main__(sys.argv))