#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (09/2024)
Download and management utilities for syncing time and auxiliary files
PYTHON DEPENDENCIES:
lxml: processing XML and HTML in Python
https://pypi.python.org/pypi/lxml
UPDATE HISTORY:
Updated 09/2024: add wrapper to importlib for optional dependencies
Updated 08/2024: generalize hash function to use any available algorithm
Updated 11/2023: updated ssl context to fix deprecation error
Updated 05/2023: using pathlib to define and expand paths
add basic variable typing to function inputs
updated SSL context to fix some deprecation warnings
Written 12/2022
"""
from __future__ import print_function, division, annotations
import sys
import re
import io
import ssl
import inspect
import hashlib
import logging
import pathlib
import warnings
import importlib
import posixpath
import subprocess
import lxml.etree
import calendar, time
import dateutil.parser
if sys.version_info[0] == 2:
import urllib2
else:
import urllib.request as urllib2
# PURPOSE: get absolute path within a package from a relative path
[docs]
def get_data_path(relpath: list | str | pathlib.Path):
"""
Get the absolute path within a package from a relative path
Parameters
----------
relpath: list, str or pathlib.Path
relative path
"""
# current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
filepath = pathlib.Path(filename).absolute().parent
if isinstance(relpath, list):
# use *splat operator to extract from list
return filepath.joinpath(*relpath)
elif isinstance(relpath, (str, pathlib.Path)):
return filepath.joinpath(relpath)
def import_dependency(
name: str,
extra: str = "",
raise_exception: bool = False
):
"""
Import an optional dependency
Adapted from ``pandas.compat._optional::import_optional_dependency``
Parameters
----------
name: str
Module name
extra: str, default ""
Additional text to include in the ``ImportError`` message
raise_exception: bool, default False
Raise an ``ImportError`` if the module is not found
Returns
-------
module: obj
Imported module
"""
# check if the module name is a string
msg = f"Invalid module name: '{name}'; must be a string"
assert isinstance(name, str), msg
# default error if module cannot be imported
err = f"Missing optional dependency '{name}'. {extra}"
module = type('module', (), {})
# try to import the module
try:
module = importlib.import_module(name)
except (ImportError, ModuleNotFoundError) as exc:
if raise_exception:
raise ImportError(err) from exc
else:
logging.debug(err)
# return the module
return module
# PURPOSE: get the hash value of a file
[docs]
def get_hash(
local: str | io.IOBase | pathlib.Path,
algorithm: str = 'md5'
):
"""
Get the hash value from a local file or ``BytesIO`` object
Parameters
----------
local: obj, str or pathlib.Path
BytesIO object or path to file
algorithm: str, default 'md5'
hashing algorithm for checksum validation
"""
# check if open file object or if local file exists
if isinstance(local, io.IOBase):
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local.getvalue()).hexdigest()
else:
raise ValueError(f'Invalid hashing algorithm: {algorithm}')
elif isinstance(local, (str, pathlib.Path)):
# generate checksum hash for local file
local = pathlib.Path(local).expanduser()
# if file currently doesn't exist, return empty string
if not local.exists():
return ''
# open the local_file in binary read mode
with local.open(mode='rb') as local_buffer:
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local_buffer.read()).hexdigest()
else:
raise ValueError(f'Invalid hashing algorithm: {algorithm}')
else:
return ''
# PURPOSE: get the git hash value
[docs]
def get_git_revision_hash(
refname: str = 'HEAD',
short: bool = False
):
"""
Get the ``git`` hash value for a particular reference
Parameters
----------
refname: str, default HEAD
Symbolic reference name
short: bool, default False
Return the shorted hash value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'rev-parse']
cmd.append('--short') if short else None
cmd.append(refname)
# get output
with warnings.catch_warnings():
return str(subprocess.check_output(cmd), encoding='utf8').strip()
# PURPOSE: get the current git status
[docs]
def get_git_status():
"""Get the status of a ``git`` repository as a boolean value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain']
with warnings.catch_warnings():
return bool(subprocess.check_output(cmd))
# PURPOSE: recursively split a url path
[docs]
def url_split(s: str):
"""
Recursively split a url path into a list
Parameters
----------
s: str
url string
"""
head, tail = posixpath.split(s)
if head in ('http:','https:','ftp:','s3:'):
return s,
elif head in ('', posixpath.sep):
return tail,
return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments
def convert_arg_line_to_args(arg_line):
"""
Convert file lines to arguments
Parameters
----------
arg_line: str
line string containing a single argument and/or comments
"""
# remove commented lines and after argument comments
for arg in re.sub(r'\#(.*?)$',r'',arg_line).split():
if not arg.strip():
continue
yield arg
# PURPOSE: returns the Unix timestamp value for a formatted date string
[docs]
def get_unix_time(
time_string: str,
format: str = '%Y-%m-%d %H:%M:%S'
):
"""
Get the Unix timestamp value for a formatted date string
Parameters
----------
time_string: str
formatted time string to parse
format: str, default '%Y-%m-%d %H:%M:%S'
format for input time string
"""
try:
parsed_time = time.strptime(time_string.rstrip(), format)
except (TypeError, ValueError):
pass
else:
return calendar.timegm(parsed_time)
# try parsing with dateutil
try:
parsed_time = dateutil.parser.parse(time_string.rstrip())
except (TypeError, ValueError):
return None
else:
return parsed_time.timestamp()
# PURPOSE: output a time string in isoformat
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default SSL context
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an SSL context for unverified connections
"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the SSL context
"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: check internet connection
[docs]
def check_connection(HOST: str, context=_default_ssl_context):
"""
Check internet connection with http host
Parameters
----------
HOST: str
remote http host
context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS)
SSL context for ``urllib`` opener object
"""
# attempt to connect to http host
try:
urllib2.urlopen(HOST, timeout=20, context=context)
except urllib2.URLError as exc:
raise RuntimeError('Check internet connection') from exc
else:
return True
# PURPOSE: list a directory on DLR geoservice https Server
[docs]
def geoservice_list(
HOST: str | list,
timeout: int | None = None,
context = _default_ssl_context,
parser = lxml.etree.HTMLParser(),
format: str = '%Y-%m-%d %H:%M:%S',
pattern: str = '',
sort: bool = False
):
"""
List a directory on DLR geoservice https Server
Parameters
----------
HOST: str or list
remote http host path
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS)
SSL context for ``urllib`` opener object
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
format: str, default '%Y-%m-%d %H:%M'
format for input time string
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout, context=context)
except (urllib2.HTTPError, urllib2.URLError):
raise Exception('List error from {0}'.format(posixpath.join(*HOST)))
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response, parser)
colnames = tree.xpath('//tr/td[@class="link"]/a/@href')
# get the Unix timestamp value for a modification time
collastmod = [get_unix_time(i, format=format)
for i in tree.xpath('//tr/td[@class="date"]/text()')]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(colnames) if re.search(pattern, f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames, collastmod)