invest/tests/test_forest_carbon_edge.py

808 lines
34 KiB
Python

"""Module for Regression Testing the InVEST Forest Carbon Edge model."""
import os
import pickle
import shutil
import tempfile
import unittest
import numpy
import pandas
import pygeoprocessing
import shapely
from osgeo import gdal
from osgeo import ogr
from osgeo import osr
from shapely.geometry import Polygon
gdal.UseExceptions()
REGRESSION_DATA = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'forest_carbon_edge_effect')
def make_simple_vector(path_to_shp):
"""
Generate shapefile with one rectangular polygon
Args:
path_to_shp (str): path to target shapefile
Returns:
None
"""
# (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin)
shapely_geometry_list = [
Polygon([(461251, 4923195), (461501, 4923195),
(461501, 4923445), (461251, 4923445),
(461251, 4923195)])
]
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
vector_format = "ESRI Shapefile"
fields = {"id": ogr.OFTReal}
attribute_list = [{"id": 0}]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)
def make_simple_raster(base_raster_path, array, nodata_val=-1):
"""Create a raster on designated path.
Args:
base_raster_path (str): the raster path for the new raster.
array (array): numpy array to convert to tif.
nodata_val (int or None): for defining a raster's nodata value.
Returns:
None
"""
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # UTM Zone 10N
projection_wkt = srs.ExportToWkt()
# origin hand-picked for this epsg:
origin = (461261, 4923265)
pixel_size = (1, -1)
pygeoprocessing.numpy_array_to_raster(
array, nodata_val, pixel_size, origin, projection_wkt,
base_raster_path)
class ForestCarbonEdgeTests(unittest.TestCase):
"""Tests for the Forest Carbon Edge Model."""
def setUp(self):
"""Overriding setUp function to create temp workspace directory."""
# this lets us delete the workspace after its done no matter the
# test result
self.workspace_dir = tempfile.mkdtemp()
def tearDown(self):
"""Overriding tearDown function to remove temporary directory."""
shutil.rmtree(self.workspace_dir)
def test_carbon_full(self):
"""Forest Carbon Edge: regression testing all functionality."""
from natcap.invest import forest_carbon_edge_effect
args = {
'aoi_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'small_aoi.shp'),
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input', 'forest_edge_carbon_lu_table.csv'),
'compute_forest_edge_effects': True,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'small_lulc.tif'),
'n_nearest_model_points': 10,
'pools_to_calculate': 'all',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
forest_carbon_edge_effect.execute(args)
ForestCarbonEdgeTests._test_same_files(
os.path.join(REGRESSION_DATA, 'file_list.txt'),
args['workspace_dir'])
self._assert_vector_results_close(
'id',
['c_sum', 'c_ha_mean'],
os.path.join(
args['workspace_dir'], 'aggregated_carbon_stocks.shp'),
os.path.join(REGRESSION_DATA, 'agg_results_base.shp'))
# Check raster output to make sure values are in Mg/ha.
raster_path = os.path.join(args['workspace_dir'], 'carbon_map.tif')
raster_info = pygeoprocessing.get_raster_info(raster_path)
nodata = raster_info['nodata'][0]
raster_sum = 0.0
for _, block in pygeoprocessing.iterblocks((raster_path, 1)):
raster_sum += numpy.sum(
block[~pygeoprocessing.array_equals_nodata(
block, nodata)], dtype=numpy.float64)
# expected_sum_per_pixel_values is in Mg, calculated from raster
# generated by the model when each pixel value was in Mg/px.
# Since pixel values are now Mg/ha, raster sum is (Mg•px)/ha.
# To convert expected_sum_per_pixel_values from Mg, multiply by px/ha.
expected_sum_per_pixel_values = 21414391.997192383
pixel_area = abs(numpy.prod(raster_info['pixel_size']))
pixels_per_hectare = 10000 / pixel_area
expected_sum = expected_sum_per_pixel_values * pixels_per_hectare
numpy.testing.assert_allclose(raster_sum, expected_sum)
def test_carbon_dup_output(self):
"""Forest Carbon Edge: test for existing output overlap."""
from natcap.invest import forest_carbon_edge_effect
args = {
'aoi_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'small_aoi.shp'),
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input', 'forest_edge_carbon_lu_table.csv'),
'compute_forest_edge_effects': True,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'small_lulc.tif'),
'n_nearest_model_points': 1,
'pools_to_calculate': 'above_ground',
'results_suffix': 'small',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
# explicitly testing that invoking twice doesn't cause the model to
# crash because of existing outputs
forest_carbon_edge_effect.execute(args)
forest_carbon_edge_effect.execute(args)
self.assertTrue(True) # explicit pass of the model
def test_carbon_no_forest_edge(self):
"""Forest Carbon Edge: test for no forest edge effects."""
from natcap.invest import forest_carbon_edge_effect
args = {
'aoi_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'small_aoi.shp'),
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input',
'no_forest_edge_carbon_lu_table.csv'),
'compute_forest_edge_effects': False,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'small_lulc.tif'),
'n_nearest_model_points': 1,
'pools_to_calculate': 'above_ground',
'results_suffix': 'small_no_edge_effect',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
forest_carbon_edge_effect.execute(args)
ForestCarbonEdgeTests._test_same_files(
os.path.join(
REGRESSION_DATA, 'file_list_no_edge_effect.txt'),
args['workspace_dir'])
self._assert_vector_results_close(
'id',
['c_sum', 'c_ha_mean'],
os.path.join(
args['workspace_dir'],
'aggregated_carbon_stocks_small_no_edge_effect.shp'),
os.path.join(
REGRESSION_DATA, 'agg_results_no_edge_effect.shp'))
def test_carbon_bad_pool_value(self):
"""Forest Carbon Edge: test with bad carbon pool value."""
from natcap.invest import forest_carbon_edge_effect
args = {
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input',
'no_forest_edge_carbon_lu_table_bad_pool_value.csv'),
'compute_forest_edge_effects': False,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'small_lulc.tif'),
'n_nearest_model_points': 1,
'pools_to_calculate': 'all',
'results_suffix': 'small_no_edge_effect',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
with self.assertRaises(ValueError) as cm:
forest_carbon_edge_effect.execute(args)
expected_message = 'Could not interpret carbon pool value'
actual_message = str(cm.exception)
self.assertTrue(expected_message in actual_message, actual_message)
def test_missing_lulc_value(self):
"""Forest Carbon Edge: test with missing LULC value."""
from natcap.invest import forest_carbon_edge_effect
import pandas
args = {
'aoi_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'small_aoi.shp'),
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input', 'forest_edge_carbon_lu_table.csv'),
'compute_forest_edge_effects': True,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'small_lulc.tif'),
'n_nearest_model_points': 10,
'pools_to_calculate': 'all',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
bad_biophysical_table_path = os.path.join(
self.workspace_dir, 'bad_biophysical_table.csv')
bio_df = pandas.read_csv(args['biophysical_table_path'])
bio_df = bio_df[bio_df['lucode'] != 4]
bio_df.to_csv(bad_biophysical_table_path)
bio_df = None
args['biophysical_table_path'] = bad_biophysical_table_path
with self.assertRaises(ValueError) as cm:
forest_carbon_edge_effect.execute(args)
expected_message = (
"The missing values found in the LULC raster but not the table"
" are: [4.]")
actual_message = str(cm.exception)
self.assertTrue(expected_message in actual_message, actual_message)
def test_carbon_nodata_lulc(self):
"""Forest Carbon Edge: ensure nodata lulc raster cause exception."""
from natcap.invest import forest_carbon_edge_effect
args = {
'aoi_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'small_aoi.shp'),
'biomass_to_carbon_conversion_factor': '0.47',
'biophysical_table_path': os.path.join(
REGRESSION_DATA, 'input', 'forest_edge_carbon_lu_table.csv'),
'compute_forest_edge_effects': True,
'lulc_raster_path': os.path.join(
REGRESSION_DATA, 'input', 'nodata_lulc.tif'),
'n_nearest_model_points': 10,
'pools_to_calculate': 'all',
'tropical_forest_edge_carbon_model_vector_path': os.path.join(
REGRESSION_DATA, 'input', 'core_data',
'forest_carbon_edge_regression_model_parameters.shp'),
'workspace_dir': self.workspace_dir,
'n_workers': -1
}
with self.assertRaises(ValueError) as cm:
forest_carbon_edge_effect.execute(args)
expected_message = 'The landcover raster '
actual_message = str(cm.exception)
self.assertTrue(expected_message in actual_message, actual_message)
def test_combine_carbon_maps(self):
"""Test `combine_carbon_maps`"""
from natcap.invest.forest_carbon_edge_effect import combine_carbon_maps
# note that NODATA_VALUE = -1
carbon_arr1 = numpy.array([[7, 2, -1], [0, -2, -1]])
carbon_arr2 = numpy.array([[-1, 900, -1], [1, 20, 0]])
expected_output = numpy.array([[7, 902, -1], [1, 18, 0]])
actual_output = combine_carbon_maps(carbon_arr1, carbon_arr2)
numpy.testing.assert_allclose(actual_output, expected_output)
def test_aggregate_carbon_map(self):
"""Test `_aggregate_carbon_map`"""
from natcap.invest.forest_carbon_edge_effect import \
_aggregate_carbon_map
aoi_vector_path = os.path.join(self.workspace_dir, "aoi.shp")
carbon_map_path = os.path.join(self.workspace_dir, "carbon.tif")
target_vector_path = os.path.join(self.workspace_dir,
"agg_carbon.shp")
# make data
make_simple_vector(aoi_vector_path)
carbon_array = numpy.array(([1, 2, 3], [4, 5, 6]))
make_simple_raster(carbon_map_path, carbon_array)
_aggregate_carbon_map(aoi_vector_path, carbon_map_path,
target_vector_path)
# Validate fields in the agg carbon results vector
with gdal.OpenEx(target_vector_path,
gdal.OF_VECTOR | gdal.GA_Update) as ws_ds:
ws_layer = ws_ds.GetLayer()
for field_name, expected_value in zip(['c_sum', 'c_ha_mean'],
[0.0021, 0.000336]):
actual_values = [ws_feat.GetField(field_name)
for ws_feat in ws_layer][0]
error_msg = f"Error with {field_name} in agg_carbon.shp"
self.assertEqual(actual_values, expected_value, msg=error_msg)
def test_calculate_lulc_carbon_map(self):
"""Test `_calculate_lulc_carbon_map`"""
from natcap.invest.forest_carbon_edge_effect import \
_calculate_lulc_carbon_map
# Make synthetic data
lulc_raster_path = os.path.join(self.workspace_dir, "lulc.tif")
lulc_array = numpy.array([[1, 2, 3], [3, 2, 1]], dtype=numpy.int16)
make_simple_raster(lulc_raster_path, lulc_array)
biophysical_table_path = os.path.join(self.workspace_dir,
"biophysical_table.csv")
data = {"lucode": [1, 2, 3]}
df = pandas.DataFrame(data).set_index("lucode")
df["is_tropical_forest"] = [0, 1, 0]
df["c_above"] = [100, 500, 200]
df.to_csv(biophysical_table_path)
carbon_pool_type = 'c_above'
ignore_tropical_type = False
compute_forest_edge_effects = True
carbon_map_path = os.path.join(self.workspace_dir, "output_carbon.tif")
_calculate_lulc_carbon_map(
lulc_raster_path, biophysical_table_path, carbon_pool_type,
ignore_tropical_type, compute_forest_edge_effects,
carbon_map_path)
actual_output = pygeoprocessing.raster_to_numpy_array(carbon_map_path)
expected_output = numpy.array([[100, 500, 200], [200, 500, 100]])
numpy.testing.assert_allclose(actual_output, expected_output)
def test_map_distance_from_tropical_forest_edge(self):
"""Test `_map_distance_from_tropical_forest_edge`"""
from natcap.invest.forest_carbon_edge_effect import \
_map_distance_from_tropical_forest_edge
# Make synthetic data
base_lulc_raster_path = os.path.join(self.workspace_dir, "lulc.tif")
lulc_array = numpy.array([
[2, 2, 3, 3, 3, 2, 2],
[2, 1, 1, 1, 1, 1, 2],
[3, 1, 1, 1, 1, 1, 3],
[2, 1, 1, 1, 1, 1, 2],
[2, 2, 3, 3, 3, 2, 2]
], dtype=numpy.int16)
make_simple_raster(base_lulc_raster_path, lulc_array)
biophysical_table_path = os.path.join(self.workspace_dir,
"biophysical_table.csv")
data = {"lucode": [1, 2, 3]}
df = pandas.DataFrame(data).set_index("lucode")
df["is_tropical_forest"] = [1, 0, 0]
df["c_above"] = [100, 500, 200]
df.to_csv(biophysical_table_path)
target_edge_distance_path = os.path.join(self.workspace_dir,
"edge_distance.tif")
target_mask_path = os.path.join(self.workspace_dir,
"non_forest_mask.tif")
_map_distance_from_tropical_forest_edge(
base_lulc_raster_path, biophysical_table_path,
target_edge_distance_path, target_mask_path)
# check forest mask
actual_output = pygeoprocessing.raster_to_numpy_array(target_mask_path)
expected_output = numpy.array([
[1, 1, 1, 1, 1, 1, 1],
[1, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 1],
[1, 1, 1, 1, 1, 1, 1]
], dtype=numpy.int16)
numpy.testing.assert_allclose(actual_output, expected_output)
# check edge distance map
actual_output = pygeoprocessing.raster_to_numpy_array(
target_edge_distance_path)
expected_output = numpy.array([
[0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 1, 1, 1, 0],
[0, 1, 2, 2, 2, 1, 0],
[0, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0]
], dtype=numpy.int16)
numpy.testing.assert_allclose(actual_output, expected_output)
def test_calculate_tropical_forest_edge_carbon_map(self):
"""Test `_calculate_tropical_forest_edge_carbon_map`"""
from natcap.invest.forest_carbon_edge_effect import \
_calculate_tropical_forest_edge_carbon_map
from scipy.spatial import cKDTree
edge_dist_array = numpy.array([
[0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 1, 1, 1, 0],
[0, 1, 2, 2, 2, 1, 0],
[0, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0]
], dtype=numpy.int16)
edge_distance_path = os.path.join(self.workspace_dir, "edge_dist.tif")
make_simple_raster(edge_distance_path, edge_dist_array)
spatial_index_pickle_path = os.path.join(self.workspace_dir,
"spatial_index.pkl")
def _create_spatial_index_pickle(spatial_index_pickle_path,
raster_path):
"""
Create and save a KD-tree.
This function reads the spatial extent and resolution from a raster
file, then generates a grid of sample points in geographic space
(one point every other row, all columns). It builds a KD-tree from
these points for fast spatial lookup, along with synthetic theta
and method model parameters, and saves the result as a pickle file.
Args:
spatial_index_pickle_path (str): Path to save the pickle file.
raster_path (string): Path to the raster used to extract
spatial metadata.
Return:
None
"""
# Get origin and pixel_size
raster_info = pygeoprocessing.get_raster_info(raster_path)
gt = raster_info['geotransform']#461261, 4923265
origin_x, origin_y = gt[0], gt[3]
pixel_size_x, pixel_size_y = raster_info['pixel_size']
cols, rows = raster_info['raster_size']
# only create a point every other row and every col (in raster)
row_col_pairs = [(r, c) for r in range(0, rows, 2) for c in range(cols)]
# Get spatial coordinates
points = []
for row, col in row_col_pairs:
x = origin_x + col * pixel_size_x
y = origin_y + row * pixel_size_y
points.append((y, x))
# note: row → y, col → x (so KD-tree works with (lat, lon))
theta_model_parameters = numpy.linspace(
100, 200, len(points)*3).reshape(len(points), 3) # Nx3
method_model_parameter = numpy.ones((len(points),))
# change method for one area
method_model_parameter[0] = 3
kd_tree = cKDTree(points)
# Save the data as a tuple in a pickle file
with open(spatial_index_pickle_path, 'wb') as f:
pickle.dump((kd_tree, theta_model_parameters,
method_model_parameter), f)
_create_spatial_index_pickle(spatial_index_pickle_path,
edge_distance_path)
n_nearest_model_points = 6
biomass_to_carbon_conversion_factor = 10
tropical_forest_edge_carbon_map_path = os.path.join(self.workspace_dir,
"output.tif")
_calculate_tropical_forest_edge_carbon_map(
edge_distance_path, spatial_index_pickle_path,
n_nearest_model_points, biomass_to_carbon_conversion_factor,
tropical_forest_edge_carbon_map_path)
actual_output = pygeoprocessing.raster_to_numpy_array(
tropical_forest_edge_carbon_map_path)
actual_output_sum = actual_output.sum()
# Comparing sums as 1 value in the output arrays is ~8% different
# on windows vs. mac, which seemed too high a relative tolerance to set
numpy.testing.assert_allclose(actual_output_sum, 3659.7915, rtol=0.009)
# Spot checking several values too
self.assertAlmostEqual(actual_output[1, 2], 142.47273)
self.assertAlmostEqual(actual_output[3, 5], 274.47815)
self.assertAlmostEqual(actual_output[0, 1], -1)
def test_invalid_geometries(self):
"""Forest Carbon: Check handling of invalid vector geometries."""
from natcap.invest import forest_carbon_edge_effect
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
lulc_matrix = numpy.ones((5,5), dtype=numpy.int8)
lulc_raster_path = os.path.join(self.workspace_dir, 'lulc.tif')
pygeoprocessing.numpy_array_to_raster(
lulc_matrix, 255, (1, -1), (461262, 4923269), projection_wkt,
lulc_raster_path)
fields_polys = {'geom_id': ogr.OFTInteger}
attrs_polys = [
{'geom_id': 1},
{'geom_id': 2},
{'geom_id': 3}
]
# valid polygon
valid_poly = Polygon([
(461265, 4923269), (461267, 4923269), (461267, 4923267),
(461265, 4923267), (461265, 4923269)])
self.assertTrue(shapely.is_valid(valid_poly))
# invalid bowtie polygon
invalid_bowtie_poly = Polygon([
(461262, 4923267), (461264, 4923267), (461262, 4923265),
(461264, 4923265), (461262, 4923267)])
self.assertFalse(shapely.is_valid(invalid_bowtie_poly))
# invalid bowtie polygon with vertex in the middle
invalid_alt_bowtie_poly = Polygon([
(461262, 4923267), (461264, 4923267), (461263, 4923266),
(461264, 4923265), (461262, 4923265), (461263, 4923266),
(461262, 4923267)])
self.assertFalse(shapely.is_valid(invalid_alt_bowtie_poly))
test_vector_path = os.path.join(self.workspace_dir, 'vector.gpkg')
pygeoprocessing.shapely_geometry_to_vector(
[valid_poly, invalid_bowtie_poly,
invalid_alt_bowtie_poly],
test_vector_path, projection_wkt, 'GPKG',
fields=fields_polys,
attribute_list=attrs_polys)
test_vector = gdal.OpenEx(
test_vector_path, gdal.OF_VECTOR | gdal.GA_ReadOnly)
test_layer = test_vector.GetLayer()
self.assertEqual(test_layer.GetFeatureCount(), 3)
test_layer = None
test_vector = None
target_vector_path = os.path.join(
self.workspace_dir, 'checked_geometries.gpkg')
forest_carbon_edge_effect._clip_global_regression_models_vector(
lulc_raster_path, test_vector_path, target_vector_path)
target_vector = gdal.OpenEx(target_vector_path, gdal.OF_VECTOR)
target_layer = target_vector.GetLayer()
self.assertEqual(
target_layer.GetFeatureCount(), 1)
# valid_polygon geom_id == 1
self.assertEqual(target_layer.GetFeature(0).GetField('geom_id'), 1)
target_layer = None
target_vector = None
def test_vector_clipping_buffer(self):
"""Forest Carbon: Check DISTANCE_UPPER_BOUND buffer."""
from natcap.invest import forest_carbon_edge_effect
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
lulc_matrix = numpy.ones((5,5), dtype=numpy.int8)
lulc_raster_path = os.path.join(self.workspace_dir, 'lulc.tif')
pygeoprocessing.numpy_array_to_raster(
lulc_matrix, 255, (1000, -1000), (461261, 4923270), projection_wkt,
lulc_raster_path)
# get LULC bouding box; will be used to construct polygons
raster_info = pygeoprocessing.get_raster_info(lulc_raster_path)
bbox_minx, bbox_miny, bbox_maxx, bbox_maxy = raster_info['bounding_box']
fields_polys = {'geom_id': ogr.OFTInteger}
attrs_polys = [
{'geom_id': 1},
{'geom_id': 2},
{'geom_id': 3},
{'geom_id': 4}
]
# polygon that intersects with the lulc
in_bbox_poly = shapely.geometry.box(
bbox_maxx - 150, bbox_maxy - 150,
bbox_maxx - 50, bbox_maxy - 50)
self.assertTrue(shapely.is_valid(in_bbox_poly))
# polygon outside of lulc bb, but within buffer distance
in_buffer_poly = shapely.geometry.box(
bbox_maxx + 50, bbox_maxy + 50,
bbox_maxx + 150, bbox_maxy + 150)
self.assertTrue(shapely.is_valid(in_buffer_poly))
buffer = forest_carbon_edge_effect.DISTANCE_UPPER_BOUND
# polygon on the edge of the buffer
buffer_edge_poly = shapely.geometry.box(
bbox_maxx + buffer + 50, bbox_maxy + buffer + 50,
bbox_maxx + buffer - 50, bbox_maxy + buffer - 50)
self.assertTrue(shapely.is_valid(buffer_edge_poly))
# polygon outside of buffer distance
out_of_buffer_poly = shapely.geometry.box(
bbox_maxx + buffer + 50, bbox_maxy + buffer + 50,
bbox_maxx + buffer + 150, bbox_maxy + buffer + 150)
self.assertTrue(shapely.is_valid(out_of_buffer_poly))
test_vector_path = os.path.join(self.workspace_dir, 'vector.gpkg')
pygeoprocessing.shapely_geometry_to_vector(
[in_bbox_poly, in_buffer_poly,
buffer_edge_poly, out_of_buffer_poly],
test_vector_path, projection_wkt, 'GPKG',
fields=fields_polys,
attribute_list=attrs_polys)
target_vector_path = os.path.join(
self.workspace_dir, 'checked_geometries.gpkg')
forest_carbon_edge_effect._clip_global_regression_models_vector(
lulc_raster_path, test_vector_path, target_vector_path)
vector = gdal.OpenEx(
target_vector_path, gdal.OF_VECTOR | gdal.GA_ReadOnly)
layer = vector.GetLayer()
self.assertEqual(layer.GetFeatureCount(), 3)
feat_ids = [feat.GetField('geom_id') for feat in layer]
self.assertEqual(feat_ids, [1, 2, 3])
@staticmethod
def _test_same_files(base_list_path, directory_path):
"""Assert files in `base_list_path` are in `directory_path`.
Args:
base_list_path (string): a path to a file that has one relative
file path per line.
directory_path (string): a path to a directory whose contents will
be checked against the files listed in `base_list_file`
Returns:
None
Raises:
AssertionError when there are files listed in `base_list_file`
that don't exist in the directory indicated by `path`
"""
missing_files = []
with open(base_list_path, 'r') as file_list:
for file_path in file_list:
full_path = os.path.join(directory_path, file_path.rstrip())
if full_path == '':
continue
if not os.path.isfile(full_path):
missing_files.append(full_path)
if len(missing_files) > 0:
raise AssertionError(
"The following files were expected but not found: " +
'\n'.join(missing_files))
def _assert_vector_results_close(
self, id_fieldname, field_list, result_vector_path,
expected_vector_path):
"""Test workspace state against expected aggregate results.
Args:
id_fieldname (string): fieldname of the unique ID.
field_list (list of string): list of fields to check
near-equality.
result_vector_path (string): path to the summary shapefile
produced by the Forest Carbon Edge model.
expected_vector_path (string): path to a vector that has the
same fields and values as `result_vector_path`.
Returns:
None
Raises:
AssertionError if results are not nearly equal or missing.
"""
result_vector = gdal.OpenEx(result_vector_path, gdal.OF_VECTOR)
try:
result_layer = result_vector.GetLayer()
result_lookup = {}
for feature in result_layer:
result_lookup[feature.GetField(id_fieldname)] = dict(
[(fieldname, feature.GetField(fieldname))
for fieldname in field_list])
expected_vector = gdal.OpenEx(
expected_vector_path, gdal.OF_VECTOR)
expected_layer = expected_vector.GetLayer()
expected_lookup = {}
for feature in expected_layer:
expected_lookup[feature.GetField(id_fieldname)] = dict(
[(fieldname, feature.GetField(fieldname))
for fieldname in field_list])
self.assertEqual(len(result_lookup), len(expected_lookup))
not_close_values_list = []
for feature_id in result_lookup:
for fieldname in field_list:
result = result_lookup[feature_id][fieldname]
expected_result = expected_lookup[feature_id][fieldname]
if not numpy.isclose(result, expected_result):
not_close_values_list.append(
'id: %d, %s: %f (actual) vs %f (expected)' % (
feature_id, fieldname, result,
expected_result))
if not_close_values_list:
raise AssertionError(
'Values do not match: %s' % not_close_values_list)
finally:
result_layer = None
if result_vector:
gdal.Dataset.__swig_destroy__(result_vector)
result_vector = None
class ForestCarbonEdgeValidationTests(unittest.TestCase):
"""Tests for the Forest Carbon Model MODEL_SPEC and validation."""
def setUp(self):
"""Create a temporary workspace."""
self.workspace_dir = tempfile.mkdtemp()
self.base_required_keys = [
'workspace_dir',
'biophysical_table_path',
'lulc_raster_path',
'pools_to_calculate',
'compute_forest_edge_effects',
]
def tearDown(self):
"""Remove the temporary workspace after a test."""
shutil.rmtree(self.workspace_dir)
def test_missing_keys(self):
"""Forest Carbon Validate: assert missing required keys."""
from natcap.invest import forest_carbon_edge_effect
from natcap.invest import validation
# empty args dict.
validation_errors = forest_carbon_edge_effect.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(self.base_required_keys)
self.assertEqual(invalid_keys, expected_missing_keys)
def test_missing_keys_for_edge_effects(self):
"""Forest Carbon Validate: assert missing required for edge effects."""
from natcap.invest import forest_carbon_edge_effect
from natcap.invest import validation
args = {'compute_forest_edge_effects': True}
validation_errors = forest_carbon_edge_effect.validate(args)
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(
self.base_required_keys +
['n_nearest_model_points',
'tropical_forest_edge_carbon_model_vector_path',
'biomass_to_carbon_conversion_factor'])
expected_missing_keys.difference_update(
{'compute_forest_edge_effects'})
self.assertEqual(invalid_keys, expected_missing_keys)