#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2022 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of demcompare
# (see https://github.com/CNES/demcompare).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to Demcompare's DEM dataset
"""
import copy
# Standard imports
import logging
import os
from typing import Dict, Tuple, Union
# Third party imports
import numpy as np
import pyproj
import rasterio
import rasterio.crs
import rasterio.mask
import rasterio.warp
import rasterio.windows
import xarray as xr
from astropy import units as u
from rasterio import Affine
from rasterio.warp import Resampling, reproject
from scipy import interpolate
# Demcompare imports
from .img_tools import convert_pix_to_coord
[docs]
def create_dataset( # pylint: disable=too-many-arguments, too-many-branches
data: np.ndarray,
transform: Union[np.ndarray, rasterio.Affine] = None,
img_crs: Union[rasterio.crs.CRS, None] = None,
input_img: Union[str, None] = None,
bounds: rasterio.coords.BoundingBox = None,
nodata: float = None,
geoid_path: Union[str, None] = None,
plani_unit: u = None,
zunit: str = "m",
source_rasterio: Dict[str, rasterio.DatasetReader] = None,
classification_layer_masks: Union[Dict, xr.DataArray] = None,
) -> xr.Dataset:
"""
Creates dataset from input array and transform,
and return the corresponding xarray.DataSet.
The demcompare dataset is an xarray Dataset containing:
:image: 2D (row, col) image as xarray.DataArray,
:georef_transform: 1D (trans_len) xarray.DataArray with the parameters:
- c: x-coordinate of the upper left pixel,
- a: pixel size in the x-direction in map units/pixel,
- b: rotation about x-axis,
- f: y-coordinate of the upper left pixel,
- d: rotation about y-axis,
- e: pixel size in the y-direction in map units, negative
:classification_layer_masks: 3D (row, col, indicator) xarray.DataArray:
It contains the maps of all classification layers,
being the indicator a list with each
classification_layer name.
:attributes:
- nodata : image nodata value. float
- input_img : image input path. str or None
- crs : image crs. rasterio.crs.CRS
- xres : x resolution (value of transform[1]). float
- yres : y resolution (value of transform[5]). float
- plani_unit : georefence's planimetric unit. astropy.units
- zunit : input image z unit value. astropy.units
- bounds : image bounds. rasterio.coords.BoundingBox
- geoid_path : geoid path. str or None
- source_rasterio : rasterio's DatasetReader object or None.
:param data: image data
:type data: np.ndarray
:param transform: rasterio georeferencing transformation matrix
:type transform: np.ndarray or rasterio.Affine
:param input_img: image path
:type input_img: str
:param bounds: dem bounds
:type bounds: rasterio.coords.BoundingBox or None
:param nodata: nodata value in the image
:type nodata: float or None
:param geoid_path: optional path to local geoid, default is EGM96
:type geoid_path: str or None
:param zunit: unit
:type zunit: str
:param source_rasterio: rasterio dataset reader object
:type source_rasterio: Dict[str,rasterio.DatasetReader] or None
:param classification_layer_masks: classification layers
:type classification_layer_masks: Dict, xr.DataArray or None
:return: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type data: xr.Dataset
"""
dataset = xr.Dataset(
{"image": (["row", "col"], data.astype(np.float32))},
coords={
"row": np.arange(data.shape[0]),
"col": np.arange(data.shape[1]),
},
)
# Add classification layers
if isinstance(classification_layer_masks, dict):
# Define coords, the third col is the indicator
# with the classification layer name
coords_classification_layers = [
dataset.coords["row"],
dataset.coords["col"],
classification_layer_masks["names"],
]
# Create the dataarray
dataset["classification_layer_masks"] = xr.DataArray(
data=classification_layer_masks["map_arrays"],
coords=coords_classification_layers,
dims=["row", "col", "indicator"],
)
elif isinstance(classification_layer_masks, xr.DataArray):
dataset["classification_layer_masks"] = classification_layer_masks
# Add transform to dataset
trans_len = np.arange(0, len(transform))
dataset.coords["trans_len"] = trans_len
dataset["georef_transform"] = xr.DataArray(
data=transform, dims=["trans_len"]
)
# Add image attributes to the image dataset
dataset.attrs = {
"nodata": nodata,
"input_img": input_img,
"crs": img_crs,
"xres": transform[1],
"yres": transform[5],
"plani_unit": plani_unit,
"zunit": zunit,
"bounds": bounds,
"source_rasterio": source_rasterio,
}
# If the georef is geoid, add geoid offset to the data
if geoid_path:
# transform to ellipsoid
geoid_offset = _get_geoid_offset(dataset, geoid_path)
dataset["image"].data += geoid_offset
dataset.attrs["geoid_path"] = geoid_path
else:
dataset.attrs["geoid_path"] = None
return dataset
[docs]
def reproject_dataset(
dataset: xr.Dataset, from_dataset: xr.Dataset, interp: str = "bilinear"
) -> xr.Dataset:
"""
Reproject dataset on the from_dataset's georeference origin and grid,
and return the corresponding xarray.DataSet.
If no interp is given, default "bilinear" resampling is considered.
Another available resampling is "nearest".
:param dataset: Dataset to reproject xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dataset: xr.Dataset
:param from_dataset: Dataset to get projection from
xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type from_dataset: xr.Dataset
:param interp: interpolation method
:type interp: str
:return: reprojected xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
# Define reprojected dataset
reprojected_dataset = copy.copy(from_dataset)
if "indicator" in reprojected_dataset.coords:
reprojected_dataset = reprojected_dataset.drop_dims("indicator")
interpolation_method = Resampling.bilinear
if interp == "bilinear":
interpolation_method = Resampling.bilinear
elif interp == "nearest":
interpolation_method = Resampling.nearest
else:
logging.warning(
"Interpolation method not available, use default 'bilinear'"
)
# Get source and destination transforms
src_transform = Affine.from_gdal(
dataset["georef_transform"].data[0],
dataset["georef_transform"].data[1],
dataset["georef_transform"].data[2],
dataset["georef_transform"].data[3],
dataset["georef_transform"].data[4],
dataset["georef_transform"].data[5],
)
dst_transform = Affine.from_gdal(
from_dataset["georef_transform"].data[0],
from_dataset["georef_transform"].data[1],
from_dataset["georef_transform"].data[2],
from_dataset["georef_transform"].data[3],
from_dataset["georef_transform"].data[4],
from_dataset["georef_transform"].data[5],
)
# Get source array
source_array = dataset["image"].data
# Define dest_array with the output size and fill with nodata
dest_array = np.zeros_like(from_dataset["image"].data)
dest_array[:, :] = from_dataset.nodata
# Obtain datasets CRSs
src_crs = rasterio.crs.CRS.from_dict(dataset.attrs["crs"])
dst_crs = rasterio.crs.CRS.from_dict(from_dataset.attrs["crs"])
# Reproject with rasterio
reproject(
source=source_array,
destination=dest_array,
src_transform=src_transform,
src_crs=src_crs,
dst_transform=dst_transform,
dst_crs=dst_crs,
resampling=interpolation_method,
src_nodata=dataset.attrs["nodata"],
dst_nodata=from_dataset.attrs["nodata"],
)
# Convert output dataset's remaining nodata values to nan
dest_array[dest_array == dataset.attrs["nodata"]] = np.nan
# Charge reprojected_dataset's data and nodata values
reprojected_dataset["image"].data = dest_array
reprojected_dataset.attrs["nodata"] = dataset.attrs["nodata"]
if "indicator" in dataset.coords:
indicator = (
dataset["classification_layer_masks"].coords["indicator"].data
)
classification_layer_masks = np.full(
(
reprojected_dataset["image"].shape[0],
reprojected_dataset["image"].shape[1],
len(indicator),
),
np.nan,
dtype=np.float32,
)
for idx in np.arange(len(indicator)):
# Define dest_array with the output size and fill with nodata
dest_array_classif = np.zeros_like(from_dataset["image"].data)
dest_array_classif[:, :] = dataset.nodata
# Get source array
source_array_classif = dataset["classification_layer_masks"][
:, :, idx
].data
# Reproject with rasterio
reproject(
source=source_array_classif,
destination=dest_array_classif,
src_transform=src_transform,
src_crs=src_crs,
dst_transform=dst_transform,
dst_crs=dst_crs,
resampling=interpolation_method,
src_nodata=dataset.attrs["nodata"],
dst_nodata=from_dataset.attrs["nodata"],
)
classification_layer_masks[
:, :, idx
].data = dest_array_classif # type: ignore
# Define coords, the third col is the indicator
# with the classification layer name
coords_classification_layers = [
reprojected_dataset.coords["row"],
reprojected_dataset.coords["col"],
indicator,
]
# Create the dataarray
reprojected_dataset["classification_layer_masks"] = xr.DataArray(
data=classification_layer_masks,
coords=coords_classification_layers,
dims=["row", "col", "indicator"],
)
return reprojected_dataset
[docs]
def compute_offset_adapting_factor(
sec: xr.Dataset, ref: xr.Dataset
) -> Tuple[float, float]:
"""
Compute the factor to adapt the coregistration offsets
to the dem resolution
The name is too generic to know the usage quickly. Is the function
in dem_tools or here ?
:param sec: sec
:type sec: xr.Dataset
:param ref: ref
:type ref: xr.Dataset
:return: x and y factors
:rtype: Tuple[float, float]
"""
# Obtain the original dem size
orig_ysize, orig_xsize = sec["image"].shape
# Reproject the dem to the ref without doing any crop
reproj_sec = reproject_dataset(sec, ref, interp="bilinear")
# Obtain the full reprojected dem size
reproj_ysize, reproj_xsize = reproj_sec["image"].shape
# Return the x and y factors to adapt the computed offsets
return orig_xsize / reproj_xsize, orig_ysize / reproj_ysize
[docs]
def _get_geoid_offset(
dataset: xr.Dataset, geoid_path: Union[str, None]
) -> np.ndarray:
"""
Computes the geoid offset of the input DEM. If no geoid_path is
given, the default geoid/egm96_15.gtx if used.
:param dataset: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dataset: xr.Dataset
:param geoid_path: optional absolut geoid_path, if None egm96 is used
:type geoid_path: str or None
:return: offset as array
:rtype: np.ndarray
"""
# If no geoid path has been given, use the default geoid egm96
# installed in setup.py
if geoid_path is None:
# this returns the fully resolved path to the python installed module
module_path = os.path.dirname(__file__)
# Geoid relative Path as installed in setup.py
geoid_path = "geoid/egm96_15.gtx"
# Create full geoid path
geoid_path = os.path.join(module_path, geoid_path)
# Obtain dataset's grid
ny, nx = dataset["image"].data.shape
xp = np.arange(nx)
yp = np.arange(ny)
xp, yp = np.meshgrid(xp, yp)
# Project the dataset grid into lat/lon coordinates
lonlat = list(
convert_pix_to_coord(dataset["georef_transform"].data, yp, xp)
)
# If the georef's units are meters (if is_projected),
# convert them to degrees
src_crs = rasterio.crs.CRS.from_dict(dataset.attrs["crs"])
if src_crs.is_projected:
# convert to global coordinates
proj = pyproj.Proj(src_crs)
lonlat = list(proj(lonlat[0], lonlat[1], inverse=True))
# transform to list (2xN)
lon_1d = np.reshape(
lonlat[0],
(dataset["image"].data.shape[0] * dataset["image"].data.shape[1]),
)
lat_1d = np.reshape(
lonlat[1],
(dataset["image"].data.shape[0] * dataset["image"].data.shape[1]),
)
coords = np.zeros((lon_1d.size, 2))
coords[:, 0] = lon_1d
coords[:, 1] = lat_1d
# Interpolate geoid on the dataset coordinates
# If the dataset coordinates are outside of the geoid scope,
# an error will be raised.
try:
# Get geoid values
interp_geoid = _interpolate_geoid(
geoid_path, coords, interpol_method="linear"
)
except ValueError:
logging.error(
"Input DSM %s coordinates outside of the %s geoid scope.",
dataset.attrs["input_img"],
geoid_path,
)
raise
# transform to array of shape dataset['im'].data.shape
arr_offset = np.reshape(interp_geoid, dataset["image"].data.shape)
return arr_offset
[docs]
def _interpolate_geoid(
geoid_filename: str, coords: np.ndarray, interpol_method: str = "linear"
) -> np.ndarray:
"""
Bilinear interpolation of the given geoid to the input coordinates.
If no interpol_method is given, a "linear" interpolation is considered.
If the input coordinates are outside of the geoid scope,
an exception is raised.
:param geoid_filename: coord geoid_filename
:type geoid_filename: str
:param coords: coords matrix 2xN [lon,lat]
:type coords: np.ndarray
:param interpol_method: interpolation type
:type interpol_method: str
:return: interpolated position [lon,lat,estimate geoid]
:rtype: 3D np.array
"""
dataset = rasterio.open(geoid_filename)
transform = dataset.transform
# Get transform's step
step_x = transform[0]
step_y = -transform[4]
# coin BG
[ori_x, ori_y] = transform * (
0.5,
dataset.height - 0.5,
) # positions au centre pixel
# Compute last x and y geoid positions
last_x = ori_x + step_x * dataset.width
last_y = ori_y + step_y * dataset.height
# Get all geoid values
geoid_values = dataset.read(1)[::-1, :].transpose()
# Get all geoid positions
x = np.arange(ori_x, last_x, step_x)
y = np.arange(ori_y, last_y, step_y)
geoid_grid_coordinates = (x, y)
# Interpolate geoid on the input coordinates
interp_geoid = interpolate.interpn(
geoid_grid_coordinates,
geoid_values,
coords,
method=interpol_method,
bounds_error=True,
fill_value=None,
)
return interp_geoid