added crop production test that fails when landcover input raster has no assigned nodata value. #BITBUCKET-3851.
This commit is contained in:
parent
b924484aff
commit
732613cb68
|
@ -3,8 +3,12 @@ import unittest
|
|||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import cProfile
|
||||
|
||||
import numpy
|
||||
from osgeo import gdal
|
||||
import pandas
|
||||
import pygeoprocessing
|
||||
|
||||
MODEL_DATA_PATH = os.path.join(
|
||||
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
|
||||
|
@ -29,7 +33,7 @@ class CropProductionTests(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
"""Overriding tearDown function to remove temporary directory."""
|
||||
shutil.rmtree(self.workspace_dir)
|
||||
|
||||
@unittest.skip("skip percentile")
|
||||
def test_crop_production_percentile(self):
|
||||
"""Crop Production: test crop production percentile regression."""
|
||||
from natcap.invest import crop_production_percentile
|
||||
|
@ -46,7 +50,10 @@ class CropProductionTests(unittest.TestCase):
|
|||
'model_data_path': MODEL_DATA_PATH,
|
||||
'n_workers': '-1'
|
||||
}
|
||||
import time
|
||||
starttime = time.time()
|
||||
crop_production_percentile.execute(args)
|
||||
print time.time() - starttime
|
||||
|
||||
agg_result_table_path = os.path.join(
|
||||
args['workspace_dir'], 'aggregate_results.csv')
|
||||
|
@ -126,6 +133,7 @@ class CropProductionTests(unittest.TestCase):
|
|||
with self.assertRaises(ValueError):
|
||||
crop_production_regression.execute(args)
|
||||
|
||||
@unittest.skip("skip percentile")
|
||||
def test_crop_production_regression(self):
|
||||
"""Crop Production: test crop production regression model."""
|
||||
from natcap.invest import crop_production_regression
|
||||
|
@ -147,7 +155,90 @@ class CropProductionTests(unittest.TestCase):
|
|||
'phosphorous_fertilization_rate': 8.4,
|
||||
'potassium_fertilization_rate': 14.2,
|
||||
}
|
||||
import time
|
||||
starttime = time.time()
|
||||
crop_production_regression.execute(args)
|
||||
print time.time() - starttime
|
||||
# cProfile.runctx(
|
||||
# 'crop_production_regression.execute(args)', globals(), locals())
|
||||
|
||||
agg_result_table_path = os.path.join(
|
||||
args['workspace_dir'], 'aggregate_results.csv')
|
||||
expected_agg_result_table_path = os.path.join(
|
||||
TEST_DATA_PATH, 'expected_regression_aggregate_results.csv')
|
||||
expected_agg_result_table = pandas.read_csv(
|
||||
expected_agg_result_table_path)
|
||||
agg_result_table = pandas.read_csv(
|
||||
agg_result_table_path)
|
||||
pandas.testing.assert_frame_equal(
|
||||
expected_agg_result_table, agg_result_table, check_dtype=False)
|
||||
|
||||
result_table_path = os.path.join(
|
||||
args['workspace_dir'], 'result_table.csv')
|
||||
expected_result_table_path = os.path.join(
|
||||
TEST_DATA_PATH, 'expected_regression_result_table.csv')
|
||||
expected_result_table = pandas.read_csv(
|
||||
expected_result_table_path)
|
||||
result_table = pandas.read_csv(
|
||||
result_table_path)
|
||||
pandas.testing.assert_frame_equal(
|
||||
expected_result_table, result_table, check_dtype=False)
|
||||
|
||||
def test_crop_production_regression_no_nodata(self):
|
||||
"""Crop Production: test crop production regression model.
|
||||
|
||||
Test with a landcover raster input that has no nodata value
|
||||
defined.
|
||||
"""
|
||||
from natcap.invest import crop_production_regression
|
||||
|
||||
args = {
|
||||
'workspace_dir': self.workspace_dir,
|
||||
'results_suffix': '',
|
||||
'landcover_raster_path': os.path.join(
|
||||
SAMPLE_DATA_PATH, 'landcover.tif'),
|
||||
'landcover_to_crop_table_path': os.path.join(
|
||||
SAMPLE_DATA_PATH, 'landcover_to_crop_table.csv'),
|
||||
'model_data_path': MODEL_DATA_PATH,
|
||||
'fertilization_rate_table_path': os.path.join(
|
||||
SAMPLE_DATA_PATH, 'crop_fertilization_rates.csv'),
|
||||
'nitrogen_fertilization_rate': 29.6,
|
||||
'phosphorous_fertilization_rate': 8.4,
|
||||
'potassium_fertilization_rate': 14.2,
|
||||
}
|
||||
|
||||
# Create a raster based off the test data geotransform, but smaller and
|
||||
# with no nodata value defined.
|
||||
base_lulc_info = pygeoprocessing.get_raster_info(args['landcover_raster_path'])
|
||||
base_geotransform = base_lulc_info['geotransform']
|
||||
origin_x = base_geotransform[0]
|
||||
origin_y = base_geotransform[3]
|
||||
|
||||
n = 9
|
||||
gtiff_driver = gdal.GetDriverByName('GTiff')
|
||||
raster_path = os.path.join(self.workspace_dir, 'small_raster.tif')
|
||||
new_raster = gtiff_driver.Create(
|
||||
raster_path, n, n, 1, gdal.GDT_Int32, options=[
|
||||
'TILED=YES', 'BIGTIFF=YES', 'COMPRESS=LZW',
|
||||
'BLOCKXSIZE=16', 'BLOCKYSIZE=16'])
|
||||
# new_raster.SetProjection(srs.ExportToWkt())
|
||||
new_raster.SetProjection(base_lulc_info['projection'])
|
||||
new_raster.SetGeoTransform([origin_x, 1.0, 0.0, origin_y, 0.0, -1.0])
|
||||
new_band = new_raster.GetRasterBand(1)
|
||||
array = numpy.array(range(n*n), dtype=numpy.int32).reshape((n, n))
|
||||
array.fill(20) # 20 is present in the landcover_to_crop_table
|
||||
new_band.WriteArray(array)
|
||||
new_raster.FlushCache()
|
||||
new_band = None
|
||||
new_raster = None
|
||||
args['landcover_raster_path'] = raster_path
|
||||
|
||||
import time
|
||||
starttime = time.time()
|
||||
crop_production_regression.execute(args)
|
||||
print time.time() - starttime
|
||||
# cProfile.runctx(
|
||||
# 'crop_production_regression.execute(args)', globals(), locals())
|
||||
|
||||
agg_result_table_path = os.path.join(
|
||||
args['workspace_dir'], 'aggregate_results.csv')
|
||||
|
|
Loading…
Reference in New Issue