#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2022 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of demcompare
# (see https://github.com/CNES/demcompare).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint:disable=too-few-public-methods
"""
Mainly contains different DEM processing classes
"""
# pylint:disable=too-many-lines
import logging
from typing import Dict
import numpy as np
# Third party imports
import xarray as xr
from demcompare.dem_tools import (
accumulates_class_layers,
compute_curvature_filtering,
compute_dem_slope,
create_dem,
)
from demcompare.img_tools import compute_surface_normal, remove_nan_and_flatten
from .dem_processing import DemProcessing
from .dem_processing_template import DemProcessingTemplate
@DemProcessing.register("alti-diff")
[docs]
class AltiDiff(DemProcessingTemplate):
"""
Altitude difference between two DEMs
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "alti-diff"
self.fig_title = "[REF - SEC] difference"
self.colorbar_title = "Elevation difference (m)"
self.cmap = "bwr"
[docs]
def compute_dems_diff(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute altitude difference dem_1 - dem_2 and
return it as an xr.Dataset with the dem_2
georeferencing and attributes.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: difference xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
diff_raster = dem_1["image"].data - dem_2["image"].data
diff_dem = create_dem(
diff_raster,
transform=dem_2.georef_transform.data,
nodata=dem_1.attrs["nodata"],
img_crs=dem_2.crs,
bounds=dem_2.bounds,
)
return diff_dem
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute the difference between dem_1 and dem_2.
Add classification layers to the difference.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: difference xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
diff = self.compute_dems_diff(dem_1, dem_2)
diff = accumulates_class_layers(dem_1, dem_2, diff)
return diff
@DemProcessing.register("alti-diff-slope-norm")
[docs]
class AltiDiffSlopeNorm(DemProcessingTemplate):
"""
Altitude difference between two DEMs normalized by the slope
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "alti-diff-slope-norm"
self.fig_title = "[REF - SEC] difference normalized by the slope"
self.colorbar_title = "Elevation difference normalized by the slope"
self.cmap = "bwr"
[docs]
def compute_dems_diff_slope_norm(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute altitude difference dem_1 - dem_2,
normalized by the slope of the DEM and
return it as an xr.Dataset with the dem_2
georeferencing and attributes.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: difference normalized by the slope xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
diff_raster = dem_1["image"].data - dem_2["image"].data
diff_raster = self.dh_compute_normalization_factor(diff_raster, dem_2)
diff_dem = create_dem(
diff_raster,
transform=dem_2.georef_transform.data,
nodata=dem_1.attrs["nodata"],
img_crs=dem_2.crs,
bounds=dem_2.bounds,
)
return diff_dem
[docs]
def dh_compute_normalization_factor(
self, diff: np.ndarray, dem: xr.Dataset, nbins: int = 100
) -> np.ndarray:
"""
Compute the normalization factor for several (nbins) slope classes.
First: compute the tangent of the slope at each pixel.
Then: compute the angle of the slope at each pixel.
Then: classification of the pixels by the angle of the slope value.
Then: compute the std of the error for each of the pixel classes
Then: perform linear regression: a,b=regLin(tan(angle),std)
Finally: Error normalization for each slope class:
dh = dh/(1+b/a*tan(angle))
:param diff: difference between the ref and sec DEMs
:type diff: np.ndarray
:param dem: dem xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem: xr.Dataset
:param nbins: number of bins of the histogram
:type nbins: int
:return: altitude difference normalized by the slope
:rtype: np.ndarray
"""
alpha = compute_dem_slope(dem, add_attribute=False, unit_change=False)
tan_alpha = np.tan(alpha)
no_nan = (~np.isnan(tan_alpha)) & (~np.isnan(diff))
_, bin_alpha = np.histogram(alpha[no_nan], bins=nbins)
# exclude extreme slope values before performing linear regression
# in this case change [0, 1] -> [0.1, 0.9]
v_min, v_max = np.quantile(alpha[no_nan], [0, 1])
alpha_reg_lin = bin_alpha[
(bin_alpha <= v_max) & (bin_alpha >= v_min)
] # slope classes used for linear regression
std_alpha_reg_lin = []
alpha_reg_lin_for_fit = []
for n in range(alpha_reg_lin.size - 1):
mask = (
(alpha > alpha_reg_lin[n]) & (alpha <= alpha_reg_lin[n + 1])
) & no_nan
if diff[mask].size > 1:
std_alpha_reg_lin.append(
np.std(diff[mask])
) # standard deviation of error for slope class
alpha_reg_lin_for_fit.append(alpha_reg_lin[n])
if len(std_alpha_reg_lin) <= 1:
logging.error("Not enough pints to fit!")
raise ValueError
a, b = np.polyfit(np.tan(alpha_reg_lin_for_fit), std_alpha_reg_lin, 1)
# calculation of normalization factor
f_norm = np.ones(diff.shape) * np.nan
for n in range(len(bin_alpha) - 1):
mask = (alpha >= bin_alpha[n]) & (alpha <= bin_alpha[n + 1])
y_alpha, x_alpha = np.where(mask)
f_norm[y_alpha, x_alpha] = (
1 + b / a * tan_alpha[y_alpha, x_alpha]
) ** (-1)
# application of normalization factor to DEM elevation errors
# bias subtraction before applying the factor.
mu = np.mean(remove_nan_and_flatten(diff))
dh_norm = (np.copy(diff) - mu) * f_norm
return dh_norm
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute the difference between dem_1 and dem_2 normalized by the slope.
Add classification layers to the difference.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: difference normalized by the slope xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
diff = self.compute_dems_diff_slope_norm(dem_1, dem_2)
diff = accumulates_class_layers(dem_1, dem_2, diff)
return diff
@DemProcessing.register("angular-diff")
[docs]
class AngularDiff(DemProcessingTemplate):
"""
Angular difference between two DEMs
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "angular-diff"
self.fig_title = "[REF vs SEC] angular difference"
self.colorbar_title = "Angular difference"
self.cmap = "Reds"
[docs]
def compute_dems_angular_diff(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute angular difference dem_1 - dem_2 and
return it as an xr.Dataset with the dem_2
georeferencing and attributes.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: angular difference xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
normal_dem_1 = compute_surface_normal(
dem_1["image"].data,
dem_1.georef_transform.data[1],
dem_1.georef_transform.data[5],
)
normal_dem_2 = compute_surface_normal(
dem_2["image"].data,
dem_2.georef_transform.data[1],
dem_2.georef_transform.data[5],
)
diff_raster = self.compute_angular_similarity(
normal_dem_1, normal_dem_2
)
diff_dem = create_dem(
diff_raster,
transform=dem_2.georef_transform.data,
nodata=dem_1.attrs["nodata"],
img_crs=dem_2.crs,
bounds=dem_2.bounds,
)
return diff_dem
[docs]
def compute_angular_similarity(
self, n_a: np.ndarray, n_b: np.ndarray
) -> np.ndarray:
"""
Compute the angular difference theta (radians) between two vector maps.
:param n_a: surface normal vector to first DEM
:type n_a: np.ndarray
:param n_b: surface normal vector to second DEM
:type n_ab: np.ndarray
:return: angular difference between the two vectors
:rtype: np.ndarray
"""
n_a_b = (
n_a[0] * n_b[0] + n_a[1] * n_b[1] + n_a[2] * n_b[2]
) # Scalar product between the 2 vectors
n_a_b[n_a_b > 1] = 1
n_a_b[n_a_b < -1] = -1
dh_norm = np.arccos(np.abs(n_a_b))
return dh_norm
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Compute the angular difference between dem_1 and dem_2.
Add classification layers to the difference.
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: angular difference xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
diff = self.compute_dems_angular_diff(dem_1, dem_2)
diff = accumulates_class_layers(dem_1, dem_2, diff)
return diff
@DemProcessing.register("ref")
[docs]
class Ref(DemProcessingTemplate):
"""
REF DEM
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "ref"
self.fig_title = "REF dem"
self.colorbar_title = "Elevation (m)"
self.cmap = "terrain"
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset = None,
) -> xr.Dataset:
"""
Return dem_1
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
return dem_1
@DemProcessing.register("sec")
[docs]
class Sec(DemProcessingTemplate):
"""
SEC DEM
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "sec"
self.fig_title = "SEC dem"
self.colorbar_title = "Elevation (m)"
self.cmap = "terrain"
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Return dem_1
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
if hasattr(dem_1, "classification_layer_masks"):
dem_2["classification_layer_masks"] = dem_1[
"classification_layer_masks"
]
return dem_2
@DemProcessing.register("ref-curvature")
[docs]
class RefCurvature(DemProcessingTemplate):
"""
Curvature of the REF DEM
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "ref-curvature"
self.fig_title = "REF dem curvature"
self.colorbar_title = "Curvature"
self.cmap = "bwr"
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset = None,
) -> xr.Dataset:
"""
Return the curvature of dem_1
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: curvature xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
return compute_curvature_filtering(dem_1)
@DemProcessing.register("sec-curvature")
[docs]
class SecCurvature(DemProcessingTemplate):
"""
Curvature of the SEC DEM
"""
def __init__(self, parameters: Dict = None):
"""
Initialization the DEM processing object
:return: None
"""
super().__init__()
self.type = "sec-curvature"
self.fig_title = "SEC dem curvature"
self.colorbar_title = "Curvature"
self.cmap = "bwr"
[docs]
def process_dem(
self,
dem_1: xr.Dataset,
dem_2: xr.Dataset,
) -> xr.Dataset:
"""
Return the curvature of dem_1
:param dem_1: dem_1 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_1: xr.Dataset
:param dem_2: dem_2 xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:type dem_2: xr.Dataset
:return: curvature xr.DataSet containing :
- image : 2D (row, col) xr.DataArray float32
- georef_transform: 1D (trans_len) xr.DataArray
- classification_layer_masks : 3D (row, col, indicator)
xr.DataArray
:rtype: xr.Dataset
"""
if hasattr(dem_1, "classification_layer_masks"):
dem_2["classification_layer_masks"] = dem_1[
"classification_layer_masks"
]
return compute_curvature_filtering(dem_2)