I. #143 linting tests, remove pgp.testing

Remove pgp.testing from fisheries_io.py.

Updating history

Lots of test linting
This commit is contained in:
Doug 2020-05-28 12:01:32 -04:00
parent 7361b6ac39
commit f71858b412
11 changed files with 290 additions and 234 deletions

View File

@ -1,19 +1,26 @@
.. :changelog:
Unreleased Changes (3.9)
------------------------
* Migrating over to GDAL 3 from GDAL 2.
* Updating requirements to reflect GDAL version >= 3.
* Updating crop production tests that varied slightly due to GDAL 3
differences in handling spatial references.
* Adding function in utils.py to handle InVEST coordindate transformations.
* Adding tests for new coordinate transformation function in utils.py.
* General
* Migrating over to GDAL 3 from GDAL 2.
* Updating requirements to reflect GDAL version >= 3.
* Updating crop production tests that varied slightly due to GDAL 3
differences in handling spatial references.
* Adding function in utils.py to handle InVEST coordindate transformations.
* Adding tests for new coordinate transformation function in utils.py.
* Making InVEST compatible with Pygeoprocessing 2.0 by updating:
* ``convolve_2d`` keyword ``ignore_nodata`` to ``ignore_nodata_and_edges``.
* ``get_raster_info`` / ``get_vector_info`` keyword ``projection`` to
``projection_wkt``.
* Updating InVEST tests that were dependent on the removed ``pgp.testing``
module.
Unreleased Changes
------------------
* SDR's compiled core now defines its own ``SQRT2`` instead of relying on an
available standard C library definition. This new definition helps to avoid
some compiler issues on Windows.
* sdr
* SDR's compiled core now defines its own ``SQRT2`` instead of relying on an
available standard C library definition. This new definition helps to avoid
some compiler issues on Windows.
3.8.2 (2020-05-15)
------------------

View File

