invest/tests/test_coastal_blue_carbon.py

1063 lines
44 KiB
Python

# -*- coding: utf-8 -*-
"""Tests for Coastal Blue Carbon Functions."""
import glob
import logging
import os
import pprint
import shutil
import tempfile
import textwrap
import unittest
import numpy
import pandas
import pygeoprocessing
from natcap.invest import utils
from natcap.invest import validation
from osgeo import gdal
from osgeo import osr
gdal.UseExceptions()
REGRESSION_DATA = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data',
'coastal_blue_carbon')
LOGGER = logging.getLogger(__name__)
class TestPreprocessor(unittest.TestCase):
"""Test Coastal Blue Carbon preprocessor functions."""
def setUp(self):
"""Create a temp directory for the workspace."""
self.workspace_dir = tempfile.mkdtemp()
def tearDown(self):
"""Remove workspace."""
shutil.rmtree(self.workspace_dir)
def test_constructed_data(self):
"""CBC Preprocessor: Test on constructed data."""
from natcap.invest.coastal_blue_carbon import preprocessor
pixelsize = (10, -10)
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
wkt = srs.ExportToWkt()
nodata = -1
origin_2000 = (30, -50)
lulc_2000_array = numpy.array([
[nodata, nodata, 1, 2],
[nodata, nodata, 2, 2],
[nodata, nodata, 2, 1]], dtype=numpy.int32)
origin_2010 = (50, -50)
lulc_2010_array = numpy.array([
[2, 2],
[2, 2],
[2, 2]], dtype=numpy.int32)
lulc_2000_path = os.path.join(self.workspace_dir, '2000.tif')
lulc_2010_path = os.path.join(self.workspace_dir, '2010.tif')
for (array, origin, target_path) in [
(lulc_2000_array, origin_2000, lulc_2000_path),
(lulc_2010_array, origin_2010, lulc_2010_path)]:
pygeoprocessing.geoprocessing.numpy_array_to_raster(
array, nodata, pixelsize, origin, wkt, target_path)
snapshot_csv_path = os.path.join(self.workspace_dir, 'snapshots.csv')
with open(snapshot_csv_path, 'w') as snapshots_table:
snapshots_table.write(textwrap.dedent(
"""\
snapshot_year,raster_path
2010,2010.tif
2000,2000.tif
"""))
lulc_attributes_path = os.path.join(self.workspace_dir,
'lulc-attributes.csv')
with open(lulc_attributes_path, 'w') as attributes_table:
attributes_table.write(textwrap.dedent(
"""\
lulc-class,code,is_coastal_blue_carbon_habitat
mangrove,1,TRUE
parkinglot,2,FALSE
"""))
args = {
'workspace_dir': os.path.join(self.workspace_dir, 'workspace'),
'lulc_lookup_table_path': lulc_attributes_path,
'landcover_snapshot_csv': snapshot_csv_path,
}
preprocessor.execute(args)
# Verify output raster dimensions.
dimensions = set()
for year in (2000, 2010):
lulc_path = os.path.join(
args['workspace_dir'], 'outputs_preprocessor',
f'aligned_lulc_{year}.tif')
dimensions.add(
pygeoprocessing.get_raster_info(lulc_path)['raster_size'])
self.assertEqual(dimensions, {(2, 3)})
transition_table_path = os.path.join(
args['workspace_dir'], 'outputs_preprocessor',
'carbon_pool_transition_template.csv')
with open(transition_table_path) as transition_table:
self.assertEqual(
transition_table.readline(),
'lulc-class,mangrove,parkinglot\n')
self.assertEqual(
transition_table.readline(),
'mangrove,,disturb\n')
self.assertEqual(
transition_table.readline(),
'parkinglot,,NCC\n')
# After the above lines is a blank line, then the legend.
# Deliberately not testing the legend.
self.assertEqual(transition_table.readline(), '\n')
def test_sample_data(self):
"""CBC Preprocessor: Test on sample data."""
from natcap.invest.coastal_blue_carbon import preprocessor
snapshot_csv_path = os.path.join(
REGRESSION_DATA, 'inputs', 'snapshots.csv')
args = {
'workspace_dir': os.path.join(self.workspace_dir, 'workspace'),
'results_suffix': '150225',
'lulc_lookup_table_path': os.path.join(
REGRESSION_DATA, 'inputs', 'lulc_lookup.csv'),
'landcover_snapshot_csv': snapshot_csv_path,
}
preprocessor.execute(args)
# walk through all files in the workspace and assert that outputs have
# the file suffix.
non_suffixed_files = []
outputs_dir = os.path.join(
args['workspace_dir'], 'outputs_preprocessor')
for root_dir, dirnames, filenames in os.walk(outputs_dir):
for filename in filenames:
if not filename.lower().endswith('.txt'): # ignore logfile
basename, extension = os.path.splitext(filename)
if not basename.endswith('_150225'):
path_rel_to_workspace = os.path.relpath(
os.path.join(root_dir, filename),
args['workspace_dir'])
non_suffixed_files.append(path_rel_to_workspace)
if non_suffixed_files:
self.fail('%s files are missing suffixes: %s' %
(len(non_suffixed_files),
pprint.pformat(non_suffixed_files)))
expected_landcover_codes = set(range(0, 24))
found_landcover_codes = set(pandas.read_csv(
os.path.join(outputs_dir, 'carbon_biophysical_table_template_150225.csv')
)['code'].values)
self.assertEqual(expected_landcover_codes, found_landcover_codes)
def test_transition_table(self):
"""CBC Preprocessor: Test creation of transition table."""
from natcap.invest.coastal_blue_carbon import preprocessor
srs = osr.SpatialReference()
srs.ImportFromEPSG(3157)
projection_wkt = srs.ExportToWkt()
origin = (443723.127327877911739, 4956546.905980412848294)
matrix_a = numpy.array([
[0, 1],
[0, 1],
[0, 1]], dtype=numpy.int16)
filename_a = os.path.join(self.workspace_dir, 'raster_a.tif')
pygeoprocessing.numpy_array_to_raster(
matrix_a, -1, (100, -100), origin, projection_wkt, filename_a)
matrix_b = numpy.array([
[0, 1],
[1, 0],
[-1, -1]], dtype=numpy.int16)
filename_b = os.path.join(self.workspace_dir, 'raster_b.tif')
pygeoprocessing.numpy_array_to_raster(
matrix_b, -1, (100, -100), origin, projection_wkt, filename_b)
landcover_table_path = os.path.join(self.workspace_dir,
'lulc_table.csv')
with open(landcover_table_path, 'w') as lulc_csv:
lulc_csv.write('code,lulc-class,is_coastal_blue_carbon_habitat\n')
lulc_csv.write('0,mangrove,True\n')
lulc_csv.write('1,parking lot,False\n')
landcover_df = validation.get_validated_dataframe(
landcover_table_path,
**preprocessor.MODEL_SPEC['args']['lulc_lookup_table_path'])
target_table_path = os.path.join(self.workspace_dir,
'transition_table.csv')
# Remove landcover code 1 from the table; expect error.
landcover_df = landcover_df.drop(1)
with self.assertRaises(ValueError) as context:
preprocessor._create_transition_table(
landcover_df, [filename_a, filename_b], target_table_path)
self.assertIn('missing a row with the landuse code 1',
str(context.exception))
# Re-load the landcover table
landcover_df = validation.get_validated_dataframe(
landcover_table_path,
**preprocessor.MODEL_SPEC['args']['lulc_lookup_table_path'])
preprocessor._create_transition_table(
landcover_df, [filename_a, filename_b], target_table_path)
with open(target_table_path) as transition_table:
self.assertEqual(
transition_table.readline(),
'lulc-class,mangrove,parking lot\n')
self.assertEqual(
transition_table.readline(),
'mangrove,accum,disturb\n')
self.assertEqual(
transition_table.readline(),
'parking lot,accum,NCC\n')
# After the above lines is a blank line, then the legend.
# Deliberately not testing the legend.
self.assertEqual(transition_table.readline(), '\n')
class TestCBC2(unittest.TestCase):
"""Test Coastal Blue Carbon main model functions."""
def setUp(self):
"""Create a temp directory for the workspace."""
self.workspace_dir = tempfile.mkdtemp()
def tearDown(self):
"""Remove workspace after each test function."""
shutil.rmtree(self.workspace_dir)
def test_read_invalid_transition_matrix(self):
"""CBC: Test exceptions in invalid transition structure."""
# The full biophysical table will have much, much more information. To
# keep the test simple, I'm only tracking the columns I know I'll need
# in this function.
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
biophysical_table = pandas.DataFrame({
1: {'lulc-class': 'a',
'soil-yearly-accumulation': 2,
'biomass-yearly-accumulation': 3,
'soil-high-impact-disturb': 4,
'biomass-high-impact-disturb': 5},
2: {'lulc-class': 'b',
'soil-yearly-accumulation': 6,
'biomass-yearly-accumulation': 7,
'soil-high-impact-disturb': 8,
'biomass-high-impact-disturb': 9},
3: {'lulc-class': 'c',
'soil-yearly-accumulation': 10,
'biomass-yearly-accumulation': 11,
'soil-high-impact-disturb': 12,
'biomass-high-impact-disturb': 13}
}).T
transition_csv_path = os.path.join(self.workspace_dir,
'transitions.csv')
with open(transition_csv_path, 'w') as transition_csv:
transition_csv.write('lulc-class,a,b,c\n')
transition_csv.write('missing code,NCC,accum,high-impact-disturb\n')
transition_csv.write('b,,NCC,accum\n')
transition_csv.write('c,accum,,NCC\n')
transition_csv.write(',,,\n')
transition_csv.write(',legend,,') # simulate legend
with self.assertRaises(ValueError) as cm:
disturbance_matrices, accumulation_matrices = (
coastal_blue_carbon._read_transition_matrix(
transition_csv_path, biophysical_table))
self.assertIn("'lulc-class' column has a value, 'missing code'",
str(cm.exception))
with open(transition_csv_path, 'w') as transition_csv:
transition_csv.write('lulc-class,missing code,b,c\n')
transition_csv.write('a,NCC,accum,high-impact-disturb\n')
transition_csv.write('b,,NCC,accum\n')
transition_csv.write('c,accum,,NCC\n')
transition_csv.write(',,,\n')
transition_csv.write(',legend,,') # simulate legend
with self.assertRaises(ValueError) as cm:
disturbance_matrices, accumulation_matrices = (
coastal_blue_carbon._read_transition_matrix(
transition_csv_path, biophysical_table))
self.assertIn("The transition table's header row has a column name, "
"'missing code',", str(cm.exception))
def test_read_transition_matrix(self):
"""CBC: Test transition matrix reading."""
# The full biophysical table will have much, much more information. To
# keep the test simple, I'm only tracking the columns I know I'll need
# in this function.
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
biophysical_table = pandas.DataFrame({
1: {'lulc-class': 'a',
'soil-yearly-accumulation': 2,
'biomass-yearly-accumulation': 3,
'soil-high-impact-disturb': 4,
'biomass-high-impact-disturb': 5},
2: {'lulc-class': 'b',
'soil-yearly-accumulation': 6,
'biomass-yearly-accumulation': 7,
'soil-high-impact-disturb': 8,
'biomass-high-impact-disturb': 9},
3: {'lulc-class': 'c',
'soil-yearly-accumulation': 10,
'biomass-yearly-accumulation': 11,
'soil-high-impact-disturb': 12,
'biomass-high-impact-disturb': 13}
}).T
transition_csv_path = os.path.join(self.workspace_dir,
'transitions.csv')
with open(transition_csv_path, 'w') as transition_csv:
transition_csv.write('lulc-class,a,b,c\n')
transition_csv.write('a,NCC,accum,high-impact-disturb\n')
transition_csv.write('b,,NCC,accum\n')
transition_csv.write('c,accum,,NCC\n')
transition_csv.write(',,,\n')
transition_csv.write(',legend,,') # simulate legend
disturbance_matrices, accumulation_matrices = (
coastal_blue_carbon._read_transition_matrix(
transition_csv_path, biophysical_table))
expected_biomass_disturbance = numpy.zeros((4, 4), dtype=numpy.float32)
expected_biomass_disturbance[1, 3] = (
biophysical_table['biomass-high-impact-disturb'][1])
numpy.testing.assert_allclose(
expected_biomass_disturbance,
disturbance_matrices['biomass'].toarray())
expected_soil_disturbance = numpy.zeros((4, 4), dtype=numpy.float32)
expected_soil_disturbance[1, 3] = (
biophysical_table['soil-high-impact-disturb'][1])
numpy.testing.assert_allclose(
expected_soil_disturbance,
disturbance_matrices['soil'].toarray())
expected_biomass_accumulation = numpy.zeros(
(4, 4), dtype=numpy.float32)
expected_biomass_accumulation[3, 1] = (
biophysical_table['biomass-yearly-accumulation'][1])
expected_biomass_accumulation[1, 2] = (
biophysical_table['biomass-yearly-accumulation'][2])
expected_biomass_accumulation[2, 3] = (
biophysical_table['biomass-yearly-accumulation'][3])
numpy.testing.assert_allclose(
expected_biomass_accumulation,
accumulation_matrices['biomass'].toarray())
expected_soil_accumulation = numpy.zeros((4, 4), dtype=numpy.float32)
expected_soil_accumulation[3, 1] = (
biophysical_table['soil-yearly-accumulation'][1])
expected_soil_accumulation[1, 2] = (
biophysical_table['soil-yearly-accumulation'][2])
expected_soil_accumulation[2, 3] = (
biophysical_table['soil-yearly-accumulation'][3])
numpy.testing.assert_allclose(
expected_soil_accumulation,
accumulation_matrices['soil'].toarray())
def test_emissions(self):
"""CBC: Check emissions calculations."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
volume_disturbed_carbon = numpy.array(
[[5.5, coastal_blue_carbon.NODATA_FLOAT32_MIN]], dtype=numpy.float32)
year_last_disturbed = numpy.array(
[[10, coastal_blue_carbon.NODATA_UINT16_MAX]], dtype=numpy.uint16)
half_life = numpy.array([[7.5, 7.5]], dtype=numpy.float32)
current_year = 15
result_matrix = coastal_blue_carbon._calculate_emissions(
volume_disturbed_carbon, year_last_disturbed, half_life,
current_year)
# Calculated by hand.
expected_array = numpy.array([
[0.3058625, coastal_blue_carbon.NODATA_FLOAT32_MIN]],
dtype=numpy.float32)
numpy.testing.assert_allclose(
result_matrix, expected_array, rtol=1E-6)
def test_add_rasters(self):
"""CBC: Check that we can sum rasters."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
srs = osr.SpatialReference()
srs.ImportFromEPSG(32731) # WGS84 / UTM zone 31 S
wkt = srs.ExportToWkt()
raster_a_path = os.path.join(self.workspace_dir, 'a.tif')
pygeoprocessing.numpy_array_to_raster(
numpy.array([[5, 15, 12, 15]], dtype=numpy.uint8),
15, (2, -2), (2, -2), wkt, raster_a_path)
raster_b_path = os.path.join(self.workspace_dir, 'b.tif')
pygeoprocessing.numpy_array_to_raster(
numpy.array([[3, 4, 5, 5]], dtype=numpy.uint8),
5, (2, -2), (2, -2), wkt, raster_b_path)
target_path = os.path.join(self.workspace_dir, 'output.tif')
coastal_blue_carbon._sum_n_rasters(
[raster_a_path, raster_b_path], target_path)
nodata = coastal_blue_carbon.NODATA_FLOAT32_MIN
try:
raster = gdal.OpenEx(target_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
numpy.array([[8, 4, 12, nodata]], dtype=numpy.float32))
finally:
raster = None
@staticmethod
def _create_model_args(target_dir):
srs = osr.SpatialReference()
srs.ImportFromEPSG(32731) # WGS84 / UTM zone 31 S
wkt = srs.ExportToWkt()
biophysical_table = [
['code', 'lulc-class', 'biomass-initial', 'soil-initial',
'litter-initial', 'biomass-half-life',
'biomass-low-impact-disturb', 'biomass-med-impact-disturb',
'biomass-high-impact-disturb', 'biomass-yearly-accumulation',
'soil-half-life', 'soil-low-impact-disturb',
'soil-med-impact-disturb', 'soil-high-impact-disturb',
'soil-yearly-accumulation', 'litter-yearly-accumulation'],
[1, 'mangrove',
64, 313, 3, # initial
15, 0.5, 0.5, 1, 2, # biomass
7.5, 0.3, 0.5, 0.66, 5.35, # soil
1], # litter accum.
[2, 'parking lot',
0, 0, 0, # initial
0, 0, 0, 0, 0, # biomass
0, 0, 0, 0, 0, # soil
0], # litter accum.
]
biophysical_table_path = os.path.join(
target_dir, 'biophysical.csv')
with open(biophysical_table_path, 'w') as bio_table:
for line_list in biophysical_table:
line = ','.join(str(field) for field in line_list)
bio_table.write(f'{line}\n')
transition_matrix = [
['lulc-class', 'mangrove', 'parking lot'],
['mangrove', 'NCC', 'high-impact-disturb'],
['parking lot', 'accum', 'NCC']
]
transition_matrix_path = os.path.join(
target_dir, 'transitions.csv')
with open(transition_matrix_path, 'w') as transition_table:
for line_list in transition_matrix:
line = ','.join(line_list)
transition_table.write(f'{line}\n')
baseline_landcover_raster_path = os.path.join(
target_dir, 'baseline_lulc.tif')
baseline_matrix = numpy.array([[1, 2]], dtype=numpy.uint8)
pygeoprocessing.numpy_array_to_raster(
baseline_matrix, 255, (2, -2), (2, -2), wkt,
baseline_landcover_raster_path)
snapshot_2010_raster_path = os.path.join(
target_dir, 'snapshot_2010.tif')
snapshot_2010_matrix = numpy.array([[2, 1]], dtype=numpy.uint8)
pygeoprocessing.numpy_array_to_raster(
snapshot_2010_matrix, 255, (2, -2), (2, -2), wkt,
snapshot_2010_raster_path)
snapshot_2020_raster_path = os.path.join(
target_dir, 'snapshot_2020.tif')
snapshot_2020_matrix = numpy.array([[1, 2]], dtype=numpy.uint8)
pygeoprocessing.numpy_array_to_raster(
snapshot_2020_matrix, 255, (2, -2), (2, -2), wkt,
snapshot_2020_raster_path)
snapshot_rasters_csv_path = os.path.join(
target_dir, 'snapshot_rasters.csv')
baseline_year = 2000
with open(snapshot_rasters_csv_path, 'w') as snapshot_rasters_csv:
snapshot_rasters_csv.write('snapshot_year,raster_path\n')
snapshot_rasters_csv.write(
f'{baseline_year},{baseline_landcover_raster_path}\n')
snapshot_rasters_csv.write(
f'2010,{snapshot_2010_raster_path}\n')
snapshot_rasters_csv.write(
f'2020,{snapshot_2020_raster_path}\n')
args = {
'landcover_transitions_table': transition_matrix_path,
'landcover_snapshot_csv': snapshot_rasters_csv_path,
'biophysical_table_path': biophysical_table_path,
'analysis_year': 2030,
'do_economic_analysis': True,
'use_price_table': True,
'price_table_path': os.path.join(target_dir,
'price_table.csv'),
'discount_rate': 4,
}
with open(args['price_table_path'], 'w') as price_table:
price_table.write('year,price\n')
prior_year_price = 1.0
for year in range(baseline_year,
args['analysis_year']+1):
price = prior_year_price * 1.04
price_table.write(f'{year},{price}\n')
return args
def test_duplicate_lulc_classes(self):
"""CBC: Raise an execption if duplicate lulc-classes."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = os.path.join(self.workspace_dir, 'workspace')
with open(args['biophysical_table_path'], 'r') as table:
lines = table.readlines()
with open(args['biophysical_table_path'], 'a') as table:
last_line_contents = lines[-1].strip().split(',')
last_line_contents[0] = '3' # assign a new code
table.write(','.join(last_line_contents))
with self.assertRaises(ValueError) as context:
coastal_blue_carbon.execute(args)
self.assertIn("`lulc-class` column must be unique",
str(context.exception))
def test_model_no_analysis_year_no_price_table(self):
"""CBC: Test the model's execution."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = os.path.join(self.workspace_dir, 'workspace')
del args['analysis_year'] # final year is 2020.
args['use_price_table'] = False
args['inflation_rate'] = 5
args['price'] = 10.0
coastal_blue_carbon.execute(args)
# Sample values calculated by hand. Pixel 0 only accumulates. Pixel 1
# has no accumulation (per the biophysical table) and also has no
# emissions.
expected_sequestration_2000_to_2010 = numpy.array(
[[83.5, 0]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2000-and-2010.tif'))
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2000_to_2010)
finally:
raster = None
expected_sequestration_2010_to_2020 = numpy.array(
[[-176.9792, 73.5]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2010-and-2020.tif'))
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2010_to_2020, rtol=1e-6)
finally:
raster = None
# Total sequestration is the sum of all the previous sequestration.
expected_total_sequestration = (
expected_sequestration_2000_to_2010 +
expected_sequestration_2010_to_2020)
raster_path = os.path.join(
args['workspace_dir'], 'output',
'total-net-carbon-sequestration.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_total_sequestration, rtol=1e-6)
finally:
raster = None
expected_net_present_value_at_2020 = numpy.array(
[[-20506.314, 16123.521]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output', 'net-present-value-at-2020.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_net_present_value_at_2020, rtol=1e-6)
finally:
raster = None
def test_model_one_transition_no_analysis_year(self):
"""CBC: Test the model on one transition with no analysis year.
This test came up while looking into an issue reported on the forums:
https://community.naturalcapitalproject.org/t/coastal-blue-carbon-negative-carbon-stocks/780/12
"""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = os.path.join(self.workspace_dir, 'workspace')
prior_snapshots = validation.get_validated_dataframe(
args['landcover_snapshot_csv'],
**coastal_blue_carbon.MODEL_SPEC['args']['landcover_snapshot_csv']
)['raster_path'].to_dict()
baseline_year = min(prior_snapshots.keys())
baseline_raster = prior_snapshots[baseline_year]
with open(args['landcover_snapshot_csv'], 'w') as snapshot_csv:
snapshot_csv.write('snapshot_year,raster_path\n')
snapshot_csv.write(f'{baseline_year},{baseline_raster}\n')
snapshot_csv.write(f'2010,{prior_snapshots[2010]}\n')
del args['analysis_year']
# Use valuation parameters rather than price table.
args['use_price_table'] = False
args['inflation_rate'] = 4
args['price'] = 1.0
coastal_blue_carbon.execute(args)
# Check sequestration raster
expected_sequestration_2000_to_2010 = numpy.array(
[[83.5, 0.]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2000-and-2010.tif'))
try:
raster= gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2000_to_2010)
finally:
raster = None
# Check valuation raster
# Discount rate here matches the inflation rate, so the value of the 10
# years' accumulation is just 1*(10 years of accumulation).
expected_net_present_value_at_2010 = numpy.array(
[[835.0, 0.]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output', 'net-present-value-at-2010.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_net_present_value_at_2010, rtol=1e-6)
finally:
raster = None
def test_model(self):
"""CBC: Test the model's execution."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = os.path.join(self.workspace_dir, 'workspace')
coastal_blue_carbon.execute(args)
# Sample values calculated by hand. Pixel 0 only accumulates. Pixel 1
# has no accumulation (per the biophysical table) and also has no
# emissions.
expected_sequestration_2000_to_2010 = numpy.array(
[[83.5, 0]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2000-and-2010.tif'))
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2000_to_2010)
finally:
raster = None
expected_sequestration_2010_to_2020 = numpy.array(
[[-176.9792, 73.5]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2010-and-2020.tif'))
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2010_to_2020, rtol=1e-6)
finally:
raster = None
expected_sequestration_2020_to_2030 = numpy.array(
[[73.5, -28.698004]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2020-and-2030.tif'))
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2020_to_2030, rtol=1e-6)
finally:
raster = None
# Total sequestration is the sum of all the previous sequestration.
expected_total_sequestration = (
expected_sequestration_2000_to_2010 +
expected_sequestration_2010_to_2020 +
expected_sequestration_2020_to_2030)
raster_path = os.path.join(
args['workspace_dir'], 'output',
'total-net-carbon-sequestration.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_total_sequestration, rtol=1e-6)
finally:
raster = None
expected_net_present_value_at_2030 = numpy.array(
[[-373.67245, 837.93445]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output', 'net-present-value-at-2030.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_net_present_value_at_2030, rtol=1e-6)
finally:
raster = None
# For emissions, make sure that each of the emissions sum rasters in
# the output folder match the sum of all annual emissions from the time
# period.
for emissions_raster_path in glob.glob(
os.path.join(args['workspace_dir'], 'output',
'carbon-emissions-between-*.tif')):
try:
raster = gdal.OpenEx(emissions_raster_path)
band = raster.GetRasterBand(1)
emissions_in_period = band.ReadAsArray()
finally:
band = None
raster = None
basename = os.path.splitext(
os.path.basename(emissions_raster_path))[0]
parts = basename.split('-')
start_year = int(parts[3])
end_year = int(parts[5])
summed_emissions_over_time_period = numpy.array(
[[0.0, 0.0]], dtype=numpy.float32)
for year in range(start_year, end_year):
for pool in ('soil', 'biomass'):
yearly_emissions_raster = os.path.join(
args['workspace_dir'], 'intermediate',
f'emissions-{pool}-{year}.tif')
try:
raster = gdal.OpenEx(yearly_emissions_raster)
band = raster.GetRasterBand(1)
summed_emissions_over_time_period += band.ReadAsArray()
finally:
band = None
raster = None
numpy.testing.assert_allclose(
emissions_in_period,
summed_emissions_over_time_period)
def test_model_no_transitions(self):
"""CBC: Test model without transitions.
When the model executes without transitions, we still evaluate carbon
sequestration (accumulation only) for the whole baseline period.
"""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = os.path.join(self.workspace_dir, 'workspace')
prior_snapshots = validation.get_validated_dataframe(
args['landcover_snapshot_csv'],
**coastal_blue_carbon.MODEL_SPEC['args']['landcover_snapshot_csv']
)['raster_path'].to_dict()
baseline_year = min(prior_snapshots.keys())
baseline_raster = prior_snapshots[baseline_year]
with open(args['landcover_snapshot_csv'], 'w') as snapshot_csv:
snapshot_csv.write('snapshot_year,raster_path\n')
snapshot_csv.write(f'{baseline_year},{baseline_raster}\n')
args['analysis_year'] = baseline_year + 10
# Use valuation parameters rather than price table.
args['use_price_table'] = False
args['inflation_rate'] = 4
args['price'] = 1.0
coastal_blue_carbon.execute(args)
# Check sequestration raster
expected_sequestration_2000_to_2010 = numpy.array(
[[83.5, 0.]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output',
('total-net-carbon-sequestration-between-'
'2000-and-2010.tif'))
try:
raster= gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_sequestration_2000_to_2010)
finally:
raster = None
# Check valuation raster
# Discount rate here matches the inflation rate, so the value of the 10
# years' accumulation is just 1*(10 years of accumulation).
expected_net_present_value_at_2010 = numpy.array(
[[835.0, 0.]], dtype=numpy.float32)
raster_path = os.path.join(
args['workspace_dir'], 'output', 'net-present-value-at-2010.tif')
try:
raster = gdal.OpenEx(raster_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_net_present_value_at_2010, rtol=1e-6)
finally:
raster = None
def test_validation(self):
"""CBC: Test custom validation."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = self.workspace_dir
# verify validation passes on basic set of arguments.
validation_warnings = coastal_blue_carbon.validate(args)
self.assertEqual([], validation_warnings)
# Now work through the extra validation warnings.
# test validation: invalid analysis year
prior_snapshots = validation.get_validated_dataframe(
args['landcover_snapshot_csv'],
**coastal_blue_carbon.MODEL_SPEC['args']['landcover_snapshot_csv']
)['raster_path'].to_dict()
baseline_year = min(prior_snapshots)
# analysis year must be >= the last transition year.
args['analysis_year'] = baseline_year
validation_warnings = coastal_blue_carbon.validate(args)
self.assertIn(
coastal_blue_carbon.INVALID_ANALYSIS_YEAR_MSG.format(
analysis_year=2000, latest_year=2020),
validation_warnings[0][1])
# Create an invalid transitions table.
invalid_raster_path = os.path.join(self.workspace_dir,
'invalid_raster.tif')
with open(invalid_raster_path, 'w') as raster:
raster.write('not a raster')
# Write over the landcover snapshot CSV
with open(args['landcover_snapshot_csv'], 'w') as snapshot_table:
snapshot_table.write('snapshot_year,raster_path\n')
snapshot_table.write(
f'{baseline_year},{prior_snapshots[baseline_year]}\n')
snapshot_table.write(
f"{baseline_year + 10},{invalid_raster_path}")
# Write invalid entries to landcover transition table
with open(args['landcover_transitions_table'], 'w') as transition_table:
transition_table.write('lulc-class,Developed,Forest,Water\n')
transition_table.write('Developed,NCC,,invalid\n')
transition_table.write('Forest,accum,disturb,low-impact-disturb\n')
transition_table.write('Water,disturb,med-impact-disturb,high-impact-disturb\n')
transition_options = [
'accum', 'high-impact-disturb', 'med-impact-disturb',
'low-impact-disturb', 'ncc']
validation_warnings = coastal_blue_carbon.validate(args)
self.assertEqual(len(validation_warnings), 2)
self.assertIn(
'File could not be opened as a GDAL raster',
validation_warnings[0][1])
self.assertIn(
coastal_blue_carbon.INVALID_TRANSITION_VALUES_MSG.format(
model_transitions=transition_options,
transition_values=['disturb', 'invalid']),
validation_warnings[1][1])
def test_track_first_disturbance(self):
"""CBC: Track disturbances over time."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
float32_nodata = coastal_blue_carbon.NODATA_FLOAT32_MIN
srs = osr.SpatialReference()
srs.ImportFromEPSG(32731) # WGS84 / UTM zone 31 S
wkt = srs.ExportToWkt()
disturbance_magnitude_path = os.path.join(
self.workspace_dir, 'disturbance_magnitude.tif')
disturbance_magnitude_matrix = numpy.array(
[[0.5, float32_nodata, 0.0]], dtype=numpy.float32)
pygeoprocessing.numpy_array_to_raster(
disturbance_magnitude_matrix, float32_nodata, (2, -2), (2, -2), wkt,
disturbance_magnitude_path)
stocks_path = os.path.join(
self.workspace_dir, 'stocks.tif')
stocks_matrix = numpy.array(
[[10, -1, 10]], dtype=numpy.float32)
pygeoprocessing.numpy_array_to_raster(
stocks_matrix, -1, (2, -2), (2, -2), wkt, stocks_path)
current_year = 2010
target_disturbance_path = os.path.join(
self.workspace_dir, 'disturbance_volume.tif')
target_year_of_disturbance = os.path.join(
self.workspace_dir, 'year_of_disturbance.tif')
coastal_blue_carbon._track_disturbance(
disturbance_magnitude_path, stocks_path,
None, # No prior disturbance volume
None, # No prior disturbance years
current_year, target_disturbance_path, target_year_of_disturbance)
try:
expected_disturbance = numpy.array(
[[5.0, float32_nodata, 0.0]], dtype=numpy.float32)
raster = gdal.OpenEx(target_disturbance_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_disturbance)
finally:
raster = None
try:
uint16_nodata = coastal_blue_carbon.NODATA_UINT16_MAX
expected_year_of_disturbance = numpy.array(
[[2010, uint16_nodata, uint16_nodata]], dtype=numpy.uint16)
raster = gdal.OpenEx(target_year_of_disturbance)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_year_of_disturbance)
finally:
raster = None
def test_track_later_disturbance(self):
"""CBC: Track disturbances over time."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
float32_nodata = coastal_blue_carbon.NODATA_FLOAT32_MIN
uint16_nodata = coastal_blue_carbon.NODATA_UINT16_MAX
srs = osr.SpatialReference()
srs.ImportFromEPSG(32731) # WGS84 / UTM zone 31 S
wkt = srs.ExportToWkt()
disturbance_magnitude_path = os.path.join(
self.workspace_dir, 'disturbance_magnitude.tif')
disturbance_magnitude_matrix = numpy.array(
[[0.5, float32_nodata, 0.0]], dtype=numpy.float32)
pygeoprocessing.numpy_array_to_raster(
disturbance_magnitude_matrix, float32_nodata, (2, -2), (2, -2), wkt,
disturbance_magnitude_path)
stocks_path = os.path.join(
self.workspace_dir, 'stocks.tif')
stocks_matrix = numpy.array(
[[10, -1, 10]], dtype=numpy.float32)
pygeoprocessing.numpy_array_to_raster(
stocks_matrix, -1, (2, -2), (2, -2), wkt, stocks_path)
prior_disturbance_volume_path = os.path.join(
self.workspace_dir, 'prior_disturbance_vol.tif')
prior_disturbance_vol_matrix = numpy.array(
[[700, float32_nodata, 300]], numpy.float32)
pygeoprocessing.numpy_array_to_raster(
prior_disturbance_vol_matrix, float32_nodata, (2, -2), (2, -2),
wkt, prior_disturbance_volume_path)
prior_year_disturbance_path = os.path.join(
self.workspace_dir, 'prior_disturbance_years.tif')
prior_year_disturbance_matrix = numpy.array(
[[2000, uint16_nodata, 1990]], numpy.uint16)
pygeoprocessing.numpy_array_to_raster(
prior_year_disturbance_matrix, uint16_nodata, (2, -2), (2, -2), wkt,
prior_year_disturbance_path)
target_disturbance_path = os.path.join(
self.workspace_dir, 'disturbance_volume.tif')
target_year_of_disturbance = os.path.join(
self.workspace_dir, 'year_of_disturbance.tif')
current_year = 2010
coastal_blue_carbon._track_disturbance(
disturbance_magnitude_path, stocks_path,
prior_disturbance_volume_path,
prior_year_disturbance_path,
current_year, target_disturbance_path, target_year_of_disturbance)
try:
expected_disturbance = numpy.array(
[[5.0, float32_nodata, 300]], dtype=numpy.float32)
raster = gdal.OpenEx(target_disturbance_path)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_disturbance)
finally:
raster = None
try:
expected_year_of_disturbance = numpy.array(
[[2010, uint16_nodata, 1990]], dtype=numpy.uint16)
raster = gdal.OpenEx(target_year_of_disturbance)
numpy.testing.assert_allclose(
raster.ReadAsArray(),
expected_year_of_disturbance)
finally:
raster = None
def test_validate_required_analysis_year(self):
"""CBC: analysis year validation (regression test for #1534)."""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
args = TestCBC2._create_model_args(self.workspace_dir)
args['workspace_dir'] = self.workspace_dir
args['analysis_year'] = None
# truncate the CSV so that it has only one snapshot year
with open(args['landcover_snapshot_csv'], 'r') as file:
lines = file.readlines()
with open(args['landcover_snapshot_csv'], 'w') as file:
file.writelines(lines[:2])
validation_warnings = coastal_blue_carbon.validate(args)
self.assertEqual(
validation_warnings,
[(['analysis_year'], coastal_blue_carbon.MISSING_ANALYSIS_YEAR_MSG)])
args['analysis_year'] = 2000 # set analysis year equal to snapshot year
validation_warnings = coastal_blue_carbon.validate(args)
self.assertEqual(
validation_warnings,
[(['analysis_year'],
coastal_blue_carbon.INVALID_ANALYSIS_YEAR_MSG.format(
analysis_year=2000, latest_year=2000))])