Source code for ci_watson.artifactory_helpers

"""
Helpers for Artifactory or local big data handling.
"""
import copy
from datetime import datetime
import json
import os
import re
import shutil
import sys
import time
from difflib import unified_diff
from io import StringIO

try:
    from astropy.io import fits
    from astropy.io.fits import FITSDiff, HDUDiff
    from astropy.utils.introspection import minversion
    HAS_ASTROPY = True
except ImportError:
    HAS_ASTROPY = False

if HAS_ASTROPY and minversion('astropy', '3.1'):
    ASTROPY_LT_3_1 = False
else:
    ASTROPY_LT_3_1 = True

__all__ = ['BigdataError', 'check_url', 'get_bigdata_root', 'get_bigdata',
           'compare_outputs', 'generate_upload_params',
           'generate_upload_schema']

RE_URL = re.compile(r"\w+://\S+")

UPLOAD_SCHEMA = {"files": [
                    {"pattern": "",
                     "target": "",
                     "props": None,
                     "recursive": "false",
                     "flat": "true",
                     "regexp": "false",
                     "explode": "false",
                     "excludePatterns": []}]}

TODAYS_DATE = datetime.now().strftime("%Y-%m-%d")
TIMEOUT = int(os.environ.get("TEST_BIGDATA_TIMEOUT", 30))
CHUNK_SIZE = int(os.environ.get("TEST_BIGDATA_CHUNK_SIZE", 16384))
RETRY_MAX = int(os.environ.get("TEST_BIGDATA_RETRY_MAX", 3))
RETRY_DELAY = int(os.environ.get("TEST_BIGDATA_RETRY_DELAY", 5))

# Negative value disables timeout (i.e. hang forever)
if TIMEOUT < 0:
    TIMEOUT = None
# Timeout length cannot be zero
elif not TIMEOUT:
    TIMEOUT = 1

# Prevent chunks from being smaller than the usual physical block size
if CHUNK_SIZE < 512:
    CHUNK_SIZE = 512

# Prevent infinite retry loops
if RETRY_MAX < 0:
    RETRY_MAX = 0

# Prevent infinite retry wait
if RETRY_DELAY < 0:
    RETRY_DELAY = 0


