Merge latest from release branch; update Crop Production & Carbon tests

This commit is contained in:
Emily Davis 2025-02-10 18:05:47 -07:00
commit 3a13f1012b
20 changed files with 1628 additions and 18 deletions

View File

@ -430,21 +430,20 @@ jobs:
WORKBENCH_BINARY=$(find "$(pwd)/workbench/dist" -type f -name 'invest_*.dmg' | head -n 1)
make WORKBENCH_BIN_TO_SIGN="$WORKBENCH_BINARY" codesign_mac
#- name: Sign binaries (Windows)
# if: github.event_name != 'pull_request' && matrix.os == 'windows-latest' # secrets not available in PR
# env:
# CERT_FILE: Stanford-natcap-code-signing-cert-expires-2024-01-26.p12
# CERT_PASS: ${{ secrets.WINDOWS_CODESIGN_CERT_PASS }}
# run: |
# # figure out the path to signtool.exe (it keeps changing with SDK updates)
# SIGNTOOL_PATH=$(find 'C:\\Program Files (x86)\\Windows Kits\\10' -type f -name 'signtool.exe*' | head -n 1)
# WORKBENCH_BINARY=$(find "$(pwd)/workbench/dist" -type f -name 'invest_*.exe' | head -n 1)
# make WORKBENCH_BIN_TO_SIGN="$WORKBENCH_BINARY" SIGNTOOL="$SIGNTOOL_PATH" codesign_windows
- name: Deploy artifacts to GCS
if: github.event_name != 'pull_request'
run: make deploy
# This relies on the file existing on GCP, so it must be run after `make
# deploy` is called.
- name: Queue windows binaries for signing
if: github.event_name != 'pull_request' && matrix.os == 'windows-latest' # secrets not available in PR
env:
ACCESS_TOKEN: ${{ secrets.CODESIGN_QUEUE_ACCESS_TOKEN }}
run: |
cd codesigning
bash enqueue-current-windows-installer.sh
- name: Upload workbench binary artifact
if: always()
uses: actions/upload-artifact@v4

View File

@ -72,14 +72,16 @@ jobs:
rm -rf artifacts/Wheel*
# download each artifact separately so that the command will fail if any is missing
for artifact in Workbench-Windows-binary \
Workbench-macOS-binary \
for artifact in Workbench-macOS-binary \
InVEST-sample-data \
InVEST-user-guide
do
gh run download $RUN_ID --dir artifacts --name "$artifact"
done
# download the signed windows workbench file from GCS
wget --directory-prefix=artifacts https://storage.googleapis.com/releases.naturalcapitalproject.org/invest/${{ env.VERSION }}/workbench/invest_${{ env.VERSION }}_workbench_win32_x64.exe
# We build one sdist per combination of OS and python version, so just
# download and unzip all of them into an sdists directory so we can
# just grab the first one. This approach is more flexible to changes

View File

@ -45,6 +45,9 @@ Unreleased Changes
* Now testing and building against Python 3.13.
No longer testing and building with Python 3.8, which reached EOL.
https://github.com/natcap/invest/issues/1755
* InVEST's windows binaries are now distributed once again with a valid
signature, signed by Stanford University.
https://github.com/natcap/invest/issues/1580
* Workbench
* Auto-scrolling of log output is halted on user-initiated scrolling,
enabling easier inspection of log output while a model is running

View File

@ -356,10 +356,8 @@ codesign_mac:
codesign --timestamp --verbose --sign Stanford $(WORKBENCH_BIN_TO_SIGN)
codesign_windows:
$(GSUTIL) cp gs://stanford_cert/$(CERT_FILE) $(BUILD_DIR)/$(CERT_FILE)
"$(SIGNTOOL)" sign -fd SHA256 -f $(BUILD_DIR)/$(CERT_FILE) -p $(CERT_PASS) $(WORKBENCH_BIN_TO_SIGN)
"$(SIGNTOOL)" timestamp -tr http://timestamp.sectigo.com -td SHA256 $(WORKBENCH_BIN_TO_SIGN)
$(RM) $(BUILD_DIR)/$(CERT_FILE)
@echo "Installer was signed with signtool"
deploy:

21
codesigning/Makefile Normal file
View File

@ -0,0 +1,21 @@
.PHONY: deploy-cloudfunction deploy-worker
deploy-cloudfunction:
gcloud functions deploy \
--project natcap-servers \
codesigning-queue \
--memory=256Mi \
--trigger-http \
--gen2 \
--region us-west1 \
--allow-unauthenticated \
--entry-point main \
--runtime python312 \
--source gcp-cloudfunc/
# NOTE: This must be executed from a computer that has SSH access to ncp-inkwell.
deploy-worker:
cd signing-worker && ansible-playbook \
--ask-become-pass \
--inventory-file inventory.ini \
playbook.yml

74
codesigning/README.md Normal file
View File

