From 71c834543ad13b54841f4ff1d15f64d7b39985d7 Mon Sep 17 00:00:00 2001 From: Cresson Remi <remi.cresson@irstea.fr> Date: Wed, 20 Dec 2023 15:43:36 +0100 Subject: [PATCH] Update file extents.py --- simplestac/extents.py | 216 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 simplestac/extents.py diff --git a/simplestac/extents.py b/simplestac/extents.py new file mode 100644 index 0000000..1979f67 --- /dev/null +++ b/simplestac/extents.py @@ -0,0 +1,216 @@ +""" +Module to deal with STAC Extents. +""" +import pystac +from dataclasses import dataclass +from datetime import datetime +from typing import Union + +Coords = list[Union[int, float]] + + +@dataclass +class SmartBbox: + """ + Small class to work with a single 2D bounding box. + """ + coords: Coords = None # [xmin, ymin, xmax, ymax] + + def touches(self, other: "SmartBbox") -> bool: + """ + Overlap test. + + Args: + other: other bounding box + + Returns: + True if the other bounding box touches, else False. + + """ + xmin, ymin, xmax, ymax = self.coords + o_xmin, o_ymin, o_xmax, o_ymax = other.coords + + if xmax < o_xmin or o_xmax < xmin or ymax < o_ymin or o_ymax < ymin: + return False + return True + + def update(self, other: "SmartBbox"): + """ + Update the coordinates of the Bbox. Modifies itself inplace. + + Args: + other: other bounding box + + """ + if not self.coords: + self.coords = other.coords + else: + self.coords = [ + min(self.coords[0], other.coords[0]), + min(self.coords[1], other.coords[1]), + max(self.coords[2], other.coords[2]), + max(self.coords[3], other.coords[3]) + ] + + +def clusterize_bboxes(bboxes: list[Coords]) -> list[Coords]: + """ + Computes a list of bounding boxes regrouping all overlapping ones. + + Args: + bboxes: 2D bounding boxes (list of int of float) + + Returns: + list of 2D bounding boxes (list of int of float) + + """ + # Regroup bboxes into clusters of bboxes + smart_bboxes = [SmartBbox(bbox) for bbox in bboxes] + clusters = bboxes_to_bboxes_clusters(smart_bboxes) + + # Compute clusters extents + clusters_unions = [SmartBbox() for _ in clusters] + for i, cluster in enumerate(clusters): + for smart_bbox in cluster: + clusters_unions[i].update(smart_bbox) + + return [smart_bbox.coords.copy() for smart_bbox in clusters_unions] + + +def bboxes_to_bboxes_clusters(smart_bboxes: list[SmartBbox]) -> list[ + list[SmartBbox]]: + """ + Transform a list of bounding boxes into a nested list of clustered ones. + + Args: + smart_bboxes: a list of `SmartBbox` instances + + Returns: + a list of `SmartBbox` instances list + + """ + clusters_labels = compute_smart_bboxes_clusters(smart_bboxes) + clusters_bboxes = [[] for _ in range(max(clusters_labels) + 1)] + for smart_bbox, labels in zip(smart_bboxes, clusters_labels): + clusters_bboxes[labels].append(smart_bbox) + return clusters_bboxes + + +def compute_smart_bboxes_clusters(smart_bboxes: list[SmartBbox]) -> list[int]: + """ + Compute the extent of a cluster of `SmartBbox` instances. + + Args: + smart_bboxes: a list of `SmartBbox` instances + + Returns: + a vector of same size as `smart_bboxes` with the group numbers (int) + + """ + labels = len(smart_bboxes) * [None] + group = 0 + + def dfs(index: int): + """ + Deep first search with o(n) complexity. + + Args: + index: vertex index. + + """ + labels[index] = group + cur_item = smart_bboxes[index] + for i, (item, label) in enumerate(zip(smart_bboxes, labels)): + if i != index and cur_item.touches(item) and labels[i] is None: + dfs(i) + + while any(label is None for label in labels): + next_unmarked = next( + i for i, label in enumerate(labels) + if label is None + ) + dfs(next_unmarked) + group += 1 + return labels + + +class AutoSpatialExtent(pystac.SpatialExtent): + """ + Custom extension of pystac.SpatialExtent that automatically compute bboxes. + """ + + def __init__(self, *args, **kwargs): + """ + Initializer. Clusterize boxes after the original initializer. + + Args: + *args: args + **kwargs: keyword args + + """ + super().__init__(*args, **kwargs) + self.clusterize_bboxes() + + def update(self, other: pystac.SpatialExtent | Coords): + """ + Updates itself with a new spatial extent or bounding box. Modifies + inplace `self.bboxes`. + + Args: + other: spatial extent or bbox coordinates + + """ + is_spat_ext = isinstance(other, pystac.SpatialExtent) + self.bboxes += other.bboxes if is_spat_ext else other + self.clusterize_bboxes() + + def clusterize_bboxes(self): + """ + Regroup the bounding boxes that overlap. Modifies inplace `self.bboxes`. + + """ + self.bboxes = clusterize_bboxes(self.bboxes) + + +class AutoTemporalExtent(pystac.TemporalExtent): + """ + Custom extension of pystac.TemporalExtent that automatically updates itself + with another date or temporal extent provided. + """ + + def __init__(self, *args, **kwargs): + """ + Initializer. Regroup all intervals into a single one. + + Args: + *args: args + **kwargs: keyword args + + """ + super().__init__(*args, **kwargs) + self.make_single_interval() + + def update(self, other: Union[pystac.TemporalExtent, datetime]): + """ + Updates itself with a new temporal extent of date. Modifies inplace + `self.intervals`. + + Args: + other: temporal extent or datetime + + """ + is_temp_ext = isinstance(other, pystac.TemporalExtent) + intervals = other.intervals if is_temp_ext else [[other, other]] + self.intervals += intervals + self.make_single_interval() + + def make_single_interval(self): + all_dates = [] + for interval in self.intervals: + if isinstance(interval, (list, tuple)): + all_dates += [i for i in interval] + elif isinstance(interval, datetime): + all_dates.append(interval) + else: + TypeError(f"Unsupported date/range of: {interval}") + self.intervals = [[min(all_dates), max(all_dates)]] -- GitLab