invest/tests/test_crop_production.py

434 lines
18 KiB
Python

"""Module for Regression Testing the InVEST Crop Production models."""
import unittest
import tempfile
import shutil
import os
import numpy
from osgeo import gdal
import pandas
import pygeoprocessing
gdal.UseExceptions()
MODEL_DATA_PATH = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'crop_production_model', 'model_data')
SAMPLE_DATA_PATH = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'crop_production_model', 'sample_user_data')
TEST_DATA_PATH = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'crop_production_model')
class CropProductionTests(unittest.TestCase):
"""Tests for the Crop Production model."""
def setUp(self):
"""Overriding setUp function to create temp workspace directory."""
# this lets us delete the workspace after its done no matter the
# the rest result
self.workspace_dir = tempfile.mkdtemp()
def tearDown(self):
"""Overriding tearDown function to remove temporary directory."""
shutil.rmtree(self.workspace_dir)
def test_crop_production_percentile(self):
"""Crop Production: test crop production percentile regression."""
from natcap.invest import crop_production_percentile
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'model_data_path': MODEL_DATA_PATH,
'n_workers': '-1'
}
crop_production_percentile.execute(args)
agg_result_table_path = os.path.join(
args['workspace_dir'], 'aggregate_results.csv')
expected_agg_result_table_path = os.path.join(
TEST_DATA_PATH, 'expected_aggregate_results.csv')
expected_agg_result_table = pandas.read_csv(
expected_agg_result_table_path)
agg_result_table = pandas.read_csv(
agg_result_table_path)
pandas.testing.assert_frame_equal(
expected_agg_result_table, agg_result_table,
check_dtype=False, check_exact=False)
expected_result_table = pandas.read_csv(
os.path.join(TEST_DATA_PATH, 'expected_result_table.csv')
)
result_table = pandas.read_csv(
os.path.join(args['workspace_dir'], 'result_table.csv'))
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
def test_crop_production_percentile_no_nodata(self):
"""Crop Production: test percentile model with undefined nodata raster.
Test with a landcover raster input that has no nodata value
defined.
"""
from natcap.invest import crop_production_percentile
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'model_data_path': MODEL_DATA_PATH,
'n_workers': '-1'
}
# Create a raster based on the test data geotransform, but smaller and
# with no nodata value defined.
base_lulc_info = pygeoprocessing.get_raster_info(
args['landcover_raster_path'])
base_geotransform = base_lulc_info['geotransform']
origin_x = base_geotransform[0]
origin_y = base_geotransform[3]
n = 9
gtiff_driver = gdal.GetDriverByName('GTiff')
raster_path = os.path.join(self.workspace_dir, 'small_raster.tif')
new_raster = gtiff_driver.Create(
raster_path, n, n, 1, gdal.GDT_Int32, options=[
'TILED=YES', 'BIGTIFF=YES', 'COMPRESS=LZW',
'BLOCKXSIZE=16', 'BLOCKYSIZE=16'])
new_raster.SetProjection(base_lulc_info['projection_wkt'])
new_raster.SetGeoTransform([origin_x, 1.0, 0.0, origin_y, 0.0, -1.0])
new_band = new_raster.GetRasterBand(1)
array = numpy.array(range(n*n), dtype=numpy.int32).reshape((n, n))
array.fill(20) # 20 is present in the landcover_to_crop_table
new_band.WriteArray(array)
new_raster.FlushCache()
new_band = None
new_raster = None
args['landcover_raster_path'] = raster_path
crop_production_percentile.execute(args)
result_table_path = os.path.join(
args['workspace_dir'], 'result_table.csv')
expected_result_table_path = os.path.join(
TEST_DATA_PATH, 'expected_result_table_no_nodata.csv')
expected_result_table = pandas.read_csv(
expected_result_table_path)
result_table = pandas.read_csv(
result_table_path)
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
def test_crop_production_percentile_bad_crop(self):
"""Crop Production: test crop production with a bad crop name."""
from natcap.invest import crop_production_percentile
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
self.workspace_dir, 'landcover_to_badcrop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'model_data_path': MODEL_DATA_PATH,
'n_workers': '-1'
}
with open(args['landcover_to_crop_table_path'],
'w') as landcover_crop_table:
landcover_crop_table.write(
'crop_name,lucode\nfakecrop,20\n')
with self.assertRaises(ValueError):
crop_production_percentile.execute(args)
def test_crop_production_percentile_missing_climate_bin(self):
"""Crop Production: test crop percentile with a missing climate bin."""
from natcap.invest import crop_production_percentile
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'model_data_path': MODEL_DATA_PATH,
'n_workers': '-1'
}
# copy model data directory to a temp location so that hard coded
# data paths can be altered for this test.
tmp_copy_model_data_path = os.path.join(
self.workspace_dir, 'tmp_model_data')
shutil.copytree(MODEL_DATA_PATH, tmp_copy_model_data_path)
# remove a row from the wheat percentile yield table so that a wheat
# climate bin value is missing
climate_bin_wheat_table_path = os.path.join(
MODEL_DATA_PATH, 'climate_percentile_yield_tables',
'wheat_percentile_yield_table.csv')
bad_climate_bin_wheat_table_path = os.path.join(
tmp_copy_model_data_path, 'climate_percentile_yield_tables',
'wheat_percentile_yield_table.csv')
os.remove(bad_climate_bin_wheat_table_path)
table_df = pandas.read_csv(climate_bin_wheat_table_path)
table_df = table_df[table_df['climate_bin'] != 40]
table_df.to_csv(bad_climate_bin_wheat_table_path)
table_df = None
args['model_data_path'] = tmp_copy_model_data_path
with self.assertRaises(ValueError) as context:
crop_production_percentile.execute(args)
self.assertTrue(
"The missing values found in the wheat Climate Bin raster but not"
" the table are: [40]" in str(context.exception))
def test_crop_production_regression_bad_crop(self):
"""Crop Production: test crop regression with a bad crop name."""
from natcap.invest import crop_production_regression
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_badcrop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'aggregate_polygon_id': 'id',
'model_data_path': MODEL_DATA_PATH,
'fertilization_rate_table_path': os.path.join(
SAMPLE_DATA_PATH, 'crop_fertilization_rates.csv'),
'nitrogen_fertilization_rate': 29.6,
'phosphorus_fertilization_rate': 8.4,
'potassium_fertilization_rate': 14.2,
'n_workers': '-1'
}
with open(args['landcover_to_crop_table_path'],
'w') as landcover_crop_table:
landcover_crop_table.write(
'crop_name,lucode\nfakecrop,20\n')
with self.assertRaises(ValueError):
crop_production_regression.execute(args)
def test_crop_production_regression_missing_climate_bin(self):
"""Crop Production: test crop regression with a missing climate bin."""
from natcap.invest import crop_production_regression
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'aggregate_polygon_id': 'id',
'model_data_path': MODEL_DATA_PATH,
'fertilization_rate_table_path': os.path.join(
SAMPLE_DATA_PATH, 'crop_fertilization_rates.csv'),
'nitrogen_fertilization_rate': 29.6,
'phosphorus_fertilization_rate': 8.4,
'potassium_fertilization_rate': 14.2,
'n_workers': '-1'
}
# copy model data directory to a temp location so that hard coded
# data paths can be altered for this test.
tmp_copy_model_data_path = os.path.join(
self.workspace_dir, 'tmp_model_data')
shutil.copytree(MODEL_DATA_PATH, tmp_copy_model_data_path)
# remove a row from the wheat regression yield table so that a wheat
# climate bin value is missing
climate_bin_wheat_table_path = os.path.join(
MODEL_DATA_PATH, 'climate_regression_yield_tables',
'wheat_regression_yield_table.csv')
bad_climate_bin_wheat_table_path = os.path.join(
tmp_copy_model_data_path, 'climate_regression_yield_tables',
'wheat_regression_yield_table.csv')
os.remove(bad_climate_bin_wheat_table_path)
table_df = pandas.read_csv(climate_bin_wheat_table_path)
table_df = table_df[table_df['climate_bin'] != 40]
table_df.to_csv(bad_climate_bin_wheat_table_path)
table_df = None
args['model_data_path'] = tmp_copy_model_data_path
with self.assertRaises(ValueError) as context:
crop_production_regression.execute(args)
self.assertTrue(
"The missing values found in the wheat Climate Bin raster but not"
" the table are: [40]" in str(context.exception))
def test_crop_production_regression(self):
"""Crop Production: test crop production regression model."""
from natcap.invest import crop_production_regression
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'aggregate_polygon_id': 'id',
'model_data_path': MODEL_DATA_PATH,
'fertilization_rate_table_path': os.path.join(
SAMPLE_DATA_PATH, 'crop_fertilization_rates.csv'),
'nitrogen_fertilization_rate': 29.6,
'phosphorus_fertilization_rate': 8.4,
'potassium_fertilization_rate': 14.2,
}
crop_production_regression.execute(args)
expected_agg_result_table = pandas.read_csv(
os.path.join(TEST_DATA_PATH, 'expected_regression_aggregate_results.csv'))
agg_result_table = pandas.read_csv(
os.path.join(args['workspace_dir'], 'aggregate_results.csv'))
pandas.testing.assert_frame_equal(
expected_agg_result_table, agg_result_table,
check_dtype=False, check_exact=False)
result_table_path = os.path.join(
args['workspace_dir'], 'result_table.csv')
expected_result_table_path = os.path.join(
TEST_DATA_PATH, 'expected_regression_result_table.csv')
expected_result_table = pandas.read_csv(
expected_result_table_path)
result_table = pandas.read_csv(
result_table_path)
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
def test_crop_production_regression_no_nodata(self):
"""Crop Production: test regression model with undefined nodata raster.
Test with a landcover raster input that has no nodata value
defined.
"""
from natcap.invest import crop_production_regression
args = {
'workspace_dir': self.workspace_dir,
'results_suffix': '',
'landcover_raster_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover.tif'),
'landcover_to_crop_table_path': os.path.join(
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
'model_data_path': MODEL_DATA_PATH,
'fertilization_rate_table_path': os.path.join(
SAMPLE_DATA_PATH, 'crop_fertilization_rates.csv'),
'nitrogen_fertilization_rate': 29.6,
'phosphorus_fertilization_rate': 8.4,
'potassium_fertilization_rate': 14.2,
}
# Create a raster based on the test data geotransform, but smaller and
# with no nodata value defined.
base_lulc_info = pygeoprocessing.get_raster_info(
args['landcover_raster_path'])
base_geotransform = base_lulc_info['geotransform']
origin_x = base_geotransform[0]
origin_y = base_geotransform[3]
n = 9
gtiff_driver = gdal.GetDriverByName('GTiff')
raster_path = os.path.join(self.workspace_dir, 'small_raster.tif')
new_raster = gtiff_driver.Create(
raster_path, n, n, 1, gdal.GDT_Int32, options=[
'TILED=YES', 'BIGTIFF=YES', 'COMPRESS=LZW',
'BLOCKXSIZE=16', 'BLOCKYSIZE=16'])
new_raster.SetProjection(base_lulc_info['projection_wkt'])
new_raster.SetGeoTransform([origin_x, 1.0, 0.0, origin_y, 0.0, -1.0])
new_band = new_raster.GetRasterBand(1)
array = numpy.array(range(n*n), dtype=numpy.int32).reshape((n, n))
array.fill(20) # 20 is present in the landcover_to_crop_table
new_band.WriteArray(array)
new_raster.FlushCache()
new_band = None
new_raster = None
args['landcover_raster_path'] = raster_path
crop_production_regression.execute(args)
expected_result_table = pandas.read_csv(os.path.join(
TEST_DATA_PATH, 'expected_regression_result_table_no_nodata.csv'))
result_table = pandas.read_csv(
os.path.join(args['workspace_dir'], 'result_table.csv'))
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
class CropValidationTests(unittest.TestCase):
"""Tests for the Crop Productions' MODEL_SPEC and validation."""
def setUp(self):
"""Create a temporary workspace."""
self.workspace_dir = tempfile.mkdtemp()
self.base_required_keys = [
'workspace_dir',
'landcover_raster_path',
'landcover_to_crop_table_path',
'model_data_path',
]
def tearDown(self):
"""Remove the temporary workspace after a test."""
shutil.rmtree(self.workspace_dir)
def test_missing_keys_percentile(self):
"""Crop Percentile Validate: assert missing required keys."""
from natcap.invest import crop_production_percentile
from natcap.invest import validation
# empty args dict.
validation_errors = crop_production_percentile.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(self.base_required_keys)
self.assertEqual(invalid_keys, expected_missing_keys)
def test_missing_keys_regression(self):
"""Crop Regression Validate: assert missing required keys."""
from natcap.invest import crop_production_regression
from natcap.invest import validation
# empty args dict.
validation_errors = crop_production_regression.validate({})
invalid_keys = validation.get_invalid_keys(validation_errors)
expected_missing_keys = set(
self.base_required_keys +
['fertilization_rate_table_path'])
self.assertEqual(invalid_keys, expected_missing_keys)