@ -0,0 +1,74 @@
# InVEST Codesigning Service
This directory contains all of the functional code and configuration (minus a
few secrets) that are needed to deploy our code-signing service. There are
three key components to this service:
1. A cloud function (`gcp-cloudfunc/') that handles a google cloud
storage-backed cloud function that operates as a high-latency queue.
2. A script (`enqueue-binary.py`) that will enqueue a binary that already
exists on one of our GCS buckets.
3. A `systemd` service that runs on a debian:bookworm machine and periodically
polls the cloud function to dequeue the next item to sign.
## Deploying the Cloud Function
The necessary `gcloud` deployment configuration can be executed with
```bash
$ make deploy-cloudfunction
```
### Secrets
The current deployment process requires you to manually create an environment
variable, ``ACCESS_TOKEN``, that contains the secret token shared by the cloud
function, systemd service and enqueue script.
## Deploying the Systemd Service
To deploy the systemd service, you will need to be on a computer that has ssh
access to `ncp-inkwell`, which is a computer that has a yubikey installed in
it. This computer is assumed to run debian:bookworm at this time. To deploy
(non-secret) changes to ncp-inkwell, run this in an environment where
`ansible-playbook` is available (`pip install ansible` to install):
```bash
$ make deploy-worker
```
### Secrets
The systemd service requires several secrets to be available in the codesigning
workspace, which is located at `/opt/natcap-codesign':
* `/opt/natcap-codesign/pass.txt` is a plain text file containing only the PIN
for the yubikey
* `/opt/natcap-codesign/access_token.txt` is a plain text file containing the
access token shared with the cloud function, systemd service and enqueue script.
* `/opt/natcap-codesign/slack_token.txt` is a plain text file containing the
slack token used to post messages to our slack workspace.
* `/opt/natcap-codesign/natcap-servers-1732552f0202.json` is a GCP service
account key used to authenticate to google cloud storage. This file must be
available in the `gcp-cloudfunc/` directory at the time of deployment.
## Future Work
### Authenticate to the function with Identity Federation
The cloud function has access controlled by a secret token, which is not ideal.
Instead, we should be using github/GCP identity federation to control access.
### Trigger the function with GCS Events
GCP Cloud Functions have the ability to subscribe to bucket events, which
should allow us to subscribe very specifically to just those `finalize` events
that apply to the Windows workbench binaries. Doing so will require reworking this cloud function into 2 cloud functions:
1. An endpoint for ncp-inkwell to poll for the next binary to sign
2. A cloud function that subscribes to GCS bucket events and enqueues the binary to sign.
Relevant docs include:
* https://cloud.google.com/functions/docs/writing/write-event-driven-functions#cloudevent-example-python

View File

@ -0,0 +1,28 @@
"""Enqueue a windows binary for signing.
To call this script, you need to set the ACCESS_TOKEN environment variable from
the software team secrets store.
Example invocation:
$ ACCESS_TOKEN=abcs1234 python3 enqueue-binary.py <gs:// uri to binary on gcs>
"""
import os
import sys
import requests
DATA = {
'token': os.environ['ACCESS_TOKEN'],
'action': 'enqueue',
'url': sys.argv[1].replace(
'gs://', 'https://storage.googleapis.com/'),
}
response = requests.post(
'https://us-west1-natcap-servers.cloudfunctions.net/codesigning-queue',
json=DATA
)
if response.status_code >= 400:
print(response.text)
sys.exit(1)

View File

@ -0,0 +1,13 @@
#!/usr/bin/env sh
#
# Run this script to enqueue the windows binary for this current version of the
# InVEST windows workbench installer for code signing.
#
# NOTE: this script must be run from the directory containing this script.
version=$(python -m setuptools_scm)
url_base=$(make -C .. --no-print-directory print-DIST_URL_BASE | awk ' { print $3 } ')
url="${url_base}/workbench/invest_${version}_workbench_win32_x64.exe"
echo "Enqueuing URL ${url}"
python enqueue-binary.py "${url}"

View File

@ -0,0 +1,180 @@
import contextlib
import datetime
import json
import logging
import os
import time
from urllib.parse import unquote
import functions_framework
import google.cloud.logging # pip install google-cloud-logging
import requests
from flask import jsonify
from google.cloud import storage # pip install google-cloud-storage
GOOGLE_PREFIX = 'https://storage.googleapis.com'
CODESIGN_DATA_BUCKET = 'natcap-codesigning'
LOG_CLIENT = google.cloud.logging.Client()
LOG_CLIENT.setup_logging()
@contextlib.contextmanager
def get_lock():
"""Acquire a GCS-based mutex.
This requires that the bucket we are using has versioning.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(CODESIGN_DATA_BUCKET)
lock_obtained = False
n_tries = 100
for i in range(n_tries):
lockfile = bucket.blob('mutex.lock')
if not lockfile.generation:
lockfile.upload_from_string(
f"Lock acquired {datetime.datetime.now().isoformat()}")
lock_obtained = True
break
else:
time.sleep(0.1)
if not lock_obtained:
raise RuntimeError(f'Could not obtain lock after {n_tries} tries')
try:
yield
finally:
lockfile.delete()
@functions_framework.http
def main(request):
"""Handle requests to this GCP Cloud Function.
All requests must be POST requests and have a JSON body with the following
attributes:
* token: a secret token that matches the ACCESS_TOKEN environment
variable that is defined in the cloud function configuration.
* action: either 'enqueue' or 'dequeue'
If the action is 'enqueue', the request must also have a 'url' attribute.
The 'url' attribute, when provided, must be a URL to a file that meets
these requirements:
* The URL must be a publicly accessible URL
* The URL must be a file that ends in '.exe'
* The URL must be located in either the releases bucket, or else
in the dev builds bucket. It doesn't necessarily have to be an
InVEST binary.
* The URL must be a file that is not older than June 1, 2024
* The URL must be a file that is not already in the queue
* The URL should be a file that is not already signed (if the file has
already been signed, its signature will be overwritten)
"""
data = request.get_json()
if data['token'] != os.environ['ACCESS_TOKEN']:
logging.info('Rejecting request due to invalid token')
return jsonify('Invalid token'), 403
if request.method != 'POST':
logging.info('Rejecting request due to invalid HTTP method')
return jsonify('Invalid request method'), 405
storage_client = storage.Client()
bucket = storage_client.bucket(CODESIGN_DATA_BUCKET)
logging.debug(f'Data POSTed: {data}')
if data['action'] == 'dequeue':
with get_lock():
queuefile = bucket.blob('queue.json')
queue_dict = json.loads(queuefile.download_as_string())
try:
next_file_url = queue_dict['queue'].pop(0)
except IndexError:
# No items in the queue!
logging.info('No binaries are currently queued for signing')
return jsonify('No items in the queue'), 204
queuefile.upload_from_string(json.dumps(queue_dict))
data = {
'https-url': next_file_url,
'basename': os.path.basename(next_file_url),
'gs-uri': unquote(next_file_url.replace(
f'{GOOGLE_PREFIX}/', 'gs://')),
}
logging.info(f'Dequeued {next_file_url}')
return jsonify(data)
elif data['action'] == 'enqueue':
url = data['url']
logging.info(f'Attempting to enqueue url {url}')
if not url.endswith('.exe'):
logging.info("Rejecting URL because it doesn't end in .exe")
return jsonify('Invalid URL to sign'), 400
if not url.startswith(GOOGLE_PREFIX):
logging.info(f'Rejecting URL because it does not start with {GOOGLE_PREFIX}')
return jsonify('Invalid host'), 400
if not url.startswith((
f'{GOOGLE_PREFIX}/releases.naturalcapitalproject.org/',
f'{GOOGLE_PREFIX}/natcap-dev-build-artifacts/')):
logging.info('Rejecting URL because the bucket is incorrect')
return jsonify("Invalid target bucket"), 400
# Remove http character quoting
url = unquote(url)
binary_bucket_name, *binary_obj_paths = url.replace(
GOOGLE_PREFIX + '/', '').split('/')
codesign_bucket = storage_client.bucket(CODESIGN_DATA_BUCKET)
# If the file does not exist at this URL, reject it.
response = requests.head(url)
if response.status_code >= 400:
logging.info('Rejecting URL because it does not exist')
return jsonify('Requested file does not exist'), 403
# If the file is too old, reject it. Trying to avoid a
# denial-of-service by invoking the service with very old files.
# I just pulled June 1 out of thin air as a date that is a little while
# ago, but not so long ago that we could suddenly have many files
# enqueued.
mday, mmonth, myear = response.headers['Last-Modified'].split(' ')[1:4]
modified_time = datetime.datetime.strptime(
' '.join((mday, mmonth, myear)), '%d %b %Y')
if modified_time < datetime.datetime(year=2024, month=6, day=1):
logging.info('Rejecting URL because it is too old')
return jsonify('File is too old'), 400
response = requests.head(f'{url}.signature')
if response.status_code != 404:
logging.info('Rejecting URL because it has already been signed.')
return jsonify('File has already been signed'), 204
with get_lock():
# Since the file has not already been signed, add the file to the
# queue
queuefile = codesign_bucket.blob('queue.json')
if not queuefile.exists():
queue_dict = {'queue': []}
else:
queue_dict = json.loads(queuefile.download_as_string())
if url not in queue_dict['queue']:
queue_dict['queue'].append(url)
else:
return jsonify(
'File is already in the queue', 200, 'application/json')
queuefile.upload_from_string(json.dumps(queue_dict))
logging.info(f'Enqueued {url}')
return jsonify("OK"), 200
else:
return jsonify('Invalid action request'), 405

View File

@ -0,0 +1,5 @@
google-cloud-storage
google-cloud-logging
functions-framework==3.*
flask
requests

5
codesigning/signing-worker/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
# This key is copied from GCP. I've added it to the .gitignore to try to
# prevent it from getting committed and pushed to git, while still allowing it
# to be where ansible expects the key to be so ansible can copy the file to the
# remote server.
natcap-servers-1732552f0202.json

View File

@ -0,0 +1,5 @@
# This is an ansible inventory file. If we had more hostnames to list here, we
# could group them into functional groups (e.g. codesign-workers).
[ncp-inkwell]
ncp-inkwell

View File

@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Service script to sign InVEST windows binaries."""
import logging
import os
import shutil
import subprocess
import sys
import textwrap
import time
import traceback
import pexpect # apt install python3-pexpect
import requests # apt install python3-requests
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
CERTIFICATE = sys.argv[1]
FILE_DIR = os.path.dirname(__file__)
QUEUE_TOKEN_FILE = os.path.join(FILE_DIR, "access_token.txt")
with open(QUEUE_TOKEN_FILE) as token_file:
ACCESS_TOKEN = token_file.read().strip()
SLACK_TOKEN_FILE = os.path.join(FILE_DIR, "slack_token.txt")
with open(SLACK_TOKEN_FILE) as token_file:
SLACK_ACCESS_TOKEN = token_file.read().strip()
SLACK_NOTIFICATION_SUCCESS = textwrap.dedent(
"""\
:lower_left_fountain_pen: Successfully signed and uploaded `{filename}` to
<{url}|google cloud>
""")
SLACK_NOTIFICATION_ALREADY_SIGNED = textwrap.dedent(
"""\
:lower_left_fountain_pen: `{filename}` is already signed!
<{url}|google cloud>
""")
SLACK_NOTIFICATION_FAILURE = textwrap.dedent(
"""\
:red-flag: Something went wrong while signing {filename}:
```
{traceback}
```
Please investigate on ncp-inkwell using:
```
sudo journalctl -u natcap-codesign.service
```
""")
def post_to_slack(message):
"""Post a message to the slack channel.
Args:
message (str): The message to post.
Returns:
``None``
"""
resp = requests.post(
"https://slack.com/api/chat.postMessage",
headers={
"Authorization": f"Bearer {SLACK_ACCESS_TOKEN}",
"Content-Type": "application/json; charset=utf-8"
},
json={
"channel": "CESG428BH", # sw-invest
"text": message
})
resp.raise_for_status()
def get_from_queue():
"""Get an item to sign from the queue.
Returns:
``None`` if there are no items in the queue, the JSON response dict
otherwise.
"""
response = requests.post(
"https://us-west1-natcap-servers.cloudfunctions.net/codesigning-queue",
headers={"Content-Type": "application/json"},
json={
"token": ACCESS_TOKEN,
"action": "dequeue"
})
if response.status_code == 204:
return None
else:
return response.json()
def download_file(url):
"""Download an arbitrarily large file.
Adapted from https://stackoverflow.com/a/16696317
Args:
url (str): The URL to download.
Returns:
``None``
"""
local_filename = url.split('/')[-1]
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_filename
def upload_to_bucket(filename, path_on_bucket):
"""Upload a file to a GCS bucket.
Args:
filename (str): The local file to upload.
path_on_bucket (str): The path to the file on the GCS bucket, including
the ``gs://`` prefix.
Returns:
``None``
"""
subprocess.run(['gsutil', 'cp', filename, path_on_bucket], check=True)
def sign_file(file_to_sign):
"""Sign a local .exe file.
Uses ``osslsigncode`` to sign the file using the private key stored on a
Yubikey, and the corresponding certificate that has been exported from the
PIV slot 9c.
Args:
file_to_sign (str): The local filepath to the file to sign.
Returns:
``None``
"""
signed_file = f"{file_to_sign}.signed"
pass_file = os.path.join(FILE_DIR, 'pass.txt')
signcode_command = textwrap.dedent(f"""\
osslsigncode sign \
-pkcs11engine /usr/lib/x86_64-linux-gnu/engines-3/pkcs11.so \
-pkcs11module /usr/lib/x86_64-linux-gnu/libykcs11.so \
-key "pkcs11:id=%02;type=private" \
-certs {CERTIFICATE} \
-h sha256 \
-ts http://timestamp.sectigo.com \
-readpass {pass_file} \
-verbose \
-in {file_to_sign} \
-out {signed_file}""")
process = pexpect.spawnu(signcode_command)
process.expect('Enter PKCS#11 key PIN for Private key for Digital Signature:')
with open(pass_file) as passfile:
process.sendline(passfile.read().strip())
# print remainder of program output for our logging.
print(process.read())
shutil.move(signed_file, file_to_sign)
def note_signature_complete(local_filepath, target_gs_uri):
"""Create a small file next to the signed file to indicate signature.
Args:
gs_uri (str): The GCS URI of the signed file.
"""
# Using osslsigncode to verify the output always fails for me, even though
# the signature is clearly valid when checked on Windows.
process = subprocess.run(
['osslsigncode', 'verify', '-in', local_filepath], check=False,
capture_output=True)
temp_filepath = f'/tmp/{os.path.basename(local_filepath)}.signed'
with open(temp_filepath, 'w') as signature_file:
signature_file.write(process.stdout.decode())
try:
# Upload alongside the original file
subprocess.run(
['gsutil', 'cp', temp_filepath, f'{target_gs_uri}.signature'],
check=True)
finally:
os.remove(temp_filepath)
def has_signature(filename):
"""Check if a file is already signed.
Args:
filename (str): The local filepath to the file to check.
Returns:
``True`` if the file is signed, ``False`` otherwise.
"""
process = subprocess.run(
['osslsigncode', 'verify', '-in', filename], capture_output=True,
check=False)
# Handle the case where it's possible there might not be any stdout or
# stderr to decode.
process_output = ""
for output in (process.stdout, process.stderr):
if output is not None:
process_output += output.decode()
if 'No signature found' in process_output:
return False
return True
def main():
while True:
try:
file_to_sign = get_from_queue()
if file_to_sign is None:
LOGGER.info('No items in the queue')
else:
LOGGER.info(f"Dequeued and downloading {file_to_sign['https-url']}")
filename = download_file(file_to_sign['https-url'])
LOGGER.info(f"Checking if {filename} is already signed")
if has_signature(filename):
LOGGER.info(f"{filename} is already signed, skipping")
post_to_slack(
SLACK_NOTIFICATION_ALREADY_SIGNED.format(
filename=filename,
url=file_to_sign['https-url']))
note_signature_complete(filename, file_to_sign['gs-uri'])
else:
LOGGER.info(f"Signing {filename}")
sign_file(filename)
LOGGER.info(f"Uploading signed file to {file_to_sign['gs-uri']}")
upload_to_bucket(filename, file_to_sign['gs-uri'])
LOGGER.info(
f"Adding {file_to_sign['https-url']} to signed files list")
note_signature_complete(filename, file_to_sign['gs-uri'])
LOGGER.info(f"Removing {filename}")
post_to_slack(
SLACK_NOTIFICATION_SUCCESS.format(
filename=filename,
url=file_to_sign['https-url']))
LOGGER.info("Signing complete.")
os.remove(filename)
except Exception as e:
LOGGER.exception(f"Unexpected error signing file: {e}")
post_to_slack(
SLACK_NOTIFICATION_FAILURE.format(
filename=file_to_sign['https-url'],
traceback=traceback.format_exc()))
time.sleep(60)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,33 @@
# Systemd service for debian:bookworm for signing InVEST windows binaries.
#
# To install this service, copy this onto the host as /etc/systemd/system/natcap-codesign.service
#
# To use, run (for example):
# # On modifying the service file, run:
# $ sudo systemctl daemon-reload
#
# # enable the service
# $ sudo systemctl enable natcap-codesign.service
#
# # start the service
# $ sudo systemctl start natcap-codesign
#
# # check the service status
# $ sudo systemctl status natcap-codesign
#
# This service is built to run in the foreground.
#
# See https://wiki.debian.org/systemd/Services for background info about systemd services.
[Unit]
Description=NatCap Code Signing for Windows EXE Binaries
User=natcap-codesign
Group=natcap-codesign
WorkingDirectory=/tmp
[Service]
# Run in the foreground
Type=simple
Restart=always
ExecStart=python3 /opt/natcap-codesign/natcap-codesign.py /opt/natcap-codesign/codesign-cert-chain.pem

View File

@ -0,0 +1,112 @@
---
- name: Set up everything needed on NCP-Inkwell
hosts: all
become: true
become_method: sudo
tasks:
- name: Install GCP SDK dependencies
ansible.builtin.apt:
update_cache: true
pkg:
- apt-transport-https
- ca-certificates
- gnupg
- curl
- name: Download the Google Cloud SDK package repository signing key
ansible.builtin.shell:
cmd: curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg
creates: /usr/share/keyrings/cloud.google.gpg
- name: Add Google Cloud SDK package repository source
ansible.builtin.apt_repository:
update_cache: true
filename: google-cloud-sdk.list
repo: "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main"
- name: Install packages
ansible.builtin.apt:
update_cache: true
pkg:
- python3
- python3-pexpect
- python3-requests
- wget
- vim-nox
- yubico-piv-tool
- libengine-pkcs11-openssl
- ykcs11
- libssl-dev
- google-cloud-sdk
- google-cloud-cli
- yubikey-manager
- name: Add bookworm-backports repository
ansible.builtin.apt_repository:
update_cache: true
repo: "deb http://deb.debian.org/debian {{ ansible_distribution_release }}-backports main"
filename: bookworm-backports.list
- name: Install osslsigncode from backports
ansible.builtin.apt:
update_cache: true
default_release: "{{ ansible_distribution_release }}-backports"
pkg:
# The normal debian:bookworm repos have osslsigncode 2.5, which has a
# bug in it that prevents it from signing our binaries. This was
# fixed in osslsigncode 2.6. The version available in
# bookworm-backports is 2.9. The issue (and solution) was similar to
# https://stackoverflow.com/a/78308879
- osslsigncode
- name: Create the codesign directory
ansible.builtin.file:
state: directory
path: /opt/natcap-codesign
- name: Install the certificate
ansible.builtin.shell:
cmd: ykman piv certificates export 9c /opt/natcap-codesign/codesign-cert-chain.pem
creates: /opt/natcap-codesign/codesign-cert-chain.pem
- name: Create codesigning group
ansible.builtin.group:
name: natcap-codesign
state: present
- name: Create codesigning user
ansible.builtin.user:
name: natcap-codesign
group: natcap-codesign
shell: /bin/bash
createhome: true
- name: Install the service account key
ansible.builtin.copy:
src: natcap-servers-1732552f0202.json
dest: /opt/natcap-codesign/natcap-servers-1732552f0202.json
mode: 0600
- name: Set up application credentials
ansible.builtin.shell:
cmd: gcloud auth activate-service-account --key-file=/opt/natcap-codesign/natcap-servers-1732552f0202.json
- name: Install codesigning python script
ansible.builtin.copy:
src: natcap-codesign.py
dest: /opt/natcap-codesign/natcap-codesign.py
mode: 0755
- name: Install the codesign service
ansible.builtin.copy:
src: natcap-codesign.service
dest: /etc/systemd/system/natcap-codesign.service
mode: 0644
- name: Enable the natcap-codesign service
ansible.builtin.systemd_service:
name: natcap-codesign
daemon_reload: true # reload in case there are any config changes
state: restarted
enabled: true

View File

@ -5,15 +5,49 @@ import tempfile
import unittest
import numpy
from shapely.geometry import Polygon
import pandas
import pygeoprocessing
from osgeo import gdal
from osgeo import gdal, ogr, osr
REGRESSION_DATA = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data', 'annual_water_yield')
SAMPLE_DATA = os.path.join(REGRESSION_DATA, 'input')
gdal.UseExceptions()
def make_watershed_vector(path_to_shp):
"""
Generate watershed results shapefile with two polygons
Args:
path_to_shp (str): path to store watershed results vector
Outputs:
None
"""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"hp_energy": ogr.OFTReal, "hp_val": ogr.OFTReal,
"ws_id": ogr.OFTReal, "rsupply_vl": ogr.OFTReal,
"wyield_mn": ogr.OFTReal, "wyield_vol": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [
{"hp_energy": 1, "hp_val": 1, "ws_id": 0, "rsupply_vl": 2},
{"hp_energy": 11, "hp_val": 3, "ws_id": 1, "rsupply_vl": 52}
]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)
class AnnualWaterYieldTests(unittest.TestCase):
"""Regression Tests for Annual Water Yield Model."""
@ -367,7 +401,6 @@ class AnnualWaterYieldTests(unittest.TestCase):
self.assertTrue(
'but are not found in the valuation table' in
actual_message, actual_message)
# if the demand table is missing but the valuation table is present,
# make sure we have a validation error.
args_missing_demand_table = args.copy()
@ -380,3 +413,112 @@ class AnnualWaterYieldTests(unittest.TestCase):
self.assertEqual(
validation_warnings[0],
(['demand_table_path'], 'Input is required but has no value'))
def test_fractp_op(self):
"""Test `fractp_op`"""
from natcap.invest.annual_water_yield import fractp_op
# generate fake data
kc = numpy.array([[1, .1, .1], [.6, .6, .1]])
eto = numpy.array([[1000, 900, 900], [1100, 1005, 1000]])
precip = numpy.array([[100, 1000, 10], [500, 800, 1100]])
root = numpy.array([[99, 300, 400], [5, 500, 800]])
soil = numpy.array([[600, 700, 700], [800, 900, 600]])
pawc = numpy.array([[.11, .11, .12], [.55, .55, .19]])
veg = numpy.array([[1, 1, 0], [0, 1, 0]])
nodata_dict = {'eto': None, 'precip': None, 'depth_root': None,
'pawc': None, 'out_nodata': None}
seasonality_constant = 6
actual_fractp = fractp_op(kc, eto, precip, root, soil, pawc, veg,
nodata_dict, seasonality_constant)
# generated by running fractp_op
expected_fractp = numpy.array([[0.9345682, 0.06896508, 1.],
[1., 0.6487423, 0.09090909]],
dtype=numpy.float32)
numpy.testing.assert_allclose(actual_fractp, expected_fractp,
err_msg="Fractp does not match expected")
def test_compute_watershed_valuation(self):
"""Test `compute_watershed_valuation`, `compute_rsupply_volume`
and `compute_water_yield_volume`"""
from natcap.invest import annual_water_yield
def _create_watershed_results_vector(path_to_shp):
"""Generate a fake watershed results vector file."""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"ws_id": ogr.OFTReal, "wyield_mn": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [{"ws_id": 0, "wyield_mn": 990000,
"consum_mn": 500, "consum_vol": 50},
{"ws_id": 1, "wyield_mn": 800000,
"consum_mn": 600, "consum_vol": 70}]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp,
projection_wkt,
vector_format, fields,
attribute_list)
def _validate_fields(vector_path, field_name, expected_values, error_msg):
"""
Validate a specific field in the watershed results vector
by comparing actual to expected values. Expected values generated
by running the function.
Args:
vector path (str): path to watershed shapefile
field_name (str): attribute field to check
expected values (list): list of expected values for field
error_msg (str): what to print if assertion fails
Returns:
None
"""
with gdal.OpenEx(vector_path, gdal.OF_VECTOR | gdal.GA_Update) as ws_ds:
ws_layer = ws_ds.GetLayer()
actual_values = [ws_feat.GetField(field_name)
for ws_feat in ws_layer]
self.assertEqual(actual_values, expected_values, msg=error_msg)
# generate fake watershed results vector
watershed_results_vector_path = os.path.join(self.workspace_dir,
"watershed_results.shp")
_create_watershed_results_vector(watershed_results_vector_path)
# generate fake val_df
val_df = pandas.DataFrame({'efficiency': [.7, .8], 'height': [12, 50],
'fraction': [.9, .7], 'discount': [60, 20],
'time_span': [10, 10], 'cost': [100, 200],
'kw_price': [15, 20]})
# test water yield volume
annual_water_yield.compute_water_yield_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "wyield_vol",
[990.0, 800.0],
"Error with water yield volume calculation.")
# test rsupply volume
annual_water_yield.compute_rsupply_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "rsupply_vl",
[940.0, 730.0],
"Error calculating total realized water supply volume.")
# test compute watershed valuation
annual_water_yield.compute_watershed_valuation(
watershed_results_vector_path, val_df)
_validate_fields(watershed_results_vector_path, "hp_energy",
[19.329408, 55.5968],
"Error calculating energy.")
_validate_fields(watershed_results_vector_path, "hp_val",
[501.9029748723, 4587.91946857059],
"Error calculating net present value.")

