#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
advection.py
Written by Tyler Sutterley and Ben Smith (09/2024)
Routines for advecting ice parcels using velocity estimates
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
scipy: Scientific Tools for Python
https://docs.scipy.org/doc/
netCDF4: Python interface to the netCDF C library
https://unidata.github.io/netcdf4-python/netCDF4/index.html
gdal: Pythonic interface to the Geospatial Data Abstraction Library (GDAL)
https://pypi.python.org/pypi/GDAL/
matplotlib: Python 2D plotting library
http://matplotlib.org/
https://github.com/matplotlib/matplotlib
pointCollection: Utilities for organizing and manipulating point data
https://github.com/SmithB/pointCollection
UPDATE HISTORY:
Updated 09/2024: use wrapper to importlib for optional dependencies
Updated 05/2024: make subscriptable and allow item assignment
Updated 04/2024: added base level attribute for time units
Updated 05/2023: add fill gaps function and xy0 interpolator
add option to advect parcels to set the number of steps directly
using pathlib to define and expand paths
Updated 03/2023: added function for extracting from a dictionary
verify input times are float64 arrays
set default interpolator to linear regular grid
add case for using regular grid interpolation with 2d velocities
Updated 10/2022: added option to plot divergence of velocity field
added streaklines based on a velocity field
Updated 08/2022: verify datatype of imported velocity fields
add interpolation and plot routines for unstructured meshes
place some imports within try/except statements
Updated 06/2022: added velocity and streamline plot routine
using numpy nan_to_num function to convert NaN values
Updated 05/2022: verify that input spatial coordinates are doubles
Updated 04/2022: updated docstrings to numpy documentation format
Updated 02/2022: converted to a python class using pointCollection
added spline and regular grid interpolators
Updated 02/2018: added catch for points outside image extents
added adaptive Runge-Kutta-Fehlberg method
Written 01/2018
"""
from __future__ import annotations
import os
import io
import re
import copy
import time
import logging
import pathlib
import numpy as np
import scipy.interpolate
import scipy.spatial
import pointAdvection.utilities
# attempt imports
plt = pointAdvection.utilities.import_dependency('matplotlib.pyplot')
mtri = pointAdvection.utilities.import_dependency('matplotlib.mtri')
pc = pointAdvection.utilities.import_dependency('pointCollection')
timescale = pointAdvection.utilities.import_dependency('timescale')
[docs]
class advection():
"""
Data class for advecting ice parcels using velocity estimates
Attributes
----------
x: np.ndarray
x-coordinates
y: np.ndarray
y-coordinates
t: np.ndarray
time coordinates
x0: np.ndarray or NoneType, default None
Final x-coordinate after advection
y0: np.ndarray or NoneType, default None
Final y-coordinate after advection
t0: np.ndarray or float, default 0.0
Ending time for advection
velocity: obj
``pointCollection`` object of velocity fields
Can be type ``grid`` or ``mesh``
vel_mean: obj
``pointCollection`` object of mean velocity field
streak: dict
path traversed during advection (set kwarg 'streak' to true
in translate methods to enable)
filename: str
input filename of velocity file
integrator: str
Advection function
- ``'euler'``
- ``'RK4'``
- ``'RKF45'``
method: str, default 'linear'
Interpolation method for velocities
- ``'bilinear'``: quick bilinear interpolation
- ``'spline'``: scipy bivariate spline interpolation
- ``'linear'``, ``'nearest'``: scipy regular grid interpolations
- ``'linearND'``, ``'nearestND'``: scipy unstructured N-dimensional interpolations
interpolant: obj
Interpolation function for velocity fields
time_units: str, default 'years'
Temporal units for advection
fill_value: float or NoneType, default np.nan
invalid value for output data
"""
np.seterr(invalid='ignore')
def __init__(self, **kwargs):
# set default keyword arguments
kwargs.setdefault('x', None)
kwargs.setdefault('y', None)
kwargs.setdefault('t', None)
kwargs.setdefault('t0', 0.0)
kwargs.setdefault('integrator', 'RK4')
kwargs.setdefault('method', 'linear')
kwargs.setdefault('time_units', 'years')
kwargs.setdefault('fill_value', np.nan)
# set default class attributes
self.x=np.atleast_1d(kwargs['x']).astype('f8')
self.y=np.atleast_1d(kwargs['y']).astype('f8')
self.t=np.array(kwargs['t'], dtype='f8')
self.x0=None
self.y0=None
self.t0=np.array(kwargs['t0'], dtype='f8')
self.velocity=None
self.streak={'x':[], 'y':[],'t':[]}
self.filename=None
self.integrator=copy.copy(kwargs['integrator'])
self.method=copy.copy(kwargs['method'])
self.interpolant={}
self.time_units=copy.copy(kwargs['time_units'])
self.fill_value=kwargs['fill_value']
self.vel_mean=None
def __update_streak__(self, t, **kwargs):
"""
Update the streakline during flow tracing.
"""
if kwargs['streak']:
self.streak['x'] += [self.x0.copy()]
self.streak['y'] += [self.y0.copy()]
self.streak['t'] += [t.copy()]
[docs]
def case_insensitive_filename(self,
filename: str | pathlib.Path | io.BytesIO
):
"""
Searches a directory for a filename without case dependence
Parameters
----------
filename: str, pathlib.Path, or io.BytesIO
input filename
"""
# check if filename is open file object
if isinstance(filename, io.IOBase):
self.filename = copy.copy(filename)
else:
# tilde-expand input filename
self.filename = pathlib.Path(filename).expanduser().absolute()
# check if file presently exists with input case
if not self.filename.exists():
# search for filename without case dependence
f = [f.name for f in self.filename.parent.iterdir() if
re.match(self.filename.name, f.name, re.I)]
if not f:
errmsg = f'{filename} not found in file system'
raise FileNotFoundError(errmsg)
self.filename = self.filename.with_name(f.pop())
# convert filename to string
self.filename = str(self.filename)
# print filename
logging.debug(self.filename)
return self
# PURPOSE: read geotiff velocity file and extract x and y velocities
[docs]
def from_geotiff(self,
filename: str | pathlib.Path | io.BytesIO,
**kwargs
):
"""
Read geotiff velocity file and extract x and y velocities
Parameters
----------
filename: str or pathlib.Path
geotiff velocity file
bounds: list or NoneType, default None
boundaries to read: ``[[xmin, xmax], [ymin, ymax]]``
If not specified, can read around input points
buffer: float or NoneType, default 5e4
Buffer around input points for extracting velocity fields
scale: float, default None
scaling factor for converting velocities to common units
"""
# set default keyword arguments
kwargs.setdefault('bounds', None)
kwargs.setdefault('buffer', 5e4)
kwargs.setdefault('scale', 1.0)
# check input arguments
assert isinstance(filename, (str, pathlib.Path)), \
'filename must be a string or file object'
assert isinstance(kwargs['bounds'], (list, np.ndarray, type(None))), \
'bounds must be an iterable or None'
assert isinstance(kwargs['buffer'], (float, type(None))), \
'buffer must be a float or None'
assert isinstance(kwargs['scale'], (int, float)), \
'scale must be an int or float'
# find input geotiff velocity file
self.case_insensitive_filename(filename)
# x and y limits (buffered maximum and minimum)
if (kwargs['bounds'] is None) and (kwargs['buffer'] is not None):
bounds = self.buffered_bounds(kwargs['buffer'])
else:
bounds = kwargs['bounds']
# set scale for converting velocities to common units
scale = np.float64(kwargs['scale'])
# read input velocity file from geotiff
UV = pc.grid.data().from_geotif(self.filename,
bands=[1,2], bounds=bounds)
# return the input velocity field as new point collection object
# use scale to convert to common units
self.velocity = pc.grid.data().from_dict(dict(x=UV.x, y=UV.y,
U=UV.z[:,:,0]*scale, V=UV.z[:,:,1]*scale))
# copy projection from geotiff to output pc object
self.velocity.projection = copy.copy(UV.projection)
# set the spacing and dimensions of grid
dx = self.velocity.x[1] - self.velocity.x[0]
dy = self.velocity.y[1] - self.velocity.y[0]
setattr(self.velocity, 'spacing', (dx, dy))
setattr(self.velocity, 'type', 'grid')
# attempt to set dimensions if not current attribute
try:
setattr(self.velocity, 'ndim', self.velocity.U.ndim)
except AttributeError as exc:
pass
return self
# PURPOSE: read netCDF4 velocity file and extract x and y velocities
[docs]
def from_nc(self,
filename: str | io.IOBase,
field_mapping: dict = dict(U='VX', V='VY'),
group: str | None = None,
bounds: list | np.ndarray | None = None,
buffer: float | None = 5e4,
scale: float = 1.0/31557600.0, **kwargs
):
"""
Read netCDF4 velocity file and extract x and y velocities
Parameters
----------
filename: str
netCDF4 velocity file
field_mapping: dict, default {'U':'VX', 'V':'VY'}
mapping between netCDF4 and output field variables
group: str or NoneType, default None
netCDF4 group to extract variables
bounds: list or NoneType, default None
boundaries to read: ``[[xmin, xmax], [ymin, ymax]]``
If not specified, can read around input points
buffer: float or NoneType, default 5e4
Buffer around input points for extracting velocity fields
scale: float, default None
scaling factor for converting velocities to common units
"""
if self.x is None or not np.any(np.isfinite(self.x)):
raise(ValueError('need to assign x and y before reading velocities'))
# set default keyword arguments
kwargs.setdefault('field_mapping', {'U':'VX', 'V':'VY'})
kwargs.setdefault('group', None)
kwargs.setdefault('bounds', None)
kwargs.setdefault('buffer', 5e4)
kwargs.setdefault('scale', 1.0)
# check input arguments
assert isinstance(filename, (str, pathlib.Path)), \
'filename must be a string or file object'
assert isinstance(kwargs['field_mapping'], dict), \
'field_mapping must be a dictionary'
assert isinstance(kwargs['bounds'], (list, np.ndarray, type(None))), \
'bounds must be an iterable or None'
assert isinstance(kwargs['buffer'], (float, type(None))), \
'buffer must be a float or None'
assert isinstance(kwargs['scale'], (int, float)), \
'scale must be an int or float'
# find input netCDF4 velocity file
self.case_insensitive_filename(filename)
# x and y limits (buffered maximum and minimum)
if (kwargs['bounds'] is None) and (kwargs['buffer'] is not None):
bounds = self.buffered_bounds(kwargs['buffer'])
else:
bounds = kwargs['bounds']
# set scale for converting velocities to common units
scale = np.float64(kwargs['scale'])
# read input velocity file from netCDF4
self.velocity = pc.grid.data().from_nc(self.filename,
field_mapping=field_mapping, group=group,
bounds=bounds, **kwargs)
# attempt to read the error terms
try:
temp = pc.grid.data().from_nc(self.filename,
fields=['ERRX','ERRY'], group=group,
bounds=bounds)
self.velocity.assign(eU=temp.ERRX, eV=temp.ERRY)
except Exception:
pass
# attempt to set dimensions if not current attribute
try:
setattr(self.velocity, 'ndim', self.velocity.U.ndim)
except AttributeError:
pass
# swap orientation of axes
if (self.velocity.t_axis == 0) and (self.velocity.ndim == 3):
self.velocity.U = np.transpose(self.velocity.U, axes=(1,2,0))
self.velocity.V = np.transpose(self.velocity.V, axes=(1,2,0))
# check if velocity has error components
if hasattr(self.velocity, 'eU'):
self.velocity.eU = np.transpose(self.velocity.eU, axes=(1,2,0))
if hasattr(self.velocity, 'eV'):
self.velocity.eV = np.transpose(self.velocity.eV, axes=(1,2,0))
# update time dimension axis
self.velocity.t_axis = 2
# create mask for invalid velocity points
mask = ((self.velocity.U.data == self.velocity.fill_value) & \
(self.velocity.V.data == self.velocity.fill_value))
# check if any grid values are nan
mask |= np.isnan(self.velocity.U.data) | np.isnan(self.velocity.V.data)
# use scale to convert to common units
self.velocity.U = scale*np.array(self.velocity.U, dtype=float)
self.velocity.V = scale*np.array(self.velocity.V, dtype=float)
if hasattr(self.velocity, 'eU'):
self.velocity.eU = scale*np.array(self.velocity.eU, dtype=float)
if hasattr(self.velocity, 'eV'):
self.velocity.eV = scale*np.array(self.velocity.eV, dtype=float)
# update fill values in velocity grids
self.velocity.U[mask] = self.fill_value
self.velocity.V[mask] = self.fill_value
if hasattr(self.velocity, 'eU'):
self.velocity.eU[mask] = self.fill_value
if hasattr(self.velocity, 'eV'):
self.velocity.eV[mask] = self.fill_value
# set the spacing of grid
dx = self.velocity.x[1] - self.velocity.x[0]
dy = self.velocity.y[1] - self.velocity.y[0]
setattr(self.velocity, 'spacing', (dx, dy))
setattr(self.velocity, 'type', 'grid')
return self
# PURPOSE: create an advection object from an input dictionary
[docs]
def from_dict(self,
D_dict: dict,
**kwargs
):
"""
Create an advection object from an input dictionary
Parameters
----------
D_dict: dict
Dictionary of advection grid variables
bounds: list or NoneType, default None
boundaries to read: ``[[xmin, xmax], [ymin, ymax]]``
If not specified, can read around input points
buffer: float or NoneType, default None
Buffer around input points for extracting velocity fields
scale: float, default None
scaling factor for converting velocities to common units
"""
# set default keyword arguments
kwargs.setdefault('bounds', None)
kwargs.setdefault('buffer', None)
kwargs.setdefault('scale', 1.0)
kwargs.setdefault('t_axis', 0)
# check input arguments
assert isinstance(kwargs['bounds'], (list, np.ndarray, type(None))), \
'bounds must be an iterable or None'
assert isinstance(kwargs['buffer'], (float, type(None))), \
'buffer must be a float or None'
assert isinstance(kwargs['scale'], (int, float)), \
'scale must be an int or float'
assert isinstance(kwargs['t_axis'], int), \
't_axis must be a int'
# read input velocity file from netCDF4
self.velocity = pc.grid.data(t_axis=kwargs['t_axis']).from_dict(D_dict)
# x and y limits (buffered maximum and minimum)
if (kwargs['bounds'] is None) and (kwargs['buffer'] is not None):
bounds = self.buffered_bounds(kwargs['buffer'])
else:
bounds = kwargs['bounds']
# set scale for converting velocities to common units
scale = np.float64(kwargs['scale'])
# crop grid data to bounds
if bounds is not None:
self.velocity.crop(bounds[0], bounds[1])
# attempt to set dimensions if not current attribute
try:
setattr(self.velocity, 'ndim', self.velocity.U.ndim)
except AttributeError as exc:
pass
# swap orientation of axes
if (self.velocity.t_axis == 0) and (self.velocity.ndim == 3):
self.velocity.U = np.transpose(self.velocity.U, axes=(1,2,0))
self.velocity.V = np.transpose(self.velocity.V, axes=(1,2,0))
# check if velocity has error components
if hasattr(self.velocity, 'eU'):
self.velocity.eU = np.transpose(self.velocity.eU, axes=(1,2,0))
if hasattr(self.velocity, 'eV'):
self.velocity.eV = np.transpose(self.velocity.eV, axes=(1,2,0))
# update time dimension axis
self.velocity.t_axis = 2
# create mask for invalid velocity points
mask = ((self.velocity.U.data == self.velocity.fill_value) & \
(self.velocity.V.data == self.velocity.fill_value))
# check if any grid values are nan
mask |= np.isnan(self.velocity.U.data) | np.isnan(self.velocity.V.data)
# use scale to convert to common units
self.velocity.U = scale*np.array(self.velocity.U, dtype=float)
self.velocity.V = scale*np.array(self.velocity.V, dtype=float)
if hasattr(self.velocity, 'eU'):
self.velocity.eU = scale*np.array(self.velocity.eU, dtype=float)
if hasattr(self.velocity, 'eV'):
self.velocity.eV = scale*np.array(self.velocity.eV, dtype=float)
# update fill values in velocity grids
self.velocity.U[mask] = self.fill_value
self.velocity.V[mask] = self.fill_value
if hasattr(self.velocity, 'eU'):
self.velocity.eU[mask] = self.fill_value
if hasattr(self.velocity, 'eV'):
self.velocity.eV[mask] = self.fill_value
# set the spacing of grid
dx = self.velocity.x[1] - self.velocity.x[0]
dy = self.velocity.y[1] - self.velocity.y[0]
setattr(self.velocity, 'spacing', (dx, dy))
setattr(self.velocity, 'type', 'grid')
return self
# PURPOSE: build a data object from a list of other data objects
[docs]
def from_list(self,
D_list: list,
**kwargs
):
"""
Build a data object from a list of other data objects
Parameters
----------
D_list: list
``pointCollection`` grid objects
sort: bool, default False
sort the list of data objects before merging
bounds: list or NoneType, default None
boundaries to read: ``[[xmin, xmax], [ymin, ymax]]``
If not specified, can read around input points
buffer: float or NoneType, default None
Buffer around input points for extracting velocity fields
scale: float or NoneType, default None
scaling factor for converting velocities common units
"""
# set default keyword arguments
kwargs.setdefault('sort', False)
kwargs.setdefault('bounds', None)
kwargs.setdefault('buffer', None)
kwargs.setdefault('scale', 1.0)
# check input arguments
assert isinstance(kwargs['sort'], bool), \
'sort must be a boolean'
assert isinstance(kwargs['bounds'], (list, np.ndarray, type(None))), \
'bounds must be an iterable or None'
assert isinstance(kwargs['buffer'], (float, type(None))), \
'buffer must be a float or None'
assert isinstance(kwargs['scale'], (int, float)), \
'scale must be an int or float'
# x and y limits (buffered maximum and minimum)
if (kwargs['bounds'] is None) and (kwargs['buffer'] is not None):
bounds = self.buffered_bounds(kwargs['buffer'])
else:
bounds = kwargs['bounds']
# set scale for converting velocities to common units
scale = np.float64(kwargs['scale'])
# read input velocity data from grid objects
v_list=[]
for Di in D_list:
if isinstance(Di, self.__class__):
v_list += [Di.velocity]
# if we are assembling from a set of advection objects, assume that scaling is already done
scale=1.
else:
v_list += [Di]
self.velocity = pc.grid.data().from_list(v_list, sort=kwargs['sort'])
# crop grid data to bounds
if bounds is not None:
self.velocity.crop(bounds[0], bounds[1])
# attempt to set dimensions if not current attribute
try:
setattr(self.velocity, 'ndim', self.velocity.U.ndim)
except AttributeError:
pass
# swap orientation of axes
if (self.velocity.t_axis == 0) and (self.velocity.ndim == 3):
self.velocity.U = np.transpose(self.velocity.U, axes=(1,2,0))
self.velocity.V = np.transpose(self.velocity.V, axes=(1,2,0))
# check if velocity has error components
if hasattr(self.velocity, 'eU'):
self.velocity.eU = np.transpose(self.velocity.eU, axes=(1,2,0))
if hasattr(self.velocity, 'eV'):
self.velocity.eV = np.transpose(self.velocity.eV, axes=(1,2,0))
# update time dimension axis
self.velocity.t_axis = 2
# use scale to convert to common units
self.velocity.U *= scale
self.velocity.V *= scale
if hasattr(self.velocity, 'eU'):
self.velocity.eU = scale*np.array(self.velocity.eU, dtype=float)
if hasattr(self.velocity, 'eV'):
self.velocity.eV = scale*np.array(self.velocity.eV, dtype=float)
# update fill values in velocity grids
mask = np.isnan(self.velocity.U) | np.isnan(self.velocity.V)
self.velocity.U[mask] = self.fill_value
self.velocity.V[mask] = self.fill_value
if hasattr(self.velocity, 'eU'):
self.velocity.eU[mask] = self.fill_value
if hasattr(self.velocity, 'eV'):
self.velocity.eV[mask] = self.fill_value
# set the spacing of grid
dx = self.velocity.x[1] - self.velocity.x[0]
dy = self.velocity.y[1] - self.velocity.y[0]
setattr(self.velocity, 'spacing', (dx, dy))
setattr(self.velocity, 'type', 'grid')
return self
# PURPOSE: calculate the mean velocity field
[docs]
def calc_vel_mean(self, velocity=None, w_smooth=None):
"""
Calculate a mean velocity field
Parameters
----------
velocity: pc.grid.data, optional
If specified, the mean velocity will be calculated from this,
otherwise self.velocity will be used
w_smooth : float, optional
If specified, gaps in the velocity field will be filed with a Gaussian
smoothing of this width. The default is None.
Returns
-------
pointCollection.grid.data.
error-weighted mean velocity.
"""
# calculate an error-weighted average of the velocities
v = self.velocity.copy()
wU = 1/v.eU**2/np.nansum(1/v.eU**2, axis=2)[:,:,None]
wV = 1/v.eV**2/np.nansum(1/v.eV**2, axis=2)[:,:,None]
vel_mean=pc.grid.data().from_dict({'x':np.array(v.x),
'y':np.array(v.y),
'U':np.nansum(wU*v.U, axis=2),
'V':np.nansum(wV*v.V, axis=2),
'eU':np.sqrt(np.nansum(wU**2*v.eU**2, axis=2)),
'eV':np.sqrt(np.nansum(wV**2*v.eV**2, axis=2))})
bad = np.sum(np.isfinite(wU), axis=2)==0
for field in ['U','V']:
getattr(vel_mean, field)[bad]=np.nan
# fill gaps in vel_mean
if w_smooth is not None:
vel_mean.fill_smoothed(fields=['U','V', 'eU', 'eV'], w_smooth=w_smooth)
return vel_mean
# PURPOSE: fill in holes in velocity maps
[docs]
def fill_velocity_gaps(self, velocity=None, annual=True, w_smooth=None, vel_mean=None):
"""
Fill in gaps in the velocity field.
Parameters
----------
velocity : pointCollection.grid.data, optional
if specified, mean velocity field will be calculated from
this field, otherwise, self.velocity will be used
annual : Boolean, optional
If True, will attempt to fill velocity gaps with the average
of the previous and next year's value.
w_smooth : float, optional
If specified, gaps in the velocity field will be filed with a Gaussian
smoothing of this width. The default is None.
vel_mean : pointCollection.grid.data, optional
If specified, this will be used as the mean velocity estimate,
otherwise will be calculated from the velocity field
Returns
-------
None.
"""
# attempt to fill in gaps in each velocity field with the average of
# the velocity from one year prior and that from one year later.
# if one or the other of these is missing, use valid values from the
# slice that is present
v=self.velocity.copy()
v_filled = self.velocity.copy()
if annual:
# attempt to fill data gaps with values one year before and after
delta_year = timescale.time._to_sec['years']/self._to_sec
delta_tol = delta_year/8
for ii in range(v.U.shape[2]-1):
this_U = v.U[:,:,ii].copy()
this_V = v.V[:,:,ii].copy()
this_eU = v.eU[:,:,ii].copy()
this_eV = v.eV[:,:,ii].copy()
u_temp = np.zeros_like(this_U)
v_temp = np.zeros_like(this_U)
eu2_temp = np.zeros_like(this_U)
ev2_temp = np.zeros_like(this_U)
w_temp = np.zeros_like(this_U)
if annual:
for dt in [-delta_tol, delta_tol]:
# try to calculate the average of the previous year and the
# next year
other_year = np.argmin(np.abs(v.time[ii]+delta_year-v.time))
if np.abs((v.time[other_year]-v.time[ii]).astype(float)) > delta_tol:
continue
good = np.isfinite(v.U[:,:,other_year])
u_temp[good] += v.U[:,:,other_year][good]
v_temp[good] += v.V[:,:,other_year][good]
eu2_temp[good] += v.eU[:,:,other_year][good]**2
ev2_temp[good] += v.eV[:,:,other_year][good]**2
w_temp[good] += 1
else:
for jj in [ii-1, ii+1]:
# try to build the average of the previous time slice and the
# next time slice
if jj > 0 and jj < v.U.shape[2]:
good = np.isfinite(v.U[:,:,jj])
u_temp[good] += v.U[:,:,jj][good]
v_temp[good] += v.V[:,:,jj][good]
eu2_temp[good] += v.eU[:,:,jj][good]**2
ev2_temp[good] += v.eV[:,:,jj][good]**2
w_temp[good] += 1
u_temp[w_temp > 0] /= w_temp[w_temp>0]
v_temp[w_temp > 0] /= w_temp[w_temp>0]
eu2_temp[w_temp > 0] /= w_temp[w_temp>0]
ev2_temp[w_temp > 0] /= w_temp[w_temp>0]
to_replace = ((~np.isfinite(this_U)) & (w_temp>0)).ravel()
if np.any(to_replace):
this_U.ravel()[to_replace] = u_temp.ravel()[to_replace]
this_V.ravel()[to_replace] = v_temp.ravel()[to_replace]
this_eU.ravel()[to_replace] = np.sqrt(eu2_temp.ravel()[to_replace])
this_eV.ravel()[to_replace] = np.sqrt(ev2_temp.ravel()[to_replace])
v_filled.U[:,:,ii]=this_U
v_filled.V[:,:,ii]=this_V
v_filled.eU[:,:,ii]=this_eU
v_filled.eV[:,:,ii]=this_eV
if vel_mean is None:
vel_mean=self.calc_vel_mean(velocity=v_filled, w_smooth=w_smooth)
# fill in the remaining gaps using the mean velocity field
for ii in range(v.U.shape[2]):
this_U=v_filled.U[:,:,ii].copy()
this_V=v_filled.V[:,:,ii].copy()
to_replace = (~np.isfinite(this_U)).ravel()
this_eU=v_filled.eU[:,:,ii].copy()
this_eV=v_filled.eV[:,:,ii].copy()
if np.any(to_replace):
this_U.ravel()[to_replace] = vel_mean.U.ravel()[to_replace]
this_V.ravel()[to_replace] = vel_mean.V.ravel()[to_replace]
this_eU.ravel()[to_replace] = vel_mean.eU.ravel()[to_replace]
this_eV.ravel()[to_replace] = vel_mean.eV.ravel()[to_replace]
else:
continue
v_filled.U[:,:,ii]=this_U
v_filled.V[:,:,ii]=this_V
v_filled.eU[:,:,ii]=this_eU
v_filled.eV[:,:,ii]=this_eV
self.velocity.V = v_filled.V
self.velocity.U = v_filled.U
self.velocity.eV = v_filled.eV
self.velocity.eU = v_filled.eU
#PURPOSE: make interpolation objects to allow fast interpolation final displacements
[docs]
def xy1_interpolator(self, bounds=None, t_range=None, t_step=None, blocksize=100,
advection_time_step=0.25, report_progress=False):
'''
Make interpolation objects for the final position of parcels.
Parameters
----------
bounds : iterable of iterables, optional
x and y bounds of the region to be interpolated. If None,
the bounds of the velocity data are used. The default is None.
t_range : iterable of floats, optional
time bounds of the region to be interpolated, in seconds relative
to the epoch. If None, the time bounds of the velocity
data is used. The default is None.
t_step : float, optional
time step, in time_units. If None, the time values in the
velocity data are used.. The default is None.
advection_time_step : float, optional
reference time step for the advection object to use. The default is 0.25 (years)
blocksize: int, optional
grid will be divided into blocks of this size, or run all at once if None
Returns
-------
grid_obj : pc.grid.data
grid object with fields x1 and y1 containing positions at time t for objects starting at (x, y) at the epoch.
'''
if t_range is None:
t_range = [np.nanmin(self.velocity.time), np.nanmax(self.velocity.time)]
if bounds is None:
bounds=self.velocity.bounds() + [t_range]
else:
if len(bounds)==2:
bounds += tuple([t_range])
if t_step is None:
# use the times on the velocity object
ti = self.velocity.t
else:
ti = np.arange(bounds[2][0], bounds[2][1]+t_step, t_step)
# define grids of working coordinates
dx=self.velocity.x[1]-self.velocity.x[0]
x0, y0 = [np.arange(bb[0]-dx, bb[1]+dx, dx) for bb in bounds[0:2]]
print([x0.shape, y0.shape])
xg, yg=np.meshgrid(x0, y0)
epoch=self.t0
# output grids:
xf, yf = [np.zeros((y0.size, x0.size, ti.size)) for jj in ['x','y']]
if blocksize is not None:
x_blocksize=blocksize
y_blocksize=blocksize
else:
x_blocksize=x0.size
y_blocksize=y0.size
last_t=time.time()
for row0 in np.arange(0, y0.size, y_blocksize):
rows=slice(row0, np.minimum(row0+y_blocksize, y0.size))
for col0 in np.arange(0, x0.size, x_blocksize):
cols=slice(col0, np.minimum(col0+x_blocksize, x0.size))
xg_sub=xg[rows, cols]
yg_sub=yg[rows, cols]
# advect points starting at the epoch, towards the beginning and
# end of the time series
for these in [np.flatnonzero(ti >= epoch), np.flatnonzero(ti < epoch)[::-1]]:
# initialize at the epoch
self.y, self.x, self.t = [jj.ravel() for jj in [yg_sub, xg_sub, np.zeros_like(xg_sub)+epoch]]
for this_ind in these:
self.t0=ti[this_ind]
try:
self.translate_parcel(step=advection_time_step)
except ValueError:
# ValueError is thrown when all points are outside the velocity map
self.x0 *= np.nan
self.y0 *= np.nan
xf[rows, cols, this_ind]=self.x0.reshape(xg_sub.shape)
yf[rows, cols, this_ind]=self.y0.reshape(xg_sub.shape)
# update the initial coordinates for the next translation
self.x=self.x0.copy()
self.y=self.y0.copy()
self.t=np.zeros_like(xg[rows, cols]).ravel()+ti[this_ind]
if report_progress:
t_elapsed=time.time()-last_t
last_t=time.time()
print(f'row0={row0} out of {y0.size}, col0={col0} out of {x0.size}, dt={t_elapsed}')
# reset the epoch
self.t0 = epoch
grid_obj=pc.grid.data().from_dict({'x':x0,'y':y0, 't':ti, 'x1':xf,'y1':yf})
return grid_obj
#PURPOSE: interpolation objects to allow fast interpolation initial positions
[docs]
def xy0_interpolator(self, bounds=None, t_range=None, t_step=None, advection_time_step=10, blocksize=100):
"""
Build interpolation objects from initial to final positions
Parameters
----------
bounds : iterable, optional
bounds of the interpolation domain.
Can be specified as two iterables
(x, y) or three (x, y, t). The default is None.
t_range : iterable, optional
time range for the interpolation, in velocity time units (seconds relative
to epoch). If not specified, the time range of self.velocity
will be used. The default is None.
t_step : float, optional
The time step for the interpolation, in time_units. If None,
the time values in the velocity object will be used. The default is None.
advection_time_step : float, optional
The time step to use in parcel tracking, in days. The default is 5.
Returns
-------
grid_obj : pc.grid.data
grid object with fields x0 and y0 containing positions at the epoch for parcels starting at x, y, t
"""
# default time range is the maximum range of the velocity field
if t_range is None:
t_range = [np.nanmin(self.velocity.time), np.nanmax(self.velocity.time)]
# default bounds are the bounds of the velocity field
# plus the time range
if bounds is None:
bounds = self.velocity.bounds() + [t_range]
elif (bounds is not None) and (len(bounds) == 2):
bounds += [t_range]
# use the time from the velocity field
# or calculate the time using the the specified time step
if t_step is None:
# use the times on the velocity object
ti = self.velocity.t
else:
ti = np.arange(bounds[2][0], bounds[2][1]+t_step, t_step)
dx=self.velocity.x[1]-self.velocity.x[0]
x0, y0 = [np.arange(bb[0]-dx, bb[1]+dx, dx) for bb in bounds[0:2]]
yg, xg, tg = np.meshgrid(y0, x0, ti, indexing='ij')
x_init, y_init = [np.zeros_like(ii) for ii in [xg, yg]]
shp=xg.shape
if blocksize is not None:
x_blocksize=blocksize
y_blocksize=blocksize
else:
x_blocksize=x0.size
y_blocksize=y0.size
for t_ind in range(len(ti)):
for row0 in np.arange(0, y0.size, y_blocksize):
rows=slice(row0, np.minimum(row0+y_blocksize, y0.size))
for col0 in np.arange(0, x0.size, x_blocksize):
cols=slice(col0, np.minimum(col0+x_blocksize, x0.size))
xg_sub=xg[rows, cols, t_ind]
yg_sub=yg[rows, cols, t_ind]
tg_sub=tg[rows, cols, t_ind]
self.x, self.y, self.t = [item.ravel() for item in (xg_sub, yg_sub, tg_sub)]
try:
self.translate_parcel(step=advection_time_step)
except ValueError:
# ValueError is thrown when all points are outside the velocity map
self.x0 *= np.nan
self.y0 *= np.nan
x_init[rows, cols, t_ind] = self.x0.reshape(xg_sub.shape)
y_init[rows, cols, t_ind] = self.y0.reshape(xg_sub.shape)
grid_obj = pc.grid.data().from_dict({'x':x0, 'y':y0,'t':ti,'x0':x_init,'y0':y_init})
return grid_obj
# PURPOSE: translate a parcel between two times using an advection function
[docs]
def translate_parcel(self, **kwargs):
"""
Translates a parcel between two times using an advection function
Parameters
----------
integrator: str
Advection function
- ``'euler'``
- ``'RK4'``
- ``'RKF45'``
method: str
Interpolation method for velocities
- ``'bilinear'``: quick bilinear interpolation
- ``'spline'``: scipy bivariate spline interpolation
- ``'linear'``, ``'nearest'``: scipy regular grid interpolations
- ``'linearND'``, ``'nearestND'``: scipy unstructured N-dimensional interpolations
step: int or float, default 1
Temporal step size for advection
N: int or NoneType, default None
Number of integration steps
Default is determined based on the temporal step size
t0: float, default 0.0
Ending time for advection
"""
# set default keyword arguments
kwargs.setdefault('integrator', self.integrator)
kwargs.setdefault('method', self.method)
kwargs.setdefault('step', 1)
kwargs.setdefault('N', None)
kwargs.setdefault('t0', self.t0)
kwargs.setdefault('streak', False)
# check that there are points within the velocity file
if not self.inside_polygon(self.x,self.y).any():
raise ValueError('No points within ice velocity image')
# update advection class attributes
if (kwargs['integrator'] != self.integrator):
self.integrator = copy.copy(kwargs['integrator'])
if (kwargs['method'] != self.method):
self.method = copy.copy(kwargs['method'])
if (kwargs['t0'] != self.t0):
self.t0 = np.copy(kwargs['t0'])
# advect the parcel every step
# (using closest number of iterations)
step = np.float64(kwargs['step'])
# set or calculate the number of steps to advect the dataset
if kwargs['N'] is not None:
n_steps = np.copy(kwargs['N'])
elif (np.min(self.t0) < np.min(self.t)):
# maximum number of steps to advect backwards in time
n_steps = np.abs(np.max(self.t) - np.min(self.t0))/step
elif (np.max(self.t0) > np.max(self.t)):
# maximum number of steps to advect forward in time
n_steps = np.abs(np.max(self.t0) - np.min(self.t))/step
elif (np.ndim(self.t0) == 0) or (np.ndim(self.t) == 0):
# maximum number of steps between the two datasets
n_steps = np.max(np.abs(self.t0 - self.t))/step
else:
# average number of steps between the two datasets
n_steps = np.abs(np.mean(self.t0) - np.mean(self.t))/step
# check input advection functions
kwargs.update({'N': np.int64(n_steps)})
logging.debug(f'Advecting {n_steps} steps')
if (self.integrator == 'euler'):
# euler: Explicit Euler method
return self.euler(**kwargs)
elif (self.integrator == 'RK4'):
# RK4: Fourth-order Runge-Kutta method
return self.RK4(**kwargs)
elif (self.integrator == 'RKF45'):
# RKF45: adaptive Runge-Kutta-Fehlberg 4(5) method
return self.RKF45(**kwargs)
else:
raise ValueError('Invalid advection function')
# PURPOSE: Advects parcels using an Explicit Euler integration
[docs]
def euler(self, **kwargs):
"""
Advects parcels using an Explicit Euler integration
Parameters
----------
N: int, default 1
Number of integration steps
"""
# set default keyword options
kwargs.setdefault('N', 1)
# translate parcel from time 1 to time 2 at time step
dt = (self.t0 - self.t)/np.float64(kwargs['N'])
self.x0 = np.copy(self.x)
self.y0 = np.copy(self.y)
# keep track of time for 3-dimensional interpolations
t = np.copy(self.t)
self.__update_streak__(t, **kwargs)
for i in range(kwargs['N']):
u1, v1 = self.interpolate(x=self.x0, y=self.y0, t=t)
self.x0 += u1*dt
self.y0 += v1*dt
# add to time
t += dt
self.__update_streak__(t, **kwargs)
# return the translated coordinates
return self
# PURPOSE: Advects parcels using a fourth-order Runge-Kutta integration
[docs]
def RK4(self, **kwargs):
"""
Advects parcels using a fourth-order Runge-Kutta integration
Parameters
----------
N: int, default 1
Number of integration steps
"""
# set default keyword options
kwargs.setdefault('N', 1)
# translate parcel from time 1 to time 2 at time step
dt = (self.t0 - self.t)/np.float64(kwargs['N'])
self.x0 = np.copy(self.x)
self.y0 = np.copy(self.y)
self.__update_streak__(self.t, **kwargs)
# keep track of time for 3-dimensional interpolations
t = np.copy(self.t)
for i in range(kwargs['N']):
u1, v1 = self.interpolate(x=self.x0, y=self.y0, t=t)
x2, y2 = (self.x0 + 0.5*u1*dt, self.y0 + 0.5*v1*dt)
u2, v2 = self.interpolate(x=x2, y=y2, t=t)
x3, y3 = (self.x0 + 0.5*u2*dt, self.y0 + 0.5*v2*dt)
u3, v3 = self.interpolate(x=x3, y=y3, t=t)
x4, y4 = (self.x0 + u3*dt, self.y0 + v3*dt)
u4, v4 = self.interpolate(x=x4, y=y4, t=t)
self.x0 += dt*(u1 + 2.0*u2 + 2.0*u3 + u4)/6.0
self.y0 += dt*(v1 + 2.0*v2 + 2.0*v3 + v4)/6.0
# add to time
t += dt
self.__update_streak__(t, **kwargs)
# return the translated coordinates
return self
# PURPOSE: Advects parcels using a Runge-Kutta-Fehlberg integration
[docs]
def RKF45(self, **kwargs):
"""
Advects parcels using a Runge-Kutta-Fehlberg 4(5) integration
Parameters
----------
N: int, default 1
Number of integration steps
"""
# set default keyword options
kwargs.setdefault('N', 1)
# coefficients in Butcher tableau for Runge-Kutta-Fehlberg 4(5) method
b4 = [25.0/216.0, 0.0, 1408.0/2565.0, 2197.0/4104.0, -1.0/5.0, 0.0]
b5 = [16.0/135.0, 0.0, 6656.0/12825.0, 28561.0/56430.0, -9.0/50.0, 2.0/55.0]
# using an adaptive step size:
# iterate solution until the difference is less than the tolerance
# difference between the 4th and 5th order solutions
sigma = np.inf
# tolerance for solutions
tolerance = 5e-2
# multiply scale by factors of 2 until iteration reaches tolerance level
scale = 1
self.x0 = np.copy(self.x)
self.y0 = np.copy(self.y)
self.__update_streak__(self.t, **kwargs)
# while the difference (sigma) is greater than the tolerance
while (sigma > tolerance) or np.isnan(sigma):
# translate parcel from time 1 to time 2 at time step
dt = (self.t0 - self.t)/np.float64(scale*kwargs['N'])
X4OA, Y4OA = (np.copy(self.x), np.copy(self.y))
X5OA, Y5OA = (np.copy(self.x), np.copy(self.y))
# keep track of time for 3-dimensional interpolations
t = np.copy(self.t)
for i in range(scale*kwargs['N']):
# calculate fourth order accurate solutions
u4, v4 = self.RFK45_velocities(X4OA, Y4OA, dt, t=t)
X4OA += dt*np.dot(b4, u4)
Y4OA += dt*np.dot(b4, v4)
# calculate fifth order accurate solutions
u5, v5 = self.RFK45_velocities(X5OA, Y5OA, dt, t=t)
X5OA += dt*np.dot(b5, u5)
Y5OA += dt*np.dot(b5, v5)
# add to time
t += dt
# calculate difference between 4th and 5th order accurate solutions
i, = np.nonzero(np.isfinite(X4OA) & np.isfinite(Y4OA))
num = np.count_nonzero(np.isfinite(X4OA) & np.isfinite(Y4OA))
sigma = np.sqrt(np.sum((X5OA[i]-X4OA[i])**2 + (Y5OA[i]-Y4OA[i])**2)/num)
# if sigma is less than the tolerance: save xi and yi coordinates
# else: multiply scale by factors of 2 and re-run iteration
if (sigma <= tolerance) or np.isnan(sigma):
self.x0,self.y0 = (np.copy(X4OA), np.copy(Y4OA))
self.__update_streak__(t, **kwargs)
else:
scale *= 2
# return the translated coordinates
return self
# PURPOSE: calculates X and Y velocities for Runge-Kutta-Fehlberg 4(5) method
[docs]
def RFK45_velocities(self,
xi: np.ndarray,
yi: np.ndarray,
dt: np.ndarray,
**kwargs
):
"""
Calculates X and Y velocities for Runge-Kutta-Fehlberg 4(5) method
Parameters
----------
xi: np.ndarray
x-coordinates
yi: np.ndarray
y-coordinates
dt: np.ndarray
integration time step size
t: np.ndarray or NoneType, default None
time coordinates
"""
kwargs.setdefault('t', None)
# Butcher tableau for Runge-Kutta-Fehlberg 4(5) method
A = np.array([[1.0/4.0, 0.0, 0.0, 0.0, 0.0],
[3.0/32.0, 9.0/32.0, 0.0, 0.0, 0.0],
[1932.0/2197.0, -7200.0/2197.0, 7296.0/2197.0, 0.0, 0.0],
[439.0/216.0, -8.0, 3680.0/513.0, -845.0/4104.0, 0.0],
[-8.0/27.0, 2.0, -3544.0/2565.0, 1859.0/4104.0, -11.0/40.0]])
# calculate velocities and parameters for iteration
u1, v1 = self.interpolate(x=xi, y=yi, t=kwargs['t'])
x2 = xi + A[0,0]*u1*dt
y2 = yi + A[0,0]*v1*dt
u2, v2 = self.interpolate(x=x2, y=y2, t=kwargs['t'])
x3 = xi + (A[1,0]*u1 + A[1,1]*u2)*dt
y3 = yi + (A[1,0]*v1 + A[1,1]*v2)*dt
u3, v3 = self.interpolate(x=x3, y=y3, t=kwargs['t'])
x4 = xi + (A[2,0]*u1 + A[2,1]*u2 + A[2,2]*u3)*dt
y4 = yi + (A[2,0]*v1 + A[2,1]*v2 + A[2,2]*v3)*dt
u4, v4 = self.interpolate(x=x4, y=y4, t=kwargs['t'])
x5 = xi + (A[3,0]*u1 + A[3,1]*u2 + A[3,2]*u3 + A[3,3]*u4)*dt
y5 = yi + (A[3,0]*v1 + A[3,1]*v2 + A[3,2]*v3 + A[3,3]*v4)*dt
u5, v5 = self.interpolate(x=x5, y=y5, t=kwargs['t'])
x6 = xi + (A[4,0]*u1 + A[4,1]*u2 + A[4,2]*u3 + A[4,3]*u4 + A[4,4]*u5)*dt
y6 = yi + (A[4,0]*v1 + A[4,1]*v2 + A[4,2]*v3 + A[4,3]*v4 + A[4,4]*v5)*dt
u6, v6 = self.interpolate(x=x6, y=y6, t=kwargs['t'])
return (np.array([u1,u2,u3,u4,u5,u6]), np.array([v1,v2,v3,v4,v5,v6]))
[docs]
def buffered_bounds(self, buffer=5e4):
"""
Calculates the bounding box including a buffer distance
"""
xmin = np.floor(self.x.min()) - buffer
xmax = np.ceil(self.x.max()) + buffer
ymin = np.floor(self.y.min()) - buffer
ymax = np.ceil(self.y.max()) + buffer
return [[xmin,xmax], [ymin,ymax]]
@property
def distance(self):
"""
Calculates displacement between the start and end coordinates
Returns
-------
dist: np.ndarray
Eulerian distance between start and end points
"""
try:
dist = np.sqrt((self.x0 - self.x)**2 + (self.y0 - self.y)**2)
except Exception as exc:
return None
else:
return dist
# PURPOSE: check a specified 2D point is inside a specified 2D polygon
[docs]
def inside_polygon(self,
x: np.ndarray,
y: np.ndarray,
threshold: float = 0.01
):
"""
Indicates whether a specified 2D point is inside a specified 2D polygon
Parameters
----------
x: np.ndarray
x-coordinates to query
y: np.ndarray
y-coordinates to query
threshold: float, default 0.01
Minimum angle for checking if inside polygon
Returns
-------
flag: bool
Flag specifying if point is within polygon
- ``True`` for points within polygon,
- ``False`` for points outside polygon
"""
# create numpy arrays for 2D points
x = np.atleast_1d(x)
y = np.atleast_1d(y)
nn = len(x)
# get points for polygon
if (self.velocity.type == 'grid'):
# create polygon with the extents of the image
xmin,xmax,ymin,ymax = self.velocity.extent
xpts = np.array([xmin, xmax, xmax, xmin, xmin])
ypts = np.array([ymin, ymin, ymax, ymax, ymin])
elif (self.velocity.type == 'mesh'):
# create polygon with convex hull of points
points = np.c_[self.velocity.x, self.velocity.y]
hull = scipy.spatial.ConvexHull(points)
xpts = points[hull.vertices, 0]
ypts = points[hull.vertices, 1]
# check dimensions of polygon points
if (xpts.ndim != 1):
raise ValueError('X coordinates of polygon not a vector.')
if (ypts.ndim != 1):
raise ValueError('Y coordinates of polygon not a vector.')
if (len(xpts) != len(ypts)):
raise ValueError('Incompatible vector dimensions.')
# maximum possible number of vertices in polygon
N = len(xpts)
# Close the polygon if not already closed
if not np.isclose(xpts[-1],xpts[0]) and not np.isclose(ypts[-1],ypts[0]):
xpts = np.append(xpts,xpts[0])
ypts = np.append(ypts,ypts[0])
else:
# remove 1 from number of vertices
N -= 1
# Calculate dot and cross products of points to neighboring polygon points
i = np.arange(N)
X1 = np.dot(xpts[i][:,np.newaxis],np.ones((1,nn))) - \
np.dot(np.ones((N,1)),x[np.newaxis,:])
Y1 = np.dot(ypts[i][:,np.newaxis],np.ones((1,nn))) - \
np.dot(np.ones((N,1)),y[np.newaxis,:])
X2 = np.dot(xpts[i+1][:,np.newaxis],np.ones((1,nn))) - \
np.dot(np.ones((N,1)),x[np.newaxis,:])
Y2 = np.dot(ypts[i+1][:,np.newaxis],np.ones((1,nn))) - \
np.dot(np.ones((N,1)),y[np.newaxis,:])
# Dot-product
dp = X1*X2 + Y1*Y2
# Cross-product
cp = X1*Y2 - Y1*X2
# Calculate tangent of the angle between the two nearest adjacent points
theta = np.arctan2(cp,dp)
# If point is outside polygon then summation over all possible
# angles will equal a small number (e.g. 0.01)
flag = np.where(np.abs(np.sum(theta,axis=0)) > threshold, True, False)
# Make a scalar value if there was only one input value
if (nn == 1):
return flag[0]
else:
return flag
# PURPOSE: wrapper function for calling interpolation functions
[docs]
def interpolate(self, **kwargs):
"""
Wrapper function for calling interpolation functions
Returns
-------
U: np.ndarray
Velocity in x-direction
V: np.ndarray
Velocity in y-direction
"""
if (self.method == 'bilinear'):
return self.bilinear_interpolation(**kwargs)
elif (self.method == 'spline'):
return self.spline_interpolation(**kwargs)
elif (self.method in ('nearest','linear')):
return self.regular_grid_interpolation(**kwargs)
elif (self.method in ('nearestND','linearND')):
return self.unstructured_interpolation(**kwargs)
else:
raise ValueError('Invalid interpolation function')
# PURPOSE: use bilinear interpolation of velocities to coordinates
[docs]
def bilinear_interpolation(self, **kwargs):
"""
Bilinearly interpolate U and V velocities to coordinates
Parameters
----------
x: np.ndarray or NoneType, default None
x-coordinates
y: np.ndarray or NoneType, default None
y-coordinates
fill_value: float, default np.nan
Invalid value
Returns
-------
U: np.ndarray
Velocity in x-direction
V: np.ndarray
Velocity in y-direction
"""
# set default keyword options
kwargs.setdefault('x', None)
kwargs.setdefault('y', None)
kwargs.setdefault('fill_value', self.fill_value)
# output bilinear interpolated data
U = np.full_like(kwargs['x'], np.nan)
V = np.full_like(kwargs['x'], np.nan)
# only run interpolation if coordinates are finite
# and within the extents of the input velocity map
v, = np.nonzero(np.isfinite(kwargs['x']) & np.isfinite(kwargs['y']) &
self.inside_polygon(kwargs['x'],kwargs['y']))
# check that there are indice values (0 is falsy)
if not np.any(v) and not np.any(v == 0):
return (U, V)
# calculating indices for original grid
xmin,xmax,ymin,ymax = self.velocity.extent
ii = np.floor((kwargs['x'][v]-xmin)/self.velocity.spacing[0]).astype('i')
jj = np.floor((kwargs['y'][v]-ymin)/self.velocity.spacing[1]).astype('i')
# weight arrays
Wa = ((kwargs['x'][v]-self.velocity.x[ii])*(kwargs['y'][v]-self.velocity.y[jj]))
Wb = ((self.velocity.x[ii+1]-kwargs['x'][v])*(kwargs['y'][v]-self.velocity.y[jj]))
Wc = ((kwargs['x'][v]-self.velocity.x[ii])*(self.velocity.y[jj+1]-kwargs['y'][v]))
Wd = ((self.velocity.x[ii+1]-kwargs['x'][v])*(self.velocity.y[jj+1]-kwargs['y'][v]))
W = ((self.velocity.x[ii+1]-self.velocity.x[ii])*(self.velocity.y[jj+1]-self.velocity.y[jj]))
# data at indices
Ua,Va = (self.velocity.U[jj,ii], self.velocity.V[jj,ii])
Ub,Vb = (self.velocity.U[jj+1,ii], self.velocity.V[jj+1,ii])
Uc,Vc = (self.velocity.U[jj,ii+1], self.velocity.V[jj,ii+1])
Ud,Vd = (self.velocity.U[jj+1,ii+1],self.velocity.V[jj+1,ii+1])
# calculate bilinear interpolated data
U[v] = (Ua*Wa + Ub*Wb + Uc*Wc + Ud*Wd)/W
V[v] = (Va*Wa + Vb*Wb + Vc*Wc + Vd*Wd)/W
# replace invalid values with fill value
U = np.nan_to_num(U, nan=kwargs['fill_value'])
V = np.nan_to_num(V, nan=kwargs['fill_value'])
return (U, V)
# PURPOSE: use biharmonic splines to interpolate velocities to coordinates
[docs]
def spline_interpolation(self, **kwargs):
"""
Interpolate U and V velocities to coordinates using biharmonic splines
Parameters
----------
x: np.ndarray or NoneType, default None
x-coordinates
y: np.ndarray or NoneType, default None
y-coordinates
kx: int, default 1
degrees of the bivariate spline in x-direction
ky: int, default 1
degrees of the bivariate spline in y-direction
fill_value: float, default np.nan
Invalid value
Returns
-------
U: np.ndarray
Velocity in x-direction
V: np.ndarray
Velocity in y-direction
"""
# set default keyword options
kwargs.setdefault('x', None)
kwargs.setdefault('y', None)
kwargs.setdefault('kx', 1)
kwargs.setdefault('ky', 1)
kwargs.setdefault('fill_value', self.fill_value)
# output interpolated data
U = np.full_like(kwargs['x'], np.nan)
V = np.full_like(kwargs['x'], np.nan)
# only run interpolation if coordinates are finite
# and within the extents of the input velocity map
v, = np.nonzero(np.isfinite(kwargs['x']) & np.isfinite(kwargs['y']) &
self.inside_polygon(kwargs['x'],kwargs['y']))
# check that there are indice values (0 is falsy)
if not np.any(v) and not np.any(v == 0):
return (U, V)
# create mask for invalid values
indy,indx = np.nonzero((self.velocity.U == self.fill_value) |
(self.velocity.V == self.fill_value) |
(np.isnan(self.velocity.U)) | (np.isnan(self.velocity.V)))
self.velocity.U[indy,indx] = 0.0
self.velocity.V[indy,indx] = 0.0
mask = np.zeros((self.velocity.shape))
mask[indy,indx] = 1.0
# build spline interpolants for input grid
if not self.interpolant:
# use scipy bivariate splines to interpolate values
self.interpolant['U'] = scipy.interpolate.RectBivariateSpline(
self.velocity.x, self.velocity.y, self.velocity.U.T,
kx=kwargs['kx'], ky=kwargs['ky'])
self.interpolant['V'] = scipy.interpolate.RectBivariateSpline(
self.velocity.x, self.velocity.y, self.velocity.V.T,
kx=kwargs['kx'], ky=kwargs['ky'])
self.interpolant['mask'] = scipy.interpolate.RectBivariateSpline(
self.velocity.x, self.velocity.y, mask.T,
kx=kwargs['kx'], ky=kwargs['ky'])
# create mask for invalid values
invalid = np.ceil(self.interpolant['mask'].ev(
kwargs['x'][v],kwargs['y'][v])).astype(bool)
masked_values = np.where(invalid, np.nan, 0.0)
# calculate interpolated data
U[v] = self.interpolant['U'].ev(
kwargs['x'][v],kwargs['y'][v]) + masked_values
V[v] = self.interpolant['V'].ev(
kwargs['x'][v],kwargs['y'][v]) + masked_values
# replace invalid values with fill value
U = np.nan_to_num(U, nan=kwargs['fill_value'])
V = np.nan_to_num(V, nan=kwargs['fill_value'])
return (U, V)
# PURPOSE: use regular grid interpolation of velocities to coordinates
[docs]
def regular_grid_interpolation(self, **kwargs):
"""
Interpolate U and V velocities to coordinates using
regular grid interpolation
Can use time-variable ``U`` and ``V`` velocities
Parameters
----------
x: np.ndarray or NoneType, default None
x-coordinates
y: np.ndarray or NoneType, default None
y-coordinates
t: np.ndarray or NoneType, default None
time coordinates
method: str
Method of regular grid interpolation
- ``'nearest'``
- ``'linear'``
fill_value: float, default np.nan
Invalid value
Returns
-------
U: np.ndarray
Velocity in x-direction
V: np.ndarray
Velocity in y-direction
"""
# set default keyword options
kwargs.setdefault('x', None)
kwargs.setdefault('y', None)
kwargs.setdefault('t', None)
kwargs.setdefault('method', self.method)
kwargs.setdefault('fill_value', self.fill_value)
# output interpolated data
U = np.full_like(kwargs['x'], np.nan)
V = np.full_like(kwargs['x'], np.nan)
# only run interpolation if coordinates are finite
# and within the extents of the input velocity map
v, = np.nonzero(np.isfinite(kwargs['x']) & np.isfinite(kwargs['y']) &
self.inside_polygon(kwargs['x'],kwargs['y']))
# check that there are indice values (0 is falsy)
if not np.any(v) and not np.any(v == 0):
return (U, V)
# build regular grid interpolants for input grid
if not self.interpolant and (self.velocity.ndim == 3):
# use scipy regular grid to interpolate values for a given method
# will extrapolate velocities forward in time if outside range
self.interpolant['U'] = scipy.interpolate.RegularGridInterpolator(
(self.velocity.y, self.velocity.x, self.velocity.time),
self.velocity.U, method=kwargs['method'], bounds_error=False,
fill_value=None)
self.interpolant['V'] = scipy.interpolate.RegularGridInterpolator(
(self.velocity.y, self.velocity.x, self.velocity.time),
self.velocity.V, method=kwargs['method'], bounds_error=False,
fill_value=None)
elif not self.interpolant and (self.velocity.ndim == 2):
# use scipy regular grid to interpolate values for a given method
self.interpolant['U'] = scipy.interpolate.RegularGridInterpolator(
(self.velocity.y, self.velocity.x), self.velocity.U,
method=kwargs['method'], bounds_error=False, fill_value=None)
self.interpolant['V'] = scipy.interpolate.RegularGridInterpolator(
(self.velocity.y, self.velocity.x), self.velocity.V,
method=kwargs['method'], bounds_error=False, fill_value=None)
# calculate interpolated data
if (self.velocity.ndim == 3):
U[v] = self.interpolant['U'].__call__(
np.c_[kwargs['y'][v],kwargs['x'][v],kwargs['t'][v]])
V[v] = self.interpolant['V'].__call__(
np.c_[kwargs['y'][v],kwargs['x'][v],kwargs['t'][v]])
elif (self.velocity.ndim == 2):
U[v] = self.interpolant['U'].__call__(
np.c_[kwargs['y'][v],kwargs['x'][v]])
V[v] = self.interpolant['V'].__call__(
np.c_[kwargs['y'][v],kwargs['x'][v]])
# replace invalid values with fill value
U = np.nan_to_num(U, nan=kwargs['fill_value'])
V = np.nan_to_num(V, nan=kwargs['fill_value'])
return (U, V)
# PURPOSE: use unstructured interpolation of velocities to coordinates
[docs]
def unstructured_interpolation(self, **kwargs):
"""
Interpolate unstructured U and V velocities to coordinates using
N-dimensional interpolation functions
Parameters
----------
x: np.ndarray or NoneType, default None
x-coordinates
y: np.ndarray or NoneType, default None
y-coordinates
t: np.ndarray or NoneType, default None
time coordinates
method: str
Method of unstructured interpolation
- ``'nearestND'``
- ``'linearND'``
rescale: bool, default False
Rescale points to unit cube before performing interpolation
tree_options: dict or NoneType, default None
Options passed to underlying KDTree for ``nearestND``
fill_value: float, default np.nan
Invalid value
Returns
-------
U: np.ndarray
Velocity in x-direction
V: np.ndarray
Velocity in y-direction
"""
# set default keyword options
kwargs.setdefault('x', None)
kwargs.setdefault('y', None)
kwargs.setdefault('t', None)
kwargs.setdefault('method', None)
kwargs.setdefault('rescale', False)
kwargs.setdefault('tree_options', None)
kwargs.setdefault('fill_value', self.fill_value)
# output interpolated data
U = np.full_like(kwargs['x'], np.nan)
V = np.full_like(kwargs['x'], np.nan)
# only run interpolation if coordinates are finite
# and within the extents of the input velocity mesh
valid = np.ones_like(kwargs['x'], dtype=bool)
valid &= np.isfinite(kwargs['x'])
valid &= np.isfinite(kwargs['y'])
valid &= self.inside_polygon(kwargs['x'], kwargs['y'])
# check that there are indice values
if not np.any(valid):
return (U, V)
# build delaunay triangulations for input mesh coordinates
if not hasattr(self.velocity, 'mesh'):
# attempt to build delaunay triangulation
_, self.velocity.mesh = self.find_valid_triangulation(
self.velocity.x, self.velocity.y)
# reduce to points within the convex hull of the triangulation
valid &= self.inside_simplex(kwargs['x'], kwargs['y'])
v, = np.nonzero(valid)
# interpolator and keyword arguments for selected method
if (kwargs['method'] == 'nearestND'):
Interpolator = scipy.interpolate.NearestNDInterpolator
kwds = dict(rescale=kwargs['rescale'],
tree_options=kwargs['tree_options'])
elif (kwargs['method'] == 'linearND'):
Interpolator = scipy.interpolate.LinearNDInterpolator
kwds = dict(rescale=kwargs['rescale'], fill_value=np.nan)
else:
raise ValueError('Invalid interpolation function')
# build interpolants for input velocity meshes
if not self.interpolant:
self.interpolant['U'] = Interpolator(
self.velocity.mesh, self.velocity.U, **kwds)
self.interpolant['V'] = Interpolator(
self.velocity.mesh, self.velocity.V, **kwds)
if not self.interpolant and (self.velocity.ndim > 1):
# build interpolants for time-variable velocities
nt = len(self.velocity.time)
# create 1-d interpolant through time
self.interpolant['time'] = scipy.interpolate.interp1d(
self.velocity.time, np.arange(nt), kind='linear')
# evaluate at points
if (self.velocity.ndim == 1):
# evaluate using invariant velocities
U[v] = self.interpolant['U'].__call__(
np.c_[kwargs['x'][v], kwargs['y'][v]])
V[v] = self.interpolant['V'].__call__(
np.c_[kwargs['x'][v], kwargs['y'][v]])
else:
# evaluate using time-variable velocities
UT = self.interpolant['U'].__call__(
np.c_[kwargs['x'][v], kwargs['y'][v]])
VT = self.interpolant['V'].__call__(
np.c_[kwargs['x'][v], kwargs['y'][v]])
# linearly interpolate in time
times = np.ones_like(kwargs['x'])*kwargs['t']
indices = self.interpolant['time'].__call__(times[v]).astype(int)
# clip indices to valid range for temporal interpolation
indices = np.clip(indices, 0, nt-1)
for tstep in np.unique(indices):
# find valid points
vi, = np.nonzero((indices == tstep) & valid)
# check that there are indice values (0 is falsy)
if not np.any(vi) and not np.any(vi == 0):
continue
# calculate weights for linearly interpolating in time
weight = (times[vi] - self.velocity.time[tstep]) / \
(self.velocity.time[tstep+1] - self.velocity.time[tstep])
# linearly interpolate in time
U[vi] = (1.0 - weight)*UT[vi,tstep] + weight*UT[vi,tstep+1]
V[vi] = (1.0 - weight)*VT[vi,tstep] + weight*VT[vi,tstep+1]
# replace invalid values with fill value
U = np.nan_to_num(U, nan=kwargs['fill_value'])
V = np.nan_to_num(V, nan=kwargs['fill_value'])
return (U, V)
# PURPOSE: find a valid Delaunay triangulation for coordinates x0 and y0
# http://www.qhull.org/html/qhull.htm#options
# Attempt 1: standard qhull options Qt Qbb Qc Qz
# Attempt 2: rescale and center the inputs with option QbB
# Attempt 3: joggle the inputs to find a triangulation with option QJ
# if no passing triangulations: exit with empty list
[docs]
def find_valid_triangulation(self,
x: np.ndarray,
y: np.ndarray
):
"""
Attempt to find a valid Delaunay triangulation for coordinates
- Attempt 1: ``Qt Qbb Qc Qz``
- Attempt 2: ``Qt Qc QbB``
- Attempt 3: ``QJ QbB``
Parameters
----------
x: np.ndarray
x-coordinates for mesh
y: np.ndarray
y-coordinates for mesh
"""
# Attempt 1: try with standard options Qt Qbb Qc Qz
# Qt: triangulated output, all facets will be simplicial
# Qbb: scale last coordinate to [0,m] for Delaunay triangulations
# Qc: keep coplanar points with nearest facet
# Qz: add point-at-infinity to Delaunay triangulation
# Attempt 2 in case of qhull error from Attempt 1 try Qt Qc QbB
# Qt: triangulated output, all facets will be simplicial
# Qc: keep coplanar points with nearest facet
# QbB: scale input to unit cube centered at the origin
# Attempt 3 in case of qhull error from Attempt 2 try QJ QbB
# QJ: joggle input instead of merging facets
# QbB: scale input to unit cube centered at the origin
# try each set of qhull_options
for i,qhull_option in enumerate(['Qt Qbb Qc Qz','Qt Qc QbB','QJ QbB']):
try:
triangle = scipy.spatial.Delaunay(np.c_[x, y],
qhull_options=qhull_option)
except scipy.spatial.qhull.QhullError:
pass
else:
return (i+1, triangle)
# raise exception if still finding errors
raise scipy.spatial.qhull.QhullError
# PURPOSE: check a specified 2D point is inside the convex hull of a mesh
[docs]
def inside_simplex(self,
x: np.ndarray,
y: np.ndarray
):
"""
Indicates whether a specified 2D point is inside the convex hull of a mesh
Parameters
----------
x: np.ndarray
x-coordinates to query
y: np.ndarray
y-coordinates to query
Returns
-------
flag: bool
Flag specifying if point is within convex hull
- ``True`` for points within convex hull
- ``False`` for points outside convex hull
"""
# only run if velocity mesh has a find simplex attribute
if hasattr(self.velocity.mesh, 'find_simplex'):
return (self.velocity.mesh.find_simplex(np.c_[x, y]) >= 0)
else:
raise AttributeError("Convex hull cannot be found for mesh")
# PURPOSE: calculates the maximum angle within a triangle given the
# coordinates of the triangles vertices A(x,y), B(x,y), C(x,y)
[docs]
def triangle_maximum_angle(self,
Ax: np.ndarray,
Ay: np.ndarray,
Bx: np.ndarray,
By: np.ndarray,
Cx: np.ndarray,
Cy: np.ndarray
):
"""
Calculates the maximum angles within triangles with
vertices A, B and C
Parameters
----------
Ax: np.ndarray
x-coordinates of A vertices
Ay: np.ndarray
y-coordinates of A vertices
Bx: np.ndarray
x-coordinates of B vertices
By: np.ndarray
y-coordinates of B vertices
Cx: np.ndarray
x-coordinates of C vertices
Cy: np.ndarray
y-coordinates of C vertices
"""
# calculate sides of triangle (opposite interior angle at vertex)
a = np.atleast_1d(np.sqrt((Cx - Bx)**2 + (Cy - By)**2))
b = np.atleast_1d(np.sqrt((Cx - Ax)**2 + (Cy - Ay)**2))
c = np.atleast_1d(np.sqrt((Ax - Bx)**2 + (Ay - By)**2))
# calculate interior angles and convert to degrees
alpha = np.arccos((b**2 + c**2 - a**2)/(2.0*b*c))*180.0/np.pi
beta = np.arccos((a**2 + c**2 - b**2)/(2.0*a*c))*180.0/np.pi
gamma = np.arccos((a**2 + b**2 - c**2)/(2.0*a*b))*180.0/np.pi
# return the largest angle within the triangle
return np.max(np.c_[alpha, beta, gamma], axis=1)
[docs]
def imshow(self, band=None, ax=None, imtype='speed', xy_scale=1.0, **kwargs):
"""
Create plot of velocity magnitude or divergence
Parameters
----------
band: int or NoneType, default None
band of velocity grid to show
ax: obj or NoneType, default None
matplotlib figure axis
imtype: str, default 'speed'
image type to plot
- ``'speed'``: velocity magnitude
- ``'divergence'``: flow divergence
xy_scale: float, default 1.0
Scaling factor for converting horizontal coordinates
**kwargs: dict
Keyword arguments for ``plt.imshow``
Returns
-------
im: obj
matplotlib ``AxesImage`` object
"""
kwargs['extent'] = np.array(self.velocity.extent)*xy_scale
kwargs['origin'] = 'lower'
if ax is None:
ax = plt.gca()
if band is None:
U = getattr(self.velocity, 'U')
V = getattr(self.velocity, 'V')
elif (band is not None):
U = getattr(self.velocity, 'U')[:,:,band]
V = getattr(self.velocity, 'V')[:,:,band]
# calculate ice speed or flow divergence
if (imtype == 'speed'):
# calculate speed
zz = np.sqrt(U**2 + V**2)
elif (imtype == 'divergence'):
# calculate divergence
dU = np.gradient(U, self.velocity.x, axis=1)
dV = np.gradient(V, self.velocity.y, axis=0)
zz = dU + dV
# create image plot of velocity magnitude or divergence
im = ax.imshow(zz, **kwargs)
# return the image
return im
[docs]
def triplot(self, ax=None, **kwargs):
"""
Create plot of unstructured triangular mesh
Parameters
----------
ax: obj or NoneType, default None
matplotlib figure axis
**kwargs: dict
Keyword arguments for ``plt.triplot``
Returns
-------
im: obj
matplotlib ``AxesImage`` object
"""
if ax is None:
ax = plt.gca()
# get delaunay triangulation
if not hasattr(self.velocity, 'mesh'):
# attempt to build delaunay triangulation
_, self.velocity.mesh = self.find_valid_triangulation(
self.velocity.x, self.velocity.y)
# build matplotlib triangulation object
triangle = mtri.Triangulation(self.velocity.x, self.velocity.y,
self.velocity.mesh.vertices)
# create triangle plot of velocity magnitude
tri = ax.triplot(triangle, **kwargs)
# return the triangle plot
return tri
[docs]
def tricontourf(self, band=None, ax=None, **kwargs):
"""
Create plot of velocity magnitude for unstructured meshes
Parameters
----------
band: int or NoneType, default None
band of velocity mesh to show
ax: obj or NoneType, default None
matplotlib figure axis
**kwargs: dict
Keyword arguments for ``plt.tricontourf``
Returns
-------
im: obj
matplotlib ``AxesImage`` object
"""
if ax is None:
ax = plt.gca()
if band is None:
U = getattr(self.velocity, 'U')
V = getattr(self.velocity, 'V')
elif (band is not None):
U = getattr(self.velocity, 'U')[:,band]
V = getattr(self.velocity, 'V')[:,band]
# calculate speed
zz = np.sqrt(U**2 + V**2)
# get delaunay triangulation
if not hasattr(self.velocity, 'mesh'):
# attempt to build delaunay triangulation
_, self.velocity.mesh = self.find_valid_triangulation(
self.velocity.x, self.velocity.y)
# build matplotlib triangulation object
triangle = mtri.Triangulation(self.velocity.x, self.velocity.y,
self.velocity.mesh.vertices)
# create triangle plot of velocity magnitude
tri = ax.tricontourf(triangle, zz, **kwargs)
# return the triangle plot
return tri
[docs]
def streamplot(self, band=None, ax=None, xy_scale=1.0, density=[0.5, 0.5], color='0.3', **kwargs):
"""
Create stream plot of velocity vectors
Parameters
----------
band: int or NoneType, default None
band of velocity grid to show
ax: obj or NoneType, default None
matplotlib figure axis
xy_scale: float, default 1.0
Scaling factor for converting horizontal coordinates
density: float, default [0.5, 0.5]
Closeness of streamlines
color: str, default '0.3'
Streamline color
**kwargs: dict
Keyword arguments for ``plt.streamplot``
Returns
-------
sp: obj
matplotlib ``StreamplotSet`` object
"""
if ax is None:
ax = plt.gca()
if band is None:
U = getattr(self.velocity, 'U')
V = getattr(self.velocity, 'V')
elif (band is not None):
U = getattr(self.velocity, 'U')[:,:,band]
V = getattr(self.velocity, 'V')[:,:,band]
# add stream plot of velocity vectors
gridx,gridy = np.meshgrid(self.velocity.x*xy_scale, self.velocity.y*xy_scale)
sp = ax.streamplot(gridx, gridy, U, V, density=density, color=color, **kwargs)
return sp
[docs]
def copy(self):
"""
Returns a copy of the object
"""
return copy.deepcopy(self)
@property
def _to_sec(self):
"""
Converts time to seconds
"""
return timescale.time._to_sec[self.time_units]
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)