Source code for pointAdvection.utilities

#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (09/2024)
Download and management utilities for syncing time and auxiliary files

PYTHON DEPENDENCIES:
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml

UPDATE HISTORY:
    Updated 09/2024: add wrapper to importlib for optional dependencies
    Updated 08/2024: generalize hash function to use any available algorithm
    Updated 11/2023: updated ssl context to fix deprecation error
    Updated 05/2023: using pathlib to define and expand paths
        add basic variable typing to function inputs
        updated SSL context to fix some deprecation warnings
    Written 12/2022
"""
from __future__ import print_function, division, annotations

import sys
import re
import io
import ssl
import inspect
import hashlib
import logging
import pathlib
import warnings
import importlib
import posixpath
import subprocess
import lxml.etree
import calendar, time
import dateutil.parser
if sys.version_info[0] == 2:
    import urllib2
else:
    import urllib.request as urllib2

# PURPOSE: get absolute path within a package from a relative path
[docs] def get_data_path(relpath: list | str | pathlib.Path): """ Get the absolute path within a package from a relative path Parameters ---------- relpath: list, str or pathlib.Path relative path """ # current file path filename = inspect.getframeinfo(inspect.currentframe()).filename filepath = pathlib.Path(filename).absolute().parent if isinstance(relpath, list): # use *splat operator to extract from list return filepath.joinpath(*relpath) elif isinstance(relpath, (str, pathlib.Path)): return filepath.joinpath(relpath)
def import_dependency( name: str, extra: str = "", raise_exception: bool = False ): """ Import an optional dependency Adapted from ``pandas.compat._optional::import_optional_dependency`` Parameters ---------- name: str Module name extra: str, default "" Additional text to include in the ``ImportError`` message raise_exception: bool, default False Raise an ``ImportError`` if the module is not found Returns ------- module: obj Imported module """ # check if the module name is a string msg = f"Invalid module name: '{name}'; must be a string" assert isinstance(name, str), msg # default error if module cannot be imported err = f"Missing optional dependency '{name}'. {extra}" module = type('module', (), {}) # try to import the module try: module = importlib.import_module(name) except (ImportError, ModuleNotFoundError) as exc: if raise_exception: raise ImportError(err) from exc else: logging.debug(err) # return the module return module # PURPOSE: get the hash value of a file
[docs] def get_hash( local: str | io.IOBase | pathlib.Path, algorithm: str = 'md5' ): """ Get the hash value from a local file or ``BytesIO`` object Parameters ---------- local: obj, str or pathlib.Path BytesIO object or path to file algorithm: str, default 'md5' hashing algorithm for checksum validation """ # check if open file object or if local file exists if isinstance(local, io.IOBase): # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: return hashlib.new(algorithm, local.getvalue()).hexdigest() else: raise ValueError(f'Invalid hashing algorithm: {algorithm}') elif isinstance(local, (str, pathlib.Path)): # generate checksum hash for local file local = pathlib.Path(local).expanduser() # if file currently doesn't exist, return empty string if not local.exists(): return '' # open the local_file in binary read mode with local.open(mode='rb') as local_buffer: # generate checksum hash for a given type if algorithm in hashlib.algorithms_available: return hashlib.new(algorithm, local_buffer.read()).hexdigest() else: raise ValueError(f'Invalid hashing algorithm: {algorithm}') else: return ''
# PURPOSE: get the git hash value
[docs] def get_git_revision_hash( refname: str = 'HEAD', short: bool = False ): """ Get the ``git`` hash value for a particular reference Parameters ---------- refname: str, default HEAD Symbolic reference name short: bool, default False Return the shorted hash value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath('.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'rev-parse'] cmd.append('--short') if short else None cmd.append(refname) # get output with warnings.catch_warnings(): return str(subprocess.check_output(cmd), encoding='utf8').strip()
# PURPOSE: get the current git status
[docs] def get_git_status(): """Get the status of a ``git`` repository as a boolean value """ # get path to .git directory from current file path filename = inspect.getframeinfo(inspect.currentframe()).filename basepath = pathlib.Path(filename).absolute().parent.parent gitpath = basepath.joinpath('.git') # build command cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain'] with warnings.catch_warnings(): return bool(subprocess.check_output(cmd))
# PURPOSE: recursively split a url path
[docs] def url_split(s: str): """ Recursively split a url path into a list Parameters ---------- s: str url string """ head, tail = posixpath.split(s) if head in ('http:','https:','ftp:','s3:'): return s, elif head in ('', posixpath.sep): return tail, return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments def convert_arg_line_to_args(arg_line): """ Convert file lines to arguments Parameters ---------- arg_line: str line string containing a single argument and/or comments """ # remove commented lines and after argument comments for arg in re.sub(r'\#(.*?)$',r'',arg_line).split(): if not arg.strip(): continue yield arg # PURPOSE: returns the Unix timestamp value for a formatted date string
[docs] def get_unix_time( time_string: str, format: str = '%Y-%m-%d %H:%M:%S' ): """ Get the Unix timestamp value for a formatted date string Parameters ---------- time_string: str formatted time string to parse format: str, default '%Y-%m-%d %H:%M:%S' format for input time string """ try: parsed_time = time.strptime(time_string.rstrip(), format) except (TypeError, ValueError): pass else: return calendar.timegm(parsed_time) # try parsing with dateutil try: parsed_time = dateutil.parser.parse(time_string.rstrip()) except (TypeError, ValueError): return None else: return parsed_time.timestamp()
# PURPOSE: output a time string in isoformat
[docs] def isoformat(time_string: str): """ Reformat a date string to ISO formatting Parameters ---------- time_string: str formatted time string to parse """ # try parsing with dateutil try: parsed_time = dateutil.parser.parse(time_string.rstrip()) except (TypeError, ValueError): return None else: return parsed_time.isoformat()
[docs] def _create_default_ssl_context() -> ssl.SSLContext: """Creates the default SSL context """ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) _set_ssl_context_options(context) context.options |= ssl.OP_NO_COMPRESSION return context
[docs] def _create_ssl_context_no_verify() -> ssl.SSLContext: """Creates an SSL context for unverified connections """ context = _create_default_ssl_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context
[docs] def _set_ssl_context_options(context: ssl.SSLContext) -> None: """Sets the default options for the SSL context """ if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7): context.minimum_version = ssl.TLSVersion.TLSv1_2 else: context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_TLSv1 context.options |= ssl.OP_NO_TLSv1_1
# default ssl context _default_ssl_context = _create_ssl_context_no_verify() # PURPOSE: check internet connection
[docs] def check_connection(HOST: str, context=_default_ssl_context): """ Check internet connection with http host Parameters ---------- HOST: str remote http host context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS) SSL context for ``urllib`` opener object """ # attempt to connect to http host try: urllib2.urlopen(HOST, timeout=20, context=context) except urllib2.URLError as exc: raise RuntimeError('Check internet connection') from exc else: return True
# PURPOSE: list a directory on DLR geoservice https Server
[docs] def geoservice_list( HOST: str | list, timeout: int | None = None, context = _default_ssl_context, parser = lxml.etree.HTMLParser(), format: str = '%Y-%m-%d %H:%M:%S', pattern: str = '', sort: bool = False ): """ List a directory on DLR geoservice https Server Parameters ---------- HOST: str or list remote http host path timeout: int or NoneType, default None timeout in seconds for blocking operations context: obj, default ssl.SSLContext(ssl.PROTOCOL_TLS) SSL context for ``urllib`` opener object parser: obj, default lxml.etree.HTMLParser() HTML parser for ``lxml`` format: str, default '%Y-%m-%d %H:%M' format for input time string pattern: str, default '' regular expression pattern for reducing list sort: bool, default False sort output list Returns ------- colnames: list column names in a directory collastmod: list last modification times for items in the directory """ # verify inputs for remote http host if isinstance(HOST, str): HOST = url_split(HOST) # try listing from http try: # Create and submit request. request = urllib2.Request(posixpath.join(*HOST)) response = urllib2.urlopen(request, timeout=timeout, context=context) except (urllib2.HTTPError, urllib2.URLError): raise Exception('List error from {0}'.format(posixpath.join(*HOST))) else: # read and parse request for files (column names and modified times) tree = lxml.etree.parse(response, parser) colnames = tree.xpath('//tr/td[@class="link"]/a/@href') # get the Unix timestamp value for a modification time collastmod = [get_unix_time(i, format=format) for i in tree.xpath('//tr/td[@class="date"]/text()')] # reduce using regular expression pattern if pattern: i = [i for i,f in enumerate(colnames) if re.search(pattern, f)] # reduce list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # sort the list if sort: i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])] # sort list of column names and last modified times colnames = [colnames[indice] for indice in i] collastmod = [collastmod[indice] for indice in i] # return the list of column names and last modified times return (colnames, collastmod)