# standard modules
import io
from datetime import datetime
from joblib import Parallel, delayed
from shapely.wkb import loads as wkb_loads
from dataclasses import dataclass
from typing import Literal
from tqdm import tqdm
# plotting modules
import matplotlib
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib import pyplot as plt
matplotlib.use("Agg")
# saferoad modules
from .plotter import Plotter
from .database import DataBase
from .pipeline import Pipeline
from .utils import extract_dates, time_vector, Timer
[docs]
@dataclass
class PsData:
"""Class for storing point source data information.
:param filepath: Path to the point source data file.
:type filepath: str
:param latitude: Field name of the latitude field in the data file.
:type latitude: str
:param longitude: Field name of the longitude field in the data file.
:type longitude: str
:param unit: Unit of measurement for the coordinates. Options are "m", "cm", or "mm". Default is "m".
:type unit: Literal["m", "cm", "mm"]
:param crs_code: Coordinate Reference System code. Default is "EPSG:4326".
:type crs_code: str
:param name: Name identifier for the point source data. Default is "pspoint".
:type name: str
"""
filepath: str
latitude: str
longitude: str
unit: Literal["m", "cm", "mm"] = "m"
crs_code: str = "EPSG:4326"
name: str = "pspoint"
def __post_init__(self):
assert self.filepath, "File path must be provided."
assert isinstance(self.filepath, str), "File path must be a string."
assert self.latitude, "Latitude field name must be provided."
assert isinstance(self.latitude, str), "Latitude field name must be a string."
assert self.longitude, "Longitude field name must be provided."
assert isinstance(self.longitude, str), "Longitude field name must be a string."
assert self.unit in ["m", "cm", "mm"], "Unit must be one of 'm', 'cm', or 'mm'."
assert self.crs_code, "CRS code must be provided."
assert isinstance(self.crs_code, str), "CRS code must be a string."
assert self.name, "Name must be provided."
assert isinstance(self.name, str), "Name must be a string."
[docs]
@dataclass
class Road:
"""Class for storing road data information.
:param filepath: Path to the road data file.
:type filepath: str
:param crs_code: Coordinate Reference System code. Default is "EPSG:4326".
:type crs_code: str
:param name: Name identifier for the road data. Default is "road".
:type name: str
"""
filepath: str
crs_code: str = "EPSG:4326"
name: str = "road"
def __post_init__(self):
assert self.filepath, "File path must be provided."
assert isinstance(self.filepath, str), "File path must be a string."
assert self.name, "Name must be provided."
assert isinstance(self.name, str), "Name must be a string."
[docs]
class SafeRoad:
"""
A class to perform road deformation analysis using MT-InSAR data.
:param road: An instance of the Road class containing road data information.
:type road: Road
:param ps_data: An instance of the PsData class containing MT-InSAR data information.
:type ps_data: PsData
:param computational_crs: A string representing the EPSG code for the computational CRS.
:type computational_crs: str
"""
def __init__(self, road: Road, ps_data: PsData, computational_crs: str):
assert isinstance(road, Road), "road must be an instance of Road class"
assert isinstance(
ps_data, PsData
), "ps_data must be an instance of PsData class"
assert isinstance(
computational_crs, str
), "computational_crs must be a string representing the EPSG code"
self.ps_data = ps_data
self.road_data = road
self.database = DataBase()
# self.scaling_factor = self._set_scaling_factor_()
self.computational_crs = computational_crs
@property
def _scaling_factor_(self):
# determine the scaling factor based on the unit
units = ["m", "cm", "mm"]
sf = [1000, 10, 1]
return sf[units.index(self.ps_data.unit)]
[docs]
def load_files(self):
"""
Load the road and MT-InSAR data files into the database.
"""
# setup the db if loading
with Timer("Setting up the database"):
self.database.setup()
# road_query
with Timer("Loading road data"):
self.database.run(
Pipeline.Processing.load_file(
self.road_data.filepath, self.road_data.name
)
)
# ps_query
with Timer("Loading ps data"):
self.database.run(
Pipeline.Processing.load_file(self.ps_data.filepath, self.ps_data.name)
)
[docs]
def preprocess(self, attribute: str = None):
"""
Preprocess the road and MT-InSAR data.
This includes transforming the data to the computational CRS, dissolving road features,
and building geometries for the MT-InSAR points.
1. Transform the road and MT-InSAR data to the computational CRS.
2. Dissolve road features based on the provided attribute if given, otherwise merge all features.
3. Build geometries for the MT-InSAR points from the latitude and longitude columns.
4. Transform the MT-InSAR data to the computational CRS.
:param attribute: An optional string representing the attribute to dissolve road features.
If None, all features will be merged. Default is None.
:type attribute: str, optional
"""
# Transform the road projection
with Timer("Transforming road data to computational CRS"):
self.database.run(
Pipeline.Processing.spatial_tranformation(
source_epsg=self.road_data.crs_code,
target_epsg=self.computational_crs,
table_name=self.road_data.name,
)
)
# Dissolve the road features based on the provided attribute if given if not merge everything
with Timer("Dissolving road features"):
self.database.run(
Pipeline.Processing.dissolve_features(
attribute=attribute, table_name=self.road_data.name
)
)
# build the geometry for the psoints from csv headers
with Timer("Building geometry for PS data"):
self.database.run(
Pipeline.Processing.build_geometry(
latitude=self.ps_data.latitude,
longitude=self.ps_data.longitude,
table_name=self.ps_data.name,
)
)
# Transform the ps data to the computational CRS
with Timer("Transforming PS data to computational CRS"):
self.database.run(
Pipeline.Processing.spatial_tranformation(
source_epsg=self.ps_data.crs_code,
target_epsg=self.computational_crs,
table_name=self.ps_data.name,
)
)
[docs]
def generate_rectangles(self, road_width: float, segment_length: float):
"""
Generate road segments and patches.
This involves:
1. Splitting the road into segments of equal length.
2. Generating rectangles along the road segments with a specified width.
2. Rotating the generated rectangles int the same direction along the road.
:param road_width: A float representing the width of the road segments in meters.
:param segment_length: A float representing the length of each road segment in meters.
"""
with Timer("Generating road segments and patches"):
self.database.run(
Pipeline.Processing.split_to_segments(segment_length=segment_length)
)
self.database.run(
Pipeline.Processing.generate_patches(
width=road_width, length=segment_length
)
)
[docs]
def run_analysis(self):
"""
Run the deformation analysis on the MT-InSAR data for road network.
This includes calculating cumulative displacement, average velocity, point density,
local control points, and outliers.
"""
# calculate displacement per points
# get the column names from the ps_data
fields = [
i[0]
for i in self.database.connection.sql(
Pipeline.Processing.get_column_names(self.ps_data.name)
).fetchall()
]
# extract dates from the column names and generate time vector
names, dates = extract_dates(column_names=fields)
date_vector = time_vector(dates=dates)
with Timer("Calculating cumulative displacement per point"):
self.database.run(
Pipeline.Processing.calc_cum_disp(
dates=names, table_name=self.ps_data.name
)
)
# build spatial relation with
# assign points to patches generate patch_id field on pspoint table
with Timer("Relating points to patches"):
self.database.run(
Pipeline.Processing.relate_point_patches(self.ps_data.name)
)
# calculate average velocity per point needs patch_id in pspoint table for optimization purposes
with Timer("Calculating average velocity per point"):
self.database.run(
Pipeline.Processing.calc_avg_velocity(
time_vector=date_vector,
field_names=names,
table_name=self.ps_data.name,
)
)
# calculate point count per patch and point density
with Timer("Calculating point density per patch"):
self.database.run(Pipeline.Processing.calc_point_density(self.ps_data.name))
# calculate average cumulative displacement per patch
with Timer("Calculating average cumulative displacement per patch"):
self.database.run(Pipeline.Processing.calc_avg_cum_disp())
# calculate average displacement time series per patch based on gcp
with Timer(
"Calculating average displacement time series per patch based on GCP"
):
self.database.run(
Pipeline.Processing.calc_avg_disp_ts_gcp(date_fields=names)
)
# calculate standard deviation of displacement time series per patch based on gcp
with Timer(
"Calculating standard deviation of displacement time series per patch based on GCP"
):
self.database.run(
Pipeline.Processing.calc_std_disp_ts_gcp(date_fields=names)
)
# calculate average velocity per patch based on gcp
with Timer("Calculating average velocity per patch based on GCP"):
self.database.run(
Pipeline.Processing.calc_avg_velocity_gcp(table_name=self.ps_data.name)
)
# calculate the local control point per patch
with Timer("Calculating local control points per patch"):
self.database.run(Pipeline.Processing.select_lcps(self.ps_data.name))
# calculate the local control point avg velocity per patch
with Timer("Calculating local control points average velocity per patch"):
self.database.run(Pipeline.Processing.calc_lcp_velocity())
# calculate the average and std of the displacement time series per patch based on lcp
with Timer(
"Calculating average and std of displacement time series per patch based on LCP"
):
self.database.run(
Pipeline.Processing.calc_avg_std_disp_ts_lcp(
date_fields=names, table_name=self.ps_data.name
)
)
# calculate the median and std of the displacement time series per patch based on lcp
with Timer(
"Calculating median and std of displacement time series per patch based on LCP"
):
self.database.run(
Pipeline.Processing.calc_med_std_disp_ts_lcp(
date_fields=names, table_name=self.ps_data.name
)
)
# calculate the average velocity per patch based on lcp
with Timer("Calculating average velocity per patch based on LCP"):
self.database.run(
Pipeline.Processing.calc_avg_velocity_lcp(table_name=self.ps_data.name)
)
# calculate the average relative velocity per patch based on lcp
with Timer("Calculating average relative velocity per patch based on LCP"):
self.database.run(
Pipeline.Processing.calc_avg_lcp_velocity(self.ps_data.name)
)
# calulate the outliers per patch based
with Timer("Calculating outliers per patch"):
self.database.run(
Pipeline.Processing.calc_outliers(
self._scaling_factor_, self.ps_data.name
)
)
threshold_count = self.database.connection.execute(
"Select count(*) from outliers;"
).fetchone()[0]
print("%s outliers detected by threshold method" % threshold_count)
self.database.run(
Pipeline.Processing.calc_outliers_avg_lcd(
self.ps_data.name, date_vector=date_vector, date_names=names
)
)
ald_count = (
self.database.connection.execute(
"Select count(*) from outliers;"
).fetchone()[0]
- threshold_count
)
print("%s outliers detected by avg local disp method" % (ald_count))
self.database.run(
Pipeline.Processing.calc_outliers_med_lcd(
self.ps_data.name, date_vector=date_vector, date_names=names
)
)
med_count = (
self.database.connection.execute(
"select count(*) from outliers;"
).fetchone()[0]
- ald_count
- threshold_count
)
print("%s outliers detected by median local disp method" % (med_count))
with Timer("Collecting and patching the LCP time series"):
self.database.run(
Pipeline.Processing.lcp_ts_patch(
date_fields=names, table_name=self.ps_data.name
)
)
with Timer("Collecting outlier time series"):
self.database.run(
Pipeline.Processing.outlier_ts_patch(
date_fields=names, table_name=self.ps_data.name
)
)
with Timer("collecting differential displacement time series"):
self.database.run(Pipeline.Processing.outlier_rel_ts_patch())
[docs]
def generate_report(self, output_name: str):
"""
Generate a PDF report containing maps and time series plots.
:param output_name: A string representing the name of the output PDF file endswith `.pdf` extension.
:type output_name: str
"""
def render_ts_plots_workers(
patch_uid: float,
outliers: list[tuple],
dates: list[datetime],
outlier_center: tuple[float, float], #
patch_center: tuple,
pspoints: list[tuple],
lcp_point: list[tuple],
cum_disp: list[tuple],
avg_cum_disp: list[tuple],
std_cum_disp: list[tuple],
outlier_rel_ts: list[tuple],
avg_lcp_disp: list[tuple],
std_lcp_disp: list[tuple],
lcp_disp: list[tuple],
) -> bytes:
# plotting
tsplotter = Plotter.TimeSeriesPane()
# 0
tsplotter.plot_basemap_outlier(wkb_loads(outlier_center[0]).xy)
tsplotter.plot_all_outliers(outliers)
tsplotter.plot_square(wkb_loads(outlier_center[0]).xy)
tsplotter.plot_basemap_pspoint(wkb_loads(patch_center[0]).xy)
# 1
tsplotter.plot_pspoint_dist(pspoints)
tsplotter.plot_lcp(lcp_point)
tsplotter.plot_outlier(outliers)
# 2
tsplotter.plot_std_disp_graph(avg_cum_disp, std_cum_disp, dates)
tsplotter.plot_avg_disp_graph(avg_cum_disp, dates)
tsplotter.plot_cum_disp_graph(cum_disp, dates)
# 3
tsplotter.plot_std_rel_disp_graph(avg_lcp_disp, std_lcp_disp, dates)
tsplotter.plot_avg_rel_disp_graph(avg_lcp_disp, dates)
tsplotter.plot_rel_disp_graph(outlier_rel_ts, dates)
# 4
tsplotter.plot_std_lcp_disp_graph(avg_cum_disp, std_cum_disp, dates)
tsplotter.plot_avg_lcp_disp_graph(avg_cum_disp, dates)
tsplotter.plot_lcp_disp_graph(lcp_disp, dates)
tsplotter.post_process()
fig, _ = tsplotter.get_figure()
buf = io.BytesIO()
fig.savefig(buf, format="png")
plt.close(fig)
return buf.getvalue()
# export the results
with Timer("Generating and saving the report"):
with PdfPages(output_name) as pdf:
# bbox of the patches
bbox = self.database.connection.execute(
Pipeline.Visualisation.get_table_bbox(
"patches", self.computational_crs
)
).fetchall()
# ps density and patch geometries
ps_density = self.database.connection.execute(
Pipeline.Visualisation.get_ps_density(self.computational_crs)
).fetchall()
# outliers and coordinates
outliers = self.database.connection.execute(
Pipeline.Visualisation.get_outliers_map(
self.computational_crs, self._scaling_factor_
)
).fetchall()
# cumulative displacement per point
cum_disp = self.database.connection.execute(
Pipeline.Visualisation.get_cum_disp_map(
self.computational_crs, self._scaling_factor_
)
).fetchall()
# gcp average velocity per point
gcp_vel = self.database.connection.execute(
Pipeline.Visualisation.get_ps_vel_gcp(
self.computational_crs, self._scaling_factor_
)
).fetchall()
# lcp average velocity per point
lcp_vel = self.database.connection.execute(
Pipeline.Visualisation.get_ps_vel_lcp(
self.computational_crs, self._scaling_factor_
)
).fetchall()
# plotting the first page -- maps
print("Starting to render the map page")
map_plotter = Plotter.MapPane()
map_plotter.plot_basemap(bbox=bbox)
map_plotter.plot_ps_density(ps_density)
map_plotter.plot_outliers(outliers)
map_plotter.plot_cum_disp(cum_disp)
map_plotter.plot_gcp_velocity(gcp_vel)
map_plotter.plot_lcp_velocity(lcp_vel)
map_plotter.post_process()
fig, _ = map_plotter.get_figure()
plt.close(fig)
pdf.savefig(fig)
print("Map page has been rendered")
print("Intiating time series rendering")
# collect the patch_ids which has outliers
puids = [
i[0]
for i in self.database.connection.execute(
Pipeline.Visualisation.patch_uids()
).fetchall()
]
_, dates = extract_dates(
[
i[0]
for i in self.database.connection.execute(
Pipeline.Processing.get_column_names(self.ps_data.name)
).fetchall()
]
)
outliers_map = self.database.connection.execute(
Pipeline.Visualisation.get_outliers_map(
self.computational_crs, self._scaling_factor_
)
).fetchall()
# prepare the data for parallel processing
data_ready = []
for uid in puids:
outliers = self.database.connection.execute(
Pipeline.Visualisation.get_outlier_points(
patch_id=uid, source_crs=self.computational_crs
)
).fetchall()
patch_center = self.database.connection.execute(
Pipeline.Visualisation.get_patch_center(
patch_id=uid, source_crs=self.computational_crs
)
).fetchone()
pspoints = self.database.connection.execute(
Pipeline.Visualisation.get_ps_points(
patch_id=uid,
source_crs=self.computational_crs,
table_name=self.ps_data.name,
)
).fetchall() # pspoints
lcp_point = self.database.connection.execute(
Pipeline.Visualisation.get_lcp_point(
patch_id=uid, source_crs=self.computational_crs
)
).fetchone() # lcp point
avg_cum_disp = self.database.connection.execute(
Pipeline.Visualisation.get_avg_cum_disp_graph(
patch_id=uid, scaling_factor=self._scaling_factor_
)
).fetchall() # avg_cum_disp
std_cum_disp = self.database.connection.execute(
Pipeline.Visualisation.get_std_cum_disp_graph(
patch_id=uid, scaling_factor=self._scaling_factor_
)
).fetchall() # std_cum_disp
avg_lcp_disp = self.database.connection.execute(
Pipeline.Visualisation.get_avg_disp_lcp_graph(
patch_id=uid, scaling_factor=self._scaling_factor_
)
).fetchall() # avg_lcp_disp
std_lcp_disp = self.database.connection.execute(
Pipeline.Visualisation.get_std_disp_lcp_graph(
patch_id=uid, scaling_factor=self._scaling_factor_
)
).fetchall() # std_lcp_disp
lcp_disp = self.database.connection.execute(
Pipeline.Visualisation.get_lcp_disp_graph(
patch_id=uid, scaling_factor=self._scaling_factor_
)
).fetchall() # lcp_disp
for outlier in outliers:
o_id = outlier[0]
center = self.database.connection.execute(
Pipeline.Visualisation.get_center(
point_uid=o_id, source_crs=self.computational_crs
)
).fetchone()
cum_disp = self.database.connection.execute(
Pipeline.Visualisation.get_cum_disp_graph(
outlier_id=o_id, scaling_factor=self._scaling_factor_
)
).fetchall() # cum_disp
outlier_rel_ts = self.database.connection.execute(
Pipeline.Visualisation.get_outlier_rel_ts_graph(
outlier_id=o_id, scaling_factor=self._scaling_factor_
)
).fetchall() # outlier_rel_ts
data_ready.append(
(
uid,
outliers_map,
dates,
center,
patch_center,
pspoints,
lcp_point,
cum_disp,
avg_cum_disp,
std_cum_disp,
outlier_rel_ts,
avg_lcp_disp,
std_lcp_disp,
lcp_disp,
)
)
print(f"Total pages to render for outliers: {len(data_ready)}")
png_pages = Parallel(n_jobs=-1, backend="loky")(
delayed(render_ts_plots_workers)(
patch_uid,
outliers,
dates,
outlier_center,
patch_center,
pspoints,
lcp_point,
cum_disp,
avg_cum_disp,
std_cum_disp,
outlier_rel_ts,
avg_lcp_disp,
std_lcp_disp,
lcp_disp,
)
for (
patch_uid,
outliers,
dates,
outlier_center,
patch_center,
pspoints,
lcp_point,
cum_disp,
avg_cum_disp,
std_cum_disp,
outlier_rel_ts,
avg_lcp_disp,
std_lcp_disp,
lcp_disp,
) in tqdm(data_ready)
)
print("All plots rendered, now saving to pdf")
for png in png_pages:
img = plt.imread(io.BytesIO(png), format="png")
fig, ax = plt.subplots(figsize=(12, 12), dpi=80)
# ax = fig.add_subplot(111)
ax.imshow(img)
ax.axis("off")
ax.set_xticks([])
ax.set_yticks([])
ax.set_frame_on(False)
pdf.savefig(fig)
plt.close(fig)
print("All pages saved to pdf")