466 lines
18 KiB
Python
466 lines
18 KiB
Python
"""Module for Regression Testing the InVEST Carbon model."""
|
|
import collections
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
import numpy
|
|
from osgeo import gdal
|
|
from osgeo import osr
|
|
|
|
gdal.UseExceptions()
|
|
|
|
class RouteDEMTests(unittest.TestCase):
|
|
"""Tests for RouteDEM with Pygeoprocessing 1.x routing API."""
|
|
|
|
def setUp(self):
|
|
"""Overriding setUp function to create temp workspace directory."""
|
|
# this lets us delete the workspace after its done no matter the
|
|
# the rest result
|
|
self.workspace_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
"""Overriding tearDown function to remove temporary directory."""
|
|
shutil.rmtree(self.workspace_dir)
|
|
|
|
@staticmethod
|
|
def _make_dem(target_path):
|
|
# makes a 10x10 DEM with a valley in the middle that flows to row 0.
|
|
elevation = numpy.arange(1.1, 2, step=0.1).reshape((9, 1))
|
|
valley = numpy.concatenate((
|
|
numpy.flipud(numpy.arange(5)),
|
|
numpy.arange(1, 5)))
|
|
valley_with_sink = numpy.array([5, 4, 3, 2, 1.3, 1.3, 3, 4, 5])
|
|
|
|
dem_array = numpy.vstack((
|
|
valley_with_sink,
|
|
numpy.tile(valley, (9, 1)) + elevation))
|
|
nodata_value = -1
|
|
|
|
srs = osr.SpatialReference()
|
|
srs.ImportFromEPSG(32731)
|
|
srs_wkt = srs.ExportToWkt()
|
|
|
|
driver = gdal.GetDriverByName('GTiff')
|
|
dem_raster = driver.Create(
|
|
target_path, dem_array.shape[1], dem_array.shape[0],
|
|
2, gdal.GDT_Float32, options=(
|
|
'TILED=YES', 'BIGTIFF=YES', 'COMPRESS=LZW',
|
|
'BLOCKXSIZE=256', 'BLOCKYSIZE=256'))
|
|
dem_raster.SetProjection(srs_wkt)
|
|
ones_band = dem_raster.GetRasterBand(1)
|
|
ones_band.SetNoDataValue(nodata_value)
|
|
ones_band.WriteArray(numpy.ones(dem_array.shape))
|
|
|
|
dem_band = dem_raster.GetRasterBand(2)
|
|
dem_band.SetNoDataValue(nodata_value)
|
|
dem_band.WriteArray(dem_array)
|
|
dem_geotransform = [2, 2, 0, -2, 0, -2]
|
|
dem_raster.SetGeoTransform(dem_geotransform)
|
|
dem_raster = None
|
|
|
|
def test_routedem_no_options_default_band(self):
|
|
"""RouteDEM: default to band 1 when not specified."""
|
|
from natcap.invest import routedem
|
|
|
|
# Intentionally leaving out the dem_band_index parameter,
|
|
# should default to band 1.
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'dem.tif'),
|
|
'results_suffix': 'foo',
|
|
}
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
routedem.execute(args)
|
|
|
|
filled_raster_path = os.path.join(
|
|
args['workspace_dir'], 'filled_foo.tif')
|
|
self.assertTrue(
|
|
os.path.exists(filled_raster_path),
|
|
'Filled DEM not created.')
|
|
|
|
# The first band has only values of 1, no hydrological pits.
|
|
# So, the filled band should match the source band.
|
|
expected_filled_array = gdal.OpenEx(args['dem_path']).ReadAsArray()[0]
|
|
filled_array = gdal.OpenEx(filled_raster_path).ReadAsArray()
|
|
numpy.testing.assert_allclose(
|
|
expected_filled_array,
|
|
filled_array,
|
|
rtol=0, atol=1e-6)
|
|
|
|
def test_routedem_no_options(self):
|
|
"""RouteDEM: assert pitfilling when no other options given."""
|
|
from natcap.invest import routedem
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'dem.tif'),
|
|
'dem_band_index': 2,
|
|
'results_suffix': 'foo',
|
|
}
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
routedem.execute(args)
|
|
|
|
filled_raster_path = os.path.join(
|
|
args['workspace_dir'], 'filled_foo.tif')
|
|
self.assertTrue(
|
|
os.path.exists(filled_raster_path),
|
|
'Filled DEM not created.')
|
|
|
|
# The one sink in the array should have been filled to 1.3.
|
|
expected_filled_array = gdal.OpenEx(args['dem_path']).ReadAsArray()[1]
|
|
expected_filled_array[expected_filled_array < 1.3] = 1.3
|
|
|
|
# Filled rasters are copies of only the desired band of the input DEM,
|
|
# and then with pixels filled.
|
|
filled_array = gdal.OpenEx(filled_raster_path).ReadAsArray()
|
|
numpy.testing.assert_allclose(
|
|
expected_filled_array,
|
|
filled_array,
|
|
rtol=0, atol=1e-6)
|
|
|
|
def test_routedem_slope(self):
|
|
"""RouteDEM: assert slope option."""
|
|
from natcap.invest import routedem
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'dem.tif'),
|
|
'dem_band_index': 2,
|
|
'results_suffix': 'foo',
|
|
'calculate_slope': True,
|
|
}
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
routedem.execute(args)
|
|
|
|
for path in ('filled_foo.tif', 'slope_foo.tif'):
|
|
self.assertTrue(os.path.exists(
|
|
os.path.join(args['workspace_dir'], path)),
|
|
'File not found: %s' % path)
|
|
|
|
slope_array = gdal.OpenEx(
|
|
os.path.join(args['workspace_dir'], 'slope_foo.tif')).ReadAsArray()
|
|
# These were determined by inspection of the output array.
|
|
expected_unique_values = numpy.array(
|
|
[4.999998, 4.9999995, 5.000001, 5.0000043, 7.126098,
|
|
13.235317, 45.017357, 48.226353, 48.75, 49.56845,
|
|
50.249374, 50.24938, 50.249382, 55.17727, 63.18101],
|
|
dtype=numpy.float32).reshape((15,))
|
|
numpy.testing.assert_allclose(
|
|
expected_unique_values,
|
|
numpy.unique(slope_array),
|
|
rtol=0, atol=1e-6)
|
|
numpy.testing.assert_allclose(
|
|
numpy.sum(slope_array), 4088.7358, rtol=0, atol=1e-4)
|
|
|
|
def test_routedem_d8(self):
|
|
"""RouteDEM: test d8 routing."""
|
|
from natcap.invest import routedem
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'algorithm': 'd8',
|
|
'dem_path': os.path.join(self.workspace_dir, 'dem.tif'),
|
|
'dem_band_index': 2,
|
|
'results_suffix': 'foo',
|
|
'calculate_flow_direction': True,
|
|
'calculate_flow_accumulation': True,
|
|
'calculate_stream_threshold': True,
|
|
'calculate_downslope_distance': True,
|
|
'calculate_slope': True,
|
|
'calculate_stream_order': True,
|
|
'calculate_subwatersheds': True,
|
|
'threshold_flow_accumulation': 4,
|
|
}
|
|
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
routedem.execute(args)
|
|
|
|
for expected_file in (
|
|
'downslope_distance_foo.tif',
|
|
'filled_foo.tif',
|
|
'flow_accumulation_foo.tif',
|
|
'flow_direction_foo.tif',
|
|
'slope_foo.tif',
|
|
'stream_mask_foo.tif',
|
|
'strahler_stream_order_foo.gpkg',
|
|
'subwatersheds_foo.gpkg'):
|
|
self.assertTrue(
|
|
os.path.exists(
|
|
os.path.join(args['workspace_dir'], expected_file)),
|
|
'File not found: %s' % expected_file)
|
|
|
|
expected_stream_mask = numpy.array([
|
|
[0, 0, 0, 0, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
])
|
|
numpy.testing.assert_allclose(
|
|
expected_stream_mask,
|
|
gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'], 'stream_mask_foo.tif')).ReadAsArray(),
|
|
rtol=0, atol=1e-6)
|
|
|
|
expected_flow_accum = numpy.empty((10, 9), dtype=numpy.float64)
|
|
expected_flow_accum[:, 0:4] = numpy.arange(1, 5)
|
|
expected_flow_accum[:, 5:9] = numpy.flipud(numpy.arange(1, 5))
|
|
expected_flow_accum[:, 4] = numpy.array(
|
|
[82, 77, 72, 63, 54, 45, 36, 27, 18, 9])
|
|
expected_flow_accum[1, 5] = 1
|
|
expected_flow_accum[0, 5] = 8
|
|
|
|
numpy.testing.assert_allclose(
|
|
expected_flow_accum,
|
|
gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'],
|
|
'flow_accumulation_foo.tif')).ReadAsArray(),
|
|
rtol=0, atol=1e-6)
|
|
|
|
expected_flow_direction = numpy.empty((10, 9), dtype=numpy.uint8)
|
|
expected_flow_direction[:, 0:4] = 0
|
|
expected_flow_direction[:, 5:9] = 4
|
|
expected_flow_direction[:, 4] = 2
|
|
expected_flow_direction[0:2, 5] = 2
|
|
expected_flow_direction[1, 6] = 3
|
|
|
|
numpy.testing.assert_allclose(
|
|
expected_flow_direction,
|
|
gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'],
|
|
'flow_direction_foo.tif')).ReadAsArray(),
|
|
rtol=0, atol=1e-6)
|
|
|
|
expected_downslope_distance = numpy.empty(
|
|
(10, 9), dtype=numpy.float64)
|
|
expected_downslope_distance[:, 0:5] = numpy.flipud(numpy.arange(5))
|
|
expected_downslope_distance[2:, 5:] = numpy.arange(1, 5)
|
|
expected_downslope_distance[0, 5:] = numpy.arange(4)
|
|
expected_downslope_distance[1, 5] = 1
|
|
expected_downslope_distance[1, 6:] = numpy.arange(1, 4) + 0.41421356
|
|
|
|
numpy.testing.assert_allclose(
|
|
expected_downslope_distance,
|
|
gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'],
|
|
'downslope_distance_foo.tif')).ReadAsArray(),
|
|
rtol=0, atol=1e-6)
|
|
|
|
try:
|
|
vector = gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'], 'strahler_stream_order_foo.gpkg'))
|
|
layer = vector.GetLayer()
|
|
self.assertEqual(27, layer.GetFeatureCount())
|
|
features_per_order = collections.defaultdict(int)
|
|
for feature in layer:
|
|
order = feature.GetField('order')
|
|
features_per_order[order] += 1
|
|
self.assertEqual(dict(features_per_order), {1: 18, 2: 9})
|
|
finally:
|
|
layer = None
|
|
vector = None
|
|
|
|
try:
|
|
vector = gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'], 'subwatersheds_foo.gpkg'))
|
|
layer = vector.GetLayer()
|
|
self.assertEqual(26, layer.GetFeatureCount())
|
|
features_by_area = collections.defaultdict(int)
|
|
for feature in layer:
|
|
geometry = feature.GetGeometryRef()
|
|
area = geometry.GetArea()
|
|
features_by_area[area] += 1
|
|
self.assertEqual(dict(features_by_area), {16: 17, 4: 8, 24: 1})
|
|
finally:
|
|
layer = None
|
|
vector = None
|
|
|
|
def test_routedem_mfd(self):
|
|
"""RouteDEM: test mfd routing."""
|
|
from natcap.invest import routedem
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'algorithm': 'mfd',
|
|
'dem_path': os.path.join(self.workspace_dir, 'dem.tif'),
|
|
'dem_band_index': 2,
|
|
'results_suffix': 'foo',
|
|
'calculate_flow_direction': True,
|
|
'calculate_flow_accumulation': True,
|
|
'calculate_stream_threshold': True,
|
|
'calculate_downslope_distance': True,
|
|
'calculate_slope': False,
|
|
'calculate_stream_order': True, # make sure file not created
|
|
'calculate_subwatersheds': True, # make sure file not created
|
|
'threshold_flow_accumulation': 4,
|
|
}
|
|
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
routedem.execute(args)
|
|
|
|
expected_stream_mask = numpy.array([
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 1, 1, 1, 0, 0, 0],
|
|
[0, 0, 0, 0, 1, 0, 0, 0, 0],
|
|
])
|
|
numpy.testing.assert_allclose(
|
|
expected_stream_mask,
|
|
gdal.OpenEx(os.path.join(
|
|
args['workspace_dir'], 'stream_mask_foo.tif')).ReadAsArray(),
|
|
rtol=0, atol=1e-6)
|
|
|
|
# Raster sums are from manually-inspected outputs.
|
|
for filename, expected_sum in (
|
|
('flow_accumulation_foo.tif', 678.94551294),
|
|
('flow_direction_foo.tif', 40968303668.0),
|
|
('downslope_distance_foo.tif', 162.28624753707527)):
|
|
raster_path = os.path.join(args['workspace_dir'], filename)
|
|
raster = gdal.OpenEx(raster_path)
|
|
if raster is None:
|
|
self.fail('Could not open raster %s' % filename)
|
|
|
|
self.assertEqual(raster.RasterYSize, expected_stream_mask.shape[0])
|
|
self.assertEqual(raster.RasterXSize, expected_stream_mask.shape[1])
|
|
|
|
raster_sum = numpy.sum(raster.ReadAsArray(), dtype=numpy.float64)
|
|
numpy.testing.assert_allclose(
|
|
raster_sum, expected_sum, rtol=0, atol=1e-6)
|
|
|
|
self.assertFalse(os.path.exists(os.path.join(
|
|
args['workspace_dir'], 'strahler_stream_order_foo.gpkg')))
|
|
self.assertFalse(os.path.exists(os.path.join(
|
|
args['workspace_dir'], 'subwatersheds_foo.gpkg')))
|
|
|
|
def test_validation_required_args(self):
|
|
"""RouteDEM: test required args in validation."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
args = {}
|
|
|
|
required_keys = ['workspace_dir', 'dem_path']
|
|
|
|
validation_warnings = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_warnings)
|
|
self.assertTrue(set(required_keys).issubset(invalid_keys))
|
|
|
|
def test_validation_required_args_threshold(self):
|
|
"""RouteDEM: test required args in validation (with threshold)."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
args = {'calculate_stream_threshold': True}
|
|
required_keys = [
|
|
'workspace_dir', 'dem_path', 'algorithm',
|
|
|
|
# Required because calculate_stream_threshold
|
|
'threshold_flow_accumulation']
|
|
|
|
validation_warnings = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_warnings)
|
|
for key in required_keys:
|
|
self.assertTrue(key in invalid_keys)
|
|
|
|
def test_validation_required_args_none(self):
|
|
"""RouteDEM: test validation of a present but None args."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
required_keys = ['workspace_dir', 'dem_path', 'algorithm']
|
|
args = dict((k, None) for k in required_keys)
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
self.assertEqual(invalid_keys, set(required_keys))
|
|
|
|
def test_validation_required_args_empty(self):
|
|
"""RouteDEM: test validation of a present but empty args."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
required_keys = ['workspace_dir', 'dem_path', 'algorithm']
|
|
args = dict((k, '') for k in required_keys)
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
self.assertEqual(invalid_keys, set(required_keys))
|
|
|
|
def test_validation_invalid_raster(self):
|
|
"""RouteDEM: test validation of an invalid DEM."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'badraster.tif'),
|
|
}
|
|
|
|
with open(args['dem_path'], 'w') as bad_raster:
|
|
bad_raster.write('This is an invalid raster format.')
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
self.assertTrue('dem_path' in invalid_keys)
|
|
|
|
def test_validation_band_index_type(self):
|
|
"""RouteDEM: test validation of an invalid band index."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'notafile.txt'),
|
|
'dem_band_index': range(1, 5),
|
|
}
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
self.assertEqual(invalid_keys, set(['algorithm', 'dem_path',
|
|
'dem_band_index']))
|
|
|
|
def test_validation_band_index_negative_value(self):
|
|
"""RouteDEM: test validation of a negative band index."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'notafile.txt'),
|
|
'dem_band_index': -5,
|
|
}
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
self.assertEqual(invalid_keys, set(['dem_path', 'dem_band_index',
|
|
'algorithm']))
|
|
|
|
def test_validation_band_index_value_too_large(self):
|
|
"""RouteDEM: test validation of a too-large band index."""
|
|
from natcap.invest import routedem
|
|
from natcap.invest import validation
|
|
|
|
args = {
|
|
'workspace_dir': self.workspace_dir,
|
|
'dem_path': os.path.join(self.workspace_dir, 'raster.tif'),
|
|
'dem_band_index': 5,
|
|
}
|
|
|
|
# Has two bands, so band index 5 is too large.
|
|
RouteDEMTests._make_dem(args['dem_path'])
|
|
|
|
validation_errors = routedem.validate(args)
|
|
invalid_keys = validation.get_invalid_keys(validation_errors)
|
|
|
|
self.assertEqual(invalid_keys, set(['algorithm', 'dem_band_index']))
|