View File

@ -12,6 +12,8 @@ import numpy
import numpy.random
import numpy.testing
import pygeoprocessing
gdal.UseExceptions()
@ -330,6 +332,69 @@ class CarbonTests(unittest.TestCase):
assert_raster_equal_value(
os.path.join(args['workspace_dir'], 'npv_redd.tif'), -4602.106)
def test_generate_carbon_map(self):
"""Test `_generate_carbon_map`"""
from natcap.invest.carbon import _generate_carbon_map
def _make_simple_lulc_raster(base_raster_path):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
array = numpy.array([[1, 1], [2, 3]], dtype=numpy.int32)
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461251, 4923245)
pixel_size = (1, 1)
no_data = -999
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
# generate a fake lulc raster
lulc_path = os.path.join(self.workspace_dir, "lulc.tif")
_make_simple_lulc_raster(lulc_path)
# make fake carbon pool dict
carbon_pool_by_type = {1: 5000, 2: 60, 3: 120}
out_carbon_stock_path = os.path.join(self.workspace_dir,
"carbon_stock.tif")
_generate_carbon_map(lulc_path, carbon_pool_by_type,
out_carbon_stock_path)
# open output carbon stock raster and check values
actual_carbon_stock = gdal.Open(out_carbon_stock_path)
band = actual_carbon_stock.GetRasterBand(1)
actual_carbon_stock = band.ReadAsArray()
expected_carbon_stock = numpy.array([[5000, 5000], [60, 120]],
dtype=numpy.float32)
numpy.testing.assert_array_equal(actual_carbon_stock,
expected_carbon_stock)
def test_calculate_valuation_constant(self):
"""Test `_calculate_valuation_constant`"""
from natcap.invest.carbon import _calculate_valuation_constant
valuation_constant = _calculate_valuation_constant(lulc_cur_year=2010,
lulc_fut_year=2012,
discount_rate=50,
rate_change=5,
price_per_metric_ton_of_c=50)
expected_valuation = 40.87302
self.assertEqual(round(valuation_constant, 5), expected_valuation)
class CarbonValidationTests(unittest.TestCase):
"""Tests for the Carbon Model MODEL_SPEC and validation."""

