#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2022 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of demcompare
# (see https://github.com/CNES/demcompare).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Mainly contains the StatsDataset class
contains the computed stats of a pair of DEMs
for the different classification layers
"""
# Standard imports
import collections
import copy
import csv
import json
import os
from typing import Dict, List, Tuple, Union
import numpy as np
import xarray as xr
from .dem_processing import DemProcessing
[docs]
class StatsDataset:
"""
StatsDataset class
The StatsDataset class contains a list of one
xr.dataset per classification layer
Each xr.Dataset contains :
:image: 2D (row, col) input image as xarray.DataArray,
:image_by_class: 3D (row, col; nb_classes)
xarray.DataArray containing
the image pixels belonging
to each class considering the valid pixels
:image_by_class_intersection: 3D (row, col; nb_classes)
xarray.DataArray containing
the image pixels belonging
to each class considering the intersection mode
:image_by_class_exclusion: 3D (row, col; nb_classes)
xarray.DataArray containing
the image pixels belonging
to each class considering the exclusion mode
:attributes:
- name : name of the classification_layer. str
- stats_by_class : dictionary containing
the stats per class considering the standard mode
- stats_by_class_intersection : dictionary containing
the stats per class considering the intersection mode
- stats_by_class_exclusion : dictionary containing
the stats per class considering the exclusion mode
"""
def __init__(self, image: np.ndarray, dem_processing_method: str = None):
# Dictionary with the different classification layers
# and the modes of each layer
self.classif_layers_and_modes: Dict = {}
# Image map
self.image: np.ndarray = image
# List of xr.Dataset for each classification layer
self.classif_layers_dataset: List[xr.Dataset] = []
if dem_processing_method is not None:
self.dem_processing: DemProcessing = DemProcessing(
dem_processing_method
)
[docs]
def add_classif_layer_and_mode_stats(
self, classif_name: str, input_stats: List[Dict], mode_name: str
):
"""
Add the stats of a classification layer and
a mode to the corresponding xarray dataset
:param classif_name: classification_layer name
:type classif_name: str
:param input_stats: input statistics
:type input_stats: List[str]
:mode_name: name of the mode (standard (no name),
intersection, exclusion)
:type mode_name: str
:return: None
"""
# If no xr.Dataset exists for the classification layer,
# create it, otherwise overload it !
if classif_name not in self.classif_layers_and_modes:
# Store the classification layer name on the
# classif_layers_and_modes dictionary
self.classif_layers_and_modes[classif_name] = {}
self.classif_layers_and_modes[classif_name]["modes"] = []
# Initialize the dataset
new_dataset = xr.Dataset(
{"image": (["row", "col"], self.image)},
coords={
"row": np.arange(self.image.shape[0]),
"col": np.arange(self.image.shape[1]),
},
)
# Add the name of the classification as an attribute
new_dataset.attrs["name"] = classif_name
# Add the created dataset to the classif_layers_dataset list
self.classif_layers_dataset.append(new_dataset)
# Image and stats indicator name
if mode_name == "standard":
image_indicator = "image_by_class"
stats_indicator = "stats_by_class"
else:
image_indicator = "image_by_class_" + mode_name
stats_indicator = "stats_by_class_" + mode_name
# Add the mode of the corresponding classification layer on the
# classif_layers_and_modes dictionary if doesn't exist in classif modes
if (
mode_name
not in self.classif_layers_and_modes[classif_name]["modes"]
):
self.classif_layers_and_modes[classif_name]["modes"].append(
mode_name
)
# Get the classification corresponding dataset idx
dataset_idx = list(self.classif_layers_and_modes.keys()).index(
classif_name
)
# Initialize the classification layer classes
classes = list(np.arange(len(input_stats)))
# Define coords, the third col is the indicator
# with the number of classes
coords_classification_layers = [
self.classif_layers_dataset[dataset_idx].coords["row"],
self.classif_layers_dataset[dataset_idx].coords["col"],
classes,
]
# Initialize the image data by class
# Each dataset has one xr.DataArray per mode indicating
# the image by class
image_maps = np.full(
(
self.image.shape[0],
self.image.shape[1],
len(classes),
),
np.nan,
dtype=np.float32,
)
# Initialize the stats_by_class + mode_name dictionary on
# the dataset attrs
if (
stats_indicator
not in self.classif_layers_dataset[dataset_idx].attrs
):
self.classif_layers_dataset[dataset_idx].attrs[stats_indicator] = {}
# Iterate to obtain the stats per class
# overwrite if mode/stat is already present
for class_idx, class_stats in enumerate(input_stats):
# Fill the alti diff of the corresponding class
# with the input dz_values
image_maps[:, :, class_idx] = class_stats["dz_values"]
# Make a copy of the class_stats dictionary to make
# temporal changes
tmp_class_stats = copy.deepcopy(class_stats)
# Pop the dz_values dictionary key to iterate over
# the rest of metrics
tmp_class_stats.pop("dz_values")
# Scalar metrics are stored in attrs
# of the dataset
# Initialize the stats_by_class + mode_name +
# class_idx dictionary
if (
class_idx
not in self.classif_layers_dataset[dataset_idx].attrs[
stats_indicator
]
):
self.classif_layers_dataset[dataset_idx].attrs[stats_indicator][
class_idx
] = {}
# Add each metric on the dictionary
for stat_name, stat_value in tmp_class_stats.items():
self.classif_layers_dataset[dataset_idx].attrs[stats_indicator][
class_idx
][stat_name] = stat_value
# Create and add the xr.DataArray to the dataset
# overload if already present (through dataset_idx above)
self.classif_layers_dataset[dataset_idx][image_indicator] = (
xr.DataArray(
data=image_maps,
coords=coords_classification_layers,
dims=["row", "col", "classes"],
)
)
[docs]
def save_as_csv_and_json(
self,
classif_name: str,
stats_dir: str,
):
"""
Saves the classification layer's results to csv and json
files on the stats_dir
:param classif_name: classification_layer name
:type classif_name: str
:param stats_dir: output stats directory
:type stats_dir: str
:return: None
"""
# Iterate over the classification modes
for _, mode_name_item in enumerate(
self.classif_layers_and_modes[classif_name]["modes"]
):
# Get the dataset idx of the corresponding classification layer
dataset_idx = list(self.classif_layers_and_modes.keys()).index(
classif_name
)
# Get the xr.Dataset
classif_dataset = self.classif_layers_dataset[dataset_idx]
# Indicator name of the image and stats by class map
if mode_name_item == "standard":
mode_name_item = ""
else:
mode_name_item = "_" + mode_name_item
stats_dict = classif_dataset.attrs[
"stats_by_class" + mode_name_item
]
scalar_metric_dict: collections.OrderedDict = (
collections.OrderedDict()
)
# Add each class stats on the results dict
for class_idx in list(stats_dict.keys()):
scalar_metric_dict[class_idx] = {}
scalar_metric_dict[class_idx]["Set Name"] = stats_dict[
class_idx
]["class_name"]
# Save scalar metrics
for metric_name, metric_stats in stats_dict[class_idx].items():
if isinstance(metric_stats, (float, int)):
scalar_metric_dict[class_idx][
metric_name
] = metric_stats
# Initialize the output json path
mode_output_json_files = os.path.join(
stats_dir, "stats_results" + mode_name_item + ".json"
)
# Save the results dictionary on a json file
with open(mode_output_json_files, "w", encoding="utf8") as outfile:
json.dump(scalar_metric_dict, outfile, indent=4)
# Save the results into a csv file
# - create filename
csv_filename = os.path.join(
os.path.splitext(mode_output_json_files)[0] + ".csv"
)
# - writes the results down as csv format
with open(csv_filename, "w", encoding="utf8") as csvfile:
fieldnames = list(scalar_metric_dict[0].keys())
writer = csv.DictWriter(
csvfile, fieldnames=fieldnames, quoting=csv.QUOTE_NONNUMERIC
)
writer.writeheader()
for set_item in scalar_metric_dict:
writer.writerow(scalar_metric_dict[set_item])
[docs]
def get_classification_layer_names(self):
"""
Returns the available classification layers
:return: None
"""
return list(self.classif_layers_and_modes.keys())
[docs]
def get_classification_layer_dataset(
self, classification_layer: str
) -> xr.Dataset:
"""
Returns the xr.Dataset corresponding to
the input classification layer name
:param classification_layer: classification_layer name
:type classification_layer: str
:return: stats dictionary
:rtype: xr.Dataset
"""
# Get the dataset index and return the corresponding dataset
idx = list(self.classif_layers_and_modes.keys()).index(
classification_layer # pylint:disable=consider-iterating-dictionary
)
return self.classif_layers_dataset[idx]
[docs]
def get_classification_layer_stats(self, classification_layer: str) -> Dict:
"""
Returns all the stats corresponding to
the input classification layer name
:param classification_layer: classification_layer name
:type classification_layer: str
:return: stats dictionary
:rtype: Dict
"""
# Get the dataset index and return the corresponding dataset
idx = list(self.classif_layers_and_modes.keys()).index(
classification_layer # pylint:disable=consider-iterating-dictionary
)
return self.classif_layers_dataset[idx].attrs
[docs]
def get_classification_layer_metrics(
self,
classification_layer: str,
) -> List[str]:
"""
Returns the metric names available on the
input classification layer and mode
:param classification_layer: classification_layer name
:type classification_layer: str
:return: available metric names
:rtype: List[str]
"""
# Get classification_layer dataset
dataset = self.get_classification_layer_dataset(classification_layer)
# Get available metric names
output_metric_names = copy.deepcopy(
list(dataset.attrs["stats_by_class"][0].keys())
)
# Delete class name as it is not a metric
output_metric_names.pop(output_metric_names.index("class_name"))
return output_metric_names
[docs]
def get_classification_layer_metric(
self,
classification_layer: str,
classif_class: int = None,
mode: str = "",
metric: str = None,
) -> Union[List, Tuple[np.ndarray, np.ndarray], np.ndarray, float]:
"""
Returns the metric corresponding to the
input classification layer and mode
:param classification_layer: classification_layer name
:type classification_layer: str
:param classif_class: classification_layer class
:type classif_class: int
:param mode: mode (standard (no name), intersection, exclusion)
:type mode: str
:param metric: metric
:type metric: str
:return: metric
:rtype: Union[List,Tuple[np.ndarray, np.ndarray], np.ndarray, float]
"""
# Get classification_layer dataset
dataset = self.get_classification_layer_dataset(classification_layer)
if mode in ("standard", ""):
# Standard mode
stats_indicator = "stats_by_class"
else:
stats_indicator = "stats_by_class_" + mode
# If the class was specified, return the corresponding metric
if isinstance(classif_class, int):
output_metric = dataset.attrs[stats_indicator][classif_class][
metric
]
# Otherwise, return a list with the metric for each class
else:
output_metric = []
# Iterate over the classes and add the metric
# result of each class
for _, metric_dict in dataset.attrs[stats_indicator].items():
output_metric.append(metric_dict[metric])
return output_metric