# Extract variables and dates from SPI & SPEI filenames
spi_spei_nc_files <- fs::dir_ls(out_dir_final, type="file",glob="*.nc")
# Assuming format: "variable_(infos)_YYYYMM.(geo.infos).nc"
file_meta <- tibble::tibble(filepath = as.character(spi_spei_nc_files)) |>
dplyr::mutate(
filename = fs::path_file(filepath), # Next line: drop the tail infos with "." sep
filename_cleaned = stringr::str_split_i(fs::path_ext_remove(filename), "\\.", i=1)
) |>
tidyr::separate(filename_cleaned, sep = "_", # Split at the underscore
into = c("variable",NA,NA,NA,NA,NA,"time_str")) |> #NAs: distrib,spat_ext,reanalysis,output_stream,ref_per
dplyr::mutate(year = as.integer(stringr::str_sub(time_str, 1, 4)),
month = as.integer(stringr::str_sub(time_str, 5, 6))
) |> data.table::as.data.table()Standard Element 2
Data processing script
drought indices, SGI, drought prediction
1 Input data description
- GW data
- daily measurements of GW level in monitoring pipes
- point-source time series, approx. 2-week intervals
- Time series have a variable duration and temporal coverage. They mostly cover the years 1975-2024, with only little gaps
- From Syke
- Pipe metadata has coordinates, CRS is ETRS-TM35FIN
- SPI & SPEI drought indices
- monthly time series
- rasterized, global but dl’d data cropped to Finland
- from ECMWF
- Time series have a temporal coverage of years 1960-2024.
- CRS is WGS84
1.1 Outputs:
Object
pipe_sgi_long: Constructed time series of Standardized Groundwater index for all monitoring pipes. 380+ point-source timeseries. Variables: time, station ID, pipe ID, SGI value. SGI calculated with monthly groupings, Gaussian kernel. Time series have a variable duration and temporal coverage. However, they mostly cover the years 1975-2024, with only little gaps The time series do not have the pipe metadata attached. (coordinates, elevation, GW area ID) Instad, it is contained within the source data, and joined as needed.Object
station_spi_spei_ts: SPI & SPEI index values read for the center points of all stations. The source data is mentioned in the README. 53 point-source timeseries. Variables: station ID, year+month, year, month, drought index type, drought index value. Time series have a temporal coverage of years 1960-2024. The time series do not have the station metadata attached. (coordinates) Instad, it is contained within the source data, and joined as needed.
Both analysis ready datasets have a monthly temporal resolution. Both datasets are contained within Finland, and cover the most of Finland These objects are saved into analysis-ready/ folder.
1.2 Modelling framework:
The modelling is done entirely using the freely available R-language. The reproducibility of this workflow is ensured with the renv package. Libraries should be handled automatically, and all required are listed in renv.lock. See ?renv::restore() for help if needed.
1.3 Data processing steps:
SPI & SPEI index values are read from the .nc files. (Point-like time series for all stations) Erroneous (< -1000) SPI & SPEI index values are removed. No other apparent outliers.
The original GW data is aggregated into montly time series for all pipes. From these, SGI timeseries are calculated for all pipes. Stations also get their aggregated monthly SGI series (median of station’s pipes)
SGI series are then correlated with SPI & SPEI time series. The concurrency is also estimated by calculating the RMSE. This same analysis is also performed on the level of time series for individual pipes for comparison.
The analysis of correlation was intended get more sophisticated. Issues with the datasets postponed this past this WaterDigitalization project schedule.
1.4 How to use this notebook:
This notebook can be run with Quarto installed. The easiest way to achieve this is to use the Positron IDE. The primary way is to use the reproducibility wrapper (run_reproducibility.R), which runs these interdependent notebooks in the correct order.
2 SPI & SPEI data processing
List the files, and interpret the contents based on the file name.
Transform GW station coordinates for drought index subsetting.
# Get a spatial object of station coordinates
station_coords <- sf::st_as_sf(stations[,.(station_id=tunnus,lon,lat)],
coords=c("lon","lat"), crs="EPSG:3067") |> # Original crs ETRS-TM35FIN
sf::st_transform("OGC:CRS84") # to WGS 84 (CRS84 which is [lon,lat], same as dl'd SPI/SPEI files)
# TODO: Ensure that coordinates are correct, in teh approx. center of the pipes!Extract all variants of drought index time series for all GW stations from the .nc files.
# A function to process a single variable efficiently
extract_timeseries <- function(var_name, files_df, pts_sf) {
var_files <- files_df[variable == var_name][order(time_str)] # Files for only this var, sort chronologically
raster_stack <- terra::rast(var_files$filepath) # Create SpatRaster stack. terra handles virtually, saving mem
# Ensure CRS of supplied points and read NetCDF files match
if (! (sf::st_crs(pts_sf) == sf::st_crs(raster_stack))) { # Comparison of `crs` objs is sophisticated
message("Converting points' CRS (",sf::st_crs(pts_sf)$input,") to file's CRS (",sf::st_crs(raster_stack)$input,")")
pts_sf <- sf::st_transform(pts_sf, sf::st_crs(raster_stack)) # Transform the points to match files
}
# Convert sf points to terra's SpatVector format
pts_vect <- terra::vect(pts_sf)
# Ensure that SpatRaster layers are chronological (may be default beavior, but JIC)
raster_stack <- raster_stack[[order(terra::time(raster_stack))]]
# Extract data
extracted <- terra::extract(raster_stack, pts_vect, fun=identity, bind=TRUE, exact=TRUE, ID=FALSE)
cat("Point timeseries extracted... ")
# Combine with site IDs and tidy into a long format
res_long <- tidyr::pivot_longer(
data = as.data.frame(extracted), # df rows=points, cols = time layers (var,var.1,var.2 ...)
cols = -station_id,
names_to="layer_name", values_to="value"
) |>
# Map the chronological time back onto the extracted layers
dplyr::group_by(station_id) |>
dplyr::mutate(variable = var_name,
time_str=var_files$time_str, year=var_files$year, month=var_files$month
) |> dplyr::ungroup() |>
dplyr::select(station_id, time_str,year,month, variable, value)
return(res_long)
}
# Check if a file for SPI & SPEI timeseries for stations already found
station_spi_spei_path <- fs::path(fs::dir_create("analysis_ready"), "station_spi_spei_ts.csv")
if (fs::file_exists(station_spi_spei_path)) {
station_spi_spei_ts <- data.table::fread(station_spi_spei_path, keepLeadingZeros=TRUE, yaml=TRUE)
} else {
# Extract over each variable (SPI1, SPI3, ..., SPEI36, SPEI48) and bind into one df
station_spi_spei_ts <- purrr::map_dfr(unique(file_meta$variable),
~extract_timeseries(.x, file_meta, station_coords)
) |> as.data.table()
# Save for quick access
data.table::fwrite(station_spi_spei_ts, station_spi_spei_path, yaml=TRUE)
}
rm(station_spi_spei_path)
# Add time column for easier manipulation (not interpreted from csv, faster)
station_spi_spei_ts[, time := lubridate::make_date(year, month, 1)]
# Rename drought index column
data.table::setnames(station_spi_spei_ts, "variable", "dr_ind")Remove the prominent outlier values from the drought index time series.
# Eliminate clear error values of SPI & SPEI timeseries
station_spi_spei_ts <- station_spi_spei_ts[value > -1000]
hist(station_spi_spei_ts$value)# What stations have the largest amount of extreme values?
station_spi_spei_ts[,sum(abs(value) > 4), by=list(station_id,dr_ind)][order(V1)] station_id dr_ind V1
<char> <char> <int>
1: 0101 SPEI12 0
2: 0102 SPEI12 0
3: 0103 SPEI12 0
4: 0104 SPEI12 0
5: 0105 SPEI12 0
---
2074: 0209 SPI48 178
2075: 0810 SPEI48 189
2076: 0313 SPEI36 197
2077: 0313 SPEI48 227
2078: 0302 SPEI36 241
Test the workflow for transforming GW levels into SGI for an individual pipe.
# Aggregate GW levels to yearmon timeseries
pipedata <- G[paikka_id == 10550, .(g=mean(g)), by=yearmon(pvm)]
pipedata[, yearmon := lubridate::ym(floor(yearmon)*100 + round(12 * (yearmon %% 1)) + 1)] # Yearmon to date
pipexts <- xts::as.xts(pipedata, order.by=pipedata$yearmon) # Turn into xts
# This uses sliding window aggregation!!! Not wanted
# SEI::aggregate_xts(pipexts, agg_period=2, agg_scale="months", agg_fun="mean", timescale="days", na_thres = 100)
# Reference period to calculate SGI
pipexts_ref <- pipexts["1980-01-01::2020-12-31"]
# Calculate SGI
insertSource("fix_SEI_std_index.R", package = "SEI", functions = "std_index") # Fix SEI::std_index... PR waitingModified functions inserted through trace(): std_index
pipesgi <- SEI::std_index(pipexts, x_ref=pipexts_ref,
dist="kde", index_type="normal",
gr_new = factor(xts::.indexmon(pipexts)), gr_ref = factor(xts::.indexmon(pipexts_ref)),
return_fit = TRUE
)Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
Warning in ks.test.default(pit, "punif"): ties should not be present for the
one-sample Kolmogorov-Smirnov test
xts::tclass(pipesgi$si) <- "POSIXct"
colnames(pipesgi$si) <- "SGI"
SEI::plot_sei(pipesgi$si)Transform all GW levels of all pipes into SGI time series for comparing with the drought indices.
# Aggregate GW levels to yearmon timeseries for all pipes
G_ym <- data.table::copy(G[, .(g=mean(g)), by=list(paikka_id, yearmon=data.table::yearmon(pvm))])
G_ym[, time := lubridate::ym(floor(yearmon)*100 + round(12 * (yearmon %% 1)) + 1)] # Yearmon to date
# Function wrapper to calculate SGI
dt_std_ind <- function(dt, dt_ref) {
stopifnot("time" %in% names(dt), "yearmon" %in% names(dt_ref))
dt <- xts::as.xts(dt[order(time), .(time, g)])
dt_ref <- xts::as.xts(dt_ref[order(time),.(time, g)])
SEI::std_index(dt, x_ref = dt_ref,
dist="kde", index_type="normal", # Gaussian kernel density estimation
gr_new = factor(xts::.indexmon(dt)), # Index is evaluated separately for every cal month
gr_ref = factor(xts::.indexmon(dt_ref))
)
}
ref_period <- c("1980-01-01","2020-12-31") # Reference period for SGI calculations
# Check if a file for calculated SGI already found
pipe_sgi_path <- fs::path(fs::dir_create("analysis_ready"), "pipe_sgi_long.csv")
if (fs::file_exists(pipe_sgi_path)) {
pipe_sgi_long <- data.table::fread(pipe_sgi_path, keepLeadingZeros=T, yaml=T, colClasses=list(Date="time"))
} else {
message("Calculating SGI timeseries for pipes...")
# Calculate SGI for each pipe into elems of a list. Format the list into a long dt.
pipe_sgi_xts <- vector("list", uniqueN(G_ym$paikka_id))
i <- 1L
for (p_id in unique(G_ym$paikka_id)) {
pipe_sgi_xts[[i]] <- dt_std_ind(G_ym[paikka_id == p_id], G_ym[paikka_id == p_id & yearmon %between% ref_period])
names(pipe_sgi_xts[[i]]) <- paste0("p",p_id) # Name cols with paikka_ids
cat(i,"... ")
i <- i +1
}; rm(i)
all_p_sgi_wide <- Reduce(\(...) xts::merge.xts(..., all=TRUE, join="outer"), pipe_sgi_xts)
pipe_sgi_long <- data.table::melt(as.data.table(all_p_sgi_wide), id.vars="index", variable.name="paikka_id", value.name="sgi") # index refers to the xts time col
pipe_sgi_long[, paikka_id := as.integer(stringr::str_sub(paikka_id,2L,-1L))] # Convert paikka_id to num from being colnames
# Join station ids
pipe_sgi_long <- pipe_sgi_long[pipes,.(time=index, paikka_id, station_id=asema_tunnus, sgi), on="paikka_id", nomatch=NULL]
# Save for quick access
data.table::fwrite(pipe_sgi_long, pipe_sgi_path, yaml=T)
}colClasses dictated by user input and those read from YAML header are in conflict (specifically, for column(s) [[time]]); the proceeding assumes the user input was an intentional override and will ignore the type(s) implied by the YAML header; please exclude the column(s) from colClasses if this was unintentional.
rm(pipe_sgi_path)
# Test-visualize SGI ts
SEI::plot_sei(xts::as.xts(pipe_sgi_long[paikka_id == 9524,.(time=as.POSIXlt(time),sgi)]))Warning in as.data.table.list(jval, .named = NULL): POSIXlt column type
detected and converted to POSIXct. We do not recommend use of POSIXlt at all
because it uses 40 bytes to store one date.
Aggregate SGIs of pipes also to station-level time series.
# Calculate station SGI series by taking a median of all pipes of the station
station_sgi_long <- pipe_sgi_long[,.(sgi = median(sgi)), by=list(station_id, time)]Temporally join the drought index and SGI time series.
# (wide approach)
# Values of SPI & SPEI in wide format for all stations, 1965-2025
# spi_spei_wide <- data.table::dcast(station_spi_spei_ts, station_id + time ~ dr_ind, value.var="value")
# (long approach)
# Merge SGI ts of pipes with station ts of SPI & SPEI
pipe_sgi_w_dr_inds <- merge(pipe_sgi_long, station_spi_spei_ts,
by=c("station_id","time"), allow.cartesian = T)
station_sgi_w_dr_inds <- merge(station_sgi_long, station_spi_spei_ts,
by=c("station_id","time"), allow.cartesian = T)
# Add separate cols for the index type and the accumulation period. Sort ind factors
pipe_sgi_w_dr_inds <- pipe_sgi_w_dr_inds |> tidyr::extract(dr_ind,
into = c("ind_type", "ind_accum"), remove=F,
regex = "([A-Z]+)([0-9]+)", convert=T) |> # number col numeric
dplyr::mutate(dr_ind = factor(dr_ind, levels=gtools::mixedsort(unique(dr_ind)),ordered=T),
ind_accum = factor(ind_accum, levels=gtools::mixedsort(unique(ind_accum)),ordered=T)) |>
as.data.table()
station_sgi_w_dr_inds <- station_sgi_w_dr_inds |> tidyr::extract(dr_ind,
into = c("ind_type", "ind_accum"), remove=F,
regex = "([A-Z]+)([0-9]+)", convert=T) |> # number col numeric
dplyr::mutate(dr_ind = factor(dr_ind, levels=gtools::mixedsort(unique(dr_ind)), ordered=T),
ind_accum = factor(ind_accum, levels=gtools::mixedsort(unique(ind_accum)), ordered=T)) |>
as.data.table()message("Finished")Finished