View File

@ -11,6 +11,7 @@ import unittest
import numpy
import pandas
from scipy.sparse import dok_matrix
import pygeoprocessing
from natcap.invest import utils
from natcap.invest import validation
@ -24,6 +25,26 @@ REGRESSION_DATA = os.path.join(
LOGGER = logging.getLogger(__name__)
def make_raster_from_array(base_raster_path, array):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461261, 4923265)
pixel_size = (1, 1)
no_data = -1
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
class TestPreprocessor(unittest.TestCase):
"""Test Coastal Blue Carbon preprocessor functions."""
@ -1060,3 +1081,149 @@ class TestCBC2(unittest.TestCase):
[(['analysis_year'],
coastal_blue_carbon.INVALID_ANALYSIS_YEAR_MSG.format(
analysis_year=2000, latest_year=2000))])
def test_calculate_npv(self):
"""Test `_calculate_npv`"""
from natcap.invest.coastal_blue_carbon import coastal_blue_carbon
# make fake data
net_sequestration_rasters = {
2010: os.path.join(self.workspace_dir, "carbon_seq_2010.tif"),
2011: os.path.join(self.workspace_dir, "carbon_seq_2011.tif"),
2012: os.path.join(self.workspace_dir, "carbon_seq_2012.tif")
}
for year, path in net_sequestration_rasters.items():
array = numpy.array([[year*.5, year*.25], [year-1, 50]]) # random array
make_raster_from_array(path, array)
prices_by_year = {
2010: 50,
2011: 80,
2012: 95
}
discount_rate = 0.1
baseline_year = 2010
target_raster_years_and_paths = {
2010: os.path.join(self.workspace_dir, "tgt_carbon_seq_2010.tif"),
2011: os.path.join(self.workspace_dir, "tgt_carbon_seq_2011.tif"),
2012: os.path.join(self.workspace_dir, "tgt_carbon_seq_2012.tif")
}
coastal_blue_carbon._calculate_npv(net_sequestration_rasters,
prices_by_year, discount_rate,
baseline_year,
target_raster_years_and_paths)
# read in the created target rasters
actual_2011 = gdal.Open(target_raster_years_and_paths[2011])
band = actual_2011.GetRasterBand(1)
actual_2011 = band.ReadAsArray()
actual_2012 = gdal.Open(target_raster_years_and_paths[2012])
band = actual_2012.GetRasterBand(1)
actual_2012 = band.ReadAsArray()
# compare actual rasters to expected (based on running `_calculate_npv`)
expected_2011 = numpy.array([[100525, 50262.5], [200950, 5000]])
expected_2012 = numpy.array([[370206.818182, 185103.409091],
[740045.454545, 18409.090909]])
numpy.testing.assert_allclose(actual_2011, expected_2011)
numpy.testing.assert_allclose(actual_2012, expected_2012)
def test_calculate_accumulation_over_time(self):
"""Test `_calculate_accumulation_over_time`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_calculate_accumulation_over_time
# generate fake data with nodata values
nodata = float(numpy.finfo(numpy.float32).min)
annual_biomass_matrix = numpy.array([[1, 2], [3, nodata]])
annual_soil_matrix = numpy.array([[11, 12], [13, 14]])
annual_litter_matrix = numpy.array([[.5, .9], [4, .9]])
n_years = 3
actual_accumulation = _calculate_accumulation_over_time(
annual_biomass_matrix, annual_soil_matrix, annual_litter_matrix,
n_years)
expected_accumulation = numpy.array([[37.5, 44.7], [60, nodata]])
numpy.testing.assert_allclose(actual_accumulation, expected_accumulation)
def test_calculate_net_sequestration(self):
"""test `_calculate_net_sequestration`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_calculate_net_sequestration
# make fake rasters that contain nodata pixels (-1)
accumulation_raster_path = os.path.join(self.workspace_dir,
"accumulation_raster.tif")
accumulation_array = numpy.array([[40, -1], [70, -1]])
make_raster_from_array(accumulation_raster_path, accumulation_array)
emissions_raster_path = os.path.join(self.workspace_dir,
"emissions_raster.tif")
emissions_array = numpy.array([[-1, 8], [7, -1]])
make_raster_from_array(emissions_raster_path, emissions_array)
target_raster_path = os.path.join(self.workspace_dir,
"target_raster.tif")
# run `_calculate_net_sequestration`
_calculate_net_sequestration(accumulation_raster_path,
emissions_raster_path, target_raster_path)
# compare actual to expected output net sequestration raster
actual_sequestration = gdal.Open(target_raster_path)
band = actual_sequestration.GetRasterBand(1)
actual_sequestration = band.ReadAsArray()
# calculated by running `_calculate_net_sequestration`
nodata = float(numpy.finfo(numpy.float32).min)
expected_sequestration = numpy.array([[40, -8], [-7, nodata]])
numpy.testing.assert_allclose(actual_sequestration,
expected_sequestration)
def test_reclassify_accumulation_transition(self):
"""Test `_reclassify_accumulation_transition`"""
from natcap.invest.coastal_blue_carbon.coastal_blue_carbon import \
_reclassify_accumulation_transition, _reclassify_disturbance_magnitude
# make fake raster data
landuse_transition_from_raster = os.path.join(self.workspace_dir,
"landuse_transition_from.tif")
landuse_transition_from_array = numpy.array([[1, 2], [3, 2]])
make_raster_from_array(landuse_transition_from_raster,
landuse_transition_from_array)
landuse_transition_to_raster = os.path.join(self.workspace_dir,
"landuse_transition_to.tif")
landuse_transition_to_array = numpy.array([[1, 1], [2, 3]])
make_raster_from_array(landuse_transition_to_raster,
landuse_transition_to_array)
#make fake accumulation_rate_matrix
accumulation_rate_matrix = dok_matrix((4, 4), dtype=numpy.float32)
accumulation_rate_matrix[1, 2] = 0.5 # Forest -> Grassland
accumulation_rate_matrix[1, 3] = 0.3 # Forest -> Agriculture
accumulation_rate_matrix[2, 1] = 0.2 # Grassland -> Forest
accumulation_rate_matrix[2, 3] = 0.4 # Grassland -> Agriculture
accumulation_rate_matrix[3, 1] = 0.1 # Agriculture -> Forest
accumulation_rate_matrix[3, 2] = 0.3 # Agriculture -> Grassland
target_raster_path = os.path.join(self.workspace_dir, "output.tif")
_reclassify_accumulation_transition(
landuse_transition_from_raster, landuse_transition_to_raster,
accumulation_rate_matrix, target_raster_path)
# compare actual and expected target_raster
actual_accumulation = gdal.Open(target_raster_path)
band = actual_accumulation.GetRasterBand(1)
actual_accumulation = band.ReadAsArray()
expected_accumulation = numpy.array([[0, .2], [.3, .4]])
numpy.testing.assert_allclose(actual_accumulation, expected_accumulation)