@ -5,11 +5,11 @@ The Fisheries IO module contains functions for handling inputs and outputs
import logging
import os
import csv
import math
import numpy
from osgeo import ogr
from osgeo import gdal
import pygeoprocessing.testing
from .. import reporting
from .. import utils
@ -271,9 +271,8 @@ def read_population_csv(args, path):
"Region vector shapes do not match. %s" % path)
# Check that information is correct
assert pygeoprocessing.testing.isclose(
pop_dict['Larvaldispersal'].sum(), 1), (
"The Larvaldisperal vector does not sum exactly to one.. %s" % path)
assert math.isclose(pop_dict['Larvaldispersal'].sum(), 1), (
f"The Larvaldisperal vector does not sum exactly to one.. {path}")
# Check that certain attributes have fraction elements
Frac_Vectors = ['Survnaturalfrac', 'Vulnfishing',

View File

@ -13,8 +13,8 @@ import numpy
def make_simple_raster(base_raster_path, fill_val, nodata_val):
"""Create a 10x10 raster on designated path with fill value.
Parameters:
raster_path (str): the raster path for making the new raster.
Args:
base_raster_path (str): the raster path for making the new raster.
fill_val (int): the value used for filling the raster.
nodata_val (int or None): for defining a band's nodata value.
@ -50,7 +50,7 @@ def make_simple_raster(base_raster_path, fill_val, nodata_val):
def assert_raster_equal_value(base_raster_path, val_to_compare):
"""Assert that the entire output raster has the same value as specified.
Parameters:
Args:
base_raster_path (str): the filepath of the raster to be asserted.
val_to_compare (float): the value to be filled in the array to compare.
@ -70,7 +70,7 @@ def assert_raster_equal_value(base_raster_path, val_to_compare):
def make_pools_csv(pools_csv_path):
"""Create a carbon pools csv file with simplified land cover types.
Parameters:
Args:
pools_csv_path (str): the path of carbon pool csv.
Returns:

View File

@ -1,9 +1,10 @@
"""Module for Testing the InVEST cli framework."""
import sys
import os
import shutil
import tempfile
import unittest
from unittest.mock import patch
import unittest.mock
import contextlib
import json
@ -24,6 +25,7 @@ def redirect_stdout():
class CLIHeadlessTests(unittest.TestCase):
"""Headless Tests for CLI."""
def setUp(self):
"""Use a temporary workspace for all tests in this class."""
self.workspace_dir = tempfile.mkdtemp()
@ -149,9 +151,8 @@ class CLIHeadlessTests(unittest.TestCase):
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'coastal_blue_carbon', 'cbc_galveston_bay.invs.json')
with unittest.mock.patch(
'natcap.invest.coastal_blue_carbon.coastal_blue_carbon.execute',
return_value=None) as patched_model:
targ = 'natcap.invest.coastal_blue_carbon.coastal_blue_carbon.execute'
with unittest.mock.patch(targ, return_value=None) as patched_model:
cli.main([
'run',
'cbc', # uses an alias

View File

@ -6,7 +6,6 @@ import shutil
import csv
import logging
import tempfile
import functools
import copy
import pprint
@ -74,7 +73,7 @@ NODATA_INT = -9999
def _read_array(raster_path):
""""Read raster as array."""
"""Read raster as array."""
ds = gdal.Open(raster_path)
band = ds.GetRasterBand(1)
a = band.ReadAsArray()
@ -98,7 +97,7 @@ def _create_workspace():
def _get_args(workspace, num_transitions=2, valuation=True):
"""Create and return arguments for CBC main model.
Parameters:
Args:
workspace(string): A path to a folder on disk. Generated inputs will
be saved to this directory.
num_transitions=2 (int): The number of transitions to synthesize.
@ -112,7 +111,7 @@ def _get_args(workspace, num_transitions=2, valuation=True):
band_matrices_two = numpy.ones((2, 2)) * 2
band_matrices_with_nodata = numpy.ones((2, 2))
band_matrices_with_nodata[0][0] = NODATA_INT
srs = osr.SpatialReference()
srs.ImportFromEPSG(3157)
projection_wkt = srs.ExportToWkt()
@ -131,15 +130,15 @@ def _get_args(workspace, num_transitions=2, valuation=True):
carbon_pool_transient_list)
raster_0_path = os.path.join(workspace, 'raster_0.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices, NODATA_INT, (100, -100), origin, projection_wkt,
raster_0_path)
raster_1_path = os.path.join(workspace, 'raster_1.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_with_nodata, NODATA_INT, (100, -100), origin,
band_matrices_with_nodata, NODATA_INT, (100, -100), origin,
projection_wkt, raster_1_path)
raster_2_path = os.path.join(workspace, 'raster_2.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_two, NODATA_INT, (100, -100), origin,
band_matrices_two, NODATA_INT, (100, -100), origin,
projection_wkt, raster_2_path)
possible_transitions = [raster_1_path, raster_2_path]
@ -153,7 +152,8 @@ def _get_args(workspace, num_transitions=2, valuation=True):
'lulc_baseline_map_uri': raster_0_path,
'lulc_baseline_year': 1995,
'lulc_transition_maps_list': possible_transitions[:num_transitions+1],
'lulc_transition_years_list': possible_transition_years[:num_transitions+1],
'lulc_transition_years_list': possible_transition_years[
:num_transitions+1],
'analysis_year': 2010,
'carbon_pool_initial_uri': carbon_pool_initial_path,
'carbon_pool_transient_uri': carbon_pool_transient_path,
@ -199,27 +199,27 @@ def _get_preprocessor_args(args_choice, workspace):
raster_0_path = os.path.join(workspace, 'raster_0.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
raster_0_path)
raster_1_path = os.path.join(workspace, 'raster_1.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
raster_1_path)
raster_2_path = os.path.join(workspace, 'raster_2.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_ones, NODATA_INT, (100, -100), origin, projection_wkt,
raster_2_path)
raster_3_path = os.path.join(workspace, 'raster_3.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_zeros, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_zeros, NODATA_INT, (100, -100), origin, projection_wkt,
raster_3_path)
raster_4_path = os.path.join(workspace, 'raster_4.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_zeros, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_zeros, NODATA_INT, (100, -100), origin, projection_wkt,
raster_4_path)
raster_nodata_path = os.path.join(workspace, 'raster_4.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_nodata, NODATA_INT, (100, -100), origin, projection_wkt,
band_matrices_nodata, NODATA_INT, (100, -100), origin, projection_wkt,
raster_nodata_path)
args = {
@ -240,14 +240,16 @@ def _get_preprocessor_args(args_choice, workspace):
'workspace_dir': os.path.join(workspace, 'workspace'),
'results_suffix': 'test',
'lulc_lookup_uri': lulc_lookup_path,
'lulc_snapshot_list': [raster_0_path, raster_nodata_path, raster_3_path]
'lulc_snapshot_list': [
raster_0_path, raster_nodata_path, raster_3_path]
}
args4 = {
'workspace_dir': os.path.join(workspace, 'workspace'),
'results_suffix': 'test',
'lulc_lookup_uri': lulc_lookup_path,
'lulc_snapshot_list': [raster_0_path, raster_nodata_path, raster_4_path]
'lulc_snapshot_list': [
raster_0_path, raster_nodata_path, raster_4_path]
}
if args_choice == 1:
@ -274,9 +276,8 @@ class TestPreprocessor(unittest.TestCase):
def test_create_carbon_pool_transient_table_template(self):
"""Coastal Blue Carbon: Test creation of transient table template."""
from natcap.invest.coastal_blue_carbon import preprocessor
args = _get_preprocessor_args(1, self.workspace_dir)
filepath = os.path.join(self.workspace_dir,
'transient_temp.csv')
filepath = os.path.join(
self.workspace_dir, 'transient_temp.csv')
code_to_lulc_dict = {1: 'one', 2: 'two', 3: 'three'}
preprocessor._create_carbon_pool_transient_table_template(
filepath, code_to_lulc_dict)
@ -376,19 +377,19 @@ class TestPreprocessor(unittest.TestCase):
from natcap.invest.coastal_blue_carbon import preprocessor
args = _get_preprocessor_args(1, self.workspace_dir)
OTHER_NODATA = -1
srs = osr.SpatialReference()
srs.ImportFromEPSG(3157)
projection_wkt = srs.ExportToWkt()
origin = (443723.127327877911739, 4956546.905980412848294)
band_matrices_with_nodata = numpy.ones((2, 2)) * OTHER_NODATA
raster_wrong_nodata = os.path.join(
self.workspace_dir, 'raster_wrong_nodata.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_with_nodata, OTHER_NODATA, (100, -100), origin,
band_matrices_with_nodata, OTHER_NODATA, (100, -100), origin,
projection_wkt, raster_wrong_nodata)
args['lulc_snapshot_list'][0] = raster_wrong_nodata
with self.assertRaises(ValueError):
preprocessor.execute(args)
@ -415,7 +416,7 @@ class TestPreprocessor(unittest.TestCase):
raster_zeros = os.path.join(self.workspace_dir, 'raster_1.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_zero, NODATA_INT, (100, -100), origin,
band_matrices_zero, NODATA_INT, (100, -100), origin,
projection_wkt, raster_zeros)
args['lulc_snapshot_list'][0] = raster_zeros
@ -443,7 +444,7 @@ class TestPreprocessor(unittest.TestCase):
raster_zeros = os.path.join(self.workspace_dir, 'raster_1.tif')
pygeoprocessing.numpy_array_to_raster(
band_matrices_zero, NODATA_INT, (100, -100), origin,
band_matrices_zero, NODATA_INT, (100, -100), origin,
projection_wkt, raster_zeros)
args['lulc_snapshot_list'][0] = raster_zeros
@ -496,6 +497,7 @@ class TestIO(unittest.TestCase):
self.args = _get_args(self.workspace_dir)
def tearDown(self):
"""Clean up workspace when finished."""
shutil.rmtree(self.workspace_dir)
def test_get_inputs(self):
@ -629,7 +631,8 @@ class TestModel(unittest.TestCase):
self.args['analysis_year'] = None
self.args['lulc_baseline_year'] = 2000
self.args['lulc_transition_maps_list'] = [self.args['lulc_transition_maps_list'][0]]
self.args['lulc_transition_maps_list'] = [
self.args['lulc_transition_maps_list'][0]]
self.args['lulc_transition_years_list'] = [2005]
cbc.execute(self.args)
@ -714,7 +717,8 @@ class TestModel(unittest.TestCase):
'price_table_uri': os.path.join(
REGRESSION_DATA, 'inputs/Price_table_SCC3.csv'),
'lulc_transition_matrix_uri': os.path.join(
REGRESSION_DATA, 'outputs_preprocessor/transitions_sample.csv'),
REGRESSION_DATA,
'outputs_preprocessor/transitions_sample.csv'),
'price': 10.0,
'results_suffix': '150225'
}
@ -741,7 +745,8 @@ class TestModel(unittest.TestCase):
# walk through all files in the workspace and assert that outputs have
# the file suffix.
non_suffixed_files = []
for root_dir, dirnames, filenames in os.walk(self.args['workspace_dir']):
for root_dir, dirnames, filenames in os.walk(
self.args['workspace_dir']):
for filename in filenames:
if not filename.lower().endswith('.txt'): # ignore logfile
basename, extension = os.path.splitext(filename)
@ -778,10 +783,13 @@ class TestModel(unittest.TestCase):
class CBCRefactorTest(unittest.TestCase):
"""CBC Refactor Tests."""
def setUp(self):
"""Create a temporary workspace."""
self.workspace_dir = tempfile.mkdtemp()
def tearDown(self):
"""Remove temporary workspace when done."""
shutil.rmtree(self.workspace_dir)
@staticmethod
@ -792,15 +800,15 @@ class CBCRefactorTest(unittest.TestCase):
workspace (string): The path to the workspace directory on disk.
Files will be saved to this location.
transition_tuples (list or None): A list of tuples, where the first
element of the tuple is a numpy matrix of the transition values,
and the second element of the tuple is the year of the transition.
Provided years must be in chronological order.
element of the tuple is a numpy matrix of the transition
values, and the second element of the tuple is the year of the
transition. Provided years must be in chronological order.
If ``None``, the transition parameters will be ignored.
analysis_year (int or None): The year of the final analysis. If
provided, it must be greater than the last year within the
transition tuples (unless ``transition_tuples`` is None, in which
case ``analysis_year`` can be anything greater than 2000, the
baseline year).
transition tuples (unless ``transition_tuples`` is None, in
which case ``analysis_year`` can be anything greater than 2000,
the baseline year).
Returns:
A dict of the model arguments.
@ -810,12 +818,12 @@ class CBCRefactorTest(unittest.TestCase):
args = {
'workspace_dir': workspace,
'lulc_lookup_uri': os.path.join(workspace, 'lulc_lookup.csv'),
'lulc_transition_matrix_uri': os.path.join(workspace,
'transition_matrix.csv'),
'carbon_pool_initial_uri': os.path.join(workspace,
'carbon_pool_initial.csv'),
'carbon_pool_transient_uri': os.path.join(workspace,
'carbon_pool_transient.csv'),
'lulc_transition_matrix_uri': os.path.join(
workspace, 'transition_matrix.csv'),
'carbon_pool_initial_uri': os.path.join(
workspace, 'carbon_pool_initial.csv'),
'carbon_pool_transient_uri': os.path.join(
workspace, 'carbon_pool_transient.csv'),
'lulc_baseline_map_uri': os.path.join(workspace, 'lulc.tif'),
'lulc_baseline_year': 2000,
'do_economic_analysis': False,
@ -831,12 +839,11 @@ class CBCRefactorTest(unittest.TestCase):
args['carbon_pool_transient_uri'],
carbon_pool_transient_list)
srs = osr.SpatialReference()
srs.ImportFromEPSG(3157)
projection_wkt = srs.ExportToWkt()
origin = (443723.127327877911739, 4956546.905980412848294)
known_matrix_size = None
if transition_tuples:
args['lulc_transition_maps_list'] = []
@ -847,7 +854,7 @@ class CBCRefactorTest(unittest.TestCase):
filename = os.path.join(
workspace, 'transition_%s.tif' % transition_year)
pygeoprocessing.numpy_array_to_raster(
band_matrix, -1, (100, -100), origin, projection_wkt,
band_matrix, -1, (100, -100), origin, projection_wkt,
filename)
args['lulc_transition_maps_list'].append(filename)
@ -856,7 +863,7 @@ class CBCRefactorTest(unittest.TestCase):
# Make the lulc
lulc_shape = (10, 10) if not known_matrix_size else known_matrix_size
pygeoprocessing.numpy_array_to_raster(
numpy.ones(lulc_shape), -1, (100, -100), origin, projection_wkt,
numpy.ones(lulc_shape), -1, (100, -100), origin, projection_wkt,
args['lulc_baseline_map_uri'])
if analysis_year:
@ -990,7 +997,8 @@ class CBCValidationTests(unittest.TestCase):
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
from natcap.invest import validation
validation_errors = coastal_blue_carbon.validate({}) # empty args dict.
# empty args dict.
validation_errors = coastal_blue_carbon.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(self.base_required_keys)
self.assertEqual(invalid_keys, expected_missing_keys)

View File

@ -10,7 +10,8 @@ from osgeo import gdal, osr, ogr
import numpy.testing
import pandas.testing
import pygeoprocessing
from shapely.geometry import Point, Polygon, MultiPolygon, LineString, MultiLineString
from shapely.geometry import Point, Polygon, MultiPolygon
from shapely.geometry import LineString, MultiLineString
import shapely.wkb
import taskgraph
@ -100,7 +101,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# add the max_fetch_distance to the bounding box so we can use
# the clipped raster in the fetch-ray-depth routine.
model_resolution = 5000
fetch_buffer = max_fetch_distance + model_resolution # add the model resolution, to be safe
# add the model resolution, to be safe
fetch_buffer = max_fetch_distance + model_resolution
aoi_bounding_box[0] -= fetch_buffer
aoi_bounding_box[1] -= fetch_buffer
aoi_bounding_box[2] += fetch_buffer
@ -279,7 +281,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
def test_invalid_habitats_outside_search_radius(self):
"""CV: test habitat search when no valid habitat within range."""
workspace_dir = self.workspace_dir
habitat_vector_path = os.path.join(workspace_dir, 'invalid_habitat.gpkg')
habitat_vector_path = os.path.join(
workspace_dir, 'invalid_habitat.gpkg')
_ = make_vector_of_invalid_geoms(habitat_vector_path)
base_shore_point_vector_path = os.path.join(
@ -287,7 +290,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
search_radius = 1 # meter, we don't want to find any habitat.
habitat_rank = 1
habitat_id = 'foo'
target_habitat_pickle_path = os.path.join(workspace_dir, 'target.pickle')
target_habitat_pickle_path = os.path.join(
workspace_dir, 'target.pickle')
coastal_vulnerability.search_for_habitat(
base_shore_point_vector_path, search_radius, habitat_rank,
@ -335,7 +339,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
fields = {'shore_id': ogr.OFTInteger}
attrs = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
point, simple_points_path, projection_wkt, 'GPKG',
point, simple_points_path, projection_wkt, 'GPKG',
fields=fields, attribute_list=attrs, ogr_geom_type=ogr.wkbPoint)
target_pickle_path = os.path.join(
workspace_dir, 'geomorphology.pickle')
@ -375,7 +379,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
target_surge_pickle_path, expected_raw_values_path)
def test_no_shelf_contour_near_aoi(self):
"""CV: test ValueError raised if shelf contour is too far from shore."""
"""CV: test ValueError raised if shelf contour too far from shore."""
workspace_dir = self.workspace_dir
base_shore_point_vector_path = os.path.join(
INPUT_DATA, "wwiii_shore_points_5000m.gpkg")
@ -390,7 +394,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
(bbox[0] - 2e6, bbox[1] - 2e6),
(bbox[0] - 2e6 - 100, bbox[1] - 2e6 - 100)])
pygeoprocessing.shapely_geometry_to_vector(
[line_a], shelf_contour_path, srs_wkt, 'GeoJSON',
[line_a], shelf_contour_path, srs_wkt, 'GeoJSON',
ogr_geom_type=ogr.wkbLineString)
target_surge_pickle_path = os.path.join(workspace_dir, 'surge.pickle')
@ -418,8 +422,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Make a shore point in center of AOI bbox
shore_point_path = os.path.join(workspace_dir, 'shore_point.shp')
pygeoprocessing.shapely_geometry_to_vector(
geometries, shore_point_path, srs_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
geometries, shore_point_path, srs_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
# Make surge line as diagonal of AOI bounding box
@ -429,7 +433,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
line_b = LineString([(bbox[0], bbox[1]), (bbox[2], bbox[3])])
geometries = [MultiLineString([line_a, line_b])]
pygeoprocessing.shapely_geometry_to_vector(
geometries, shelf_contour_path, srs_wkt, 'GeoJSON',
geometries, shelf_contour_path, srs_wkt, 'GeoJSON',
ogr_geom_type=ogr.wkbMultiLineString)
target_surge_pickle_path = os.path.join(
@ -482,7 +486,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
expected_values_dict, invert_values=True)
expected_rank_dict = {
int(x[0]): int(x[1]) for x in expected_rank_dict.items()}
# the dict items need sorting by FID to match the pre-sorted pickled items
# the dict items need sorting by FID to match the pre-sorted pickled
# items
expected_ranks = [x[1] for x in sorted(expected_rank_dict.items())]
numpy.testing.assert_array_equal(
@ -524,13 +529,15 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Make an SLR point vector
slr_fieldname = 'Trend'
slr_point_vector_path = os.path.join(workspace_dir, 'simple_points.shp')
slr_point_vector_path = os.path.join(
workspace_dir, 'simple_points.shp')
out_driver = ogr.GetDriverByName('ESRI Shapefile')
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
shapely_feature = Point(-125.65, 49.0)
out_vector = out_driver.CreateDataSource(slr_point_vector_path)
layer_name = os.path.basename(os.path.splitext(slr_point_vector_path)[0])
layer_name = os.path.basename(
os.path.splitext(slr_point_vector_path)[0])
out_layer = out_vector.CreateLayer(layer_name, srs=srs)
field_defn = ogr.FieldDefn(slr_fieldname, ogr.OFTReal)
out_layer.CreateField(field_defn)
@ -565,11 +572,11 @@ class CoastalVulnerabilityTests(unittest.TestCase):
projection_wkt = srs.ExportToWkt()
shore_point_path = os.path.join(workspace_dir, 'shore_point.shp')
fields = {'shore_id': ogr.OFTInteger}
attributes=[{'shore_id': 0}]
attributes = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
[Point(0., 0.)], shore_point_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
[Point(0., 0.)], shore_point_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
slr_point_vector_path = os.path.join(workspace_dir, 'slr_point.shp')
@ -600,11 +607,11 @@ class CoastalVulnerabilityTests(unittest.TestCase):
projection_wkt = srs.ExportToWkt()
shore_point_path = os.path.join(workspace_dir, 'shore_point.shp')
fields = {'shore_id': ogr.OFTInteger}
attributes=[{'shore_id': 0}]
attributes = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
[Point(0., 0.)], shore_point_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
[Point(0., 0.)], shore_point_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
slr_point_vector_path = os.path.join(workspace_dir, 'slr_point.shp')
@ -623,7 +630,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
self.assertTrue(expected_message in actual_message)
def test_long_aggregate_radius(self):
"""CV: handle an unreasonably long search radius in raster aggregation."""
"""CV: handle unreasonably long search radius in raster aggregation."""
workspace_dir = self.workspace_dir
raster_path = os.path.join(workspace_dir, 'simple_raster.tif')
target_pickle_path = os.path.join(workspace_dir, 'target.pickle')
@ -654,7 +661,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
geometries = [
Point(0.1, -0.1), # pixel (0,0): kernel origin out of bounds
Point(1.25, -1.25), # pixel (2,2): kernel origin & extent out of bounds
Point(1.25, -1.25), # pixel (2,2): kernel origin & extent O.O.B
Point(2.1, -2.1), # pixel (4,4): kernel extent out of bounds
]
fields = {'shore_id': ogr.OFTInteger}
@ -662,8 +669,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Make a vector proximate to the raster
simple_points_path = os.path.join(workspace_dir, 'simple_points.shp')
pygeoprocessing.shapely_geometry_to_vector(
geometries, simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
geometries, simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
coastal_vulnerability._aggregate_raster_values_in_radius(
@ -679,7 +686,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
list(actual_values.values()), expected_values, decimal=4)
def test_complete_run(self):
"""CV: regression test for a complete run with all optional arguments."""
"""CV: regression test for a complete run w/ all optional arguments."""
args = CoastalVulnerabilityTests.generate_base_args(self.workspace_dir)
# these optional args aren't included in base_args:
args['geomorphology_vector_path'] = os.path.join(
@ -703,10 +710,12 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Also expect matching shore_id field in all tabular outputs:
intermediate_csv = pandas.read_csv(
os.path.join(
args['workspace_dir'], 'intermediate/intermediate_exposure.csv'))
args['workspace_dir'],
'intermediate/intermediate_exposure.csv'))
habitat_csv = pandas.read_csv(
os.path.join(
args['workspace_dir'], 'intermediate/habitats/habitat_protection.csv'))
args['workspace_dir'],
'intermediate/habitats/habitat_protection.csv'))
pandas.testing.assert_series_equal(
actual_values_df['shore_id'], intermediate_csv['shore_id'])
pandas.testing.assert_series_equal(
@ -723,12 +732,14 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Points with ranks for the final equation. Also includes a field
# without the R_ prefix, which final equation should ignore.
base_vector_path = os.path.join(REGRESSION_DATA, 'coastal_exposure.gpkg')
base_vector_path = os.path.join(
REGRESSION_DATA, 'coastal_exposure.gpkg')
# This input gets modified in place, so first copy to working dir
# I'm using GPKG driver to copy because that driver may have problems
# updating a file created by a different GPKG driver version, and the version
# used is dependent on GDAL version. https://gdal.org/drivers/vector/gpkg.html
# updating a file created by a different GPKG driver version, and the
# version used is dependent on GDAL version.
# https://gdal.org/drivers/vector/gpkg.html
base_shore_point_vector = ogr.Open(base_vector_path)
gpkg_driver = ogr.GetDriverByName('GPKG')
gpkg_driver.CopyDataSource(
@ -750,17 +761,23 @@ class CoastalVulnerabilityTests(unittest.TestCase):
self.assertTrue(numpy.isnan(result))
def test_final_risk_calc_with_missing_data(self):
"""CV: test missing data at feature propogates to empty field in output."""
"""CV: test final risk calculation w/ missing data.
Test missing data at feature propogates to empty field in output.
"""
target_vector_path = os.path.join(self.workspace_dir, 'target.gpkg')
target_csv_path = os.path.join(self.workspace_dir, 'target.csv')
# This gpkg has a feature with an empty field value for 'R_slr'
# The function modifies the file in place, so copy to test workspace first.
# The function modifies the file in place, so copy to test workspace
# first.
# I'm using GPKG driver to copy because that driver may have problems
# updating a file created by a different GPKG driver version, and the version
# used is dependent on GDAL version. https://gdal.org/drivers/vector/gpkg.html
base_vector_path = os.path.join(REGRESSION_DATA, 'test_missing_values.gpkg')
# updating a file created by a different GPKG driver version, and the
# version used is dependent on GDAL version.
# https://gdal.org/drivers/vector/gpkg.html
base_vector_path = os.path.join(
REGRESSION_DATA, 'test_missing_values.gpkg')
base_shore_point_vector = ogr.Open(base_vector_path)
gpkg_driver = ogr.GetDriverByName('GPKG')
gpkg_driver.CopyDataSource(
@ -833,8 +850,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
fields = {'shore_id': ogr.OFTInteger}
attributes = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
[Point(0., 0.)], simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
[Point(0., 0.)], simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
coastal_vulnerability._aggregate_raster_values_in_radius(
@ -955,7 +972,7 @@ class CoastalVulnerabilityTests(unittest.TestCase):
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # UTM Zone 10N
wkt = srs.ExportToWkt()
aoi_geometries = [Polygon([
(-200, -200), (200, -200), (200, 200), (-200, 200), (-200, -200)])]
pygeoprocessing.shapely_geometry_to_vector(
@ -963,13 +980,13 @@ class CoastalVulnerabilityTests(unittest.TestCase):
landmass_path = os.path.join(workspace_dir, 'landmass.geojson')
poly_a = Polygon([
(-200, -200), (-100, -200), (-100, -100), (-200, -100),
(-200, -200), (-100, -200), (-100, -100), (-200, -100),
(-200, -200)])
poly_b = Polygon([
(100, 100), (200, 100), (200, 200), (100, 200), (100, 100)])
landmass_geometries = [poly_a, poly_b, MultiPolygon([poly_a, poly_b])]
pygeoprocessing.shapely_geometry_to_vector(
landmass_geometries, landmass_path, wkt, 'GeoJSON',
landmass_geometries, landmass_path, wkt, 'GeoJSON',
ogr_geom_type=ogr.wkbUnknown)
args['aoi_vector_path'] = aoi_path
@ -1054,11 +1071,11 @@ class CoastalVulnerabilityTests(unittest.TestCase):
srs.ImportFromEPSG(26910) # UTM Zone 10N
wkt = srs.ExportToWkt()
poly_a = Polygon([
(-200, -200), (-100, -200), (-100, -100), (-200, -100),
(-200, -200), (-100, -200), (-100, -100), (-200, -100),
(-200, -200)])
poly_b = Polygon([
(100, 100), (200, 100), (200, 200), (100, 200), (100, 100)])
pygeoprocessing.shapely_geometry_to_vector(
[poly_a, poly_b], aoi_path, wkt, 'GeoJSON')
@ -1096,20 +1113,21 @@ class CoastalVulnerabilityTests(unittest.TestCase):
self.assertTrue(n_points == 6)
def test_no_wwiii_coverage(self):
"""CV: test exception when shore points are outside max wwiii distance."""
"""CV: test exception when shore points are outside max wwiii dist."""
args = CoastalVulnerabilityTests.generate_base_args(self.workspace_dir)
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # UTM Zone 10N
projection_wkt = srs.ExportToWkt()
simple_points_path = os.path.join(self.workspace_dir, 'simple_points.shp')
simple_points_path = os.path.join(
self.workspace_dir, 'simple_points.shp')
fields = {'shore_id': ogr.OFTInteger}
attributes = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
[Point(0.0, 0.0)], simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
[Point(0.0, 0.0)], simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
target_path = os.path.join(self.workspace_dir, 'target.gpkg')
@ -1142,8 +1160,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
attributes = [{'shore_id': 0}]
pygeoprocessing.shapely_geometry_to_vector(
geometries, simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
geometries, simple_points_path, projection_wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbPoint)
target_path = os.path.join(workspace_dir, 'target.gpkg')
@ -1278,7 +1296,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
base_point_vector_path, base_raster_path, sample_distance,
target_pickle_path, aggregation_mode)
actual_message = str(cm.exception)
expected_message = 'aggregation mode must be either "mean" or "density"'
expected_message = ('aggregation mode must be either "mean" or '
'"density"')
self.assertTrue(actual_message == expected_message)
def test_invalid_habitat_table_paths(self):
@ -1298,7 +1317,8 @@ class CoastalVulnerabilityTests(unittest.TestCase):
def test_polygon_to_lines(self):
"""CV: test a helper function that converts polygons to linestrings."""
# Test a polygon with inner rings to cover all paths through function.
donut_wkt = 'POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))'
donut_wkt = ('POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), '
'(20 30, 35 35, 30 20, 20 30))')
geometry = ogr.CreateGeometryFromWkt(donut_wkt)
shapely_geom = shapely.wkb.loads(geometry.ExportToWkb())
line_list = coastal_vulnerability.polygon_to_lines(shapely_geom)
@ -1359,7 +1379,8 @@ class CoastalVulnerabilityValidationTests(unittest.TestCase):
from natcap.invest import coastal_vulnerability
from natcap.invest import validation
validation_errors = coastal_vulnerability.validate({}) # empty args dict.
# empty args dict.
validation_errors = coastal_vulnerability.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(
self.base_required_keys)
@ -1498,8 +1519,9 @@ def make_vector_of_invalid_geoms(target_vector_path):
assert not invalid_shared_edge_polygon.IsValid()
# 3: Dangling edge - fixed by buffer
invalid_dangling_edge_polygon = ogr.CreateGeometryFromWkt(
'POLYGON((100 100, 110 100, 115 105, 110 100, 110 110, 100 110, 100 100))')
dangle_geom = ('POLYGON((100 100, 110 100, 115 105, 110 100, 110 110, '
'100 110, 100 100))')
invalid_dangling_edge_polygon = ogr.CreateGeometryFromWkt(dangle_geom)
assert not invalid_dangling_edge_polygon.IsValid()
# One invalid geom that cannot be loaded by shapely or fixed by buffer

View File

@ -319,7 +319,8 @@ class CropValidationTests(unittest.TestCase):
from natcap.invest import crop_production_percentile
from natcap.invest import validation
validation_errors = crop_production_percentile.validate({}) # empty args dict.
# empty args dict.
validation_errors = crop_production_percentile.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(self.base_required_keys)
self.assertEqual(invalid_keys, expected_missing_keys)
@ -329,7 +330,8 @@ class CropValidationTests(unittest.TestCase):
from natcap.invest import crop_production_regression
from natcap.invest import validation
validation_errors = crop_production_regression.validate({}) # empty args dict.
# empty args dict.
validation_errors = crop_production_regression.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(
self.base_required_keys +

View File

@ -1,3 +1,4 @@
"""Testing Module for Datastack."""
import os
import unittest
import tempfile
@ -44,13 +45,13 @@ def _assert_vectors_equal(
av, ev, decimal=tolerance_places)
else:
assert(ev is None)
expected_geom = feature.GetGeometryRef()
expected_geom_wkt = expected_geom.ExportToWkt()
actual_geom = feature.GetGeometryRef()
actual_geom_wkt = actual_geom.ExportToWkt()
assert(expected_geom_wkt == actual_geom_wkt)
feature = None
actual_feature = None
finally:
@ -61,13 +62,17 @@ def _assert_vectors_equal(
class DatastacksTest(unittest.TestCase):
"""Test Datastack."""
def setUp(self):
"""Create temporary workspace."""
self.workspace = tempfile.mkdtemp()
def tearDown(self):
"""Remove temporary workspace."""
shutil.rmtree(self.workspace)
def test_collect_simple_parameters(self):
"""Datastack: test collect simple parameters."""
from natcap.invest import datastack
params = {
'a': 1,
@ -93,6 +98,7 @@ class DatastacksTest(unittest.TestCase):
{'a': 1, 'b': 'hello there', 'c': 'plain bytestring', 'd': ''})
def test_collect_multipart_gdal_raster(self):
"""Datastack: test collect multipart gdal raster."""
from natcap.invest import datastack
params = {
'raster': os.path.join(DATA_DIR, 'dem'),
@ -119,6 +125,7 @@ class DatastacksTest(unittest.TestCase):
numpy.testing.assert_allclose(model_array, reg_array)
def test_collect_geotiff(self):
"""Datastack: test collect geotiff."""
# Necessary test, as this is proving to be an issue.
from natcap.invest import datastack
params = {
@ -136,6 +143,7 @@ class DatastacksTest(unittest.TestCase):
numpy.testing.assert_allclose(model_array, reg_array)
def test_collect_ogr_vector(self):
"""Datastack: test collect ogr vector."""
from natcap.invest import datastack
source_vector_path = os.path.join(DATA_DIR, 'watersheds.shp')
source_vector = ogr.Open(source_vector_path)
@ -169,12 +177,13 @@ class DatastacksTest(unittest.TestCase):
out_directory,
datastack.DATASTACK_PARAMETER_FILENAME)))['args']
_assert_vectors_equal(
params['vector'],
params['vector'],
os.path.join(out_directory, archived_params['vector']))
self.assertEqual(len(archived_params), 1) # sanity check
def test_collect_ogr_table(self):
"""Datastack: test collect ogr table."""
from natcap.invest import datastack
params = {
'table': os.path.join(DATA_DIR, 'carbon_pools_samp.csv'),
@ -193,14 +202,15 @@ class DatastacksTest(unittest.TestCase):
open(os.path.join(
out_directory,
datastack.DATASTACK_PARAMETER_FILENAME)))['args']
model_df = pandas.read_csv(params['table'])
reg_df = pandas.read_csv(
model_df = pandas.read_csv(params['table'])
reg_df = pandas.read_csv(
os.path.join(out_directory, archived_params['table']))
pandas.testing.assert_frame_equal(model_df, reg_df)
self.assertEqual(len(archived_params), 1) # sanity check
def test_nonspatial_single_file(self):
"""Datastack: test nonspatial single file."""
from natcap.invest import datastack
params = {
@ -222,13 +232,14 @@ class DatastacksTest(unittest.TestCase):
open(os.path.join(out_directory,
datastack.DATASTACK_PARAMETER_FILENAME)))['args']
self.assertTrue(filecmp.cmp(
params['some_file'],
os.path.join(out_directory, archived_params['some_file']),
params['some_file'],
os.path.join(out_directory, archived_params['some_file']),
shallow=False))
self.assertEqual(len(archived_params), 1) # sanity check
def test_data_dir(self):
"""Datastack: test data directory."""
from natcap.invest import datastack
params = {
'data_dir': os.path.join(self.workspace, 'data_dir')
@ -261,7 +272,7 @@ class DatastacksTest(unittest.TestCase):
self.assertEqual(len(archived_params), 1) # sanity check
common_files = ['foo.txt', 'bar.txt', 'baz.txt', 'nested/nested.txt']
matched_files, mismatch_files, error_files = filecmp.cmpfiles(
params['data_dir'],
params['data_dir'],
os.path.join(out_directory, archived_params['data_dir']),
common_files, shallow=False)
if mismatch_files or error_files:
@ -269,6 +280,7 @@ class DatastacksTest(unittest.TestCase):
f' {mismatch_files} ; and the errors are {error_files}')
def test_list_of_inputs(self):
"""Datastack: test list of inputs."""
from natcap.invest import datastack
params = {
'file_list': [
@ -292,8 +304,9 @@ class DatastacksTest(unittest.TestCase):
archived_params = json.load(
open(os.path.join(out_directory,
datastack.DATASTACK_PARAMETER_FILENAME)))['args']
archived_file_list = [
os.path.join(out_directory, filename) for filename in archived_params['file_list']]
archived_file_list = [
os.path.join(out_directory, filename)
for filename in archived_params['file_list']]
self.assertEqual(len(archived_params), 1) # sanity check
for expected_file, archive_file in zip(
@ -302,6 +315,7 @@ class DatastacksTest(unittest.TestCase):
self.fail(f'File mismatch: {expected_file} != {archive_file}')
def test_duplicate_filepaths(self):
"""Datastack: test duplicate filepaths."""
from natcap.invest import datastack
params = {
'foo': os.path.join(self.workspace, 'foo.txt'),
@ -335,6 +349,7 @@ class DatastacksTest(unittest.TestCase):
len(os.listdir(os.path.join(out_directory, 'data'))), 1)
def test_archive_extraction(self):
"""Datastack: test archive extraction."""
from natcap.invest import datastack
params = {
'blank': '',
@ -378,7 +393,7 @@ class DatastacksTest(unittest.TestCase):
numpy.testing.assert_allclose(model_array, reg_array)
_assert_vectors_equal(
archive_params['vector'], params['vector'])
model_df = pandas.read_csv(archive_params['table'])
model_df = pandas.read_csv(archive_params['table'])
reg_df = pandas.read_csv(params['table'])
pandas.testing.assert_frame_equal(model_df, reg_df)
for key in ('blank', 'a', 'b', 'c'):
@ -396,6 +411,7 @@ class DatastacksTest(unittest.TestCase):
filecmp.cmp(expected_file, archive_file, shallow=False))
def test_nested_args_keys(self):
"""Datastack: test nested argument keys."""
from natcap.invest import datastack
params = {
@ -412,6 +428,7 @@ class DatastacksTest(unittest.TestCase):
self.assertEqual(archive_params, params)
def test_datastack_parameter_set(self):
"""Datastack: test datastack parameter set."""
from natcap.invest import datastack, __version__
params = {
@ -455,6 +472,7 @@ class DatastacksTest(unittest.TestCase):
self.assertEqual(callable_name, modelname)
def test_relative_parameter_set(self):
"""Datastack: test relative parameter set."""
from natcap.invest import datastack, __version__
params = {
@ -566,6 +584,7 @@ class DatastacksTest(unittest.TestCase):
params, 'sample_model', natcap.invest.__version__))
def test_get_datatack_info_parameter_set(self):
"""Datastack: test get datastack info parameter set."""
import natcap.invest
from natcap.invest import datastack
@ -585,6 +604,7 @@ class DatastacksTest(unittest.TestCase):
params, 'sample_model', natcap.invest.__version__))
def test_get_datastack_info_logfile_new_style(self):
"""Datastack: test get datastack info logfile new style."""
import natcap.invest
from natcap.invest import datastack
args = {
@ -605,6 +625,7 @@ class DatastacksTest(unittest.TestCase):
args, 'some_modelname', natcap.invest.__version__))
def test_get_datastack_info_logfile_iui_style(self):
"""Datastack: test get datastack info logfile iui style."""
from natcap.invest import datastack
logfile_path = os.path.join(self.workspace, 'logfile.txt')
@ -701,16 +722,17 @@ class DatastacksTest(unittest.TestCase):
extraction_path)
expected_args = {
'windows_path': os.path.join(extraction_path, 'data',
'filepath1.txt'),
'linux_path': os.path.join(extraction_path, 'data',
'filepath2.txt'),
'windows_path': os.path.join(
extraction_path, 'data', 'filepath1.txt'),
'linux_path': os.path.join(
extraction_path, 'data', 'filepath2.txt'),
}
self.maxDiff = None # show whole exception on failure
self.assertEqual(extracted_args, expected_args)
class UtilitiesTest(unittest.TestCase):
"""Datastack Utilities Tests."""
def test_print_args(self):
"""Datastacks: verify that we format args correctly."""
from natcap.invest.datastack import format_args_dict, __version__

View File

@ -125,7 +125,6 @@ class DelineateItTests(unittest.TestCase):
def test_point_snapping(self):
"""DelineateIt: test point snapping."""
from natcap.invest import delineateit
srs = osr.SpatialReference()
@ -155,14 +154,14 @@ class DelineateItTests(unittest.TestCase):
Point(13, -5),
box(-2, -2, -1, -1), # Off the edge
]
fields={'foo': ogr.OFTInteger, 'bar': ogr.OFTString}
attributes=[
fields = {'foo': ogr.OFTInteger, 'bar': ogr.OFTString}
attributes = [
{'foo': 0, 'bar': 0.1}, {'foo': 1, 'bar': 1.1},
{'foo': 2, 'bar': 2.1}, {'foo': 3, 'bar': 3.1},
{'foo': 4, 'bar': 4.1}]
pygeoprocessing.shapely_geometry_to_vector(
source_features, source_points_path, wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
source_features, source_points_path, wkt, 'GeoJSON',
fields=fields, attribute_list=attributes,
ogr_geom_type=ogr.wkbUnknown)
snapped_points_path = os.path.join(self.workspace_dir,
@ -256,7 +255,7 @@ class DelineateItTests(unittest.TestCase):
dem_raster_path = os.path.join(self.workspace_dir, 'dem.tif')
# byte datatype
pygeoprocessing.numpy_array_to_raster(
dem_matrix, 255, (2, -2), (2, -2), projection_wkt,
dem_matrix, 255, (2, -2), (2, -2), projection_wkt,
dem_raster_path)
# empty geometry
@ -314,7 +313,8 @@ class DelineateItTests(unittest.TestCase):
outflow_layer = None
outflow_vector = None
target_vector_path = os.path.join(self.workspace_dir, 'checked_geometries.gpkg')
target_vector_path = os.path.join(
self.workspace_dir, 'checked_geometries.gpkg')
with self.assertRaises(ValueError) as cm:
delineateit.check_geometries(
outflow_vector_path, dem_raster_path, target_vector_path,
@ -335,7 +335,8 @@ class DelineateItTests(unittest.TestCase):
target_vector = gdal.OpenEx(target_vector_path, gdal.OF_VECTOR)
target_layer = target_vector.GetLayer()
self.assertEqual(target_layer.GetFeatureCount(), len(expected_geom_areas))
self.assertEqual(
target_layer.GetFeatureCount(), len(expected_geom_areas))
for feature in target_layer:
geom = feature.GetGeometryRef()

View File

@ -26,7 +26,7 @@ EXPECTED_FILE_LIST = [
def _make_harvest_shp(workspace_dir):
"""Within workspace, make an output folder with dummy Finfish_Harvest.shp.
Parameters:
Args:
workspace_dir: path to workspace for creating the output folder.
"""
output_path = os.path.join(workspace_dir, 'output')
@ -35,6 +35,7 @@ def _make_harvest_shp(workspace_dir):
with open(os.path.join(output_path, 'Finfish_Harvest.shp'), 'wb') as shp:
shp.write(b'')
def _assert_vectors_equal(
actual_vector_path, expected_vector_path, tolerance_places=3):
"""Assert fieldnames and values are equal with no respect to order."""
@ -63,13 +64,13 @@ def _assert_vectors_equal(
av, ev, decimal=tolerance_places)
else:
assert(ev is None)
expected_geom = feature.GetGeometryRef()
expected_geom_wkt = expected_geom.ExportToWkt()
actual_geom = feature.GetGeometryRef()
actual_geom_wkt = actual_geom.ExportToWkt()
assert(expected_geom_wkt == actual_geom_wkt)
feature = None
actual_feature = None
finally:
@ -78,6 +79,7 @@ def _assert_vectors_equal(
expected_layer = None
expected_vector = None
class FinfishTests(unittest.TestCase):
"""Tests for Finfish Aquaculture."""
@ -93,7 +95,7 @@ class FinfishTests(unittest.TestCase):
@staticmethod
def generate_base_args(workspace_dir):
"""Generate an args list that is consistent for both regression tests"""
"""Generate args list that is consistent for both regression tests."""
args = {
'farm_ID': 'FarmID',
'farm_op_tbl': os.path.join(SAMPLE_DATA, 'Farm_Operations.csv'),
@ -150,7 +152,7 @@ class FinfishTests(unittest.TestCase):
def _test_same_files(base_path_list, directory_path):
"""Assert files in `base_path_list` are in `directory_path`.
Parameters:
Args:
base_path_list (list): list of strings which are relative
file paths.
directory_path (string): a path to a directory whose contents will
@ -205,7 +207,8 @@ class FinfishValidationTests(unittest.TestCase):
from natcap.invest.finfish_aquaculture import finfish_aquaculture
from natcap.invest import validation
validation_errors = finfish_aquaculture.validate({}) # empty args dict.
# empty args dict.
validation_errors = finfish_aquaculture.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(
self.base_required_keys +

View File

@ -30,7 +30,7 @@ class FisheriesSampleDataTests(unittest.TestCase):
def get_harvest_info(workspace, filename='results_table.csv'):
"""Extract final harvest info from the results CSV.
Parameters:
Args:
workspace (string): The path to the output workspace. The file
*workspace*/output/results_table.csv must exist.
@ -73,16 +73,14 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 6050000.0,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'beta': 4.14e-08,
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': False,
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_shrimp',
'population_params.csv'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_shrimp', 'population_params.csv'),
'population_type': 'Stage-Based',
'recruitment_type': 'Fixed',
'sexsp': 'No',
@ -97,7 +95,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
}
validation_warnings = fisheries.validate(args)
self.assertEqual(len(validation_warnings), 1)
self.assertTrue('required but has no value' in validation_warnings[0][1])
self.assertTrue(
'required but has no value' in validation_warnings[0][1])
def test_validation_batch(self):
"""Fisheries: Batch parameters (full model validation)."""
@ -106,18 +105,16 @@ class FisheriesSampleDataTests(unittest.TestCase):
# enabled.
args = {
'alpha': 5.77e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_belize',
'Lob_Belize_Subregions.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_belize', 'Lob_Belize_Subregions.shp'),
'beta': 2.885e6,
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': True,
'migration_dir': os.path.join(SAMPLE_DATA,
'input_lobster', 'Migrations'),
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_lobster',
'population_params.csv'),
'migration_dir': os.path.join(
SAMPLE_DATA, 'input_lobster', 'Migrations'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_lobster', 'population_params.csv'),
'population_type': 'Age-Based',
'recruitment_type': 'Beverton-Holt',
'sexsp': 'No',
@ -169,7 +166,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
"""Fisheries: Validate AOI fields."""
from natcap.invest.fisheries import fisheries
args = {'aoi_vector_path': os.path.join(self.workspace_dir, 'aoi.gpkg')}
args = {
'aoi_vector_path': os.path.join(self.workspace_dir, 'aoi.gpkg')}
gpkg_driver = gdal.GetDriverByName('GPKG')
vector = gpkg_driver.Create(
args['aoi_vector_path'], 0, 0, 0, gdal.GDT_Unknown)
@ -179,7 +177,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
layer = None
vector = None
validation_warnings = fisheries.validate(args, limit_to='aoi_vector_path')
validation_warnings = fisheries.validate(
args, limit_to='aoi_vector_path')
self.assertEqual(len(validation_warnings), 1)
self.assertTrue('Fields are missing from the first layer' in
validation_warnings[0][1])
@ -200,16 +199,14 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 6050000.0,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'beta': 4.14e-08,
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': False,
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_shrimp',
'population_params.csv'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_shrimp', 'population_params.csv'),
'population_type': 'Stage-Based',
'recruitment_type': 'Fixed',
'sexsp': 'No',
@ -225,7 +222,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
fisheries.execute(args)
final_timestep_data = FisheriesSampleDataTests.get_harvest_info(
args['workspace_dir'], 'results_table_foo.csv')
self.assertEqual(final_timestep_data['spawners'], '(fixed recruitment)')
self.assertEqual(
final_timestep_data['spawners'], '(fixed recruitment)')
self.assertEqual(final_timestep_data['harvest'], 3120557.88)
def test_sampledata_lobster(self):
@ -233,18 +231,16 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 5.77e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_belize',
'Lob_Belize_Subregions.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_belize', 'Lob_Belize_Subregions.shp'),
'beta': 2.885e6,
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': True,
'migration_dir': os.path.join(SAMPLE_DATA,
'input_lobster', 'Migrations'),
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_lobster',
'population_params.csv'),
'migration_dir': os.path.join(
SAMPLE_DATA, 'input_lobster', 'Migrations'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_lobster', 'population_params.csv'),
'population_type': 'Age-Based',
'recruitment_type': 'Beverton-Holt',
'sexsp': 'No',
@ -269,16 +265,14 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 6.05e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'beta': 4.14e-08,
'do_batch': False,
'harvest_units': 'Individuals',
'migr_cont': False,
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_blue_crab',
'population_params.csv'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_blue_crab', 'population_params.csv'),
'population_type': 'Age-Based',
'recruitment_type': 'Ricker',
'sexsp': 'No',
@ -301,15 +295,13 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 6.05e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'beta': 4.14e-08,
'do_batch': True,
'harvest_units': 'Individuals',
'migr_cont': False,
'population_csv_dir': os.path.join(SAMPLE_DATA,
'input_blue_crab'),
'population_csv_dir': os.path.join(SAMPLE_DATA, 'input_blue_crab'),
'population_type': 'Age-Based',
'recruitment_type': 'Ricker',
'sexsp': 'No',
@ -332,16 +324,15 @@ class FisheriesSampleDataTests(unittest.TestCase):
from natcap.invest.fisheries import fisheries
args = {
'alpha': 2e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_hood_canal',
'DC_HoodCanal_Subregions.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_hood_canal',
'DC_HoodCanal_Subregions.shp'),
'beta': 3.09e-7,
'do_batch': False,
'harvest_units': 'Individuals',
'migr_cont': False,
'population_csv_path': os.path.join(SAMPLE_DATA,
'input_dungeness_crab',
'population_params.csv'),
'population_csv_path': os.path.join(
SAMPLE_DATA, 'input_dungeness_crab', 'population_params.csv'),
'population_type': 'Age-Based',
'recruitment_type': 'Ricker',
'sexsp': 'Yes',
@ -361,12 +352,11 @@ class FisheriesSampleDataTests(unittest.TestCase):
@staticmethod
def fecundity_args(workspace):
"""
Create a base set of args for the fecundity recruitment model.
"""Create a base set of args for the fecundity recruitment model.
The AOI is located in Belize.
Parameters:
Args:
workspace (string): The path to the workspace on disk.
Returns:
@ -374,15 +364,14 @@ class FisheriesSampleDataTests(unittest.TestCase):
"""
args = {
'alpha': 5.77e6,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_belize',
'Lob_Belize_Subregions.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_belize', 'Lob_Belize_Subregions.shp'),
'beta': 2.885e6,
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': False,
'population_csv_path': os.path.join(TEST_DATA,
'sample_fecundity_params.csv'),
'population_csv_path': os.path.join(
TEST_DATA, 'sample_fecundity_params.csv'),
'population_type': 'Age-Based',
'recruitment_type': 'Fecundity',
'sexsp': 'No',
@ -397,10 +386,9 @@ class FisheriesSampleDataTests(unittest.TestCase):
@staticmethod
def galveston_args(workspace):
"""
Create a base set of fecundity args for Galveston Bay.
"""Create a base set of fecundity args for Galveston Bay.
Parameters:
Args:
workspace (string): The path to the workspace on disk.
Returns:
@ -409,9 +397,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
args = FisheriesSampleDataTests.fecundity_args(workspace)
args.update({
'alpha': 6050000.0,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'beta': 4.14e-08,
'harvest_units': 'Individuals',
'spawn_units': 'Individuals',
@ -441,7 +428,7 @@ class FisheriesSampleDataTests(unittest.TestCase):
# This doesn't model anything real, but it will produce outputs as
# expected.
'recruitment_func': lambda x: (numpy.ones((9,)),
numpy.float64(100))
numpy.float64(100))
})
fisheries.execute(args)
@ -495,9 +482,8 @@ class FisheriesSampleDataTests(unittest.TestCase):
args = {
'alpha': 6050000.0,
'beta': 4.14e-08,
'aoi_vector_path': os.path.join(SAMPLE_DATA,
'shapefile_galveston',
'Galveston_Subregion.shp'),
'aoi_vector_path': os.path.join(
SAMPLE_DATA, 'shapefile_galveston', 'Galveston_Subregion.shp'),
'do_batch': False,
'harvest_units': 'Weight',
'migr_cont': False,
@ -518,15 +504,17 @@ class FisheriesSampleDataTests(unittest.TestCase):
fisheries.execute(args)
final_timestep_data = FisheriesSampleDataTests.get_harvest_info(
args['workspace_dir'], 'results_table_foo.csv')
self.assertEqual(final_timestep_data['spawners'], '(fixed recruitment)')
self.assertEqual(
final_timestep_data['spawners'], '(fixed recruitment)')
self.assertEqual(final_timestep_data['harvest'], 3120557.88)
# verify that two identical subregions were found.
in_subregion = False
subregions = {}
harvest_table_path = os.path.join(self.workspace_dir, 'output',
'results_table_foo.csv')
with io.open(harvest_table_path, 'r', newline=os.linesep) as harvest_table:
harvest_table_path = os.path.join(
self.workspace_dir, 'output', 'results_table_foo.csv')
with io.open(
harvest_table_path, 'r', newline=os.linesep) as harvest_table:
for line in harvest_table:
if in_subregion:
if line.lower().startswith('total'):
@ -560,10 +548,10 @@ class FisheriesHSTTest(unittest.TestCase):
args = {
'gamma': 0.5,
'hab_cont': False,
'habitat_chg_csv_path': os.path.join(HST_INPUTS,
'habitat_chg_params.csv'),
'habitat_dep_csv_path': os.path.join(HST_INPUTS,
'habitat_dep_params.csv'),
'habitat_chg_csv_path': os.path.join(
HST_INPUTS, 'habitat_chg_params.csv'),
'habitat_dep_csv_path': os.path.join(
HST_INPUTS, 'habitat_dep_params.csv'),
'pop_cont': False,
'population_csv_path': os.path.join(HST_INPUTS, 'pop_params.csv'),
'sexsp': 'No',
@ -572,7 +560,8 @@ class FisheriesHSTTest(unittest.TestCase):
fisheries_hst.execute(args)
actual_values_df = pandas.read_csv(
os.path.join(args['workspace_dir'], 'output', 'pop_params_modified.csv'))
os.path.join(
args['workspace_dir'], 'output', 'pop_params_modified.csv'))
expected_values_df = pandas.read_csv(
os.path.join(TEST_DATA, 'pop_params_modified.csv'))
pandas.testing.assert_frame_equal(actual_values_df, expected_values_df)
@ -583,13 +572,13 @@ class FisheriesHSTTest(unittest.TestCase):
args = {
'gamma': 0.5,
'hab_cont': False,
'habitat_chg_csv_path': os.path.join(HST_INPUTS,
'habitat_chg_params.csv'),
'habitat_dep_csv_path': os.path.join(HST_INPUTS,
'habitat_dep_params.csv'),
'habitat_chg_csv_path': os.path.join(
HST_INPUTS, 'habitat_chg_params.csv'),
'habitat_dep_csv_path': os.path.join(
HST_INPUTS, 'habitat_dep_params.csv'),
'pop_cont': False,
'population_csv_path': os.path.join(TEST_DATA,
'hst_pop_params_sexsp.csv'),
'population_csv_path': os.path.join(
TEST_DATA, 'hst_pop_params_sexsp.csv'),
'sexsp': 'Yes',
'workspace_dir': self.workspace_dir,
}
@ -597,7 +586,9 @@ class FisheriesHSTTest(unittest.TestCase):
fisheries_hst.execute(args)
actual_values_df = pandas.read_csv(
os.path.join(args['workspace_dir'], 'output', 'hst_pop_params_sexsp_modified.csv'))
os.path.join(
args['workspace_dir'], 'output',
'hst_pop_params_sexsp_modified.csv'))
expected_values_df = pandas.read_csv(
os.path.join(TEST_DATA, 'hst_pop_params_sexsp_modified.csv'))
pandas.testing.assert_frame_equal(actual_values_df, expected_values_df)