tasks for all the raster operations preceeding the report generation section. #BITBUCKET-3815.

This commit is contained in:
David Fisher 2018-11-09 15:32:03 -08:00
parent d80dce3528
commit b3aad28953
3 changed files with 172 additions and 72 deletions

View File

@ -8,6 +8,7 @@ import numpy
from osgeo import gdal
from osgeo import osr
import pygeoprocessing
import taskgraph
from . import utils
from . import validation
@ -108,6 +109,9 @@ def execute(args):
[cropname]_percentile_yield_table.csv files)
Please see the InVEST user's guide chapter on crop production for
details about how to download these data.
args['n_workers'] (int): (optional) The number of worker processes to
use for processing this model. If omitted, computation will take
place in the current process.
Returns:
None.
@ -146,6 +150,17 @@ def execute(args):
landcover_raster_info['projection'], wgs84srs.ExportToWkt(),
edge_samples=11)
# Initialize a TaskGraph
work_token_dir = os.path.join(_INTERMEDIATE_OUTPUT_DIR, '_tmp_work_tokens')
try:
n_workers = int(args['n_workers'])
except (KeyError, ValueError, TypeError):
# KeyError when n_workers is not present in args
# ValueError when n_workers is an empty string.
# TypeError when n_workers is None.
n_workers = -1 # Single process mode.
graph = taskgraph.TaskGraph(work_token_dir, n_workers)
crop_lucode = None
observed_yield_nodata = None
production_area = collections.defaultdict(float)
@ -164,11 +179,19 @@ def execute(args):
crop_name, file_suffix))
crop_climate_bin_raster_info = pygeoprocessing.get_raster_info(
crop_climate_bin_raster_path)
pygeoprocessing.warp_raster(
crop_climate_bin_raster_path,
crop_climate_bin_raster_info['pixel_size'],
clipped_climate_bin_raster_path, 'near',
target_bb=landcover_wgs84_bounding_box)
crop_climate_bin_task = graph.add_task(
func=pygeoprocessing.warp_raster,
args=(crop_climate_bin_raster_path,
crop_climate_bin_raster_info['pixel_size'],
clipped_climate_bin_raster_path, 'near'),
kwargs={'target_bb': landcover_wgs84_bounding_box},
target_path_list=[clipped_climate_bin_raster_path],
task_name='crop_climate_bin')
# pygeoprocessing.warp_raster(
# crop_climate_bin_raster_path,
# crop_climate_bin_raster_info['pixel_size'],
# clipped_climate_bin_raster_path, 'near',
# target_bb=landcover_wgs84_bounding_box)
climate_percentile_yield_table_path = os.path.join(
args['model_data_path'],
@ -195,20 +218,38 @@ def execute(args):
output_dir,
_COARSE_YIELD_PERCENTILE_FILE_PATTERN % (
crop_name, yield_percentile_id, file_suffix))
pygeoprocessing.reclassify_raster(
(clipped_climate_bin_raster_path, 1), bin_to_percentile_yield,
coarse_yield_percentile_raster_path, gdal.GDT_Float32,
_NODATA_YIELD)
create_coarse_yield_percentile_task = graph.add_task(
func=pygeoprocessing.reclassify_raster,
args=((clipped_climate_bin_raster_path, 1), bin_to_percentile_yield,
coarse_yield_percentile_raster_path, gdal.GDT_Float32,
_NODATA_YIELD),
target_path_list=[coarse_yield_percentile_raster_path],
dependent_task_list=[crop_climate_bin_task],
task_name='create_coarse_yield_percentile_%s_%s' % (crop_name, yield_percentile_id))
# pygeoprocessing.reclassify_raster(
# (clipped_climate_bin_raster_path, 1), bin_to_percentile_yield,
# coarse_yield_percentile_raster_path, gdal.GDT_Float32,
# _NODATA_YIELD)
LOGGER.info(
"Interpolate %s %s yield raster to landcover resolution.",
crop_name, yield_percentile_id)
pygeoprocessing.warp_raster(
coarse_yield_percentile_raster_path,
landcover_raster_info['pixel_size'],
interpolated_yield_percentile_raster_path, 'cubicspline',
target_sr_wkt=landcover_raster_info['projection'],
target_bb=landcover_raster_info['bounding_box'])
create_interpolated_yield_percentile_task = graph.add_task(
func=pygeoprocessing.warp_raster,
args=(coarse_yield_percentile_raster_path,
landcover_raster_info['pixel_size'],
interpolated_yield_percentile_raster_path, 'cubicspline'),
kwargs={'target_sr_wkt': landcover_raster_info['projection'],
'target_bb': landcover_raster_info['bounding_box']},
target_path_list=[interpolated_yield_percentile_raster_path],
dependent_task_list=[create_coarse_yield_percentile_task],
task_name='create_interpolated_yield_percentile_%s_%s' % (crop_name, yield_percentile_id))
# pygeoprocessing.warp_raster(
# coarse_yield_percentile_raster_path,
# landcover_raster_info['pixel_size'],
# interpolated_yield_percentile_raster_path, 'cubicspline',
# target_sr_wkt=landcover_raster_info['projection'],
# target_bb=landcover_raster_info['bounding_box'])
LOGGER.info(
"Calculate yield for %s at %s", crop_name,
@ -218,26 +259,27 @@ def execute(args):
_PERCENTILE_CROP_PRODUCTION_FILE_PATTERN % (
crop_name, yield_percentile_id, file_suffix))
def _crop_production_op(lulc_array, yield_array):
"""Mask in yields that overlap with `crop_lucode`."""
result = numpy.empty(lulc_array.shape, dtype=numpy.float32)
result[:] = _NODATA_YIELD
valid_mask = lulc_array != landcover_nodata
lulc_mask = lulc_array == crop_lucode
result[valid_mask] = 0
result[lulc_mask] = (
yield_array[lulc_mask] * pixel_area_ha)
return result
pygeoprocessing.raster_calculator(
[(args['landcover_raster_path'], 1),
(interpolated_yield_percentile_raster_path, 1)],
_crop_production_op, percentile_crop_production_raster_path,
gdal.GDT_Float32, _NODATA_YIELD)
create_percentile_production_task = graph.add_task(
func=pygeoprocessing.raster_calculator,
args=([(args['landcover_raster_path'], 1),
(interpolated_yield_percentile_raster_path, 1),
(landcover_nodata, 'raw'), (crop_lucode, 'raw'),
(pixel_area_ha, 'raw')],
_crop_production_op, percentile_crop_production_raster_path,
gdal.GDT_Float32, _NODATA_YIELD),
target_path_list=[percentile_crop_production_raster_path],
dependent_task_list=[create_interpolated_yield_percentile_task],
task_name='create_percentile_production_%s_%s' % (crop_name, yield_percentile_id))
# pygeoprocessing.raster_calculator(
# [(args['landcover_raster_path'], 1),
# (interpolated_yield_percentile_raster_path, 1)],
# _crop_production_op, percentile_crop_production_raster_path,
# gdal.GDT_Float32, _NODATA_YIELD)
# calculate the non-zero production area for that crop, assuming that
# all the percentile rasters have non-zero production so it's okay to
# use just one of the percentile rasters
# TODO: move this to report generation ara alongside production_lookup
LOGGER.info("Calculating production area.")
for _, band_values in pygeoprocessing.iterblocks(
percentile_crop_production_raster_path):
@ -256,11 +298,19 @@ def execute(args):
clipped_observed_yield_raster_path = os.path.join(
output_dir, _CLIPPED_OBSERVED_YIELD_FILE_PATTERN % (
crop_name, file_suffix))
pygeoprocessing.warp_raster(
global_observed_yield_raster_path,
global_observed_yield_raster_info['pixel_size'],
clipped_observed_yield_raster_path, 'near',
target_bb=landcover_wgs84_bounding_box)
clip_global_observed_yield_task = graph.add_task(
func=pygeoprocessing.warp_raster,
args=(global_observed_yield_raster_path,
global_observed_yield_raster_info['pixel_size'],
clipped_observed_yield_raster_path, 'near'),
kwargs={'target_bb': landcover_wgs84_bounding_box},
target_path_list=[clipped_observed_yield_raster_path],
task_name='clip_global_observed_yield_%s_' % crop_name)
# pygeoprocessing.warp_raster(
# global_observed_yield_raster_path,
# global_observed_yield_raster_info['pixel_size'],
# clipped_observed_yield_raster_path, 'near',
# target_bb=landcover_wgs84_bounding_box)
observed_yield_nodata = (
global_observed_yield_raster_info['nodata'][0])
@ -269,19 +319,19 @@ def execute(args):
output_dir, _ZEROED_OBSERVED_YIELD_FILE_PATTERN % (
crop_name, file_suffix))
def _zero_observed_yield_op(observed_yield_array):
"""Calculate observed 'actual' yield."""
result = numpy.empty(
observed_yield_array.shape, dtype=numpy.float32)
result[:] = 0.0
valid_mask = observed_yield_array != observed_yield_nodata
result[valid_mask] = observed_yield_array[valid_mask]
return result
pygeoprocessing.raster_calculator(
[(clipped_observed_yield_raster_path, 1)],
_zero_observed_yield_op, zeroed_observed_yield_raster_path,
gdal.GDT_Float32, observed_yield_nodata)
nodata_to_zero_for_observed_yield_task = graph.add_task(
func=pygeoprocessing.raster_calculator,
args=([(clipped_observed_yield_raster_path, 1),
(observed_yield_nodata, 'raw')],
_zero_observed_yield_op, zeroed_observed_yield_raster_path,
gdal.GDT_Float32, observed_yield_nodata),
target_path_list=[zeroed_observed_yield_raster_path],
dependent_task_list=[clip_global_observed_yield_task],
task_name='nodata_to_zero_for_observed_yield_%s_' % crop_name)
# pygeoprocessing.raster_calculator(
# [(clipped_observed_yield_raster_path, 1)],
# _zero_observed_yield_op, zeroed_observed_yield_raster_path,
# gdal.GDT_Float32, observed_yield_nodata)
interpolated_observed_yield_raster_path = os.path.join(
output_dir, _INTERPOLATED_OBSERVED_YIELD_FILE_PATTERN % (
@ -289,33 +339,44 @@ def execute(args):
LOGGER.info(
"Interpolating observed %s raster to landcover.", crop_name)
pygeoprocessing.warp_raster(
zeroed_observed_yield_raster_path,
landcover_raster_info['pixel_size'],
interpolated_observed_yield_raster_path, 'cubicspline',
target_sr_wkt=landcover_raster_info['projection'],
target_bb=landcover_raster_info['bounding_box'])
def _mask_observed_yield(lulc_array, observed_yield_array):
"""Mask total observed yield to crop lulc type."""
result = numpy.empty(lulc_array.shape, dtype=numpy.float32)
result[:] = observed_yield_nodata
valid_mask = lulc_array != landcover_nodata
lulc_mask = lulc_array == crop_lucode
result[valid_mask] = 0
result[lulc_mask] = (
observed_yield_array[lulc_mask] * pixel_area_ha)
return result
interpolate_observed_yield_task = graph.add_task(
func=pygeoprocessing.warp_raster,
args=(zeroed_observed_yield_raster_path,
landcover_raster_info['pixel_size'],
interpolated_observed_yield_raster_path, 'cubicspline'),
kwargs={'target_sr_wkt': landcover_raster_info['projection'],
'target_bb': landcover_raster_info['bounding_box']},
target_path_list=[interpolated_observed_yield_raster_path],
dependent_task_list=[nodata_to_zero_for_observed_yield_task],
task_name='interpolate_observed_yield_to_lulc_%s' % crop_name)
# pygeoprocessing.warp_raster(
# zeroed_observed_yield_raster_path,
# landcover_raster_info['pixel_size'],
# interpolated_observed_yield_raster_path, 'cubicspline',
# target_sr_wkt=landcover_raster_info['projection'],
# target_bb=landcover_raster_info['bounding_box'])
observed_production_raster_path = os.path.join(
output_dir, _OBSERVED_PRODUCTION_FILE_PATTERN % (
crop_name, file_suffix))
pygeoprocessing.raster_calculator(
[(args['landcover_raster_path'], 1),
(interpolated_observed_yield_raster_path, 1)],
_mask_observed_yield, observed_production_raster_path,
gdal.GDT_Float32, observed_yield_nodata)
calculate_observed_production_task = graph.add_task(
func=pygeoprocessing.raster_calculator,
args=([(args['landcover_raster_path'], 1),
(interpolated_observed_yield_raster_path, 1),
(observed_yield_nodata, 'raw'), (landcover_nodata, 'raw'),
(crop_lucode, 'raw'), (pixel_area_ha, 'raw')],
_mask_observed_yield_op, observed_production_raster_path,
gdal.GDT_Float32, observed_yield_nodata),
target_path_list=[observed_production_raster_path],
dependent_task_list=[interpolate_observed_yield_task],
task_name='calculate_observed_production_%s' % crop_name)
graph.join()
# pygeoprocessing.raster_calculator(
# [(args['landcover_raster_path'], 1),
# (interpolated_observed_yield_raster_path, 1)],
# _mask_observed_yield_op, observed_production_raster_path,
# gdal.GDT_Float32, observed_yield_nodata)
# both 'crop_nutrient.csv' and 'crop' are known data/header values for
# this model data.
@ -497,6 +558,43 @@ def execute(args):
aggregate_table.write('\n')
def _crop_production_op(
lulc_array, yield_array, landcover_nodata, crop_lucode, pixel_area_ha):
"""Mask in yields that overlap with `crop_lucode`."""
result = numpy.empty(lulc_array.shape, dtype=numpy.float32)
result[:] = _NODATA_YIELD
valid_mask = lulc_array != landcover_nodata
lulc_mask = lulc_array == crop_lucode
result[valid_mask] = 0
result[lulc_mask] = (
yield_array[lulc_mask] * pixel_area_ha)
return result
def _zero_observed_yield_op(observed_yield_array, observed_yield_nodata):
"""Calculate observed 'actual' yield."""
result = numpy.empty(
observed_yield_array.shape, dtype=numpy.float32)
result[:] = 0.0
valid_mask = observed_yield_array != observed_yield_nodata
result[valid_mask] = observed_yield_array[valid_mask]
return result
def _mask_observed_yield_op(
lulc_array, observed_yield_array, observed_yield_nodata,
landcover_nodata, crop_lucode, pixel_area_ha):
"""Mask total observed yield to crop lulc type."""
result = numpy.empty(lulc_array.shape, dtype=numpy.float32)
result[:] = observed_yield_nodata
valid_mask = lulc_array != landcover_nodata
lulc_mask = lulc_array == crop_lucode
result[valid_mask] = 0
result[lulc_mask] = (
observed_yield_array[lulc_mask] * pixel_area_ha)
return result
# This decorator ensures the input arguments are formatted for InVEST
@validation.invest_validator
def validate(args, limit_to=None):

View File

@ -91,6 +91,7 @@ class CropProductionPercentile(model.InVESTModel):
args = {
self.workspace.args_key: self.workspace.value(),
self.suffix.args_key: self.suffix.value(),
self.n_workers.args_key: self.n_workers.value(),
self.model_data_path.args_key: self.model_data_path.value(),
self.landcover_raster_path.args_key:
self.landcover_raster_path.value(),

View File

@ -44,7 +44,8 @@ class CropProductionTests(unittest.TestCase):
'aggregate_polygon_path': os.path.join(
SAMPLE_DATA_PATH, 'aggregate_shape.shp'),
'aggregate_polygon_id': 'id',
'model_data_path': MODEL_DATA_PATH
'model_data_path': MODEL_DATA_PATH,
'n_workers': '-1'
}
crop_production_percentile.execute(args)