"""
This module deals with extracting the relevant data from the UGRID attributes.
It takes some inspiration from: https://github.com/xarray-contrib/cf-xarray
"""
import warnings
from collections import ChainMap
from itertools import chain
from typing import Dict, List, Tuple
import xarray as xr
class UgridDimensionError(Exception):
pass
class UgridCoordinateError(Exception):
pass
_DIM_NAMES = {
1: ("node_dimension", "edge_dimension"),
2: ("node_dimension", "face_dimension", "edge_dimension"),
}
_COORD_NAMES = {
1: ("node_coordinates", "edge_coordinates"),
2: ("node_coordinates", "face_coordinates", "edge_coordinates"),
}
_COORD_DIMS = {
"node_coordinates": "node_dimension",
"edge_coordinates": "edge_dimension",
"face_coordinates": "face_dimension",
}
_CONNECTIVITY_NAMES = {
1: ("edge_node_connectivity",),
2: (
"face_node_connectivity",
"edge_node_connectivity",
"face_edge_connectivity",
"face_face_connectivity",
"edge_face_connectivity",
"boundary_node_connectivity",
),
}
_CONNECTIVITY_DIMS = {
"face_node_connectivity": ("face_dimension", None),
"edge_node_connectivity": ("edge_dimension", 2),
"face_edge_connectivity": ("face_dimension", None),
"face_face_connectivity": ("face_dimension", None),
"edge_face_connectivity": ("edge_dimension", 2),
"boundary_node_connectivity": ("boundary_edge_dimension", 2),
}
X_STANDARD_NAMES = ("projection_x_coordinate", "longitude")
Y_STANDARD_NAMES = ("projection_y_coordinate", "latitude")
PROJECTED = True
LATLON = False
DEFAULT_ATTRS = {
"node_x": {
PROJECTED: {
"standard_name": "projection_x_coordinate",
},
LATLON: {
"standard_name": "longitude",
},
},
"node_y": {
PROJECTED: {
"standard_name": "projection_y_coordinate",
},
LATLON: {
"standard_name": "latitude",
},
},
"edge_x": {
PROJECTED: {
"standard_name": "projection_x_coordinate",
},
LATLON: {
"standard_name": "longitude",
},
},
"edge_y": {
PROJECTED: {
"standard_name": "projection_y_coordinate",
},
LATLON: {
"standard_name": "latitude",
},
},
"face_x": {
PROJECTED: {
"standard_name": "projection_x_coordinate",
},
LATLON: {
"standard_name": "longitude",
},
},
"face_y": {
PROJECTED: {
"standard_name": "projection_y_coordinate",
},
LATLON: {
"standard_name": "latitude",
},
},
"face_node_connectivity": {
"cf_role": "face_node_connectivity",
"start_index": 0,
"_FillValue": -1,
},
"edge_node_connectivity": {
"cf_role": "edge_node_connectivity",
"start_index": 0,
"_FillValue": -1,
},
"face_edge_connectivity": {
"cf_role": "face_edge_connectivity",
"start_index": 0,
"_FillValue": -1,
},
"face_face_connectivity": {
"cf_role": "face_face_connectivity",
"start_index": 0,
"_FillValue": -1,
},
"edge_face_connectivity": {
"cf_role": "edge_face_connectivity",
"start_index": 0,
"_FillValue": -1,
},
"boundary_node_connectivity": {
"cf_role": "boundary_node_connectivity",
"start_index": 0,
"_FillValue": -1,
},
}
def default_topology_attrs(name: str, topology_dimension: int):
if topology_dimension == 1:
return {
"cf_role": "mesh_topology",
"long_name": "Topology data of 1D network",
"topology_dimension": 1,
"node_dimension": f"{name}_nNodes",
"edge_dimension": f"{name}_nEdges",
"edge_node_connectivity": f"{name}_edge_nodes",
"node_coordinates": f"{name}_node_x {name}_node_y",
"edge_coordinates": f"{name}_edge_x {name}_edge_y",
}
elif topology_dimension == 2:
return {
"cf_role": "mesh_topology",
"long_name": "Topology data of 2D mesh",
"topology_dimension": 2,
"node_dimension": f"{name}_nNodes",
"edge_dimension": f"{name}_nEdges",
"face_dimension": f"{name}_nFaces",
"max_face_nodes_dimension": f"{name}_nMax_face_nodes",
"boundary_edge_dimension": f"{name}_nBoundary_edges",
"edge_node_connectivity": f"{name}_edge_nodes",
"face_node_connectivity": f"{name}_face_nodes",
"face_edge_connectivity": f"{name}_face_edges",
"edge_face_connectivity": f"{name}_edge_faces",
"boundary_node_connectivity": f"{name}_boundary_nodes",
"face_face_connectivity": f"{name}_face_faces",
"node_coordinates": f"{name}_node_x {name}_node_y",
"edge_coordinates": f"{name}_edge_x {name}_edge_y",
"face_coordinates": f"{name}_face_x {name}_face_y",
}
else:
raise ValueError(
f"topology_dimensions should be 1 or 2, received {topology_dimension}"
)
def _get_topology(ds: xr.Dataset) -> List[str]:
return [k for k in ds.data_vars if ds[k].attrs.get("cf_role") == "mesh_topology"]
def _infer_xy_coords(
ds: xr.Dataset, candidates: List[str]
) -> Tuple[List[str], List[str]]:
# TODO: add argument for latitude / longitude?
x = []
y = []
for candidate in candidates:
stdname = ds[candidate].attrs.get("standard_name")
if stdname in X_STANDARD_NAMES:
x.append(candidate)
elif stdname in Y_STANDARD_NAMES:
y.append(candidate)
if not x and not y:
first = candidates[0]
second = candidates[1]
warnings.warn(
f"No standard_name of {X_STANDARD_NAMES + Y_STANDARD_NAMES} in {candidates}.\n"
f"Using {first} and {second} as projected x and y coordinates."
)
x.append(first)
y.append(second)
elif not x:
raise UgridCoordinateError(
"No standard_name of {X_STANDARD_NAMES} in {candidates}"
)
elif not y:
raise UgridCoordinateError(
"No standard_name of {Y_STANDARD_NAMES} in {candidates}"
)
return x, y
def _get_coordinates(
ds: xr.Dataset, topologies: List[str]
) -> Dict[str, Dict[str, Tuple[List[str], List[str]]]]:
topology_dict = {}
for topology in topologies:
attrs = ds[topology].attrs
topodim = attrs["topology_dimension"]
vardict = {}
for name in _COORD_NAMES[topodim]:
if name in attrs:
candidates = [c for c in attrs[name].split(" ") if c in ds]
if len(candidates) == 0:
warnings.warn(
f"the following variables are specified for UGRID {name}: "
f'"{attrs[name]}", but they are not present in the dataset'
)
continue
if len(candidates) < 2:
raise UgridCoordinateError(
f"{topology}: at least two values required for UGRID {name},"
f' while only "{attrs[name]}" are specified.'
)
vardict[name] = _infer_xy_coords(ds, candidates)
topology_dict[topology] = vardict
return topology_dict
def _infer_dims(
ds: xr.Dataset,
connectivities: Dict[str, str],
coordinates: Dict[str, Dict[str, Tuple[List[str]]]],
vardict: Dict[str, str],
) -> Dict[str, str]:
"""Infer dimensions based on connectivity and coordinates."""
inferred = {}
for role, varname in connectivities.items():
expected_dims = _CONNECTIVITY_DIMS[role]
var_dims = ds[varname].dims
for key, dim in zip(expected_dims, var_dims):
if isinstance(key, str):
prev_dim = inferred.get(key)
# Not specified: default order can be used to infer dimensions.
if prev_dim is None:
inferred[key] = dim
else:
if prev_dim not in var_dims:
raise UgridDimensionError(
f"{key}: {prev_dim} not in {role}: {varname}"
f" with dimensions: {var_dims}"
)
elif isinstance(key, int):
dim_size = ds.sizes[dim]
if not dim_size == key:
raise UgridDimensionError(
f"Expected size {key} for dimension {dim} in variable "
f"{varname} with role {role}, found instead: "
f"{dim_size}"
)
# If key is None, we don't do any checking.
for role, varnames in coordinates.items():
key = _COORD_DIMS[role]
for varname in chain.from_iterable(varnames):
var_dims = ds[varname].dims
if len(var_dims) != 1:
continue
var_dim = var_dims[0]
prev_dim = vardict.get(key) or inferred.get(key)
if prev_dim is None:
inferred[key] = var_dim
else:
if prev_dim != var_dim:
raise UgridDimensionError(
f"Conflicting names for {key}: {prev_dim} versus {var_dim}"
)
return inferred
def _get_dimensions(
ds: xr.Dataset,
topologies: List[str],
connectivity: Dict[str, Dict[str, str]],
coordinates: Dict[str, Dict[str, Tuple[List[str]]]],
) -> Dict[str, Dict[str, str]]:
"""
Get the dimensions from the topology attributes and infer them from
connectivity arrays or coordinates.
"""
topology_dict = {}
for topology in topologies:
attrs = ds[topology].attrs
topodim = attrs["topology_dimension"]
# dimensions are optionally required: only if the dimension order is
# nonstandard in any of the connectivity variables.
vardict = {k: attrs[k] for k in _DIM_NAMES[topodim] if k in attrs}
inferred = _infer_dims(
ds, connectivity[topology], coordinates[topology], vardict
)
topology_dict[topology] = {**vardict, **inferred}
return topology_dict
def _get_connectivity(
ds: xr.Dataset, topologies: List[str]
) -> Dict[str, Dict[str, str]]:
topology_dict = {}
for topology in topologies:
attrs = ds[topology].attrs
topodim = attrs["topology_dimension"]
topology_dict[topology] = {
k: attrs[k]
for k in _CONNECTIVITY_NAMES[topodim]
if (k in attrs) and (attrs[k] in ds)
}
return topology_dict
[docs]
@xr.register_dataset_accessor("ugrid_roles")
class UgridRolesAccessor:
"""
Xarray Dataset "accessor" to retrieve the names of UGRID variables.
Examples
--------
To get a list of the UGRID dummy variables in the dataset:
>>> dataset.ugrid_roles.topology
To get the names of the connectivity variables in the dataset:
>>> dataset.ugrid_roles.connectivity
Names can also be accessed directly through the topology:
>>> dataset.ugrid_roles["mesh2d"]["node_dimension"]
"""
[docs]
def __init__(self, ds: xr.Dataset):
self._ds = ds
def __getitem__(self, key: str):
if key not in self.topology:
raise KeyError(key)
return ChainMap(
self.dimensions[key], self.coordinates[key], self.connectivity[key]
)
@property
def topology(self) -> List[str]:
"""
Get the names of the topology dummy variables, marked by a CF-role of
``mesh_topology``.
Returns
-------
topology: List[str]
"""
return _get_topology(self._ds)
@property
def coordinates(self) -> Dict[str, Dict[str, Tuple[List[str], List[str]]]]:
"""
Get the names of the coordinate variables from the topology attributes.
Returns a dictionary with the coordinates for the UGRID coordinates:
* node coordinates
* edge coordinates
* face coordinates
Multiple coordinates may be defined. The coordinates are grouped by
their role (x or y).
Returns
-------
coordinates: dict[str, dict[str, Tuple[List[str]]]]
"""
return _get_coordinates(self._ds, self.topology)
@property
def dimensions(self) -> Dict[str, Dict[str, str]]:
"""
Get the dimension names from the topology attributes and infer them
from connectivity arrays or coordinates.
Returns a dictionary with the UGRID dimensions per topology:
* node dimension
* edge dimension
* face dimension
Returns
-------
dimensions: dict[str, dict[str, str]]
"""
return _get_dimensions(
self._ds, self.topology, self.connectivity, self.coordinates
)
@property
def connectivity(self) -> Dict[str, Dict[str, str]]:
"""
Get the names of the variables containing the UGRID connectivity data.
* face_node_connectivity
* edge_node_connectivity
* face_edge_connectivity
* edge_face_connectivity
Returns
-------
connectivity: Dict[str, Dict[str, str]]
"""
return _get_connectivity(self._ds, self.topology)
def __repr__(self):
dimensions = self.dimensions
coordinates = self.coordinates
connectivity = self.connectivity
def make_text_section(subtitle, entries, vardict):
tab = " "
rows = [f"{tab}{subtitle}"]
for role in entries:
if role in vardict:
rows += [f"{tab}{tab}{role}: {vardict[role]}"]
else:
rows += [f"{tab}{tab}{role}: n/a"]
rows.append("")
return rows
rows = []
for topology in self.topology:
topodim = self._ds[topology].attrs["topology_dimension"]
rows += [f"UGRID {topodim}D Topology {topology}:"]
rows += make_text_section(
"Dimensions:", _DIM_NAMES[topodim], dimensions[topology]
)
rows += make_text_section(
"Connectivity:", _CONNECTIVITY_NAMES[topodim], connectivity[topology]
)
rows += make_text_section(
"Coordinates:", _COORD_NAMES[topodim], coordinates[topology]
)
return "\n".join(rows)