"""
created matt_dumont
on: 11/30/24
"""
import tempfile
from copy import deepcopy
import numpy as np
import pandas as pd
from pathlib import Path
import shutil
from komanawa.nz_depth_to_water.density_grid import DensityGrid
import netCDF4 as nc
def _get_nc_path(_skiptest=False) -> Path:
datapath = Path(__file__).parent.joinpath('data', 'nz_depth_to_water.nc')
if not _skiptest:
assert datapath.exists(), f'{datapath} does not exist. should not get here'
return datapath
[docs]
def get_nc_dataset(to_path=None, rewrite=False) -> Path:
"""
Get the netcdf dataset for the depth to water data for New Zealand.
:param to_path: str, optional, the path to save the dataset to. if None then saves to temp directory and returns the dataset path. This behavior prevents overwriting the original data.
:return: nc.Dataset
"""
path = _get_nc_path()
if to_path is None:
to_path = Path(tempfile.mkdtemp()).joinpath(path.name)
to_path = Path(to_path)
if to_path.exists() and not rewrite:
return to_path
else:
to_path.unlink(missing_ok=True)
shutil.copy(path, to_path)
return to_path
_meta_keys = ('site_name',
'rl_source', 'source', 'nztm_x', 'nztm_y', 'reading_count', 'mean_gwl', 'median_gwl', 'std_gwl',
'max_gwl',
'min_gwl',
'well_depth', 'bottom_bottomscreen', 'dist_mp_to_ground_level', 'rl_elevation', 'top_topscreen',
'mean_dtw',
'median_dtw', 'std_dtw', 'max_dtw', 'min_dtw', 'start_date', 'end_date')
_wl_keys = (
'wl_site_name', 'wl_date', 'wl_gw_elevation', 'wl_depth_to_water', 'wl_depth_to_water_cor', 'wl_water_elev_flag',
'wl_dtw_flag',
)
def _make_metadata_table_from_nc(savepath):
outstr = []
t0 = 'Water Level Data column meanings/metadata'
t2 = 'Metadata column meanings/metadata'
for t, keys in zip((t0, t2), (_wl_keys, _meta_keys)):
outstr.append(t)
outstr.append('=' * len(t))
outstr.append('')
outstr.append('')
with nc.Dataset(_get_nc_path(), 'r') as ds:
for k in keys:
outstr.append(f'* {k}')
for attr in ds[k].ncattrs():
if attr == 'flag_values':
outstr.append(f' * {attr}:')
flag_meanings = ds[k].flag_meanings.split(' ')
flag_values = ds[k].flag_values
assert len(flag_meanings) == len(
flag_values), f'{k} flag values and meanings are not the same length'
for i, v in enumerate(ds[k].flag_values):
outstr.append(f' * {v}: {flag_meanings[i]}')
elif attr == 'flag_meanings':
pass
else:
outstr.append(f' * {attr}: {ds[k].getncattr(attr)}')
outstr.append('')
outstr.append('')
outstr.append('')
outstr = '\n'.join(outstr)
with open(savepath, 'w') as f:
f.write(outstr)
[docs]
def get_metdata_keys() -> list:
"""
Get the metadata keys for the depth to water data for New Zealand.
:return: list, the metadata keys.
"""
return list(_meta_keys)
[docs]
def get_water_level_keys() -> list:
"""
Get the water level keys for the depth to water data for New Zealand.
:return: list, the water level keys.
"""
return list(_wl_keys)
[docs]
def nz_depth_to_water_dump() -> str:
"""
Get the metadata string for the depth to water data for New Zealand. equivalent to ncDump
:return: str, the metadata string.
"""
out_str = []
with nc.Dataset(get_nc_dataset(), 'r') as ds:
out_str.append(ds.__str__())
for d in ds.dimensions.keys():
out_str.append(ds.dimensions[d].__str__())
for key in ds.variables.keys():
out_str.append(ds[key].__str__())
return '\n\n'.join(out_str)
acceptable_sources = (
'auk', 'bop', 'gdc', 'hbrc', 'hrc', 'mdc', 'nrc', 'ncc', 'orc', 'src', 'trc', 'tdc', 'wrc', 'gwrc', 'wcrc',
'nzgd', 'tcc', 'ecan')
[docs]
def get_nz_depth_to_water(source=None, convert_wl_dtw_flag=False, wl_water_elev_flag=False, ncdataset_path=None) -> (
pd.DataFrame, pd.DataFrame):
"""
Get the depth to water data for New Zealand.
acceptable_sources = (None, 'auk', 'bop', 'gdc', 'hbrc', 'hrc', 'mdc', 'nrc', 'ncc', 'orc', 'src', 'trc', 'tdc', 'wc', 'gwrc', 'wcrc', 'nzgd', 'ecan')
:param source: None (get all data), str (get data from a specific source)
:param convert_wl_dtw_flag: bool, if True then convert the wl_dtw_flag to a string.
:param wl_water_elev_flag: bool, if True then convert the wl_water_elev_flag to a string.
:param ncdataset_path: str, optional, the path to the netcdf dataset to use. if None then uses the default path stored in the repo. This provides a mechanism to access a higher precision dataset, which is available on request (it is too large to host in the github repo).
:return: metadata: pd.DataFrame, water_level_data: pd.DataFrame
"""
if ncdataset_path is None:
ncdataset_path = _get_nc_path()
with (nc.Dataset(ncdataset_path, 'r') as ds):
if source is not None:
assert source in acceptable_sources, f'{source} not in {acceptable_sources}'
source_mapper = {v: k for k, v in zip(ds['source'].flag_values, ds['source'].flag_meanings.split(' '))}
meta_index = np.where(ds.variables['source'][:] == source_mapper[source])[0]
assert len(meta_index) == len(set(meta_index)), 'duplicate meta index'
reading_index = np.where(np.isin(ds['wl_site_name'][:], meta_index))[0]
assert len(reading_index) == len(set(reading_index)), 'duplicate reading index'
else:
meta_index = np.arange(ds.dimensions['site'].size)
reading_index = np.arange(ds.dimensions['reading'].size)
outmetadata = pd.DataFrame(index=meta_index, columns=_meta_keys)
out_water_level_data = pd.DataFrame(index=reading_index, columns=_wl_keys)
for keyset, outdf, use_index in zip((_meta_keys, _wl_keys), (outmetadata, out_water_level_data),
(meta_index, reading_index)):
pass
for k in keyset:
if k == 'wl_site_name':
all_sites = np.array(ds['site_name'][:])
outdf[k] = all_sites[np.array(ds['wl_site_name'][use_index]).astype(int)]
elif hasattr(ds[k], 'flag_values'):
skip_possible_convert = (
(k == 'wl_dtw_flag' and not convert_wl_dtw_flag)
or (k == 'wl_water_elev_flag' and not wl_water_elev_flag))
if skip_possible_convert:
td = np.array(ds[k][use_index])
else:
mapper = {k: v for k, v in zip(ds[k].flag_values, ds[k].flag_meanings.split(' '))}
td = [mapper[v] for v in ds[k][use_index]]
outdf[k] = td
elif 'days since' in ds[k].units:
temp = np.array(ds[k][use_index]).astype(float)
temp[np.isclose(temp, ds[k].missing_value)] = np.nan
idx = np.isfinite(temp)
out = np.full(temp.shape, pd.NaT)
out[idx] = pd.to_datetime(ds[k].origin) + pd.to_timedelta(deepcopy(temp[idx]), unit='D')
outdf[k] = out
else:
ds[k].set_auto_scale(False)
outdf[k] = np.array(ds[k][use_index])
if k in ['nztm_x', 'nztm_y', 'reading_count']:
outdf[k] = outdf[k].astype(int)
elif k not in ['site_name']:
outdf.loc[np.isclose(outdf[k], ds[k].missing_value), k] = np.nan
outdf[k] *= ds[k].scale_factor + ds[k].add_offset
outmetadata['site_name'] = outmetadata['site_name'].astype(str)
outmetadata = outmetadata.set_index('site_name', drop=True)
return outmetadata, out_water_level_data,
[docs]
def export_dtw_to_csv(outdir, source=None, convert_wl_dtw_flag=False, wl_water_elev_flag=False):
"""
Export the depth to water data to csv files.
:param outdir: str, the directory to save the csv files to.
:param source: None (get all data), str (get data from a specific source) see get_nz_depth_to_water for acceptable sources.
:return:
"""
print(f'Preparing to export data to csvs in {outdir}')
metadata, water_level_data = get_nz_depth_to_water(source, convert_wl_dtw_flag=convert_wl_dtw_flag,
wl_water_elev_flag=wl_water_elev_flag)
outdir = Path(outdir)
outdir.mkdir(exist_ok=True)
water_level_data.to_csv(outdir.joinpath('water_level_data.csv'))
metadata.to_csv(outdir.joinpath('metadata.csv'))
print(f'Exported data to {outdir}')
[docs]
def get_npoint_in_radius(distlim) -> (np.ndarray, np.ndarray, np.ndarray):
"""
Get the number of points within [1000 | 5000 | 10,000 | 20,000] m of each point in the model grid.
:param distlim: int, the distance limit in meters.
:return: ndatapoints, mx, my (np.ndarray, np.ndarray, np.ndarray) gridded output
"""
dg = DensityGrid()
assert distlim in dg.distlims, f'{distlim} not in {dg.distlims}'
nans = dg.get_nan_layer()
mx, my = dg.get_xy()
data = np.load(dg.data_path.parent.joinpath(f'npoins_nearest_{distlim}.npy'))
data[nans] = np.nan
return data, mx, my
[docs]
def get_distance_to_nearest(npoints) -> (np.ndarray, np.ndarray, np.ndarray):
"""
Get the distance to the nearest [1|10] points for each point in the model grid.
:param npoints: int, the number of points to consider.
:return: distance(m), mx, my (np.ndarray, np.ndarray, np.ndarray) gridded output
"""
dg = DensityGrid()
assert npoints in dg.npoints, f'{npoints} not in {dg.npoints}'
nans = dg.get_nan_layer()
mx, my = dg.get_xy()
data = np.load(dg.data_path.parent.joinpath(f'distance_m_to_nearest_{npoints}_data.npy'))
data[nans] = np.nan
return data, mx, my
[docs]
def copy_geotifs(outdir):
"""
copy the geotifs of distance to nearest [1|10] points and number of points within [1|5|10|20] km to the outdir.
:param outdir: directory to copy the geotifs to.
:return:
"""
outdir = Path(outdir)
outdir.mkdir(exist_ok=True)
datadir = Path(__file__).parent.joinpath('data')
tifs = datadir.glob('*.tif')
for tif in tifs:
shutil.copy(tif, outdir.joinpath(tif.name))
print(f'Copied geotifs to {outdir}')
if __name__ == '__main__':
meta, wld = get_nz_depth_to_water('gwrc')
pass