"""
*geometry.py*
"""
import json
from io import StringIO
from json import dumps
import daiquiri
import geopandas as gpd
import shapely
from shapely import Polygon, Point
import numpy as np
logger = daiquiri.getLogger(__name__)
[docs]class Geometry:
"""
The Geometry class manages spatial geometries in GeoJSON format and
provides utilities for transformation and spatial processing.
Currently, the ``geometry`` parameter only supports GeoJSON ``Point`` and
``Polygon`` types, with plans to support additional types, including
``GeometryCollection``, in the future.
"""
def __init__(self, geometry: dict):
"""
Initializes a Geometry object with the given GeoJSON geometry.
:param geometry: A dictionary representing a GeoJSON geometry.
"""
self._data = geometry
@property
def data(self) -> dict:
"""
Retrieves the stored geometry data.
:return: A dictionary representing the GeoJSON geometry.
"""
return self._data
@data.setter
def data(self, geometry: dict):
"""
Updates the geometry data.
:param geometry: A dictionary representing a new GeoJSON geometry.
"""
self._data = geometry
[docs] def is_supported(self) -> bool:
"""
Checks if the stored geometry is supported by the resolver.
A valid geometry must be a GeoJSON object with a top-level ``type`` of
either "Point" or "Polygon".
:return: ``True`` if the geometry is supported, otherwise ``False``.
"""
logger.debug(f"Checking if geometry type '{self.geometry_type()}' is supported")
if self.geometry_type() in ["Point", "Polygon"]:
return True
logger.warning(f"Unsupported geometry type: {self.geometry_type()}")
return False
[docs] def to_esri(self) -> dict:
"""
Converts the GeoJSON geometry to an Esri-compatible format.
:return: A dictionary representing the Esri-formatted geometry.
"""
logger.debug(
f"Converting geometry of type '{self.geometry_type()}' to Esri format"
)
if self.geometry_type() == "Point":
x, y, *z = self.data["coordinates"]
geometry = {
"x": x,
"y": y,
"z": z[0] if z else None,
"spatialReference": {"wkid": 4326},
}
esri_geometry_type = "esriGeometryPoint"
logger.debug("Successfully converted Point geometry to Esri format")
return {"geometry": geometry, "geometryType": esri_geometry_type}
if self.geometry_type() == "Polygon":
geometry = {
"rings": self.data["coordinates"],
"spatialReference": {"wkid": 4326},
}
esri_geometry_type = "esriGeometryPolygon"
logger.debug("Successfully converted Polygon geometry to Esri format")
return {"geometry": geometry, "geometryType": esri_geometry_type}
raise ValueError("Invalid geometry type")
[docs] def geometry_type(self) -> str:
"""
Retrieves the type of the stored geometry (e.g., "Point" or "Polygon").
:return: A string representing the geometry type.
"""
return self.data.get("type")
[docs] def point_to_polygon(self, buffer=None) -> dict:
"""
Converts a ``Point`` geometry into a ``Polygon`` by buffering it.
:param buffer: The buffer distance used to create the polygon
(optional).
:return: A dictionary representing the buffered polygon in GeoJSON
format.
"""
if self.geometry_type() != "Point" or buffer is None:
logger.warning(
f"Skipping point-to-polygon conversion. Geometry type "
f"'{self.geometry_type()}' is not a Point, or no buffer "
f"provided."
)
return self.data
logger.debug(f"Converting Point to Polygon with buffer {buffer} km")
try:
point = gpd.GeoSeries.from_file(StringIO(dumps(self.data)))
point = point.to_crs(32634) # A CRS in units of meters
expanded_point = point.geometry.buffer(buffer * 1000) # buffer to meters
expanded_point = expanded_point.to_crs(4326) # Convert back to EPSG:4326
bounds = expanded_point.bounds
polygon = {
"type": "Polygon",
"coordinates": [
[
[bounds.minx[0], bounds.miny[0]],
[bounds.maxx[0], bounds.miny[0]],
[bounds.maxx[0], bounds.maxy[0]],
[bounds.minx[0], bounds.maxy[0]],
[bounds.minx[0], bounds.miny[0]],
]
],
}
logger.debug(
f"Successfully converted Point to Polygon with buffer {buffer} km"
)
return polygon
except Exception as e:
logger.error(f"Failed to convert Point to Polygon: {e}", exc_info=True)
return self.data
[docs] def polygon_to_points(self, grid_size) -> list[dict]:
"""
Converts a ``Polygon`` geometry into a set of representative points
using grid-based sampling.
:param grid_size: The size of the grid cells used for sampling.
:return: A list of dictionaries representing sampled points in GeoJSON
format.
"""
if self.geometry_type() != "Polygon":
logger.warning(
f"Skipping polygon-to-points conversion. Geometry type "
f"'{self.geometry_type()}' is not a Polygon."
)
return self.data
try:
# Get points from within the polygon
polygon = gpd.GeoSeries.from_file(StringIO(dumps(self.data)))
representative_points = polygon.apply(
grid_sample_polygon, args=(grid_size,)
)
points = []
for item in representative_points.items():
geojson = json.loads(gpd.GeoSeries(item[1]).to_json())
result = geojson["features"][0]["geometry"]
points.append(result)
logger.debug(
f"Extracted {len(points)} representative points from the polygon"
)
# Get points from the vertices of the polygon
coords = list(polygon[0].exterior.coords)
polygon_vertices = gpd.GeoSeries([Point(x, y) for x, y in coords])
polygon_vertices = polygon_vertices.drop_duplicates()
for item in polygon_vertices.items():
geojson = json.loads(gpd.GeoSeries(item[1]).to_json())
result = geojson["features"][0]["geometry"]
points.append(result)
logger.debug(f"Successfully converted Polygon to {len(points)} points")
return points
except Exception as e:
logger.error(f"Failed to convert Polygon to points: {e}", exc_info=True)
return self.data
def grid_sample_polygon(polygon: shapely.Polygon, grid_size: float) -> gpd.GeoSeries:
"""
Generates a set of representative points within a polygon using grid-based
sampling.
:param polygon: A Shapely Polygon object.
:param grid_size: The size of the grid cells in the same units as the
polygon's coordinates.
:return: A GeoSeries of Shapely Point objects representing the sample
points.
"""
logger.debug(f"Starting grid sampling for polygon with grid size {grid_size}")
try:
min_x, min_y, max_x, max_y = polygon.bounds
logger.debug(
f"Polygon bounds: min_x={min_x}, min_y={min_y}, "
f"max_x={max_x}, max_y={max_y}"
)
cols = np.arange(min_x, max_x + grid_size, grid_size)
rows = np.arange(min_y, max_y + grid_size, grid_size)
grid_cells = []
for x in cols:
for y in rows:
grid_cell = Polygon(
[
(x, y),
(x + grid_size, y),
(x + grid_size, y + grid_size),
(x, y + grid_size),
]
)
grid_cells.append(grid_cell)
logger.debug(f"Generated {len(grid_cells)} grid cells")
grid_gdf = gpd.GeoDataFrame(geometry=grid_cells)
intersecting_cells = grid_gdf[grid_gdf.intersects(polygon)]
logger.debug(f"{len(intersecting_cells)} grid cells intersect with the polygon")
sample_points = intersecting_cells.centroid
sample_points = sample_points[sample_points.within(polygon)]
logger.debug(f"Generated {len(sample_points)} sample points within the polygon")
return sample_points
except Exception as e:
logger.error(f"Failed to generate sample points: {e}", exc_info=True)
return gpd.GeoSeries()