gdal/swig/python/gdal-utils/osgeo_utils/samples/validate_geoparquet.py

531 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR
# Purpose: Test compliance of GeoParquet file
# Author: Even Rouault <even.rouault at spatialys.com>
#
###############################################################################
# Copyright (c) 2023, Even Rouault <even.rouault at spatialys.com>
#
# SPDX-License-Identifier: MIT
###############################################################################
import json
import sys
from osgeo import gdal, ogr, osr
geoparquet_schemas = {}
map_ogr_geom_type_to_geoparquet = {
ogr.wkbPoint: "Point",
ogr.wkbLineString: "LineString",
ogr.wkbPolygon: "Polygon",
ogr.wkbMultiPoint: "MultiPoint",
ogr.wkbMultiLineString: "MultiLineString",
ogr.wkbMultiPolygon: "MultiPolygon",
ogr.wkbGeometryCollection: "GeometryCollection",
ogr.wkbPoint25D: "Point",
ogr.wkbLineString25D: "LineString Z",
ogr.wkbPolygon25D: "Polygon Z",
ogr.wkbMultiPoint25D: "MultiPoint Z",
ogr.wkbMultiLineString25D: "MultiLineString Z",
ogr.wkbMultiPolygon25D: "MultiPolygon Z",
ogr.wkbGeometryCollection25D: "GeometryCollection Z",
}
map_remote_resources = {}
class GeoParquetValidator:
def __init__(self, filename, check_data=False, local_schema=None):
self.filename = filename
self.check_data = check_data
self.local_schema = local_schema
self.ds = None
self.errors = []
def check(self):
with gdal.ExceptionMgr(useExceptions=True), ogr.ExceptionMgr(
useExceptions=True
), osr.ExceptionMgr(useExceptions=True):
self._check()
self.ds = None
def _error(self, msg):
if len(self.errors) == 100:
self.errors.append(
f"{self.filename}: too many errors. No longer emitting ones"
)
elif len(self.errors) < 100:
self.errors.append(f"{self.filename}: {msg}")
def _check_counterclockwise(self, g, row):
gt = ogr.GT_Flatten(g.GetGeometryType())
if gt == ogr.wkbPolygon:
for idx in range(g.GetGeometryCount()):
ring = g.GetGeometryRef(idx)
if idx == 0 and ring.IsClockwise():
self._error(
f"Exterior ring of geometry at row {row} has invalid orientation"
)
elif idx > 0 and not ring.IsClockwise():
self._error(
f"Interior ring of geometry at row {row} has invalid orientation"
)
elif gt in (ogr.wkbMultiPolygon, ogr.wkbGeometryCollection):
for idx in range(g.GetGeometryCount()):
subgeom = g.GetGeometryRef(idx)
self._check_counterclockwise(subgeom, row)
def _validate(self, schema, instance):
import jsonschema
if sys.version_info >= (3, 8):
from importlib.metadata import version
jsonschema_version = version("jsonschema")
else:
from pkg_resources import get_distribution
jsonschema_version = get_distribution("jsonschema").version
def versiontuple(v):
return tuple(map(int, (v.split("."))))
# jsonschema 4.18 deprecates automatic resolution of "$ref" for security
# reason. Use a custom retrieve method.
# Cf https://python-jsonschema.readthedocs.io/en/latest/referencing/#automatically-retrieving-resources-over-http
if versiontuple(jsonschema_version) >= (4, 18):
from referencing import Registry, Resource
def retrieve_remote_file(uri: str):
if not uri.startswith("http://") and not uri.startswith("https://"):
raise Exception(f"Cannot retrieve {uri}")
import urllib
global map_remote_resources
if uri not in map_remote_resources:
response = urllib.request.urlopen(uri).read()
map_remote_resources[uri] = response
else:
response = map_remote_resources[uri]
return Resource.from_contents(json.loads(response))
registry = Registry(retrieve=retrieve_remote_file)
validator_cls = jsonschema.validators.validator_for(schema)
validator_cls(schema, registry=registry).validate(instance)
else:
# jsonschema < 4.18
jsonschema.validate(instance=instance, schema=schema)
def _check(self):
global geoparquet_schemas
if gdal.GetDriverByName("Parquet") is None:
return self._error("Parquet driver not available")
try:
import jsonschema
jsonschema.validate
except ImportError:
return self._error(
"jsonschema Python module not available. Try 'pip install jsonschema'"
)
try:
self.ds = gdal.OpenEx(self.filename, allowed_drivers=["Parquet"])
except Exception as e:
return self._error("is not a Parquet file: %s" % str(e))
lyr = self.ds.GetLayer(0)
geo = lyr.GetMetadataItem("geo", "_PARQUET_METADATA_")
if geo is None:
return self._error("'geo' file metadata item missing")
try:
geo_j = json.loads(geo)
except Exception as e:
return self._error(
"'geo' metadata item is not valid JSON. Value of 'geo' = '%s'. Exception = '%s' "
% (geo, str(e))
)
if "version" not in geo_j:
self._error(
"'geo' metadata item lacks a 'version' member. Value of 'geo' = '%s'"
% geo
)
if not self.local_schema:
return
version = None
else:
version = geo_j["version"]
if not isinstance(version, str):
return self._error(
"'geo[\"version\"]' is not a string. Value of 'geo' = '%s'" % geo
)
if self.local_schema:
schema_j = json.loads(open(self.local_schema, "rb").read())
else:
schema_url = f"https://github.com/opengeospatial/geoparquet/releases/download/v{version}/schema.json"
if schema_url not in geoparquet_schemas:
import urllib
try:
response = urllib.request.urlopen(schema_url).read()
except Exception as e:
return self._error(
f"Cannot download GeoParquet JSON schema from {schema_url}. Exception = {repr(e)}"
)
try:
geoparquet_schemas[schema_url] = json.loads(response)
except Exception as e:
return self._error(
f"Failed to read GeoParquet schema at {schema_url} as JSON. Schema content = '{response}'. Exception = {repr(e)}"
)
schema_j = geoparquet_schemas[schema_url]
try:
self._validate(schema_j, geo_j)
except Exception as e:
self._error(
"'geo' metadata item fails to validate its schema: %s'" % str(e)
)
if "primary_column" not in geo_j:
return self._error('geo["primary_column"] missing')
primary_column = geo_j["primary_column"]
if "columns" not in geo_j:
return self._error('geo["columns"] missing')
columns = geo_j["columns"]
if primary_column not in columns:
self._error(
f'geo["primary_column"] = {primary_column} is not in listed in geo["columns"'
)
ogr_geom_field_names = [
lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName()
for i in range(lyr.GetLayerDefn().GetGeomFieldCount())
]
for column_name, column_def in columns.items():
if column_name not in ogr_geom_field_names:
self._error(
f'geo["columns"] lists a {column_name} column which is not found in the Parquet fields'
)
self._check_column_metadata(column_name, column_def)
if self.check_data:
self._check_data(lyr, columns)
def _check_column_metadata(self, column_name, column_def):
srs = None
if "crs" in column_def:
crs = column_def["crs"]
if crs and (
osr.GetPROJVersionMajor() * 100 + osr.GetPROJVersionMinor() >= 602
):
srs = osr.SpatialReference()
try:
if isinstance(crs, dict):
srs.SetFromUserInput(json.dumps(crs))
elif isinstance(crs, str):
srs.SetFromUserInput(crs)
except Exception as e:
self._error(f"crs {crs} cannot be parsed: %s" % str(e))
else:
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
if "bbox" in column_def:
bbox = column_def["bbox"]
if bbox:
if len(bbox) == 4:
minx_idx = 0
miny_idx = 1
maxx_idx = 2
maxy_idx = 3
elif len(bbox) == 6:
minx_idx = 0
miny_idx = 1
minz_idx = 2
maxx_idx = 3
maxy_idx = 4
maxz_idx = 5
if bbox[maxz_idx] < bbox[minz_idx]:
self._error(f"bbox[{maxz_idx}] < bbox[{minz_idx}]")
else:
assert False, "Unexpected len(bbox)"
if srs and srs.IsGeographic():
if abs(bbox[minx_idx]) > 180:
self._error(f"abs(bbox[{minx_idx}]) > 180")
if abs(bbox[miny_idx]) > 90:
self._error(f"abs(bbox[{miny_idx}]) > 90")
if abs(bbox[maxx_idx]) > 180:
self._error(f"abs(bbox[{maxx_idx}]) > 180")
if abs(bbox[maxy_idx]) > 90:
self._error(f"abs(bbox[{maxy_idx}]) > 90")
if srs and srs.IsProjected():
if bbox[maxx_idx] < bbox[minx_idx]:
self._error(f"bbox[{maxx_idx}] < bbox[{minx_idx}]")
if bbox[maxy_idx] < bbox[miny_idx]:
self._error(f"bbox[{maxy_idx}] < bbox[{miny_idx}]")
if "geometry_types" in column_def:
geometry_types = column_def["geometry_types"]
set_geometry_types = set()
for geometry_type in geometry_types:
if geometry_type in set_geometry_types:
self._error(
f"{geometry_type} is declared several times in geometry_types[]"
)
def _check_data_with_high_level_ogr_api(self, lyr, columns):
"""Use for GeoArrow validation"""
if gdal.VersionInfo() < "3090000":
raise Exception("GDAL 3.9 or later required for GeoArrow data validation")
lyr.SetIgnoredFields(
[
lyr.GetLayerDefn().GetFieldDefn(i).GetName()
for i in range(lyr.GetLayerDefn().GetFieldCount())
]
)
list_of_set_geometry_types = []
orientations = []
encodings = []
assert len(columns) == lyr.GetLayerDefn().GetGeomFieldCount()
for i in range(lyr.GetLayerDefn().GetGeomFieldCount()):
column_def = columns[lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName()]
encodings.append(column_def["encoding"])
if "geometry_types" in column_def:
geometry_types = column_def["geometry_types"]
set_geometry_types = set(geometry_types)
else:
set_geometry_types = set()
list_of_set_geometry_types.append(set_geometry_types)
if "orientation" in column_def:
orientation = column_def["orientation"]
else:
orientation = None
orientations.append(orientation)
map_ogr_geom_type_to_geoarrow_encoding = {
ogr.wkbPoint: "point",
ogr.wkbLineString: "linestring",
ogr.wkbPolygon: "polygon",
ogr.wkbMultiPoint: "multipoint",
ogr.wkbMultiLineString: "multilinestring",
ogr.wkbMultiPolygon: "multipolygon",
}
for row, f in enumerate(lyr):
for i in range(lyr.GetLayerDefn().GetGeomFieldCount()):
g = f.GetGeomFieldRef(i)
if g:
ogr_geom_type = g.GetGeometryType()
if ogr_geom_type not in map_ogr_geom_type_to_geoparquet:
self._error(
f"Geometry at row {row} is of unexpected type for GeoParquet: %s"
% g.GetGeometryName()
)
elif list_of_set_geometry_types[i]:
geoparquet_geom_type = map_ogr_geom_type_to_geoparquet[
ogr_geom_type
]
if geoparquet_geom_type not in list_of_set_geometry_types[i]:
self._error(
f"Geometry at row {row} is of type {geoparquet_geom_type}, but not listed in geometry_types[]"
)
ogr_flat_geom_type = ogr.GT_Flatten(ogr_geom_type)
if ogr_flat_geom_type not in map_ogr_geom_type_to_geoarrow_encoding:
self._error(
f"Geometry at row {row} is of unexpected type for a GeoArrow encoding of GeoParquet: %s"
% g.GetGeometryName()
)
elif (
map_ogr_geom_type_to_geoarrow_encoding[ogr_flat_geom_type]
!= encodings[i]
):
self._error(
f"Geometry at row {row} is a %s but does not match the declared encoding of %s"
% (ogr.GeometryTypeToName(ogr_geom_type), encodings[i])
)
if orientations[i] == "counterclockwise":
self._check_counterclockwise(g, row)
def _check_data(self, lyr, columns):
# For non-WKB encoding, just use the high level OGR API
use_high_level_ogr_api = False
for _, column_def in columns.items():
if column_def["encoding"] != "WKB":
use_high_level_ogr_api = True
break
if use_high_level_ogr_api:
self._check_data_with_high_level_ogr_api(lyr, columns)
return
lyr.SetIgnoredFields(
[
lyr.GetLayerDefn().GetFieldDefn(i).GetName()
for i in range(lyr.GetLayerDefn().GetFieldCount())
]
)
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
row = 0
for batch in stream:
row_backup = row
rows_in_batch = 0
for column_name, column_def in columns.items():
if "geometry_types" in column_def:
geometry_types = column_def["geometry_types"]
set_geometry_types = set(geometry_types)
else:
set_geometry_types = set()
if "orientation" in column_def:
orientation = column_def["orientation"]
else:
orientation = None
row = row_backup
rows_in_batch = len(batch[column_name])
for geom in batch[column_name]:
g = None
try:
if geom is not None:
g = ogr.CreateGeometryFromWkb(geom)
except Exception as e:
self._error(
f"Invalid WKB geometry at row {row} for column {column_name}: %s"
% str(e)
)
if g:
ogr_geom_type = g.GetGeometryType()
if ogr_geom_type not in map_ogr_geom_type_to_geoparquet:
self._error(
f"Geometry at row {row} is of unexpected type for GeoParquet: %s"
% g.GetGeometryName()
)
elif set_geometry_types:
geoparquet_geom_type = map_ogr_geom_type_to_geoparquet[
ogr_geom_type
]
if geoparquet_geom_type not in set_geometry_types:
self._error(
f"Geometry at row {row} is of type {geoparquet_geom_type}, but not listed in geometry_types[]"
)
# ogr.Geometry.IsClockwise() added in GDAL 3.8
if (
hasattr(g, "IsClockwise")
and orientation == "counterclockwise"
):
self._check_counterclockwise(g, row)
row += 1
row += rows_in_batch
def check(filename, check_data=False, local_schema=None):
"""Validate a file against GeoParquet specification
Parameters
----------
filename:
File to validate
check_data:
Set to True to check geometry content in addition to metadata.
local_schema:
Path to local schema (if not specified, it will be retrieved at https://github.com/opengeospatial/geoparquet)
Returns
-------
A list of error messages, or an empty list if no error
"""
checker = GeoParquetValidator(
filename, check_data=check_data, local_schema=local_schema
)
checker.check()
return checker.errors
def Usage():
print(
"Usage: validate_geoparquet.py [--check-data] [--schema FILENAME] my_geo.parquet"
)
print("")
print("--check-data: validate data in addition to metadata")
print(
"--schema: path to GeoParquet JSON schema. If not specified, retrieved from the network"
)
return 2
def main(argv=sys.argv):
filename = None
if len(argv) == 1:
return Usage()
check_data = False
local_schema = None
i = 1
while i < len(argv):
arg = argv[i]
if arg == "--check-data":
check_data = True
elif arg == "--schema":
local_schema = argv[i + 1]
i += 1
elif arg[0] == "-":
return Usage()
else:
filename = arg
i += 1
if filename is None:
return Usage()
errors = check(filename, check_data=check_data, local_schema=local_schema)
if errors:
for msg in errors:
print(msg)
return 1
return 0
if __name__ == "__main__":
gdal.UseExceptions()
sys.exit(main(sys.argv))