#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2022 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of demcompare
# (see https://github.com/CNES/demcompare).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains main functions to manipulate DEM raster images.
It represents the primary API to manipulate DEM as xarray dataset in demcompare.
Dataset and associated internal functions are described in dataset_tools.py
"""
# pylint:disable=too-many-lines
# Standard imports
import copy
import logging
import os
from enum import Enum
from typing import Dict, Tuple, Union
import matplotlib.pyplot as mpl_pyplot
# Third party imports
import numpy as np
import rasterio
import xarray as xr
from astropy import units as u
from numpy.fft import fft2, ifft2, ifftshift
from rasterio import Affine
from scipy.ndimage import convolve
from .dataset_tools import (
compute_offset_adapting_factor,
create_dataset,
reproject_dataset,
)
from .img_tools import (
calc_spatial_freq_2d,
convert_pix_to_coord,
crop_rasterio_source_with_roi,
neighbour_interpol,
)
[docs]
DEFAULT_NODATA = -32768
[docs]
def load_dem(
path: str,
nodata: float = None,
band: int = 1,
geoid_georef: bool = False,
geoid_path: Union[str, None] = None,
zunit: str = "m",
input_roi: Union[bool, dict, Tuple] = False,
classification_layers: Dict = None,
) -> xr.Dataset:
"""
Reads the input DEM path and parameters and generates
the DEM object as xr.Dataset to be handled in demcompare functions.
A DEM can be any raster file opened by rasterio.
:param path: path to dem (readable by rasterio)
:type path: str
:param nodata: forcing dem no data value
(None by default and if set inside metadata)
:type nodata: float or None
:param band: band to be read in DEM. Default: 1
:type band: int
:param geoid_georef: is dem's georef is geoid
:type geoid_georef: bool
:param geoid_path: optional path to local geoid, default is EGM96
:type geoid_path: str or None
:param zunit: dem z unit
:type zunit: str
:param input_roi: False if dem are to be fully loaded,
other options are a dict roi or Tuple
:type input_roi: bool, dict or Tuple
:param classification_layers: input classification layers
:type classification_layers: Dict or None
:return: dem xr.DataSet containing : (see dataset_tools for details)
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
# Open dem with rasterio
# but source_rasterio is closed and tests bug
src_dem = rasterio.open(path)
source_rasterio = {}
source_rasterio["source_dem"] = src_dem
# Get rasterio transform :
# affine transformation matrix that maps pixel locations
# in (row, col) coordinates to (x, y) spatial positions
dem_geotransform = src_dem.transform
# Get rasterio BoundingBox(left, bottom, right, top)
bounds_dem = src_dem.bounds
if input_roi is not False:
# Use ROI
if isinstance(input_roi, dict):
if (
"left" in input_roi
and "bottom" in input_roi
and "right" in input_roi
and "top" in input_roi
):
# coordinates
bounds_dem = rasterio.coords.BoundingBox(
input_roi["left"],
input_roi["bottom"],
input_roi["right"],
input_roi["top"],
)
elif (
"x" in input_roi
and "y" in input_roi
and "w" in input_roi
and "h" in input_roi
):
# coordinates
window_dem = rasterio.windows.Window(
input_roi["x"],
input_roi["y"],
input_roi["w"],
input_roi["h"],
)
left, bottom, right, top = rasterio.windows.bounds(
window_dem, dem_geotransform
)
bounds_dem = rasterio.coords.BoundingBox(
left, bottom, right, top
)
else:
raise TypeError("Not the right conventions for ROI")
# Get dem raster image from band image
dem_image = src_dem.read(band)
# Test nodata in DEM
if np.all(dem_image == nodata):
raise ValueError(
f"All values in {source_rasterio['source_dem'].name} are NODATA"
)
classif_layers: Union[Dict, None] = None
if classification_layers:
classif_layers = {}
classif_layers["names"] = []
classif_layers["map_arrays"] = np.full(
(
dem_image.shape[0],
dem_image.shape[1],
len(classification_layers.keys()),
),
np.nan,
dtype=np.float32,
)
# Open the clasification layers with rasterio
# and add the map_array to the layer dict
for idx, [name, layer] in enumerate(classification_layers.items()):
classif_rasterio_source = rasterio.open(layer["map_path"])
map_array = classif_rasterio_source.read(band)
if map_array.shape != dem_image.shape:
raise ValueError(
f"Input classification layer {name} "
"does not have the same size"
" as its reference dem."
)
classif_layers["map_arrays"][:, :, idx] = map_array
classif_layers["names"].append(name)
source_rasterio[name] = classif_rasterio_source
# create dataset
dem_dataset = create_dem(
dem_image,
dem_geotransform,
src_dem.crs,
path,
bounds_dem,
classification_layer_masks=classif_layers,
nodata=nodata,
geoid_georef=geoid_georef,
geoid_path=geoid_path,
zunit=zunit,
source_rasterio=source_rasterio,
)
return dem_dataset
[docs]
def copy_dem(dem: xr.Dataset) -> xr.Dataset:
"""
Returns a copy of the input dem.
:param dem: input dem to copy, xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem: xr.Dataset
:return dem_copy: copy of the input dem, xr.DataSet
containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
# If present, the source_rasterio has to be temporarily set
# out of the input dem
if "source_rasterio" in dem.attrs:
source_rasterio = dem.attrs["source_rasterio"]
dem.attrs["source_rasterio"] = None
dem_copy = copy.deepcopy(dem)
dem.attrs["source_rasterio"] = source_rasterio
else:
dem_copy = copy.deepcopy(dem)
return dem_copy
[docs]
def save_dem(
dataset: xr.Dataset,
filename: str,
nodata: float = DEFAULT_NODATA,
) -> xr.Dataset:
"""
Writes a Dataset in a tiff file.
If new_array is set, new_array is used as data.
Returns written dataset.
:param dataset: xarray.DataSet containing the variables :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dataset: xr.Dataset
:param filename: output filename
:type filename: str
:param nodata: value of nodata to use
:type nodata: float
:return: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
# update from dataset
previous_profile = {}
previous_profile["crs"] = dataset.attrs["crs"]
previous_profile["transform"] = Affine.from_gdal(
dataset["georef_transform"].data[0],
dataset["georef_transform"].data[1],
dataset["georef_transform"].data[2],
dataset["georef_transform"].data[3],
dataset["georef_transform"].data[4],
dataset["georef_transform"].data[5],
)
data = dataset["image"].data
if len(dataset["image"].shape) == 2:
row, col = data.shape
with rasterio.open(
filename,
mode="w+",
driver="GTiff",
width=col,
height=row,
count=1,
dtype=data.dtype,
crs=previous_profile["crs"],
transform=previous_profile["transform"],
) as source_ds:
source_ds.nodata = nodata
source_ds.write(data, 1)
else:
row, col, depth = data.shape
with rasterio.open(
filename,
mode="w+",
driver="GTiff",
width=col,
height=row,
count=depth,
dtype=data.dtype,
crs=previous_profile["crs"],
transform=previous_profile["transform"],
) as source_ds:
for dsp in range(1, depth + 1):
source_ds.write(data[:, :, dsp - 1], dsp)
dataset.attrs["input_img"] = filename
return dataset
[docs]
def translate_dem(
dataset: xr.Dataset,
x_offset: Union[float, int, np.ndarray],
y_offset: Union[float, int, np.ndarray],
) -> xr.Dataset:
"""
Applies pixellic offset to the input dataset's
georeference origin by modifying its transform.
:param dataset: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dataset: xr.Dataset
:param x_offset: x offset
:type x_offset: Union[float, int, ndarray]
:param y_offset: y offset
:type y_offset: Union[float, int, ndarray]
:return: translated xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
dataset_translated = copy_dem(dataset)
# Project the pixellic offset to input dataset's coordinates
x_offset, y_offset = convert_pix_to_coord(
dataset["georef_transform"].data, y_offset, x_offset
)
# To add an offset, the [0] and [3] positions
# of the transform have to be modified
dataset_translated["georef_transform"].data[0] = x_offset
dataset_translated["georef_transform"].data[3] = y_offset
if dataset.attrs["bounds"]:
# Apply offset to bounds
# Apply the bounds difference between its left/bottom
# coordinate and the georef_transform origin
# (which is zero if no ROI was specified)
# to the new georef_transform origin
bounds_x_offset = float(x_offset) + (
dataset.attrs["bounds"].left - dataset["georef_transform"].data[0]
)
bounds_y_offset = float(y_offset) + (
dataset.attrs["bounds"].top - dataset["georef_transform"].data[3]
)
dataset_translated.attrs["bounds"] = rasterio.coords.BoundingBox(
float(bounds_x_offset),
float(bounds_y_offset)
- dataset.attrs["bounds"].top
+ dataset.attrs["bounds"].bottom,
float(bounds_x_offset)
+ dataset.attrs["bounds"].right
- dataset.attrs["bounds"].left,
float(bounds_y_offset),
)
return dataset_translated
[docs]
def create_dem( # pylint: disable=too-many-arguments, too-many-branches
data: np.ndarray,
transform: Union[np.ndarray, rasterio.Affine] = None,
img_crs: Union[rasterio.crs.CRS, None] = None,
input_img: Union[str, None] = None,
bounds: rasterio.coords.BoundingBox = None,
classification_layer_masks: Union[Dict, xr.DataArray] = None,
nodata: float = None,
geoid_georef: bool = False,
geoid_path: Union[str, None] = None,
zunit: str = "m",
source_rasterio: Dict[str, rasterio.DatasetReader] = None,
) -> xr.Dataset:
"""
Creates dem from input array and transform.
The demcompare DEM is an xarray Dataset containing:
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:param data: image data
:type data: np.ndarray
:param transform: rasterio georeferencing transformation matrix
:type transform: np.ndarray or rasterio.Affine
:img_crs: image CRS georef
:type img_crs: Rasterio.crs.CRS
:param input_img: image path
:type input_img: str
:param bounds: dem bounds
:type bounds: rasterio.coords.BoundingBox or None
:param classification_layer_masks: classification layers
:type classification_layer_masks: Dict, xr.DataArray or None
:param nodata: nodata value in the image
:type nodata: float or None
:param geoid_georef: if dem's georef is geoid
:type geoid_georef: bool
:param geoid_path: optional path to local geoid, default is EGM96
:type geoid_path: str or None
:param zunit: unit
:type zunit: str
:param source_rasterio: rasterio dataset reader object
:type source_rasterio: Dict[str,rasterio.DatasetReader] or None
:return: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
# If source_rasterio (DatasetReader is given), set img_ds variable
img_ds = None
if source_rasterio:
img_ds = source_rasterio["source_dem"]
# If no nodata value was given
# If img_ds exists and has nodatavals,
# Otherwise the
# DEFAULT_NODATA value is used
if nodata is None:
nodata = DEFAULT_NODATA
if img_ds:
meta_nodata = img_ds.nodatavals[0]
if meta_nodata is not None:
nodata = meta_nodata
# If input data has three dimensions, flatten
if len(data.shape) == 3:
# to dim 2
dim_single = np.where(data.shape) == 1
if dim_single == 0:
data = data[0, :, :]
if dim_single == 2:
data = data[:, :, 0]
# Convert nodata values to nan
data = data.astype(np.float32)
data[data == nodata] = np.nan
# Convert altimetric units to meter
data = ((data * u.Unit(zunit)).to(u.meter)).value
new_zunit = u.meter
# If no transform was given, add random transform and resolution
if transform is None:
xres = 1
yres = 1
transform = np.array([0, xres, 0, 0, 0, yres])
# If input transform is affine, convert it to gdal and np.array
if isinstance(transform, Affine):
# to_gdal just switches the input transform parameter's order to:
# (c, a, b, f, d, e)
transform = np.array(transform.to_gdal())
# Obtain image's georef
# The image's crs is used as georef.
if not img_crs:
if img_ds:
img_crs = img_ds.crs
# get plani unit
else:
# If no dataset image was given,
# the default "WGS84" georef is considered.
img_crs = rasterio.crs.CRS.from_epsg(4326)
if img_crs.is_geographic:
plani_unit = u.deg
else:
plani_unit = u.m
# If the georef is geoid, set default the geoid_path if not set
if geoid_georef:
if geoid_path is None:
# this returns the fully resolved path to the
# python installed module
module_path = os.path.dirname(__file__)
# Geoid relative Path as installed in setup.py
geoid_path = "geoid/egm96_15.gtx"
# Create full geoid path
geoid_path = os.path.join(module_path, geoid_path)
else:
geoid_path = None
# Create dataset
dataset = create_dataset(
data,
transform,
img_crs,
input_img,
bounds,
nodata,
geoid_path,
plani_unit,
new_zunit,
source_rasterio,
classification_layer_masks,
)
return dataset
[docs]
class SamplingSourceParameter(Enum):
"""
Enum type definition for sampling_source parameter
value are sec or ref to choose in reproject_dems
"""
[docs]
def reproject_dems(
sec: xr.Dataset,
ref: xr.Dataset,
initial_shift_x: Union[int, float] = 0,
initial_shift_y: Union[int, float] = 0,
sampling_source: str = SamplingSourceParameter.SEC.value,
) -> Tuple[xr.Dataset, xr.Dataset, Tuple[float, float]]:
"""
Reprojects both DEMs to common grid, common bounds and common georef origin.
The common grid, bounds, georef origin are defined
by the sampling_source parameter.
It defines which is the sampling of the output DEMs.
- If sampling_source is "ref":
ref is cropped to the common bounds
sec is cropped to the common bounds and resampled
- If sampling_source is "sec":
ref is cropped to the common bounds and resampled
sec is cropped to the common bounds
:param sec: dem to align xr.DataSet containing :
- im : 2D (row, col) xarray.DataArray float32
- trans: 1D (trans_len) xarray.DataArray
:type sec: xr.Dataset
:param ref: ref xr.DataSet containing :
- im : 2D (row, col) xarray.DataArray float32
- trans: 1D (trans_len) xarray.DataArray
:type ref: xr.Dataset
:param initial_shift_x: optional initial shift x
:type initial_shift_x: Union[int, float]
:param initial_shift_y: optional initial shift y
:type initial_shift_y: Union[int, float]
:param sampling_source: 'ref' or 'sec', the sampling
value of the output dems, by defaut "sec"
:type sampling_source: str
:return: reproj_cropped_sec xr.DataSet,
reproj_cropped_ref xr.DataSet, adapting_factor.
The xr.Datasets containing :
- im : 2D (row, col) xarray.DataArray float32
- trans: 1D (trans_len) xarray.DataArray
:rtype: xr.Dataset, xr.Dataset, Tuple[float, float]
"""
if sampling_source == SamplingSourceParameter.REF.value:
interp = sec
static = ref
# Compute offset adapting factor for later adapting the
# offset to the original dem resolution
adapting_factor = compute_offset_adapting_factor(sec, ref)
else: # sampling_source == SamplingSourceParameter.SEC.value:
interp = ref
static = sec
# If sampling value is sec, adapting
# factor is 1 (no adaptation needed)
adapting_factor = (1.0, 1.0)
# Reproject if input image is inversed top bottom or left right
# to have the same consistent (left, bottom, right, top) reference
# than interp projected bounds (orientation bug otherwise)
transformed_static_bounds = rasterio.warp.transform_bounds(
static.attrs["crs"],
static.attrs["crs"],
static.attrs["bounds"][0],
static.attrs["bounds"][1],
static.attrs["bounds"][2],
static.attrs["bounds"][3],
)
# Project bounds to static_crs
transformed_interp_bounds = rasterio.warp.transform_bounds(
interp.attrs["crs"],
static.attrs["crs"],
interp.attrs["bounds"][0],
interp.attrs["bounds"][1],
interp.attrs["bounds"][2],
interp.attrs["bounds"][3],
)
# Obtain intersection roi
if rasterio.coords.disjoint_bounds(
transformed_static_bounds, transformed_interp_bounds
):
raise NameError("ERROR: ROIs do not intersect")
intersection_roi = rasterio.coords.BoundingBox(
max(transformed_static_bounds[0], transformed_interp_bounds[0]),
max(transformed_static_bounds[1], transformed_interp_bounds[1]),
min(transformed_static_bounds[2], transformed_interp_bounds[2]),
min(transformed_static_bounds[3], transformed_interp_bounds[3]),
)
# Crop static dem
# rasterio.mask.mask function needs the src read by rasterio
src_static = static.attrs["source_rasterio"]["source_dem"]
(
new_cropped_static,
new_cropped_static_transform,
) = crop_rasterio_source_with_roi(src_static, intersection_roi)
# Crop static classification layers
# rasterio.mask.mask function needs the src read by rasterio
if "indicator" in static.coords:
cropped_static_classif = np.full(
(
new_cropped_static.shape[1],
new_cropped_static.shape[2],
len(static["classification_layer_masks"].coords["indicator"]),
),
np.nan,
dtype=np.float32,
)
for idx, indicator in enumerate(
static["classification_layer_masks"].coords["indicator"].data
):
src_classif = static.attrs["source_rasterio"][indicator]
(
new_cropped_classif,
_,
) = crop_rasterio_source_with_roi(src_classif, intersection_roi)
cropped_static_classif[:, :, idx] = new_cropped_classif
# Remove confidence_measure dataArray from the dataset to update it
indicator = list(static.coords["indicator"].data)
static = static.drop_dims("indicator")
coords_classification_layers = {
"row": np.arange(new_cropped_static.shape[1]),
"col": np.arange(new_cropped_static.shape[2]),
"indicator": indicator,
}
static["classification_layer_masks"] = xr.DataArray(
data=cropped_static_classif,
coords=coords_classification_layers,
dims=["row", "col", "indicator"],
)
# Create cropped static dem
reproj_cropped_static = create_dem(
new_cropped_static,
new_cropped_static_transform,
img_crs=static.attrs["crs"],
nodata=static.attrs["nodata"],
geoid_path=static.attrs["geoid_path"],
zunit=static.attrs["zunit"],
input_img=static.attrs["input_img"],
bounds=intersection_roi,
classification_layer_masks=(
static.classification_layer_masks
if "indicator" in static.coords
else None
),
)
# Full_interp represent a dem with the full interp image
full_interp = create_dem(
interp["image"].data,
interp.georef_transform,
img_crs=interp.attrs["crs"],
nodata=interp.attrs["nodata"],
geoid_path=interp.attrs["geoid_path"],
zunit=interp.attrs["zunit"],
input_img=interp.attrs["input_img"],
bounds=intersection_roi,
classification_layer_masks=(
interp.classification_layer_masks
if "indicator" in interp.coords
else None
),
)
# Translate sec according to the initial shift
if initial_shift_x != 0 or initial_shift_y != 0:
if sampling_source == SamplingSourceParameter.REF.value:
# If sampling_source is ref, adapt the initial shift to the new
# resolution
x_factor, y_factor = adapting_factor
full_interp = translate_dem(
full_interp,
initial_shift_x * x_factor,
initial_shift_y * y_factor,
)
else:
reproj_cropped_static = translate_dem(
reproj_cropped_static, initial_shift_x, initial_shift_y
)
# Interp DEM is reprojected into the static DEM's georef-grid
# Crop and resample are done in the interp DEM
reproj_cropped_interp = reproject_dataset(
full_interp, reproj_cropped_static, interp="bilinear"
)
# Update dataset input_img with interp old value
reproj_cropped_interp.attrs["input_img"] = full_interp.attrs["input_img"]
# Define reprojected ref and sec according to the sampling value
if sampling_source == SamplingSourceParameter.REF.value:
reproj_cropped_ref = reproj_cropped_static
reproj_cropped_sec = reproj_cropped_interp
else: # sampling_source == SamplingSourceParameter.SEC.value:
reproj_cropped_ref = reproj_cropped_interp
reproj_cropped_sec = reproj_cropped_static
return reproj_cropped_sec, reproj_cropped_ref, adapting_factor
[docs]
def accumulates_class_layers( # pylint:disable=too-many-branches
ref: xr.Dataset, sec: xr.Dataset, diff_ref_sec: xr.Dataset
) -> xr.Dataset:
"""
Accumulates the classification layers of each dem
:param ref: ref dem
:type ref: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:param sec: sec dem
:type ref: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:param diff_ref_sec: difference (i.e angular, in altitude, ...)
dem between ref and sec
:type diff_ref_sec: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:return: the difference dem between ref and sec,
with the classification layers
:rtype: xr.Dataset containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
"""
support_list = []
# Initialize classif_layers_datarray
classif_layers_datarray = None
if "classification_layer_masks" in ref:
# If classification layers in ref,
# Add the layer on the dataarray
classif_layers_datarray = copy.deepcopy(
ref["classification_layer_masks"]
)
for _ in classif_layers_datarray.coords["indicator"].data:
support_list.append("ref")
# Add the fusion information on the diff dataset
if "fusion_layers" in ref.attrs:
diff_ref_sec.attrs["fusion_layers"] = ref.attrs["fusion_layers"]
# If slope is present, add the dataarray
if "ref_slope" in ref:
ref_slope = copy.deepcopy(ref["ref_slope"])
diff_ref_sec["ref_slope"] = ref_slope
if "classification_layer_masks" in sec:
# If classification layers in sec,
# Add the layer on the dataarray
if isinstance(classif_layers_datarray, xr.DataArray):
# If the dataarray already existed,
# update it
nb_row = classif_layers_datarray.shape[0]
nb_col = classif_layers_datarray.shape[1]
nb_indicator = (
classif_layers_datarray.shape[2]
+ sec["classification_layer_masks"].shape[2]
)
# Add a new indicator to the DataArray
updated_data = np.full(
(nb_row, nb_col, nb_indicator), np.nan, dtype=np.float32
)
# Ref classification layers
updated_data[
:, :, : -sec["classification_layer_masks"].shape[2]
] = classif_layers_datarray.data
# Sec classification layers
updated_data[:, :, classif_layers_datarray.shape[2] :] = sec[
"classification_layer_masks"
].data
indicator = np.copy(classif_layers_datarray.coords["indicator"])
for new_indicator in sec["classification_layer_masks"].coords[
"indicator"
]:
indicator = np.append(indicator, new_indicator)
expanded_coords = [
classif_layers_datarray.coords["row"],
classif_layers_datarray.coords["col"],
indicator,
]
classif_layers_datarray = xr.DataArray(
data=updated_data,
coords=expanded_coords,
dims=["row", "col", "indicator"],
)
else:
classif_layers_datarray = copy.deepcopy(
sec["classification_layer_masks"]
)
for _ in sec["classification_layer_masks"].coords["indicator"].data:
support_list.append("sec")
# If slope is present, add the dataarray
# When computing the slope for a single dem, the indicator
# is always ref_slope by default, so we adapt it to sec_slope
if "ref_slope" in sec:
sec_slope = copy.deepcopy(sec["ref_slope"])
diff_ref_sec["sec_slope"] = sec_slope
# Add the fusion information on the diff dataset
if "fusion_layers" in sec.attrs:
if "fusion_layers" in diff_ref_sec:
diff_ref_sec.attrs["fusion_layers"].append(
sec.attrs["fusion_layers"]
)
else:
diff_ref_sec.attrs["fusion_layers"] = sec.attrs["fusion_layers"]
# Add the dataarray on the diff dataset
diff_ref_sec["classification_layer_masks"] = classif_layers_datarray
# Add the support_list as an attribute
diff_ref_sec.attrs["support_list"] = support_list
return diff_ref_sec
[docs]
def compute_dem_slope(
dataset: xr.Dataset,
degree: bool = False,
add_attribute: bool = True,
unit_change: bool = True,
) -> xr.Dataset:
"""
Computes DEM's slope
Slope is presented here :
http://pro.arcgis.com/ \
fr/pro-app/tool-reference/spatial-analyst/how-aspect-works.htm
TODO: modify code such as to remove double return at the end
:param dataset: dataset
:type dataset: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:param degree: True if is in degree
:type degree: bool
:param add_attribute: if True, add attribute to the input dataset
if False, return the slope
:type add_attribute: bool
:param unit_change: if True change units of the slope
if False, no units change
:type unit_change: bool
:return: slope
:rtype: np.ndarray
"""
def _get_orthodromic_distance(
lon1: Union[float, np.ndarray],
lat1: Union[float, np.ndarray],
lon2: Union[float, np.ndarray],
lat2: Union[float, np.ndarray],
):
"""
Get Orthodromic distance from two (lat,lon) coordinates
:param lon1: longitude 1
:type lon1: Union[float, np.ndarray]
:param lat1: latitude 1
:type lat1: Union[float, np.ndarray]
:param lon2: longitude 2
:type lon2: Union[float, np.ndarray]
:param lat2: latitude 2
:type lat2: Union[float, np.ndarray]
:return: orthodromic distance
"""
# WGS-84 equatorial radius in km
radius_equator = 6378137.0
return radius_equator * np.arccos(
np.cos(lat1 * np.pi / 180)
* np.cos(lat2 * np.pi / 180)
* np.cos((lon2 - lon1) * np.pi / 180)
+ np.sin(lat1 * np.pi / 180) * np.sin(lat2 * np.pi / 180)
)
crs = dataset.attrs["crs"]
if not crs.is_projected:
# Our dem is not projected, we can't simply use the pixel resolution
# we need to compute resolution between each point
# Create grid of image's size
ny, nx = dataset["image"].data.shape
xp = np.arange(nx)
yp = np.arange(ny)
xp, yp = np.meshgrid(xp, yp)
# Convert all pixels on grid to lat lon
lon, lat = convert_pix_to_coord(
dataset["georef_transform"].data, yp, xp
)
lonr = np.roll(lon, 1, 1)
latl = np.roll(lat, 1, 0)
# Get distance between all pixels
distx = _get_orthodromic_distance(lon, lat, lonr, lat)
disty = _get_orthodromic_distance(lon, lat, lon, latl)
# deal with singularities at edges
distx[:, 0] = distx[:, 1]
disty[0] = disty[1]
else:
# if resolution is define, all pixels consider this distance
distx = np.abs(dataset.attrs["xres"])
disty = np.abs(dataset.attrs["yres"])
# Convolution kernel
conv_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
conv_y = conv_x.transpose()
# Now we do the convolutions :
gx = convolve(dataset["image"].data, conv_x, mode="reflect")
gy = convolve(dataset["image"].data, conv_y, mode="reflect")
# And eventually we do compute tan(slope) and aspect
tan_slope = np.sqrt((gx / distx) ** 2 + (gy / disty) ** 2) / 8
slope = np.arctan(tan_slope)
# Just simple unit change as required
if unit_change:
if degree is False:
slope *= 100
else:
slope = (slope * 180) / np.pi
# Add slope as a DataArray
# Slope
coords_slope = [dataset.coords["row"], dataset.coords["col"]]
# Add computed slope in the 3D classification layer format
data = np.full((slope.shape[0], slope.shape[1]), np.nan, dtype=np.float32)
data[:, :] = slope
# Create the dataarray
# In case there is a single dem, we name the datarray
# as ref_slope. If there are two arrays, the sec slope
# will be renamed by the accumulates_class_layers function
# to sec_slope
# TODO: remove double output # pylint:disable=fixme
if add_attribute:
dataset["ref_slope"] = xr.DataArray(
data=data,
coords=coords_slope,
dims=["row", "col"],
)
return dataset
return slope
[docs]
def compute_and_save_image_plots(
dem: xr.Dataset,
plot_path: str = None,
fig_title: str = None,
colorbar_title: str = None,
cmap: str = "terrain",
vmin_plot: float = None,
vmax_plot: float = None,
):
"""
Compute and optionnally save plots from a dem using pyplot img_show.
see https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.imshow.html
:param dem: dem object to compute and save image plots
:type dem: str
:param plot_path: path to save the plots if present
:type plot_path: str
:param fig_title: optional plot figure title
:type fig_title: str
:param title_colorbar: optional dem path to save the original tif file
:type title_colorbar: str
:param cmap: registered colormap name used to map scalar data to colors.
:type cmap: str
:param vmin_plot: minimum value of the z axis when plotting
:type vmin_plot: float
:param vmax_plot: maximum value of the z axis when plotting
:type vmax_plot: float
"""
# Create and save plot using the dem_plot function
# Compute mean of dem image data
mu = np.nanmean(dem["image"].data)
# Compute std also
sigma = np.nanstd(dem["image"].data)
# Plot with matplotlib.pyplot
fig, fig_ax = mpl_pyplot.subplots(figsize=(7.0, 8.0))
# add fig title if present
if fig_title:
fig_ax.set_title(fig_title, fontsize="large")
#
if vmin_plot is None:
vmin = mu - sigma
else:
vmin = vmin_plot
if vmax_plot is None:
vmax = mu + sigma
else:
vmax = vmax_plot
im1 = fig_ax.imshow(
dem["image"].data,
cmap=cmap,
vmin=vmin,
vmax=vmax,
interpolation="none",
aspect="equal",
)
fig.colorbar(im1, label=colorbar_title, ax=fig_ax)
if vmax_plot is None and vmin_plot is None:
fig.text(
0.15,
0.15,
f"Values rescaled between"
f"\n[mean-std, mean+std]=[{mu - sigma:.2f}, {mu + sigma:.2f}]",
fontsize="medium",
)
# Save plot
if plot_path:
mpl_pyplot.savefig(plot_path, dpi=100, bbox_inches="tight")
mpl_pyplot.close()
[docs]
def verify_fusion_layers(dem: xr.Dataset, classif_cfg: Dict, support: str):
"""
Verifies that the input configuration and input dem contain the
input necessary layers for the fusion classification.
:param dem: name to save the plots
:type dem: str
:param classif_cfg: classification layers configuration
:type classif_cfg: Dict
:param support: fusion support, ref or sec
:type support: str
:return: None
"""
classif_names = list(classif_cfg.keys())
fusion_layers = []
for _, clayer in classif_cfg.items():
if clayer["type"] == "fusion":
if support in clayer:
fusion_layers.append(clayer)
for fusion_layer in fusion_layers:
for layer_to_fusion in fusion_layer[support]:
if layer_to_fusion not in classif_names:
logging.error(
"Input layer to be fused %s not defined"
" in the classification layers configuration.",
layer_to_fusion,
)
raise ValueError
layer_type = classif_cfg[layer_to_fusion]["type"]
# If the layer type is slope, verify that the slope
# has been computed on the dem
if layer_type == "slope":
if "ref_slope" not in dem:
logging.error(
"Input layer to be fused is type slope, "
" but slope has not been computed on the input dem."
)
raise ValueError
# If the layer type is segmentation, verify that the layer map
# has been loaded on the dem
else:
if "classification_layer_masks" in dem:
dem_classif_names = list(
dem["classification_layer_masks"]
.coords["indicator"]
.data
)
if layer_to_fusion not in dem_classif_names:
logging.error(
"Input layer to be fused %s, "
" is not defined on the input dem.",
layer_to_fusion,
)
raise ValueError
else:
logging.error(
"Input layer to be fused %s, "
" is not defined on the input dem.",
layer_to_fusion,
)
raise ValueError
[docs]
def compute_curvature_filtering(
dem: xr.Dataset,
filter_intensity: float = 0.9,
replication: bool = True,
) -> xr.Dataset:
"""
Return the curvature of the input dem.
First, compute the FFT of the input dem: F(y) = FFT(DEM).
Then, apply a filter y^filter_intensity
with s=0.9: F(y) = F(y)* y^filter_intensity.
Finally, apply the inverse FFT: IFFT(F(y)).
We keep the real part (imaginary part = digital noise).
:param dem: dem xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem: xr.Dataset
:param filter_intensity: parameter of the DEM's FFT filter intensity.
Should be close to 1.
Default = 0.9.
:type filter_intensity: float
:param replication: if true, the image is replicated
by x4 in order to improve resolution.
Default = True.
:type replication: bool
:return: curvature xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
no_data_location = np.logical_or(
dem["image"].data == dem.attrs["nodata"],
np.isnan(dem["image"].data),
)
# no data pixel interpolation
data_all = neighbour_interpol(dem["image"].data, no_data_location)
high, wide = dem["image"].data.shape
if replication:
data_all = np.hstack([data_all, np.flip(data_all, axis=1)])
data_all = np.vstack([data_all, np.flip(data_all, axis=0)])
f_y, f_x = calc_spatial_freq_2d(2 * high, 2 * wide, edge=np.pi)
else:
f_y, f_x = calc_spatial_freq_2d(high, wide, edge=np.pi)
# spatial frequency (module)
spatial_freq = (f_x**2 + f_y**2) ** (filter_intensity / 2)
spatial_freq = ifftshift(spatial_freq)
image = fft2(data_all)
image_filtered = image * spatial_freq
image_filtered = ifft2(image_filtered)
if replication:
image_filtered = image_filtered[:high, :wide]
image_filtered[no_data_location] = dem.attrs["nodata"]
return create_dem(
image_filtered.real,
transform=dem.georef_transform.data,
nodata=dem.attrs["nodata"],
img_crs=dem.crs,
bounds=dem.bounds,
classification_layer_masks=(
dem["classification_layer_masks"]
if hasattr(dem, "classification_layer_masks")
else None
),
)