381 lines
14 KiB
Python
381 lines
14 KiB
Python
import os
|
|
import shutil
|
|
import tempfile
|
|
import types
|
|
import unittest
|
|
|
|
import geometamaker
|
|
from natcap.invest import spec
|
|
from natcap.invest.unit_registry import u
|
|
from osgeo import gdal
|
|
from osgeo import ogr
|
|
|
|
gdal.UseExceptions()
|
|
|
|
|
|
class SpecUtilsUnitTests(unittest.TestCase):
|
|
"""Unit tests for natcap.invest.spec."""
|
|
|
|
def test_format_unit(self):
|
|
"""spec: test converting units to strings with format_unit."""
|
|
for unit_name, expected in [
|
|
('meter', 'm'),
|
|
('meter / second', 'm/s'),
|
|
('foot * mm', 'ft · mm'),
|
|
('t * hr * ha / ha / MJ / mm', 't · h · ha / (ha · MJ · mm)'),
|
|
('mm^3 / year', 'mm³/year')
|
|
]:
|
|
unit = spec.u.Unit(unit_name)
|
|
actual = spec.format_unit(unit)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_format_unit_raises_error(self):
|
|
"""spec: format_unit raises TypeError if not a pint.Unit."""
|
|
with self.assertRaises(TypeError):
|
|
spec.format_unit({})
|
|
|
|
|
|
class TestDescribeArgFromSpec(unittest.TestCase):
|
|
"""Test building RST for various invest args specifications."""
|
|
|
|
def test_number_spec(self):
|
|
number_spec = spec.NumberInput(
|
|
name="Bar",
|
|
about="Description",
|
|
units=u.meter**3/u.month,
|
|
expression="value >= 0"
|
|
)
|
|
out = spec.describe_arg_from_spec(number_spec.name, number_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`number <input_types.html#number>`__, '
|
|
'units: **m³/month**, *required*): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_ratio_spec(self):
|
|
ratio_spec = spec.RatioInput(
|
|
name="Bar",
|
|
about="Description"
|
|
)
|
|
out = spec.describe_arg_from_spec(ratio_spec.name, ratio_spec)
|
|
expected_rst = (['**Bar** (`ratio <input_types.html#ratio>`__, '
|
|
'*required*): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_percent_spec(self):
|
|
percent_spec = spec.PercentInput(
|
|
name="Bar",
|
|
about="Description",
|
|
required=False
|
|
)
|
|
out = spec.describe_arg_from_spec(percent_spec.name, percent_spec)
|
|
expected_rst = (['**Bar** (`percent <input_types.html#percent>`__, '
|
|
'*optional*): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_integer_spec(self):
|
|
integer_spec = spec.IntegerInput(
|
|
name="Bar",
|
|
about="Description",
|
|
required=True
|
|
)
|
|
out = spec.describe_arg_from_spec(integer_spec.name, integer_spec)
|
|
expected_rst = (['**Bar** (`integer <input_types.html#integer>`__, '
|
|
'*required*): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_boolean_spec(self):
|
|
boolean_spec = spec.BooleanInput(
|
|
name="Bar",
|
|
about="Description"
|
|
)
|
|
out = spec.describe_arg_from_spec(boolean_spec.name, boolean_spec)
|
|
expected_rst = (['**Bar** (`true/false <input_types.html#truefalse>'
|
|
'`__): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_freestyle_string_spec(self):
|
|
string_spec = spec.StringInput(
|
|
name="Bar",
|
|
about="Description"
|
|
)
|
|
out = spec.describe_arg_from_spec(string_spec.name, string_spec)
|
|
expected_rst = (['**Bar** (`text <input_types.html#text>`__, '
|
|
'*required*): Description'])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_option_string_spec_dictionary(self):
|
|
option_spec = spec.OptionStringInput(
|
|
name="Bar",
|
|
about="Description",
|
|
options={
|
|
"option_a": {
|
|
"display_name": "A"
|
|
},
|
|
"Option_b": {
|
|
"description": "do something"
|
|
},
|
|
"option_c": {
|
|
"display_name": "c",
|
|
"description": "do something else"
|
|
}
|
|
}
|
|
)
|
|
# expect that option case is ignored
|
|
# otherwise, c would sort before A
|
|
out = spec.describe_arg_from_spec(option_spec.name, option_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`option <input_types.html#option>`__, *required*): Description',
|
|
'\tOptions:',
|
|
'\t- A',
|
|
'\t- c: do something else',
|
|
'\t- Option_b: do something'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_option_string_spec_list(self):
|
|
option_spec = spec.OptionStringInput(
|
|
name="Bar",
|
|
about="Description",
|
|
options=["option_a", "Option_b"]
|
|
)
|
|
out = spec.describe_arg_from_spec(option_spec.name, option_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`option <input_types.html#option>`__, *required*): Description',
|
|
'\tOptions: option_a, Option_b'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_raster_spec(self):
|
|
raster_spec = spec.SingleBandRasterInput(
|
|
data_type=int,
|
|
about="Description",
|
|
name="Bar"
|
|
)
|
|
out = spec.describe_arg_from_spec(raster_spec.name, raster_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`raster <input_types.html#raster>`__, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
raster_spec = spec.SingleBandRasterInput(
|
|
data_type=float,
|
|
units=u.millimeter/u.year,
|
|
about="Description",
|
|
name="Bar"
|
|
)
|
|
out = spec.describe_arg_from_spec(raster_spec.name, raster_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`raster <input_types.html#raster>`__, units: **mm/year**, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_vector_spec(self):
|
|
vector_spec = spec.VectorInput(
|
|
fields={},
|
|
geometry_types={"LINESTRING"},
|
|
about="Description",
|
|
name="Bar"
|
|
)
|
|
out = spec.describe_arg_from_spec(vector_spec.name, vector_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`vector <input_types.html#vector>`__, linestring, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
vector_spec = spec.VectorInput(
|
|
fields=[
|
|
spec.IntegerInput(
|
|
id="id",
|
|
about="Unique identifier for each feature"
|
|
),
|
|
spec.NumberInput(
|
|
id="precipitation",
|
|
units=u.millimeter/u.year,
|
|
about="Average annual precipitation over the area"
|
|
)
|
|
],
|
|
geometry_types={"POLYGON", "MULTIPOLYGON"},
|
|
about="Description",
|
|
name="Bar"
|
|
)
|
|
out = spec.describe_arg_from_spec(vector_spec.name, vector_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`vector <input_types.html#vector>`__, polygon/multipolygon, *required*): Description',
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_csv_spec(self):
|
|
csv_spec = spec.CSVInput(
|
|
about="Description.",
|
|
name="Bar"
|
|
)
|
|
out = spec.describe_arg_from_spec(csv_spec.name, csv_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`CSV <input_types.html#csv>`__, *required*): Description. '
|
|
'Please see the sample data table for details on the format.'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
# Test every type that can be nested in a CSV column:
|
|
# number, ratio, percent, code,
|
|
csv_spec = spec.CSVInput(
|
|
about="Description",
|
|
name="Bar",
|
|
columns=[
|
|
spec.RatioInput(
|
|
id="b",
|
|
about="description"
|
|
)
|
|
]
|
|
)
|
|
out = spec.describe_arg_from_spec(csv_spec.name, csv_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`CSV <input_types.html#csv>`__, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_directory_spec(self):
|
|
self.maxDiff = None
|
|
dir_spec = spec.DirectoryInput(
|
|
about="Description",
|
|
name="Bar",
|
|
contents={}
|
|
)
|
|
out = spec.describe_arg_from_spec(dir_spec.name, dir_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`directory <input_types.html#directory>`__, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_multi_type_spec(self):
|
|
multi_spec = spec.RasterOrVectorInput(
|
|
about="Description",
|
|
name="Bar",
|
|
data_type=int,
|
|
geometry_types={"POLYGON"},
|
|
fields={}
|
|
)
|
|
out = spec.describe_arg_from_spec(multi_spec.name, multi_spec)
|
|
expected_rst = ([
|
|
'**Bar** (`raster <input_types.html#raster>`__ or `vector <input_types.html#vector>`__, *required*): Description'
|
|
])
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
def test_real_model_spec(self):
|
|
from natcap.invest import carbon
|
|
out = spec.describe_arg_from_name(
|
|
'natcap.invest.carbon', 'carbon_pools_path', 'columns', 'lucode')
|
|
expected_rst = (
|
|
'.. _carbon-pools-path-columns-lucode:\n\n' +
|
|
'**lucode** (`integer <input_types.html#integer>`__, *required*): ' +
|
|
carbon.MODEL_SPEC.get_input('carbon_pools_path').columns.get('lucode').about
|
|
)
|
|
self.assertEqual(repr(out), repr(expected_rst))
|
|
|
|
|
|
def _generate_files_from_spec(output_spec, workspace):
|
|
"""A utility function to support the metadata test."""
|
|
for spec_data in output_spec:
|
|
if spec_data.__class__ is spec.DirectoryOutput:
|
|
os.mkdir(os.path.join(workspace, spec_data.id))
|
|
_generate_files_from_spec(
|
|
spec_data.contents, os.path.join(workspace, spec_data.id))
|
|
else:
|
|
filepath = os.path.join(workspace, spec_data.id)
|
|
if isinstance(spec_data, spec.SingleBandRasterOutput):
|
|
driver = gdal.GetDriverByName('GTIFF')
|
|
raster = driver.Create(filepath, 2, 2, 1, gdal.GDT_Byte)
|
|
band = raster.GetRasterBand(1)
|
|
band.SetNoDataValue(2)
|
|
elif isinstance(spec_data, spec.VectorOutput):
|
|
driver = gdal.GetDriverByName('GPKG')
|
|
target_vector = driver.CreateDataSource(filepath)
|
|
layer_name = os.path.basename(os.path.splitext(filepath)[0])
|
|
target_layer = target_vector.CreateLayer(
|
|
layer_name, geom_type=ogr.wkbPolygon)
|
|
for field_spec in spec_data.fields:
|
|
target_layer.CreateField(ogr.FieldDefn(field_spec.id, ogr.OFTInteger))
|
|
else:
|
|
# Such as taskgraph.db, just create the file.
|
|
with open(filepath, 'w') as file:
|
|
pass
|
|
|
|
|
|
class TestMetadataFromSpec(unittest.TestCase):
|
|
"""Tests for metadata-generation functions."""
|
|
|
|
def setUp(self):
|
|
"""Override setUp function to create temp workspace directory."""
|
|
self.workspace_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Override tearDown function to remove temporary directory."""
|
|
shutil.rmtree(self.workspace_dir)
|
|
|
|
def test_write_metadata_for_outputs(self):
|
|
"""Test writing metadata for an invest output workspace."""
|
|
|
|
# An example invest output spec
|
|
output_spec = [
|
|
spec.DirectoryOutput(
|
|
id='output',
|
|
contents=[
|
|
spec.SingleBandRasterOutput(
|
|
id="urban_nature_supply_percapita.tif",
|
|
about="The calculated supply per capita of urban nature.",
|
|
data_type=float,
|
|
units=u.m**2
|
|
),
|
|
spec.VectorOutput(
|
|
id="admin_boundaries.gpkg",
|
|
about=("A copy of the user's administrative boundaries "
|
|
"vector with a single layer."),
|
|
geometry_types=spec.POLYGONS,
|
|
fields=[
|
|
spec.NumberInput(
|
|
id="SUP_DEMadm_cap",
|
|
units=u.m**2/u.person,
|
|
about="The average urban nature supply/demand"
|
|
)
|
|
]
|
|
)
|
|
]
|
|
),
|
|
spec.DirectoryOutput(
|
|
id='intermediate',
|
|
contents=[
|
|
spec.build_output_spec('taskgraph_cache', spec.TASKGRAPH_DIR)
|
|
]
|
|
)
|
|
]
|
|
# Generate an output workspace with real files, without
|
|
# running an invest model.
|
|
_generate_files_from_spec(output_spec, self.workspace_dir)
|
|
|
|
model_module = types.SimpleNamespace(
|
|
__name__='urban_nature_access',
|
|
execute=lambda: None,
|
|
MODEL_SPEC=spec.ModelSpec(
|
|
model_id='urban_nature_access',
|
|
model_title='Urban Nature Access',
|
|
userguide='',
|
|
aliases=[],
|
|
input_field_order=[],
|
|
inputs={},
|
|
outputs=output_spec
|
|
)
|
|
)
|
|
args_dict = {'workspace_dir': self.workspace_dir}
|
|
spec.generate_metadata_for_outputs(model_module, args_dict)
|
|
|
|
files, messages = geometamaker.validate_dir(
|
|
self.workspace_dir, recursive=True)
|
|
self.assertEqual(len(files), 2)
|
|
self.assertFalse(any(messages))
|
|
|
|
resource = geometamaker.describe(
|
|
os.path.join(args_dict['workspace_dir'], 'output',
|
|
'urban_nature_supply_percapita.tif'))
|
|
self.assertCountEqual(resource.get_keywords(),
|
|
[model_module.MODEL_SPEC.model_id, 'InVEST'])
|