View File

@ -8,6 +8,7 @@ import unittest
import numpy.testing
import pandas.testing
import pandas
import pygeoprocessing
import shapely.wkb
import taskgraph
@ -1553,6 +1554,100 @@ class CoastalVulnerabilityTests(unittest.TestCase):
# Polygon has 4 sides on exterior, 3 on interior, expect 7 lines
self.assertTrue(len(line_list) == 7)
def test_assemble_results_and_calculate_exposure(self):
"""Test that assemble_results_and_calculate_exposure correctly
calculates exposure"""
from natcap.invest.coastal_vulnerability import \
assemble_results_and_calculate_exposure
def _make_shore_points_vector(shore_points_path):
# create 4 points, each with a unique 'shore_id' in [0..3].
shore_geometries = [Point(0, 0), Point(1, 0), Point(2, 1), Point(3, 2)]
shore_fields = {'shore_id': ogr.OFTInteger}
shore_attributes = [{'shore_id': i} for i in range(len(shore_geometries))]
# Create a spatial reference (projected or geographic)
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910) # e.g. "NAD83 / UTM zone 10N"
pygeoprocessing.shapely_geometry_to_vector(
shore_geometries, shore_points_path, srs.ExportToWkt(),
vector_format='GPKG',
fields=shore_fields,
attribute_list=shore_attributes,
ogr_geom_type=ogr.wkbPoint
)
def _make_habitat_csv(habitat_csv_path):
# Example: one habitat column named 'kelp', plus 'R_hab'
# We have 4 shore IDs, so we add 4 rows. Values are arbitrary.
habitat_df = pandas.DataFrame(
{'shore_id': [0, 1, 2, 3], 'kelp': [5, 3, 5, 4],
'seagrass': [4, 1, 2, 4], 'R_hab': [5, 2, 5, 3]})
habitat_df.to_csv(habitat_csv_path, index=False)
def _make_risk_id_path_list():
# Create pickles for risk data
relief_pkl = os.path.join(self.workspace_dir, 'relief.pickle')
slr_pkl = os.path.join(self.workspace_dir, 'slr.pickle')
population_pkl = os.path.join(self.workspace_dir, 'population.pickle')
relief_data = {0: 10.0, 1: 50.0, 2: 30.0, 3: 80.0} # arbitrary data
slr_data = {0: 0.1, 1: 0.2, 2: 0.9, 3: 0.5}
population_data = {0: 123.0, 1: 999.0, 2: 55.0, 3: 0.0}
for file_path, data_dict in zip([relief_pkl, slr_pkl, population_pkl],
[relief_data, slr_data, population_data]):
with open(file_path, 'wb') as f:
pickle.dump(data_dict, f)
risk_id_path_list = [
(relief_pkl, True, "R_relief"), # "True" => bin to 1..5
(slr_pkl, True, "R_slr"),
(population_pkl, False, "population")
]
return risk_id_path_list
shore_points_path = os.path.join(self.workspace_dir, "shore_points.gpkg")
_make_shore_points_vector(shore_points_path)
habitat_csv_path = os.path.join(self.workspace_dir, 'habitat_protection.csv')
_make_habitat_csv(habitat_csv_path)
risk_id_path_list = _make_risk_id_path_list()
intermediate_vector_path = os.path.join(self.workspace_dir,
'intermediate_exposure.gpkg')
intermediate_csv_path = os.path.join(self.workspace_dir,
'intermediate_exposure.csv')
output_vector_path = os.path.join(self.workspace_dir,
'coastal_exposure.gpkg')
output_csv_path = os.path.join(self.workspace_dir,
'coastal_exposure.csv')
# call function
assemble_results_and_calculate_exposure(
risk_id_path_list,
habitat_csv_path,
shore_points_path,
intermediate_vector_path,
intermediate_csv_path,
output_vector_path,
output_csv_path
)
# read field values in output vector and compare
actual_df = pandas.read_csv(
output_csv_path,
usecols=["exposure", "habitat_role", "exposure_no_habitats"])
expected_df = pandas.DataFrame({
"exposure": [2.924018, 2.0, 4.641589, 2.289428],
"habitat_role": [0, 0.714418, 0, 0.424989],
"exposure_no_habitats": [2.924018, 2.714418, 4.641589, 2.714418]})
pandas.testing.assert_frame_equal(
actual_df, expected_df, check_dtype=False)
def assert_pickled_arrays_almost_equal(
actual_values_pickle_path, expected_values_json_path):