[docs] class BigdataError(Exception): """Exception related to big data access.""" pass
def retry(retries=RETRY_MAX, delay=RETRY_DELAY, trap=(Exception,)): """Execute a function again on error Parameters ---------- retries: int Maximum number of attempts delay: int, float, None Maximum time to wait per attempt (seconds) trap: tuple of type Exception Type of exceptions to trap. Untrapped exceptions raise normally. Default: `Exception` (all exceptions) """ def decorator(fn): def wrapper(*args, **kwargs): retry = 0 while retry < retries: try: return fn(*args, **kwargs) except trap as e: print("{}: {}: will try again in {} second(s) " "[attempt: {} of {}]".format( fn, e, delay, retry + 1, retries), file=sys.stderr) retry += 1 time.sleep(delay) return fn(*args, **kwargs) return wrapper return decorator
[docs] @retry() def check_url(url, timeout=TIMEOUT): """Determine if URL can be resolved without error.""" if RE_URL.match(url) is None: return False # Optional import: requests is not needed for local big data setup. import requests # requests.head does not work with Artifactory landing page. r = requests.get(url, allow_redirects=True, timeout=timeout) # TODO: Can we simply return r.ok here? if r.status_code >= 400: return False return True
@retry() def _download(url, dest, timeout=TIMEOUT, chunk_size=CHUNK_SIZE): """Simple HTTP/HTTPS downloader.""" # Optional import: requests is not needed for local big data setup. import requests dest = os.path.abspath(dest) with requests.get(url, stream=True, timeout=timeout) as r: with open(dest, 'w+b') as data: for chunk in r.iter_content(chunk_size=chunk_size): data.write(chunk) return dest
[docs] def get_bigdata_root(envkey='TEST_BIGDATA'): """ Find and returns the path to the nearest big datasets. Parameters ---------- envkey : str Environment variable name. It must contain a string defining the root Artifactory URL or path to local big data storage. """ if envkey not in os.environ: raise BigdataError( 'Environment variable {} is undefined'.format(envkey)) path = os.environ[envkey] if os.path.exists(path) or check_url(path): return path return None
[docs] def get_bigdata(*args, docopy=True, timeout=TIMEOUT, chunk_size=CHUNK_SIZE): """ Acquire requested data from a managed resource to the current directory. Parameters ---------- args : tuple of str Location of file relative to ``TEST_BIGDATA``. docopy : bool Switch to control whether or not to copy a file into the test output directory when running the test. If you wish to open the file directly from remote location or just to see path to source, set this to `False`. Default: `True` Returns ------- dest : str Absolute path to local copy of data (i.e., ``/path/to/example.fits``). Examples -------- >>> import os >>> print(os.getcwd()) /path/to >>> from ci_watson.artifactory_helpers import get_bigdata >>> filename = get_bigdata('abc', '123', 'example.fits') >>> print(filename) /path/to/example.fits >>> get_bigdata('abc', '123', 'example.fits', docopy=False) /remote/root/abc/123/example.fits """ src = os.path.join(get_bigdata_root(), *args) src_exists = os.path.exists(src) src_is_url = check_url(src) # No-op if not docopy: if src_exists or src_is_url: return os.path.abspath(src) else: raise BigdataError('Failed to find data: {}'.format(src)) filename = os.path.basename(src) dest = os.path.abspath(os.path.join(os.curdir, filename)) if src_exists: # Found src file on locally accessible directory if src == dest: # pragma: no cover raise BigdataError('Source and destination paths are identical: ' '{}'.format(src)) shutil.copy2(src, dest) elif src_is_url: _download(src, dest, timeout, chunk_size) else: raise BigdataError('Failed to retrieve data: {}'.format(src)) return dest
[docs] def compare_outputs(outputs, raise_error=True, ignore_keywords=[], ignore_hdus=[], ignore_fields=[], rtol=0.0, atol=0.0, input_path=[], docopy=True, results_root=None, verbose=True): """ Compare output with "truth" using appropriate diff routine; namely: * ``fitsdiff`` for FITS file comparisons. * ``unified_diff`` for ASCII products. Only after all elements of ``outputs`` have been processed will the method report any success or failure, with failure of any one comparison *not* preventing the rest of the comparisons to be performed. Parameters ---------- outputs : list of tuple or dict This list defines what outputs from running the test will be compared. Three distinct types of values as list elements are supported: * 2-tuple : ``(test output filename, truth filename)`` * 3-tuple : ``(test output filename, truth filename, HDU names)`` * dict : ``{'files': (output, truth), 'pars': {key: val}}`` If filename contains extension such as ``[hdrtab]``, it will be interpreted as specifying comparison of just that HDU. raise_error : bool Raise ``AssertionError`` if difference is found. ignore_keywords : list of str List of FITS header keywords to be ignored by ``FITSDiff`` and ``HDUDiff``. ignore_hdus : list of str List of FITS HDU names to ignore by ``FITSDiff``. This is only available for ``astropy>=3.1``. ignore_fields : list of str List FITS table column names to be ignored by ``FITSDiff`` and ``HDUDiff``. rtol, atol : float Relative and absolute tolerance to be used by ``FITSDiff`` and ``HDUDiff``. input_path : list or tuple A series of sub-directory names under :func:`get_bigdata_root` that leads to the path of the 'truth' files to be compared against. If not provided, it assumes that 'truth' is in the working directory. For example, with :func:`get_bigdata_root` pointing to ``/grp/test_data``, a file at:: /grp/test_data/pipeline/dev/ins/test_1/test_a.py would require ``input_path`` of:: ["pipeline", "dev", "ins", "test_1"] docopy : bool If `True`, 'truth' will be copied to output directory before comparison is done. results_root : str or `None` If not `None`, for every failed comparison, the test output is automatically renamed to the given 'truth' in the output directory and :func:`generate_upload_schema` will be called to generate a JSON scheme for Artifactory upload. If you do not need this functionality, use ``results_root=None``. verbose : bool Print extra info to screen. Returns ------- creature_report : str Report from FITS or ASCII comparator. This is part of error message if ``raise_error=True``. Examples -------- There are multiple use cases for this method, specifically related to how ``outputs`` are defined upon calling this method. The specification of the ``outputs`` can be any combination of the following patterns: 1. 2-tuple inputs:: outputs = [('file1.fits', 'file1_truth.fits')] This definition indicates that ``file1.fits`` should be compared as a whole with ``file1_truth.fits``. 2. 2-tuple inputs with extensions:: outputs = [('file1.fits[hdrtab]', 'file1_truth.fits[hdrtab]')] This definition indicates that only the HDRTAB extension from ``file1.fits`` will be compared to the HDRTAB extension from ``file1_truth.fits``. 3. 3-tuple inputs:: outputs = [('file1.fits', 'file1_truth.fits', ['primary', 'sci'])] This definition indicates that only the PRIMARY and SCI extensions should be compared between the two files. This creates a temporary ``HDUList`` object comprising only the given extensions for comparison. 4. Dictionary of inputs and parameters:: outputs = [{'files': ('file1.fits', 'file1_truth.fits'), 'pars': {'ignore_keywords': ['ROOTNAME']}}] This definition indicates that ROOTNAME will be ignored during the comparison between the files specified in ``'files'``. Any input parameter for ``FITSDiff`` or ``HDUDiff`` can be specified as part of the ``'pars'`` dictionary. In addition, the input files listed in ``'files'`` can also include an extension specification, such as ``[hdrtab]``, to limit the comparison to just that extension. This example from an actual test definition demonstrates how multiple input defintions can be used at the same time:: outputs = [ ('jw99999_nircam_f140m-maskbar_psfstack.fits', 'jw99999_nircam_f140m-maskbar_psfstack_ref.fits' ), ('jw9999947001_02102_00002_nrcb3_a3001_crfints.fits', 'jw9999947001_02102_00002_nrcb3_a3001_crfints_ref.fits' ), {'files': ('jw99999_nircam_f140m-maskbar_i2d.fits', 'jw99999_nircam_f140m-maskbar_i2d_ref.fits'), 'pars': {'ignore_hdus': ['HDRTAB']}, {'files': ('jw99999_nircam_f140m-maskbar_i2d.fits', 'jw99999_nircam_f140m-maskbar_i2d_ref.fits', ['primary','sci','dq']), 'pars': {'rtol': 0.000001} }, {'files': ('jw99999_nircam_f140m-maskbar_i2d.fits[hdrtab]', 'jw99999_nircam_f140m-maskbar_i2d_ref.fits[hdrtab]'), 'pars': {'ignore_keywords': ['NAXIS1', 'TFORM*'], 'ignore_fields': ['COL1', 'COL2']} }] .. note:: Each ``outputs`` entry in the list gets interpreted and processed separately. """ if ASTROPY_LT_3_1: if len(ignore_hdus) > 0: # pragma: no cover raise ValueError('ignore_hdus cannot be used for astropy<3.1') default_kwargs = {'rtol': rtol, 'atol': atol, 'ignore_keywords': ignore_keywords, 'ignore_fields': ignore_fields} else: default_kwargs = {'rtol': rtol, 'atol': atol, 'ignore_keywords': ignore_keywords, 'ignore_fields': ignore_fields, 'ignore_hdus': ignore_hdus} all_okay = True creature_report = '' updated_outputs = [] # To track outputs for Artifactory JSON schema for entry in outputs: diff_kwargs = copy.deepcopy(default_kwargs) extn_list = None num_entries = len(entry) if isinstance(entry, dict): entry_files = entry['files'] actual = entry_files[0] desired = entry_files[1] if len(entry_files) > 2: extn_list = entry_files[2] diff_kwargs.update(entry.get('pars', {})) elif num_entries == 2: actual, desired = entry elif num_entries == 3: actual, desired, extn_list = entry else: all_okay = False creature_report += '\nERROR: Cannot handle entry {}\n'.format( entry) continue # TODO: Use regex? if actual.endswith(']'): if extn_list is not None: all_okay = False creature_report += ( '\nERROR: Ambiguous extension requirements ' 'for {} ({})\n'.format(actual, extn_list)) continue actual_name, actual_extn = actual.split('[') actual_extn = actual_extn.replace(']', '') else: actual_name = actual actual_extn = None if desired.endswith(']'): if extn_list is not None: all_okay = False creature_report += ( '\nERROR: Ambiguous extension requirements ' 'for {} ({})\n'.format(desired, extn_list)) continue desired_name, desired_extn = desired.split('[') desired_extn = desired_extn.replace(']', '') else: desired_name = desired desired_extn = None # Get "truth" image try: desired = get_bigdata(*input_path, desired_name, docopy=docopy) except BigdataError: all_okay = False creature_report += '\nERROR: Cannot find {} in {}\n'.format( desired_name, input_path) continue if desired_extn is not None: desired_name = desired desired = "{}[{}]".format(desired, desired_extn) if verbose: print("\nComparing:\n {} \nto\n {}".format(actual, desired)) if actual.endswith('.fits') and desired.endswith('.fits'): # Build HDULists for comparison based on user-specified extensions if extn_list is not None: with fits.open(actual) as f_act: with fits.open(desired) as f_des: actual_hdu = fits.HDUList( [f_act[extn] for extn in extn_list]) desired_hdu = fits.HDUList( [f_des[extn] for extn in extn_list]) fdiff = FITSDiff(actual_hdu, desired_hdu, **diff_kwargs) creature_report += '\na: {}\nb: {}\n'.format( actual, desired) # diff report only gives hash # Working with FITS files... else: fdiff = FITSDiff(actual, desired, **diff_kwargs) creature_report += fdiff.report() if not fdiff.identical: all_okay = False # Only keep track of failed results which need to # be used to replace the truth files (if OK). updated_outputs.append((actual, desired)) elif actual_extn is not None or desired_extn is not None: if 'ignore_hdus' in diff_kwargs: # pragma: no cover diff_kwargs.pop('ignore_hdus') # Not applicable # Specific element of FITS file specified with fits.open(actual_name) as f_act: with fits.open(desired_name) as f_des: actual_hdu = f_act[actual_extn] desired_hdu = f_des[desired_extn] fdiff = HDUDiff(actual_hdu, desired_hdu, **diff_kwargs) creature_report += '\na: {}\nb: {}\n'.format(actual, desired) creature_report += fdiff.report() if not fdiff.identical: all_okay = False # Only keep track of failed results which need to # be used to replace the truth files (if OK). updated_outputs.append((actual_name, desired_name)) else: # ASCII-based diff with open(actual) as afile: actual_lines = afile.readlines() with open(desired) as dfile: desired_lines = dfile.readlines() udiff = unified_diff(actual_lines, desired_lines, fromfile=actual, tofile=desired) udiffIO = StringIO() udiffIO.writelines(udiff) udiff_report = udiffIO.getvalue() udiffIO.close() if len(udiff_report) == 0: creature_report += ('\na: {}\nb: {}\nNo differences ' 'found.\n'.format(actual, desired)) else: all_okay = False creature_report += udiff_report # Only keep track of failed results which need to # be used to replace the truth files (if OK). updated_outputs.append((actual, desired)) if not all_okay and results_root is not None: # pragma: no cover schema_pattern, tree, testname = generate_upload_params( results_root, updated_outputs, verbose=verbose) generate_upload_schema(schema_pattern, tree, testname) if not all_okay and raise_error: raise AssertionError(os.linesep + creature_report) return creature_report
[docs] def generate_upload_params(results_root, updated_outputs, verbose=True): """ Generate pattern, target, and test name for :func:`generate_upload_schema`. This uses ``BUILD_TAG`` and ``BUILD_MATRIX_SUFFIX`` on Jenkins CI to create meaningful Artifactory target path. They are optional for local runs. Other attributes like user, time stamp, and test name are also automatically determined. In addition to renamed outputs, ``*.log``is also inserted into the ``schema_pattern``. Parameters ---------- results_root : str See :func:`compare_outputs` for more info. updated_outputs : list List containing tuples of ``(actual, desired)`` of failed test output comparison to be processed. verbose : bool Print extra info to screen. Returns ------- schema_pattern, tree, testname Analogous to ``pattern``, ``target``, and ``testname`` that are passed into :func:`generate_upload_schema`, respectively. """ import getpass # Create instructions for uploading results to artifactory for use # as new comparison/truth files testname = os.path.split(os.path.abspath(os.curdir))[1] # Meaningful test dir from build info. # TODO: Organize results by day test was run. Could replace with git-hash whoami = getpass.getuser() or 'nobody' user_tag = 'NOT_CI_{}'.format(whoami) build_tag = os.environ.get('BUILD_TAG', user_tag) build_matrix_suffix = os.environ.get('BUILD_MATRIX_SUFFIX', '0') subdir = '{}_{}_{}'.format(TODAYS_DATE, build_tag, build_matrix_suffix) tree = os.path.join(results_root, subdir, testname) + os.sep schema_pattern = [] # Upload all log files schema_pattern.append('*.log') # Write out JSON file to enable retention of different results. # Also rename outputs as new truths. for test_result, truth in updated_outputs: new_truth = os.path.basename(truth) shutil.move(test_result, new_truth) schema_pattern.append(os.path.abspath(new_truth)) if verbose: print("Renamed {} as new 'truth' file: {}".format( os.path.abspath(test_result), os.path.abspath(new_truth))) return schema_pattern, tree, testname
[docs] def generate_upload_schema(pattern, target, testname, recursive=False): """ Write out JSON file to upload Jenkins results from test to Artifactory storage area. This function relies on the JFROG JSON schema for uploading data into artifactory using the Jenkins plugin. Docs can be found at https://www.jfrog.com/confluence/display/RTF/Using+File+Specs Parameters ---------- pattern : str or list of strings Specifies the local file system path to test results which should be uploaded to Artifactory. You can specify multiple artifacts by using wildcards or a regular expression as designated by the regexp property. target : str Specifies the target path in Artifactory in the following format:: [repository_name]/[repository_path] testname : str Name of test that generate the results. This will be used to create the name of the JSON file to enable these results to be uploaded to Artifactory. recursive : bool, optional Specify whether or not to identify files listed in sub-directories for uploading. Default: `False` """ jsonfile = "{}_results.json".format(testname) recursive = repr(recursive).lower() if not isinstance(pattern, str): # Populate schema for this test's data upload_schema = {"files": []} for p in pattern: temp_schema = copy.deepcopy(UPLOAD_SCHEMA["files"][0]) temp_schema.update({"pattern": p, "target": target, "recursive": recursive}) upload_schema["files"].append(temp_schema) else: # Populate schema for this test's data upload_schema = copy.deepcopy(UPLOAD_SCHEMA) upload_schema["files"][0].update({"pattern": pattern, "target": target, "recursive": recursive}) # Write out JSON file with description of test results with open(jsonfile, 'w') as outfile: json.dump(upload_schema, outfile, indent=2)