Merge pull request #1778 from natcap/main

auto merge main into release/*
This commit is contained in:
Claire Simpson 2025-02-10 12:11:34 -08:00 committed by GitHub
commit 20e98f376e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 871 additions and 3 deletions

View File

@ -5,15 +5,49 @@ import tempfile
import unittest
import numpy
from shapely.geometry import Polygon
import pandas
import pygeoprocessing
from osgeo import gdal
from osgeo import gdal, ogr, osr
REGRESSION_DATA = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data', 'annual_water_yield')
SAMPLE_DATA = os.path.join(REGRESSION_DATA, 'input')
gdal.UseExceptions()
def make_watershed_vector(path_to_shp):
"""
Generate watershed results shapefile with two polygons
Args:
path_to_shp (str): path to store watershed results vector
Outputs:
None
"""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"hp_energy": ogr.OFTReal, "hp_val": ogr.OFTReal,
"ws_id": ogr.OFTReal, "rsupply_vl": ogr.OFTReal,
"wyield_mn": ogr.OFTReal, "wyield_vol": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [
{"hp_energy": 1, "hp_val": 1, "ws_id": 0, "rsupply_vl": 2},
{"hp_energy": 11, "hp_val": 3, "ws_id": 1, "rsupply_vl": 52}
]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)
class AnnualWaterYieldTests(unittest.TestCase):
"""Regression Tests for Annual Water Yield Model."""
@ -367,7 +401,6 @@ class AnnualWaterYieldTests(unittest.TestCase):
self.assertTrue(
'but are not found in the valuation table' in
actual_message, actual_message)
# if the demand table is missing but the valuation table is present,
# make sure we have a validation error.
args_missing_demand_table = args.copy()
@ -380,3 +413,112 @@ class AnnualWaterYieldTests(unittest.TestCase):
self.assertEqual(
validation_warnings[0],
(['demand_table_path'], 'Input is required but has no value'))
def test_fractp_op(self):
"""Test `fractp_op`"""
from natcap.invest.annual_water_yield import fractp_op
# generate fake data
kc = numpy.array([[1, .1, .1], [.6, .6, .1]])
eto = numpy.array([[1000, 900, 900], [1100, 1005, 1000]])
precip = numpy.array([[100, 1000, 10], [500, 800, 1100]])
root = numpy.array([[99, 300, 400], [5, 500, 800]])
soil = numpy.array([[600, 700, 700], [800, 900, 600]])
pawc = numpy.array([[.11, .11, .12], [.55, .55, .19]])
veg = numpy.array([[1, 1, 0], [0, 1, 0]])
nodata_dict = {'eto': None, 'precip': None, 'depth_root': None,
'pawc': None, 'out_nodata': None}
seasonality_constant = 6
actual_fractp = fractp_op(kc, eto, precip, root, soil, pawc, veg,
nodata_dict, seasonality_constant)
# generated by running fractp_op
expected_fractp = numpy.array([[0.9345682, 0.06896508, 1.],
[1., 0.6487423, 0.09090909]],
dtype=numpy.float32)
numpy.testing.assert_allclose(actual_fractp, expected_fractp,
err_msg="Fractp does not match expected")
def test_compute_watershed_valuation(self):
"""Test `compute_watershed_valuation`, `compute_rsupply_volume`
and `compute_water_yield_volume`"""
from natcap.invest import annual_water_yield
def _create_watershed_results_vector(path_to_shp):
"""Generate a fake watershed results vector file."""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"ws_id": ogr.OFTReal, "wyield_mn": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [{"ws_id": 0, "wyield_mn": 990000,
"consum_mn": 500, "consum_vol": 50},
{"ws_id": 1, "wyield_mn": 800000,
"consum_mn": 600, "consum_vol": 70}]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp,
projection_wkt,
vector_format, fields,
attribute_list)
def _validate_fields(vector_path, field_name, expected_values, error_msg):
"""
Validate a specific field in the watershed results vector
by comparing actual to expected values. Expected values generated
by running the function.
Args:
vector path (str): path to watershed shapefile
field_name (str): attribute field to check
expected values (list): list of expected values for field
error_msg (str): what to print if assertion fails
Returns:
None
"""
with gdal.OpenEx(vector_path, gdal.OF_VECTOR | gdal.GA_Update) as ws_ds:
ws_layer = ws_ds.GetLayer()
actual_values = [ws_feat.GetField(field_name)
for ws_feat in ws_layer]
self.assertEqual(actual_values, expected_values, msg=error_msg)
# generate fake watershed results vector
watershed_results_vector_path = os.path.join(self.workspace_dir,
"watershed_results.shp")
_create_watershed_results_vector(watershed_results_vector_path)
# generate fake val_df
val_df = pandas.DataFrame({'efficiency': [.7, .8], 'height': [12, 50],
'fraction': [.9, .7], 'discount': [60, 20],
'time_span': [10, 10], 'cost': [100, 200],
'kw_price': [15, 20]})
# test water yield volume
annual_water_yield.compute_water_yield_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "wyield_vol",
[990.0, 800.0],
"Error with water yield volume calculation.")
# test rsupply volume
annual_water_yield.compute_rsupply_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "rsupply_vl",
[940.0, 730.0],
"Error calculating total realized water supply volume.")
# test compute watershed valuation
annual_water_yield.compute_watershed_valuation(
watershed_results_vector_path, val_df)
_validate_fields(watershed_results_vector_path, "hp_energy",
[19.329408, 55.5968],
"Error calculating energy.")
_validate_fields(watershed_results_vector_path, "hp_val",
[501.9029748723, 4587.91946857059],
"Error calculating net present value.")

View File

@ -268,6 +268,69 @@ class CarbonTests(unittest.TestCase):
assert_raster_equal_value(
os.path.join(args['workspace_dir'], 'npv_redd.tif'), -0.4602106)
def test_generate_carbon_map(self):
"""Test `_generate_carbon_map`"""
from natcap.invest.carbon import _generate_carbon_map
def _make_simple_lulc_raster(base_raster_path):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
array = numpy.array([[1, 1], [2, 3]], dtype=numpy.int32)
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461251, 4923245)
pixel_size = (1, 1)
no_data = -999
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
# generate a fake lulc raster
lulc_path = os.path.join(self.workspace_dir, "lulc.tif")
_make_simple_lulc_raster(lulc_path)
# make fake carbon pool dict
carbon_pool_by_type = {1: 5000, 2: 60, 3: 120}
out_carbon_stock_path = os.path.join(self.workspace_dir,
"carbon_stock.tif")
_generate_carbon_map(lulc_path, carbon_pool_by_type,
out_carbon_stock_path)
# open output carbon stock raster and check values
actual_carbon_stock = gdal.Open(out_carbon_stock_path)
band = actual_carbon_stock.GetRasterBand(1)
actual_carbon_stock = band.ReadAsArray()
expected_carbon_stock = numpy.array([[0.5, 0.5], [0.006, 0.012]],
dtype=numpy.float32)
numpy.testing.assert_array_equal(actual_carbon_stock,
expected_carbon_stock)
def test_calculate_valuation_constant(self):
"""Test `_calculate_valuation_constant`"""
from natcap.invest.carbon import _calculate_valuation_constant
valuation_constant = _calculate_valuation_constant(lulc_cur_year=2010,
lulc_fut_year=2012,
discount_rate=50,
rate_change=5,
price_per_metric_ton_of_c=50)
expected_valuation = 40.87302
self.assertEqual(round(valuation_constant, 5), expected_valuation)
class CarbonValidationTests(unittest.TestCase):
"""Tests for the Carbon Model MODEL_SPEC and validation."""

View File

@ -11,6 +11,7 @@ import unittest
import numpy
import pandas
from scipy.sparse import dok_matrix
import pygeoprocessing
from natcap.invest import utils
from natcap.invest import validation
@ -24,6 +25,26 @@ REGRESSION_DATA = os.path.join(
LOGGER = logging.getLogger(__name__)
def make_raster_from_array(base_raster_path, array):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461261, 4923265)
pixel_size = (1, 1)
no_data = -1
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
class TestPreprocessor(unittest.TestCase):
"""Test Coastal Blue Carbon preprocessor functions."""
@ -1060,3 +1081,149 @@ class TestCBC2(unittest.TestCase):
[(['analysis_year'],
coastal_blue_carbon.INVALID_ANALYSIS_YEAR_MSG.format(
analysis_year=2000, latest_year=2000))])
def test_calculate_npv(self):
"""Test `_calculate_npv`"""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
# make fake data
net_sequestration_rasters = {
2010: os.path.join(self.workspace_dir, "carbon_seq_2010.tif"),
2011: os.path.join(self.workspace_dir, "carbon_seq_2011.tif"),
2012: os.path.join(self.workspace_dir, "carbon_seq_2012.tif")
}
for year, path in net_sequestration_rasters.items():
array = numpy.array([[year*.5, year*.25], [year-1, 50]]) # random array
make_raster_from_array(path, array)
prices_by_year = {
2010: 50,
2011: 80,
2012: 95
}
discount_rate = 0.1
baseline_year = 2010
target_raster_years_and_paths = {
2010: os.path.join(self.workspace_dir, "tgt_carbon_seq_2010.tif"),
2011: os.path.join(self.workspace_dir, "tgt_carbon_seq_2011.tif"),
2012: os.path.join(self.workspace_dir, "tgt_carbon_seq_2012.tif")
}
coastal_blue_carbon._calculate_npv(net_sequestration_rasters,
prices_by_year, discount_rate,
baseline_year,
target_raster_years_and_paths)
# read in the created target rasters
actual_2011 = gdal.Open(target_raster_years_and_paths[2011])
band = actual_2011.GetRasterBand(1)
actual_2011 = band.ReadAsArray()
actual_2012 = gdal.Open(target_raster_years_and_paths[2012])
band = actual_2012.GetRasterBand(1)
actual_2012 = band.ReadAsArray()
# compare actual rasters to expected (based on running `_calculate_npv`)
expected_2011 = numpy.array([[100525, 50262.5], [200950, 5000]])
expected_2012 = numpy.array([[370206.818182, 185103.409091],
[740045.454545, 18409.090909]])
numpy.testing.assert_allclose(actual_2011, expected_2011)
numpy.testing.assert_allclose(actual_2012, expected_2012)
def test_calculate_accumulation_over_time(self):
"""Test `_calculate_accumulation_over_time`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_calculate_accumulation_over_time
# generate fake data with nodata values
nodata = float(numpy.finfo(numpy.float32).min)
annual_biomass_matrix = numpy.array([[1, 2], [3, nodata]])
annual_soil_matrix = numpy.array([[11, 12], [13, 14]])
annual_litter_matrix = numpy.array([[.5, .9], [4, .9]])
n_years = 3
actual_accumulation = _calculate_accumulation_over_time(
annual_biomass_matrix, annual_soil_matrix, annual_litter_matrix,
n_years)
expected_accumulation = numpy.array([[37.5, 44.7], [60, nodata]])
numpy.testing.assert_allclose(actual_accumulation, expected_accumulation)
def test_calculate_net_sequestration(self):
"""test `_calculate_net_sequestration`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_calculate_net_sequestration
# make fake rasters that contain nodata pixels (-1)
accumulation_raster_path = os.path.join(self.workspace_dir,
"accumulation_raster.tif")
accumulation_array = numpy.array([[40, -1], [70, -1]])
make_raster_from_array(accumulation_raster_path, accumulation_array)
emissions_raster_path = os.path.join(self.workspace_dir,
"emissions_raster.tif")
emissions_array = numpy.array([[-1, 8], [7, -1]])
make_raster_from_array(emissions_raster_path, emissions_array)
target_raster_path = os.path.join(self.workspace_dir,
"target_raster.tif")
# run `_calculate_net_sequestration`
_calculate_net_sequestration(accumulation_raster_path,
emissions_raster_path, target_raster_path)
# compare actual to expected output net sequestration raster
actual_sequestration = gdal.Open(target_raster_path)
band = actual_sequestration.GetRasterBand(1)
actual_sequestration = band.ReadAsArray()
# calculated by running `_calculate_net_sequestration`
nodata = float(numpy.finfo(numpy.float32).min)
expected_sequestration = numpy.array([[40, -8], [-7, nodata]])
numpy.testing.assert_allclose(actual_sequestration,
expected_sequestration)
def test_reclassify_accumulation_transition(self):
"""Test `_reclassify_accumulation_transition`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_reclassify_accumulation_transition, _reclassify_disturbance_magnitude
# make fake raster data
landuse_transition_from_raster = os.path.join(self.workspace_dir,
"landuse_transition_from.tif")
landuse_transition_from_array = numpy.array([[1, 2], [3, 2]])
make_raster_from_array(landuse_transition_from_raster,
landuse_transition_from_array)
landuse_transition_to_raster = os.path.join(self.workspace_dir,
"landuse_transition_to.tif")
landuse_transition_to_array = numpy.array([[1, 1], [2, 3]])
make_raster_from_array(landuse_transition_to_raster,
landuse_transition_to_array)
#make fake accumulation_rate_matrix
accumulation_rate_matrix = dok_matrix((4, 4), dtype=numpy.float32)
accumulation_rate_matrix[1, 2] = 0.5 # Forest -> Grassland
accumulation_rate_matrix[1, 3] = 0.3 # Forest -> Agriculture
accumulation_rate_matrix[2, 1] = 0.2 # Grassland -> Forest
accumulation_rate_matrix[2, 3] = 0.4 # Grassland -> Agriculture
accumulation_rate_matrix[3, 1] = 0.1 # Agriculture -> Forest
accumulation_rate_matrix[3, 2] = 0.3 # Agriculture -> Grassland
target_raster_path = os.path.join(self.workspace_dir, "output.tif")
_reclassify_accumulation_transition(
landuse_transition_from_raster, landuse_transition_to_raster,
accumulation_rate_matrix, target_raster_path)
# compare actual and expected target_raster
actual_accumulation = gdal.Open(target_raster_path)
band = actual_accumulation.GetRasterBand(1)
actual_accumulation = band.ReadAsArray()
expected_accumulation = numpy.array([[0, .2], [.3, .4]])
numpy.testing.assert_allclose(actual_accumulation, expected_accumulation)

View File

@ -8,6 +8,7 @@ import unittest
import numpy.testing
import pandas.testing
import pandas
import pygeoprocessing
import shapely.wkb
import taskgraph
@ -1553,6 +1554,100 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Polygon has 4 sides on exterior, 3 on interior, expect 7 lines
self.assertTrue(len(line_list) == 7)
def test_assemble_results_and_calculate_exposure(self):
"""Test that assemble_results_and_calculate_exposure correctly
calculates exposure"""
from natcap.invest.coastal_vulnerability import \
assemble_results_and_calculate_exposure
def _make_shore_points_vector(shore_points_path):
# create 4 points, each with a unique 'shore_id' in [0..3].
shore_geometries = [Point(0, 0), Point(1, 0), Point(2, 1), Point(3, 2)]
shore_fields = {'shore_id': ogr.OFTInteger}
shore_attributes = [{'shore_id': i} for i in range(len(shore_geometries))]
# Create a spatial reference (projected or geographic)
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # e.g. "NAD83 / UTM zone 10N"
pygeoprocessing.shapely_geometry_to_vector(
shore_geometries, shore_points_path, srs.ExportToWkt(),
vector_format='GPKG',
fields=shore_fields,
attribute_list=shore_attributes,
ogr_geom_type=ogr.wkbPoint
)
def _make_habitat_csv(habitat_csv_path):
# Example: one habitat column named 'kelp', plus 'R_hab'
# We have 4 shore IDs, so we add 4 rows. Values are arbitrary.
habitat_df = pandas.DataFrame(
{'shore_id': [0, 1, 2, 3], 'kelp': [5, 3, 5, 4],
'seagrass': [4, 1, 2, 4], 'R_hab': [5, 2, 5, 3]})
habitat_df.to_csv(habitat_csv_path, index=False)
def _make_risk_id_path_list():
# Create pickles for risk data
relief_pkl = os.path.join(self.workspace_dir, 'relief.pickle')
slr_pkl = os.path.join(self.workspace_dir, 'slr.pickle')
population_pkl = os.path.join(self.workspace_dir, 'population.pickle')
relief_data = {0: 10.0, 1: 50.0, 2: 30.0, 3: 80.0} # arbitrary data
slr_data = {0: 0.1, 1: 0.2, 2: 0.9, 3: 0.5}
population_data = {0: 123.0, 1: 999.0, 2: 55.0, 3: 0.0}
for file_path, data_dict in zip([relief_pkl, slr_pkl, population_pkl],
[relief_data, slr_data, population_data]):
with open(file_path, 'wb') as f:
pickle.dump(data_dict, f)
risk_id_path_list = [
(relief_pkl, True, "R_relief"), # "True" => bin to 1..5
(slr_pkl, True, "R_slr"),
(population_pkl, False, "population")
]
return risk_id_path_list
shore_points_path = os.path.join(self.workspace_dir, "shore_points.gpkg")
_make_shore_points_vector(shore_points_path)
habitat_csv_path = os.path.join(self.workspace_dir, 'habitat_protection.csv')
_make_habitat_csv(habitat_csv_path)
risk_id_path_list = _make_risk_id_path_list()
intermediate_vector_path = os.path.join(self.workspace_dir,
'intermediate_exposure.gpkg')
intermediate_csv_path = os.path.join(self.workspace_dir,
'intermediate_exposure.csv')
output_vector_path = os.path.join(self.workspace_dir,
'coastal_exposure.gpkg')
output_csv_path = os.path.join(self.workspace_dir,
'coastal_exposure.csv')
# call function
assemble_results_and_calculate_exposure(
risk_id_path_list,
habitat_csv_path,
shore_points_path,
intermediate_vector_path,
intermediate_csv_path,
output_vector_path,
output_csv_path
)
# read field values in output vector and compare
actual_df = pandas.read_csv(
output_csv_path,
usecols=["exposure", "habitat_role", "exposure_no_habitats"])
expected_df = pandas.DataFrame({
"exposure": [2.924018, 2.0, 4.641589, 2.289428],
"habitat_role": [0, 0.714418, 0, 0.424989],
"exposure_no_habitats": [2.924018, 2.714418, 4.641589, 2.714418]})
pandas.testing.assert_frame_equal(
actual_df, expected_df, check_dtype=False)
def assert_pickled_arrays_almost_equal(
actual_values_pickle_path, expected_values_json_path):

View File

@ -5,9 +5,10 @@ import shutil
import os
import numpy
from osgeo import gdal
from osgeo import gdal, ogr, osr
import pandas
import pygeoprocessing
from shapely.geometry import Polygon
gdal.UseExceptions()
MODEL_DATA_PATH = os.path.join(
@ -21,6 +22,108 @@ TEST_DATA_PATH = os.path.join(
'crop_production_model')
def make_aggregate_vector(path_to_shp):
"""
Generate shapefile with two overlapping polygons
Args:
path_to_shp (str): path to store watershed results vector
Outputs:
None
"""
# (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)
shapely_geometry_list = [
Polygon([(461151, 4923265-50), (461261+50, 4923265-50),
(461261+50, 4923265), (461151, 4923265)]),
Polygon([(461261, 4923265-35), (461261+60, 4923265-35),
(461261+60, 4923265+50), (461261, 4923265+50)])
]
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
vector_format = "ESRI Shapefile"
fields = {"id": ogr.OFTReal}
attribute_list = [
{"id": 0},
{"id": 1},
]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)
def make_simple_raster(base_raster_path, array):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461251, 4923245)
pixel_size = (30, 30)
no_data = -1
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
def create_nutrient_df():
"""Creates a nutrient DataFrame for testing."""
return pandas.DataFrame([
{'crop': 'corn', 'area (ha)': 21.0, 'production_observed': 0.2,
'percentrefuse': 7, 'protein': 42., 'lipid': 8, 'energy': 476.,
'ca': 27.0, 'fe': 15.7, 'mg': 280.0, 'ph': 704.0, 'k': 1727.0,
'na': 2.0, 'zn': 4.9, 'cu': 1.9, 'fl': 8, 'mn': 2.9, 'se': 0.1,
'vita': 3.0, 'betac': 16.0, 'alphac': 2.30, 'vite': 0.8,
'crypto': 1.6, 'lycopene': 0.36, 'lutein': 63.0, 'betat': 0.5,
'gammat': 2.1, 'deltat': 1.9, 'vitc': 6.8, 'thiamin': 0.4,
'riboflavin': 1.8, 'niacin': 8.2, 'pantothenic': 0.9,
'vitb6': 1.4, 'folate': 385.0, 'vitb12': 2.0, 'vitk': 41.0},
{'crop': 'soybean', 'area (ha)': 5., 'production_observed': 4.,
'percentrefuse': 9, 'protein': 33., 'lipid': 2., 'energy': 99.,
'ca': 257., 'fe': 15.7, 'mg': 280., 'ph': 704.0, 'k': 197.0,
'na': 2., 'zn': 4.9, 'cu': 1.6, 'fl': 3., 'mn': 5.2, 'se': 0.3,
'vita': 3.0, 'betac': 16.0, 'alphac': 1.0, 'vite': 0.8,
'crypto': 0.6, 'lycopene': 0.3, 'lutein': 61.0, 'betat': 0.5,
'gammat': 2.3, 'deltat': 1.2, 'vitc': 3.0, 'thiamin': 0.42,
'riboflavin': 0.82, 'niacin': 12.2, 'pantothenic': 0.92,
'vitb6': 5.4, 'folate': 305., 'vitb12': 3., 'vitk': 42.},
]).set_index('crop')
def _create_crop_rasters(output_dir, crop_names, file_suffix):
"""Creates raster files for test setup."""
_OBSERVED_PRODUCTION_FILE_PATTERN = os.path.join(
'.', '%s_observed_production%s.tif')
_CROP_PRODUCTION_FILE_PATTERN = os.path.join(
'.', '%s_regression_production%s.tif')
for i, crop in enumerate(crop_names):
observed_yield_path = os.path.join(
output_dir,
_OBSERVED_PRODUCTION_FILE_PATTERN % (crop, file_suffix))
crop_production_raster_path = os.path.join(
output_dir,
_CROP_PRODUCTION_FILE_PATTERN % (crop, file_suffix))
# Create arbitrary raster arrays
observed_array = numpy.array([[4, i], [i*3, 4]], dtype=numpy.int16)
crop_array = numpy.array([[i, 1], [i*2, 3]], dtype=numpy.int16)
make_simple_raster(observed_yield_path, observed_array)
make_simple_raster(crop_production_raster_path, crop_array)
class CropProductionTests(unittest.TestCase):
"""Tests for the Crop Production model."""
@ -390,6 +493,304 @@ class CropProductionTests(unittest.TestCase):
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
def test_x_yield_op(self):
"""Test `_x_yield_op"""
from natcap.invest.crop_production_regression import _x_yield_op
# make fake data
y_max = numpy.array([[-1, 3, 2], [4, 5, 3]])
b_x = numpy.array([[4, 3, 2], [2, 0, 3]])
c_x = numpy.array([[4, 1, 2], [3, 0, 3]])
lulc_array = numpy.array([[3, 3, 2], [3, -1, 3]])
fert_rate = 0.6
crop_lucode = 3
pixel_area_ha = 10
actual_result = _x_yield_op(y_max, b_x, c_x, lulc_array, fert_rate,
crop_lucode, pixel_area_ha)
expected_result = numpy.array([[-1, -19.393047, -1],
[26.776089, -1, 15.1231]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_zero_observed_yield_op(self):
"""Test `_zero_observed_yield_op`"""
from natcap.invest.crop_production_regression import \
_zero_observed_yield_op
# make fake data
observed_yield_array = numpy.array([[0, 1, -1], [5, 6, -1]])
observed_yield_nodata = -1
actual_result = _zero_observed_yield_op(observed_yield_array,
observed_yield_nodata)
expected_result = numpy.array([[0, 1, 0], [5, 6, 0]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_mask_observed_yield_op(self):
"""Test `_mask_observed_yield_op`"""
from natcap.invest.crop_production_regression import \
_mask_observed_yield_op
# make fake data
lulc_array = numpy.array([[3, 5, -9999], [3, 3, -1]])
observed_yield_array = numpy.array([[-1, 5, 4], [8, -9999, 91]])
observed_yield_nodata = -1
# note: this observed_yield_nodata value becomes the nodata value in
# the output array but the values in the observed_yield_array with
# this value are NOT treated as no data within this function
landcover_nodata = -9999
crop_lucode = 3
pixel_area_ha = 10
actual_result = _mask_observed_yield_op(
lulc_array, observed_yield_array, observed_yield_nodata,
landcover_nodata, crop_lucode, pixel_area_ha)
expected_result = numpy.array([[-10, 0, -1], [80, -99990, 0]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_tabulate_regression_results(self):
"""Test `tabulate_regression_results`"""
from natcap.invest.crop_production_regression import \
tabulate_regression_results
def _create_expected_results():
"""Creates the expected results DataFrame."""
return pandas.DataFrame([
{'crop': 'corn', 'area (ha)': 20.0,
'production_observed': 8.0, 'production_modeled': 4.0,
'protein_modeled': 1562400.0, 'protein_observed': 3124800.0,
'lipid_modeled': 297600.0, 'lipid_observed': 595200.0,
'energy_modeled': 17707200.0, 'energy_observed': 35414400.0,
'ca_modeled': 1004400.0, 'ca_observed': 2008800.0,
'fe_modeled': 584040.0, 'fe_observed': 1168080.0,
'mg_modeled': 10416000.0, 'mg_observed': 20832000.0,
'ph_modeled': 26188800.0, 'ph_observed': 52377600.0,
'k_modeled': 64244400.0, 'k_observed': 128488800.0,
'na_modeled': 74400.0, 'na_observed': 148800.0,
'zn_modeled': 182280.0, 'zn_observed': 364560.0,
'cu_modeled': 70680.0, 'cu_observed': 141360.0,
'fl_modeled': 297600.0, 'fl_observed': 595200.0,
'mn_modeled': 107880.0, 'mn_observed': 215760.0,
'se_modeled': 3720.0, 'se_observed': 7440.0,
'vita_modeled': 111600.0, 'vita_observed': 223200.0,
'betac_modeled': 595200.0, 'betac_observed': 1190400.0,
'alphac_modeled': 85560.0, 'alphac_observed': 171120.0,
'vite_modeled': 29760.0, 'vite_observed': 59520.0,
'crypto_modeled': 59520.0, 'crypto_observed': 119040.0,
'lycopene_modeled': 13392.0, 'lycopene_observed': 26784.0,
'lutein_modeled': 2343600.0, 'lutein_observed': 4687200.0,
'betat_modeled': 18600.0, 'betat_observed': 37200.0,
'gammat_modeled': 78120.0, 'gammat_observed': 156240.0,
'deltat_modeled': 70680.0, 'deltat_observed': 141360.0,
'vitc_modeled': 252960.0, 'vitc_observed': 505920.0,
'thiamin_modeled': 14880.0, 'thiamin_observed': 29760.0,
'riboflavin_modeled': 66960.0, 'riboflavin_observed': 133920.0,
'niacin_modeled': 305040.0, 'niacin_observed': 610080.0,
'pantothenic_modeled': 33480.0, 'pantothenic_observed': 66960.0,
'vitb6_modeled': 52080.0, 'vitb6_observed': 104160.0,
'folate_modeled': 14322000.0, 'folate_observed': 28644000.0,
'vitb12_modeled': 74400.0, 'vitb12_observed': 148800.0,
'vitk_modeled': 1525200.0, 'vitk_observed': 3050400.0},
{'crop': 'soybean', 'area (ha)': 40.0,
'production_observed': 12.0, 'production_modeled': 7.0,
'protein_modeled': 2102100.0, 'protein_observed': 3603600.0,
'lipid_modeled': 127400.0, 'lipid_observed': 218400.0,
'energy_modeled': 6306300.0, 'energy_observed': 10810800.0,
'ca_modeled': 16370900.0, 'ca_observed': 28064400.0,
'fe_modeled': 1000090.0, 'fe_observed': 1714440.0,
'mg_modeled': 17836000.0, 'mg_observed': 30576000.0,
'ph_modeled': 44844800.0, 'ph_observed': 76876800.0,
'k_modeled': 12548900.0, 'k_observed': 21512400.0,
'na_modeled': 127400.0, 'na_observed': 218400.0,
'zn_modeled': 312130.0, 'zn_observed': 535080.0,
'cu_modeled': 101920.0, 'cu_observed': 174720.0,
'fl_modeled': 191100.0, 'fl_observed': 327600.0,
'mn_modeled': 331240.0, 'mn_observed': 567840.0,
'se_modeled': 19110.0, 'se_observed': 32760.0,
'vita_modeled': 191100.0, 'vita_observed': 327600.0,
'betac_modeled': 1019200.0, 'betac_observed': 1747200.0,
'alphac_modeled': 63700.0, 'alphac_observed': 109200.0,
'vite_modeled': 50960.0, 'vite_observed': 87360.0,
'crypto_modeled': 38220.0, 'crypto_observed': 65520.0,
'lycopene_modeled': 19110.0, 'lycopene_observed': 32760.0,
'lutein_modeled': 3885700.0, 'lutein_observed': 6661200.0,
'betat_modeled': 31850.0, 'betat_observed': 54600.0,
'gammat_modeled': 146510.0, 'gammat_observed': 251160.0,
'deltat_modeled': 76440.0, 'deltat_observed': 131040.0,
'vitc_modeled': 191100.0, 'vitc_observed': 327600.0,
'thiamin_modeled': 26754.0, 'thiamin_observed': 45864.0,
'riboflavin_modeled': 52234.0, 'riboflavin_observed': 89544.0,
'niacin_modeled': 777140.0, 'niacin_observed': 1332240.0,
'pantothenic_modeled': 58604.0, 'pantothenic_observed': 100464.0,
'vitb6_modeled': 343980.0, 'vitb6_observed': 589680.0,
'folate_modeled': 19428500.0, 'folate_observed': 33306000.0,
'vitb12_modeled': 191100.0, 'vitb12_observed': 327600.0,
'vitk_modeled': 2675400.0, 'vitk_observed': 4586400.0}])
nutrient_df = create_nutrient_df()
pixel_area_ha = 10
workspace_dir = self.workspace_dir
output_dir = os.path.join(workspace_dir, "OUTPUT")
os.makedirs(output_dir, exist_ok=True)
landcover_raster_path = os.path.join(workspace_dir, "landcover.tif")
landcover_nodata = -1
make_simple_raster(landcover_raster_path,
numpy.array([[2, 1], [2, 3]], dtype=numpy.int16))
file_suffix = "v1"
target_table_path = os.path.join(workspace_dir, "output_table.csv")
crop_names = ["corn", "soybean"]
_create_crop_rasters(output_dir, crop_names, file_suffix)
tabulate_regression_results(
nutrient_df, crop_names, pixel_area_ha,
landcover_raster_path, landcover_nodata,
output_dir, file_suffix, target_table_path
)
# Read only the first 2 crop's data (skipping total area)
actual_result_table = pandas.read_csv(target_table_path, nrows=2,
header=0)
expected_result_table = _create_expected_results()
# Compare expected vs actual
pandas.testing.assert_frame_equal(actual_result_table,
expected_result_table)
def test_aggregate_regression_results_to_polygons(self):
"""Test `aggregate_regression_results_to_polygons`"""
from natcap.invest.crop_production_regression import \
aggregate_regression_results_to_polygons
def _create_expected_agg_table():
"""Create expected output results"""
# Define the new values manually
return pandas.DataFrame([
{"FID": 0, "corn_modeled": 1, "corn_observed": 4,
"soybean_modeled": 2, "soybean_observed": 5,
"protein_modeled": 991200, "protein_observed": 3063900,
"lipid_modeled": 110800, "lipid_observed": 388600,
"energy_modeled": 6228600, "energy_observed": 22211700,
"ca_modeled": 4928500, "ca_observed": 12697900,
"fe_modeled": 431750, "fe_observed": 1298390,
"mg_modeled": 7700000, "mg_observed": 23156000,
"ph_modeled": 19360000, "ph_observed": 58220800,
"k_modeled": 19646500, "k_observed": 73207900,
"na_modeled": 55000, "na_observed": 165400,
"zn_modeled": 134750, "zn_observed": 405230,
"cu_modeled": 46790, "cu_observed": 143480,
"fl_modeled": 129000, "fl_observed": 434100,
"mn_modeled": 121610, "mn_observed": 344480,
"se_modeled": 6390, "se_observed": 17370,
"vita_modeled": 82500, "vita_observed": 248100,
"betac_modeled": 440000, "betac_observed": 1323200,
"alphac_modeled": 39590, "alphac_observed": 131060,
"vite_modeled": 22000, "vite_observed": 66160,
"crypto_modeled": 25800, "crypto_observed": 86820,
"lycopene_modeled": 8808, "lycopene_observed": 27042,
"lutein_modeled": 1696100, "lutein_observed": 5119100,
"betat_modeled": 13750, "betat_observed": 41350,
"gammat_modeled": 61390, "gammat_observed": 182770,
"deltat_modeled": 39510, "deltat_observed": 125280,
"vitc_modeled": 117840, "vitc_observed": 389460,
"thiamin_modeled": 11364, "thiamin_observed": 33990,
"riboflavin_modeled": 31664, "riboflavin_observed": 104270,
"niacin_modeled": 298300, "niacin_observed": 860140,
"pantothenic_modeled": 25114, "pantothenic_observed": 75340,
"vitb6_modeled": 111300, "vitb6_observed": 297780,
"folate_modeled": 9131500, "folate_observed": 28199500,
"vitb12_modeled": 73200, "vitb12_observed": 210900,
"vitk_modeled": 1145700, "vitk_observed": 3436200},
{"FID": 1, "corn_modeled": 4, "corn_observed": 8,
"soybean_modeled": 7, "soybean_observed": 12,
"protein_modeled": 3664500, "protein_observed": 6728400,
"lipid_modeled": 425000, "lipid_observed": 813600,
"energy_modeled": 24013500, "energy_observed": 46225200,
"ca_modeled": 17375300, "ca_observed": 30073200,
"fe_modeled": 1584130, "fe_observed": 2882520,
"mg_modeled": 28252000, "mg_observed": 51408000,
"ph_modeled": 71033600, "ph_observed": 129254400,
"k_modeled": 76793300, "k_observed": 150001200,
"na_modeled": 201800, "na_observed": 367200,
"zn_modeled": 494410, "zn_observed": 899640,
"cu_modeled": 172600, "cu_observed": 316080,
"fl_modeled": 488700, "fl_observed": 922800,
"mn_modeled": 439120, "mn_observed": 783600,
"se_modeled": 22830, "se_observed": 40200,
"vita_modeled": 302700, "vita_observed": 550800,
"betac_modeled": 1614400, "betac_observed": 2937600,
"alphac_modeled": 149260, "alphac_observed": 280320,
"vite_modeled": 80720, "vite_observed": 146880,
"crypto_modeled": 97740, "crypto_observed": 184560,
"lycopene_modeled": 32502, "lycopene_observed": 59544,
"lutein_modeled": 6229300, "lutein_observed": 11348400,
"betat_modeled": 50450, "betat_observed": 91800,
"gammat_modeled": 224630, "gammat_observed": 407400,
"deltat_modeled": 147120, "deltat_observed": 272400,
"vitc_modeled": 444060, "vitc_observed": 833520,
"thiamin_modeled": 41634, "thiamin_observed": 75624,
"riboflavin_modeled": 119194, "riboflavin_observed": 223464,
"niacin_modeled": 1082180, "niacin_observed": 1942320,
"pantothenic_modeled": 92084, "pantothenic_observed": 167424,
"vitb6_modeled": 396060, "vitb6_observed": 693840,
"folate_modeled": 33750500, "folate_observed": 61950000,
"vitb12_modeled": 265500, "vitb12_observed": 476400,
"vitk_modeled": 4200600, "vitk_observed": 7636800}
], dtype=float)
workspace = self.workspace_dir
base_aggregate_vector_path = os.path.join(workspace,
"agg_vector.shp")
make_aggregate_vector(base_aggregate_vector_path)
target_aggregate_vector_path = os.path.join(workspace,
"agg_vector_prj.shp")
spatial_ref = osr.SpatialReference()
spatial_ref.ImportFromEPSG(26910) # EPSG:4326 for WGS84
landcover_raster_projection = spatial_ref.ExportToWkt()
crop_names = ['corn', 'soybean']
nutrient_df = create_nutrient_df()
output_dir = os.path.join(workspace, "OUTPUT")
os.makedirs(output_dir, exist_ok=True)
file_suffix = 'test'
target_aggregate_table_path = '' # unused
_create_crop_rasters(output_dir, crop_names, file_suffix)
aggregate_regression_results_to_polygons(
base_aggregate_vector_path, target_aggregate_vector_path,
landcover_raster_projection, crop_names,
nutrient_df, output_dir, file_suffix,
target_aggregate_table_path)
_AGGREGATE_TABLE_FILE_PATTERN = os.path.join(
'.','aggregate_results%s.csv')
aggregate_table_path = os.path.join(
output_dir, _AGGREGATE_TABLE_FILE_PATTERN % file_suffix)
actual_aggregate_table = pandas.read_csv(aggregate_table_path,
dtype=float)
print(actual_aggregate_table)
expected_aggregate_table = _create_expected_agg_table()
pandas.testing.assert_frame_equal(
actual_aggregate_table, expected_aggregate_table)
class CropValidationTests(unittest.TestCase):
"""Tests for the Crop Productions' MODEL_SPEC and validation."""