View File

@ -5,9 +5,10 @@ import shutil
import os
import numpy
from osgeo import gdal
from osgeo import gdal, ogr, osr
import pandas
import pygeoprocessing
from shapely.geometry import Polygon
gdal.UseExceptions()
MODEL_DATA_PATH = os.path.join(
@ -35,6 +36,108 @@ def _get_pixels_per_hectare(raster_path):
return 10000 / pixel_area
def make_aggregate_vector(path_to_shp):
"""
Generate shapefile with two overlapping polygons
Args:
path_to_shp (str): path to store watershed results vector
Outputs:
None
"""
# (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)
shapely_geometry_list = [
Polygon([(461151, 4923265-50), (461261+50, 4923265-50),
(461261+50, 4923265), (461151, 4923265)]),
Polygon([(461261, 4923265-35), (461261+60, 4923265-35),
(461261+60, 4923265+50), (461261, 4923265+50)])
]
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
vector_format = "ESRI Shapefile"
fields = {"id": ogr.OFTReal}
attribute_list = [
{"id": 0},
{"id": 1},
]
pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)
def make_simple_raster(base_raster_path, array):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""
# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()
origin = (461251, 4923245)
pixel_size = (30, 30)
no_data = -1
pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)
def create_nutrient_df():
"""Creates a nutrient DataFrame for testing."""
return pandas.DataFrame([
{'crop': 'corn', 'area (ha)': 21.0, 'production_observed': 0.2,
'percentrefuse': 7, 'protein': 42., 'lipid': 8, 'energy': 476.,
'ca': 27.0, 'fe': 15.7, 'mg': 280.0, 'ph': 704.0, 'k': 1727.0,
'na': 2.0, 'zn': 4.9, 'cu': 1.9, 'fl': 8, 'mn': 2.9, 'se': 0.1,
'vita': 3.0, 'betac': 16.0, 'alphac': 2.30, 'vite': 0.8,
'crypto': 1.6, 'lycopene': 0.36, 'lutein': 63.0, 'betat': 0.5,
'gammat': 2.1, 'deltat': 1.9, 'vitc': 6.8, 'thiamin': 0.4,
'riboflavin': 1.8, 'niacin': 8.2, 'pantothenic': 0.9,
'vitb6': 1.4, 'folate': 385.0, 'vitb12': 2.0, 'vitk': 41.0},
{'crop': 'soybean', 'area (ha)': 5., 'production_observed': 4.,
'percentrefuse': 9, 'protein': 33., 'lipid': 2., 'energy': 99.,
'ca': 257., 'fe': 15.7, 'mg': 280., 'ph': 704.0, 'k': 197.0,
'na': 2., 'zn': 4.9, 'cu': 1.6, 'fl': 3., 'mn': 5.2, 'se': 0.3,
'vita': 3.0, 'betac': 16.0, 'alphac': 1.0, 'vite': 0.8,
'crypto': 0.6, 'lycopene': 0.3, 'lutein': 61.0, 'betat': 0.5,
'gammat': 2.3, 'deltat': 1.2, 'vitc': 3.0, 'thiamin': 0.42,
'riboflavin': 0.82, 'niacin': 12.2, 'pantothenic': 0.92,
'vitb6': 5.4, 'folate': 305., 'vitb12': 3., 'vitk': 42.},
]).set_index('crop')
def _create_crop_rasters(output_dir, crop_names, file_suffix):
"""Creates raster files for test setup."""
_OBSERVED_PRODUCTION_FILE_PATTERN = os.path.join(
'.', '%s_observed_production%s.tif')
_CROP_PRODUCTION_FILE_PATTERN = os.path.join(
'.', '%s_regression_production%s.tif')
for i, crop in enumerate(crop_names):
observed_yield_path = os.path.join(
output_dir,
_OBSERVED_PRODUCTION_FILE_PATTERN % (crop, file_suffix))
crop_production_raster_path = os.path.join(
output_dir,
_CROP_PRODUCTION_FILE_PATTERN % (crop, file_suffix))
# Create arbitrary raster arrays
observed_array = numpy.array([[4, i], [i*3, 4]], dtype=numpy.int16)
crop_array = numpy.array([[i, 1], [i*2, 3]], dtype=numpy.int16)
make_simple_raster(observed_yield_path, observed_array)
make_simple_raster(crop_production_raster_path, crop_array)
class CropProductionTests(unittest.TestCase):
"""Tests for the Crop Production model."""
@ -460,6 +563,301 @@ class CropProductionTests(unittest.TestCase):
pandas.testing.assert_frame_equal(
expected_result_table, result_table, check_dtype=False)
def test_x_yield_op(self):
"""Test `_x_yield_op"""
from natcap.invest.crop_production_regression import _x_yield_op
# make fake data
y_max = numpy.array([[-1, 3, 2], [4, 5, 3]])
b_x = numpy.array([[4, 3, 2], [2, 0, 3]])
c_x = numpy.array([[4, 1, 2], [3, 0, 3]])
lulc_array = numpy.array([[3, 3, 2], [3, -1, 3]])
fert_rate = 0.6
crop_lucode = 3
actual_result = _x_yield_op(y_max, b_x, c_x, lulc_array, fert_rate,
crop_lucode)
expected_result = numpy.array([[-1, -1.9393047, -1],
[2.6776089, -1, 1.51231]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_zero_observed_yield_op(self):
"""Test `_zero_observed_yield_op`"""
from natcap.invest.crop_production_regression import \
_zero_observed_yield_op
# make fake data
observed_yield_array = numpy.array([[0, 1, -1], [5, 6, -1]])
observed_yield_nodata = -1
actual_result = _zero_observed_yield_op(observed_yield_array,
observed_yield_nodata)
expected_result = numpy.array([[0, 1, 0], [5, 6, 0]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_mask_observed_yield_op(self):
"""Test `_mask_observed_yield_op`"""
from natcap.invest.crop_production_regression import \
_mask_observed_yield_op
# make fake data
lulc_array = numpy.array([[3, 5, -9999], [3, 3, -1]])
observed_yield_array = numpy.array([[-1, 5, 4], [8, -9999, 91]])
observed_yield_nodata = -1
# note: this observed_yield_nodata value becomes the nodata value in
# the output array but the values in the observed_yield_array with
# this value are NOT treated as no data within this function
landcover_nodata = -9999
crop_lucode = 3
actual_result = _mask_observed_yield_op(
lulc_array, observed_yield_array, observed_yield_nodata,
landcover_nodata, crop_lucode)
expected_result = numpy.array([[-1, 0, -1], [8, -9999, 0]])
numpy.testing.assert_allclose(actual_result, expected_result)
def test_tabulate_regression_results(self):
"""Test `tabulate_regression_results`"""
from natcap.invest.crop_production_regression import \
tabulate_regression_results
def _create_expected_results():
"""Creates the expected results DataFrame."""
return pandas.DataFrame([
{'crop': 'corn', 'area (ha)': 20.0,
'production_observed': 80.0, 'production_modeled': 40.0,
'protein_modeled': 15624000.0, 'protein_observed': 31248000.0,
'lipid_modeled': 2976000.0, 'lipid_observed': 5952000.0,
'energy_modeled': 177072000.0, 'energy_observed': 354144000.0,
'ca_modeled': 10044000.0, 'ca_observed': 20088000.0,
'fe_modeled': 5840400.0, 'fe_observed': 11680800.0,
'mg_modeled': 104160000.0, 'mg_observed': 208320000.0,
'ph_modeled': 261888000.0, 'ph_observed': 523776000.0,
'k_modeled': 642444000.0, 'k_observed': 1284888000.0,
'na_modeled': 744000.0, 'na_observed': 1488000.0,
'zn_modeled': 1822800.0, 'zn_observed': 3645600.0,
'cu_modeled': 706800.0, 'cu_observed': 1413600.0,
'fl_modeled': 2976000.0, 'fl_observed': 5952000.0,
'mn_modeled': 1078800.0, 'mn_observed': 2157600.0,
'se_modeled': 37200.0, 'se_observed': 74400.0,
'vita_modeled': 1116000.0, 'vita_observed': 2232000.0,
'betac_modeled': 5952000.0, 'betac_observed': 11904000.0,
'alphac_modeled': 855600.0, 'alphac_observed': 1711200.0,
'vite_modeled': 297600.0, 'vite_observed': 595200.0,
'crypto_modeled': 595200.0, 'crypto_observed': 1190400.0,
'lycopene_modeled': 133920.0, 'lycopene_observed': 267840.0,
'lutein_modeled': 23436000.0, 'lutein_observed': 46872000.0,
'betat_modeled': 186000.0, 'betat_observed': 372000.0,
'gammat_modeled': 781200.0, 'gammat_observed': 1562400.0,
'deltat_modeled': 706800.0, 'deltat_observed': 1413600.0,
'vitc_modeled': 2529600.0, 'vitc_observed': 5059200.0,
'thiamin_modeled': 148800.0, 'thiamin_observed': 297600.0,
'riboflavin_modeled': 669600.0, 'riboflavin_observed': 1339200.0,
'niacin_modeled': 3050400.0, 'niacin_observed': 6100800.0,
'pantothenic_modeled': 334800.0, 'pantothenic_observed': 669600.0,
'vitb6_modeled': 520800.0, 'vitb6_observed': 1041600.0,
'folate_modeled': 143220000.0, 'folate_observed': 286440000.0,
'vitb12_modeled': 744000.0, 'vitb12_observed': 1488000.0,
'vitk_modeled': 15252000.0, 'vitk_observed': 30504000.0},
{'crop': 'soybean', 'area (ha)': 40.0,
'production_observed': 120.0, 'production_modeled': 70.0,
'protein_modeled': 21021000.0, 'protein_observed': 36036000.0,
'lipid_modeled': 1274000.0, 'lipid_observed': 2184000.0,
'energy_modeled': 63063000.0, 'energy_observed': 108108000.0,
'ca_modeled': 163709000.0, 'ca_observed': 280644000.0,
'fe_modeled': 10000900.0, 'fe_observed': 17144400.0,
'mg_modeled': 178360000.0, 'mg_observed': 305760000.0,
'ph_modeled': 448448000.0, 'ph_observed': 768768000.0,
'k_modeled': 125489000.0, 'k_observed': 215124000.0,
'na_modeled': 1274000.0, 'na_observed': 2184000.0,
'zn_modeled': 3121300.0, 'zn_observed': 5350800.0,
'cu_modeled': 1019200.0, 'cu_observed': 1747200.0,
'fl_modeled': 1911000.0, 'fl_observed': 3276000.0,
'mn_modeled': 3312400.0, 'mn_observed': 5678400.0,
'se_modeled': 191100.0, 'se_observed': 327600.0,
'vita_modeled': 1911000.0, 'vita_observed': 3276000.0,
'betac_modeled': 10192000.0, 'betac_observed': 17472000.0,
'alphac_modeled': 637000.0, 'alphac_observed': 1092000.0,
'vite_modeled': 509600.0, 'vite_observed': 873600.0,
'crypto_modeled': 382200.0, 'crypto_observed': 655200.0,
'lycopene_modeled': 191100.0, 'lycopene_observed': 327600.0,
'lutein_modeled': 38857000.0, 'lutein_observed': 66612000.0,
'betat_modeled': 318500.0, 'betat_observed': 546000.0,
'gammat_modeled': 1465100.0, 'gammat_observed': 2511600.0,
'deltat_modeled': 764400.0, 'deltat_observed': 1310400.0,
'vitc_modeled': 1911000.0, 'vitc_observed': 3276000.0,
'thiamin_modeled': 267540.0, 'thiamin_observed': 458640.0,
'riboflavin_modeled': 522340.0, 'riboflavin_observed': 895440.0,
'niacin_modeled': 7771400.0, 'niacin_observed': 13322400.0,
'pantothenic_modeled': 586040.0, 'pantothenic_observed': 1004640.0,
'vitb6_modeled': 3439800.0, 'vitb6_observed': 5896800.0,
'folate_modeled': 194285000.0, 'folate_observed': 333060000.0,
'vitb12_modeled': 1911000.0, 'vitb12_observed': 3276000.0,
'vitk_modeled': 26754000.0, 'vitk_observed': 45864000.0}])
nutrient_df = create_nutrient_df()
pixel_area_ha = 10
workspace_dir = self.workspace_dir
output_dir = os.path.join(workspace_dir, "OUTPUT")
os.makedirs(output_dir, exist_ok=True)
landcover_raster_path = os.path.join(workspace_dir, "landcover.tif")
landcover_nodata = -1
make_simple_raster(landcover_raster_path,
numpy.array([[2, 1], [2, 3]], dtype=numpy.int16))
file_suffix = "v1"
target_table_path = os.path.join(workspace_dir, "output_table.csv")
crop_names = ["corn", "soybean"]
_create_crop_rasters(output_dir, crop_names, file_suffix)
tabulate_regression_results(
nutrient_df, crop_names, pixel_area_ha,
landcover_raster_path, landcover_nodata,
output_dir, file_suffix, target_table_path
)
# Read only the first 2 crop's data (skipping total area)
actual_result_table = pandas.read_csv(target_table_path, nrows=2,
header=0)
expected_result_table = _create_expected_results()
# Compare expected vs actual
pandas.testing.assert_frame_equal(actual_result_table,
expected_result_table)
def test_aggregate_regression_results_to_polygons(self):
"""Test `aggregate_regression_results_to_polygons`"""
from natcap.invest.crop_production_regression import \
aggregate_regression_results_to_polygons
def _create_expected_agg_table():
"""Create expected output results"""
# Define the new values manually
return pandas.DataFrame([
{"FID": 0, "corn_modeled": 10, "corn_observed": 40,
"soybean_modeled": 20, "soybean_observed": 50,
"protein_modeled": 9912000, "protein_observed": 30639000,
"lipid_modeled": 1108000, "lipid_observed": 3886000,
"energy_modeled": 62286000, "energy_observed": 222117000,
"ca_modeled": 49285000, "ca_observed": 126979000,
"fe_modeled": 4317500, "fe_observed": 12983900,
"mg_modeled": 77000000, "mg_observed": 231560000,
"ph_modeled": 193600000, "ph_observed": 582208000,
"k_modeled": 196465000, "k_observed": 732079000,
"na_modeled": 550000, "na_observed": 1654000,
"zn_modeled": 1347500, "zn_observed": 4052300,
"cu_modeled": 467900, "cu_observed": 1434800,
"fl_modeled": 1290000, "fl_observed": 4341000,
"mn_modeled": 1216100, "mn_observed": 3444800,
"se_modeled": 63900, "se_observed": 173700,
"vita_modeled": 825000, "vita_observed": 2481000,
"betac_modeled": 4400000, "betac_observed": 13232000,
"alphac_modeled": 395900, "alphac_observed": 1310600,
"vite_modeled": 220000, "vite_observed": 661600,
"crypto_modeled": 258000, "crypto_observed": 868200,
"lycopene_modeled": 88080, "lycopene_observed": 270420,
"lutein_modeled": 16961000, "lutein_observed": 51191000,
"betat_modeled": 137500, "betat_observed": 413500,
"gammat_modeled": 613900, "gammat_observed": 1827700,
"deltat_modeled": 395100, "deltat_observed": 1252800,
"vitc_modeled": 1178400, "vitc_observed": 3894600,
"thiamin_modeled": 113640, "thiamin_observed": 339900,
"riboflavin_modeled": 316640, "riboflavin_observed": 1042700,
"niacin_modeled": 2983000, "niacin_observed": 8601400,
"pantothenic_modeled": 251140, "pantothenic_observed": 753400,
"vitb6_modeled": 1113000, "vitb6_observed": 2977800,
"folate_modeled": 91315000, "folate_observed": 281995000,
"vitb12_modeled": 732000, "vitb12_observed": 2109000,
"vitk_modeled": 11457000, "vitk_observed": 34362000},
{"FID": 1, "corn_modeled": 40, "corn_observed": 80,
"soybean_modeled": 70, "soybean_observed": 120,
"protein_modeled": 36645000, "protein_observed": 67284000,
"lipid_modeled": 4250000, "lipid_observed": 8136000,
"energy_modeled": 240135000, "energy_observed": 462252000,
"ca_modeled": 173753000, "ca_observed": 300732000,
"fe_modeled": 15841300, "fe_observed": 28825200,
"mg_modeled": 282520000, "mg_observed": 514080000,
"ph_modeled": 710336000, "ph_observed": 1292544000,
"k_modeled": 767933000, "k_observed": 1500012000,
"na_modeled": 2018000, "na_observed": 3672000,
"zn_modeled": 4944100, "zn_observed": 8996400,
"cu_modeled": 1726000, "cu_observed": 3160800,
"fl_modeled": 4887000, "fl_observed": 9228000,
"mn_modeled": 4391200, "mn_observed": 7836000,
"se_modeled": 228300, "se_observed": 402000,
"vita_modeled": 3027000, "vita_observed": 5508000,
"betac_modeled": 16144000, "betac_observed": 29376000,
"alphac_modeled": 1492600, "alphac_observed": 2803200,
"vite_modeled": 807200, "vite_observed": 1468800,
"crypto_modeled": 977400, "crypto_observed": 1845600,
"lycopene_modeled": 325020, "lycopene_observed": 595440,
"lutein_modeled": 62293000, "lutein_observed": 113484000,
"betat_modeled": 504500, "betat_observed": 918000,
"gammat_modeled": 2246300, "gammat_observed": 4074000,
"deltat_modeled": 1471200, "deltat_observed": 2724000,
"vitc_modeled": 4440600, "vitc_observed": 8335200,
"thiamin_modeled": 416340, "thiamin_observed": 756240,
"riboflavin_modeled": 1191940, "riboflavin_observed": 2234640,
"niacin_modeled": 10821800, "niacin_observed": 19423200,
"pantothenic_modeled": 920840, "pantothenic_observed": 1674240,
"vitb6_modeled": 3960600, "vitb6_observed": 6938400,
"folate_modeled": 337505000, "folate_observed": 619500000,
"vitb12_modeled": 2655000, "vitb12_observed": 4764000,
"vitk_modeled": 42006000, "vitk_observed": 76368000}
], dtype=float)
workspace = self.workspace_dir
base_aggregate_vector_path = os.path.join(workspace,
"agg_vector.shp")
make_aggregate_vector(base_aggregate_vector_path)
target_aggregate_vector_path = os.path.join(workspace,
"agg_vector_prj.shp")
spatial_ref = osr.SpatialReference()
spatial_ref.ImportFromEPSG(26910) # EPSG:4326 for WGS84
landcover_raster_projection = spatial_ref.ExportToWkt()
crop_names = ['corn', 'soybean']
nutrient_df = create_nutrient_df()
output_dir = os.path.join(workspace, "OUTPUT")
os.makedirs(output_dir, exist_ok=True)
file_suffix = 'test'
pixel_area_ha = 10
_create_crop_rasters(output_dir, crop_names, file_suffix)
aggregate_regression_results_to_polygons(
base_aggregate_vector_path, target_aggregate_vector_path,
landcover_raster_projection, crop_names,
nutrient_df, pixel_area_ha, output_dir, file_suffix)
_AGGREGATE_TABLE_FILE_PATTERN = os.path.join(
'.', 'aggregate_results%s.csv')
aggregate_table_path = os.path.join(
output_dir, _AGGREGATE_TABLE_FILE_PATTERN % file_suffix)
actual_aggregate_table = pandas.read_csv(aggregate_table_path,
dtype=float)
print(actual_aggregate_table)
expected_aggregate_table = _create_expected_agg_table()
pandas.testing.assert_frame_equal(
actual_aggregate_table, expected_aggregate_table)
class CropValidationTests(unittest.TestCase):
"""Tests for the Crop Productions' MODEL_SPEC and validation."""