added test for when input rasters have undefined nodata. also replacing pgp.testing calls with plain gdal. #BITBUCKET-3851.

This commit is contained in:
David Fisher 2019-02-21 16:13:08 -08:00
parent a5558446de
commit d74414aebc
1 changed files with 61 additions and 15 deletions

View File

@ -8,15 +8,15 @@ import os
from osgeo import gdal
from osgeo import osr
import numpy
import pygeoprocessing.testing
def make_simple_raster(base_raster_path, fill_val):
def make_simple_raster(base_raster_path, fill_val, nodata_val):
"""Create a 10x10 raster on designated path with fill value.
Parameters:
raster_path (str): the raster path for making the new raster.
fill_val (int): the value used for filling the raster.
nodata_val (int or None): for defining a band's nodata value.
Returns:
lulc_path (str): the path of the raster file.
@ -25,16 +25,27 @@ def make_simple_raster(base_raster_path, fill_val):
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # UTM Zone 10N
projection_wkt = srs.ExportToWkt()
# origin hand-picked for this epsg:
geotransform = [461261, 1.0, 0.0, 4923265, 0.0, -1.0]
base_array = numpy.empty((10, 10))
base_array.fill(fill_val)
pygeoprocessing.testing.create_raster_on_disk(
[base_array],
(461261, 4923265), # Origin based on the projection
projection_wkt,
-1,
(1, -1),
filename=base_raster_path)
n = 10
gtiff_driver = gdal.GetDriverByName('GTiff')
# raster_path = os.path.join(self.workspace_dir, 'small_raster.tif')
new_raster = gtiff_driver.Create(
base_raster_path, n, n, 1, gdal.GDT_Int32, options=[
'TILED=YES', 'BIGTIFF=YES', 'COMPRESS=LZW',
'BLOCKXSIZE=16', 'BLOCKYSIZE=16'])
new_raster.SetProjection(projection_wkt)
new_raster.SetGeoTransform(geotransform)
new_band = new_raster.GetRasterBand(1)
array = numpy.empty((n, n))
array.fill(fill_val)
new_band.WriteArray(array)
if nodata_val is not None:
new_band.SetNoDataValue(nodata_val)
new_raster.FlushCache()
new_band = None
new_raster = None
def assert_raster_equal_value(base_raster_path, val_to_compare):
@ -107,7 +118,7 @@ class CarbonTests(unittest.TestCase):
for fill_val, lulc_name in enumerate(lulc_names, 1):
args[lulc_name] = os.path.join(args['workspace_dir'],
lulc_name + '.tif')
make_simple_raster(args[lulc_name], fill_val)
make_simple_raster(args[lulc_name], fill_val, -1)
args['carbon_pools_path'] = os.path.join(args['workspace_dir'],
'pools.csv')
@ -142,7 +153,7 @@ class CarbonTests(unittest.TestCase):
for fill_val, lulc_name in enumerate(lulc_names, 1):
args[lulc_name] = os.path.join(args['workspace_dir'],
lulc_name + '.tif')
make_simple_raster(args[lulc_name], fill_val)
make_simple_raster(args[lulc_name], fill_val, -1)
args['carbon_pools_path'] = os.path.join(args['workspace_dir'],
'pools.csv')
@ -175,7 +186,7 @@ class CarbonTests(unittest.TestCase):
for fill_val, lulc_name in enumerate(lulc_names, 1):
args[lulc_name] = os.path.join(args['workspace_dir'],
lulc_name + '.tif')
make_simple_raster(args[lulc_name], fill_val)
make_simple_raster(args[lulc_name], fill_val, -1)
args['carbon_pools_path'] = os.path.join(args['workspace_dir'],
'pools.csv')
@ -200,7 +211,7 @@ class CarbonTests(unittest.TestCase):
for fill_val, lulc_name in enumerate(lulc_names, 200):
args[lulc_name] = os.path.join(args['workspace_dir'],
lulc_name + '.tif')
make_simple_raster(args[lulc_name], fill_val)
make_simple_raster(args[lulc_name], fill_val, -1)
args['carbon_pools_path'] = os.path.join(args['workspace_dir'],
'pools.csv')
@ -209,3 +220,38 @@ class CarbonTests(unittest.TestCase):
# Value error should be raised with lulc code 200
with self.assertRaises(ValueError):
carbon.execute(args)
def test_carbon_full_undefined_nodata(self):
"""Carbon: full model run when input raster nodata is None."""
from natcap.invest import carbon
args = {
u'workspace_dir': self.workspace_dir,
u'do_valuation': True,
u'price_per_metric_ton_of_c': 43.0,
u'rate_change': 2.8,
u'lulc_cur_year': 2016,
u'lulc_fut_year': 2030,
u'discount_rate': -7.1,
u'n_workers': -1,
}
# Create LULC rasters and pools csv in workspace and add them to args.
lulc_names = ['lulc_cur_path', 'lulc_fut_path', 'lulc_redd_path']
for fill_val, lulc_name in enumerate(lulc_names, 1):
args[lulc_name] = os.path.join(args['workspace_dir'],
lulc_name + '.tif')
make_simple_raster(args[lulc_name], fill_val, None)
args['carbon_pools_path'] = os.path.join(args['workspace_dir'],
'pools.csv')
make_pools_csv(args['carbon_pools_path'])
carbon.execute(args)
# Add assertions for npv for future and REDD scenarios.
# The npv was calculated based on _calculate_npv in carbon.py.
assert_raster_equal_value(
os.path.join(args['workspace_dir'], 'npv_fut.tif'), -0.3422078)
assert_raster_equal_value(
os.path.join(args['workspace_dir'], 'npv_redd.tif'), -0.4602106)