Source code for punx.validate

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# -----------------------------------------------------------------------------
# :author:    Pete R. Jemian
# :email:     prjemian@gmail.com
# :copyright: (c) 2014-2023, Pete R. Jemian
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------


"""
validate files against the NeXus/HDF5 standard

PUBLIC

.. autosummary::

   ~Data_File_Validator

INTERNAL

.. autosummary::

   ~ValidationItem

"""

import collections
import h5py
import logging
import os
import pyRestTable

from . import FileNotFound, HDF5_Open_Error
from . import finding
from . import utils
from . import nxdl_manager


SLASH = "/"
INFORMATIVE = int((logging.INFO + logging.DEBUG) / 2)
CLASSPATH_OF_NON_NEXUS_CONTENT = "non-NeXus content"
VALIDITEMNAME_STRICT_PATTERN = r"[a-z_][a-z0-9_]*"
logger = utils.setup_logger(__name__)


[docs] class Data_File_Validator(object): """ manage the validation of a NeXus HDF5 data file USAGE 1. make a validator with a certain schema:: validator = punx.validate.Data_File_Validator() # default You may have downloaded additional NeXus Schema (NXDL file sets). If so, pick any of these by name as follows:: validator = punx.validate.Data_File_Validator("v3.2") validator = punx.validate.Data_File_Validator("main") 2. use to validate a file or files:: result = validator.validate(hdf5_file_name) result = validator.validate(another_file) 3. close the HDF5 file when done with validation:: validator.close() PUBLIC METHODS .. autosummary:: ~close ~validate ~print_report INTERNAL METHODS .. autosummary:: ~build_address_catalog ~_group_address_catalog_ ~validate_item_name """ def __init__(self, ref=None): self.h5 = None self.__init_local__() self.manager = nxdl_manager.NXDL_Manager(ref) def __init_local__(self): self.validations = [] # list of Finding() instances self.addresses = ( collections.OrderedDict() ) # dictionary of all HDF5 address nodes in the data file self.classpaths = {} self.regexp_cache = {}
[docs] def close(self): """ close the HDF5 file (if it is open) """ if utils.isHdf5FileObject(self.h5): self.h5.close() self.h5 = None
[docs] def record_finding(self, v_item, key, status, comment): """ prepare the finding object and record it """ f = finding.Finding(v_item.h5_address, key, status, comment) self.validations.append(f) v_item.validations[key] = f return f
[docs] def finding_score(self): """ return a numerical score for the set of findings count: number of findings total: sum of status values for all findings score: total / count -- average status / finding """ total = 0 count = 0 for f in self.validations: if f.status.value != 0: total += f.status.value count += 1 if count == 0: return total, count, 0 else: return total, count, float(total) / count
[docs] def finding_summary(self, report_statuses=None): """ Return a summary dictionary of the count of findings by status. Summary Statistics ======= ===== =========================================================== status count description ======= ===== =========================================================== OK 10 meets NeXus specification NOTE 1 does not meet NeXus specification, but acceptable WARN 0 does not meet NeXus specification, not generally acceptable ERROR 0 violates NeXus specification TODO 3 validation not implemented yet UNUSED 2 optional NeXus item not used in data file COMMENT 0 comment from the punx source code -- -- -- TOTAL 16 -- ======= ===== =========================================================== """ report_statuses = report_statuses or finding.VALID_STATUS_LIST summary = collections.OrderedDict() for status in report_statuses: summary[status] = 0 for f in self.validations: summary[f.status] += 1 return summary
[docs] def print_report(self, statuses=None): """ Print a validation report. """ reported_statuses = statuses or list(finding.VALID_STATUS_DICT.keys()) print("data file: " + self.fname) print( f"NeXus definitions: {self.manager.nxdl_file_set.ref}" f", dated {self.manager.nxdl_file_set.last_modified}" f", sha={self.manager.nxdl_file_set.sha}\n" ) def sort_validations(f): value = f.h5_address value += " %3d" % -f.status.value # sort from best to worst value += " " + f.status.description value = value.replace("@", " @") # keep attributes with group or dataset return value print("findings") t = pyRestTable.Table() for label in "address status test comments".split(): t.addLabel(label) for f in sorted(self.validations, key=sort_validations): if str(f.status) in reported_statuses: row = [] row.append(f.h5_address) row.append(f.status) row.append(f.test_name) row.append(f.comment) t.addRow(row) print(str(t)) summary = self.finding_summary() t = pyRestTable.Table() for label in "status count description (value)".split(): t.addLabel(label) for s, c in summary.items(): row = [s.key, c, s.description, s.value] t.addRow(row) t.addRow(["", "--", "", ""]) t.addRow(["TOTAL", sum(summary.values()), "", ""]) print("\nsummary statistics") print(str(t)) total, count, average = self.finding_score() print("<finding>=%f of %d items reviewed" % (average, count))
[docs] def validate(self, fname): """start the validation process from the file root""" from .validations import default_plot if not os.path.exists(fname): raise FileNotFound(fname) self.fname = fname if self.h5 is not None: self.close() # left open from previous call to validate() try: self.h5 = h5py.File(fname, "r") except IOError: logger.error("Could not open as HDF5: " + fname) raise HDF5_Open_Error(fname) self.__init_local__() self.build_address_catalog() # 1. check all objects in file (name is valid, ...) for v_list in self.classpaths.values(): for v_item in v_list: self.validate_item_name(v_item) self.validate_attribute(v_item) # 2. check all base classes against defaults for k, v_item in self.addresses.items(): if utils.isHdf5Group(v_item.h5_object) or utils.isHdf5FileObject( v_item.h5_object ): self.validate_group(v_item) # 3. check application definitions for k in ("/NXentry/definition", "/NXentry/NXsubentry/definition"): if k in self.classpaths: for v_item in self.classpaths[k]: self.validate_application_definition(v_item.parent) # 4. check for default plot default_plot.verify(self)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] def build_address_catalog(self): """ find all HDF5 addresses and NeXus class paths in the data file """ self._group_address_catalog_(None, self.h5)
def _group_address_catalog_(self, parent, group): """ catalog this group's address and all its contents """ def addClasspath(v): if v.classpath not in self.classpaths: self.classpaths[v.classpath] = [] self.classpaths[v.classpath].append(v) logger.log(INFORMATIVE, "NeXus classpath: " + v.classpath) def get_subject(parent, o): v = ValidationItem(parent, o) self.addresses[v.h5_address] = v logger.log(INFORMATIVE, "HDF5 address: " + v.h5_address) addClasspath(v) for k, a in sorted(o.attrs.items()): av = ValidationItem(v, a, attribute_name=k) self.addresses[av.h5_address] = av addClasspath(av) return v obj = get_subject(parent, group) parent = self.classpaths[obj.classpath][-1] for item in group: if utils.isHdf5Group(group[item]): self._group_address_catalog_(parent, group[item]) else: get_subject(parent, group[item]) def validate_item_name(self, v_item): from .validations import item_name item_name.verify(self, v_item) def validate_attribute(self, v_item): from .validations import attribute attribute.verify(self, v_item)
[docs] def validate_group(self, v_item): """ validate the NeXus content of a HDF5 data file group """ from .validations import hdf5_group_items_in_base_class from .validations import base_class_items_in_hdf5_group key = "NeXus_group" if v_item.classpath == CLASSPATH_OF_NON_NEXUS_CONTENT: self.record_finding(v_item, key, finding.OK, "not a NeXus group") return if v_item.classpath.startswith("/NX"): nx_class = v_item.nx_class elif v_item.classpath == "": nx_class = "NXroot" # handle as NXroot else: raise ValueError(f"unexpected: {v_item}") # print(str(v_item), v_item.name, v_item.classpath) self.validate_NX_class_attribute(v_item, nx_class) base_class = self.manager.classes.get(nx_class) if base_class is None: c = "unknown NeXus base class: " + nx_class self.record_finding(v_item, "NeXus base class", finding.ERROR, c) else: hdf5_group_items_in_base_class.verify(self, v_item, base_class) base_class_items_in_hdf5_group.verify(self, v_item, base_class) # TODO: validate attributes - both HDF5-supplied & NXDL-specified # TODO: validate symbols - both HDF5-supplied & NXDL-specified # TODO: validate fields - both HDF5-supplied & NXDL-specified # TODO: validate links - both HDF5-supplied & NXDL-specified c = nx_class + ": more validations needed" self.record_finding(v_item, "NeXus base class", finding.TODO, c)
[docs] def validate_application_definition(self, v_item): """ validate group as a NeXus application definition """ from .validations import application_definition application_definition.verify(self, v_item)
def validate_NX_class_attribute(self, v_item, nx_class): from .validations import nx_class_attribute nx_class_attribute.validate_NX_class_attribute(self, v_item, nx_class)
[docs] def usedAsBaseClass(self, nx_class): """ returns bool: is the nx_class a base class? NXDL specifications in the contributed definitions directory could be intended as either a base class or an application definition. NeXus provides no easy identifier for this difference. The most obvious distinction between them is the presence of the `definition` field in the `NXentry` group of an application definition. This field is not present in base classes. """ nxdl_def = self.manager.classes.get(nx_class, None) if nxdl_def is None: return False if nxdl_def.category == "applications": return False if nxdl_def.category == "base_classes": return True # now, need to work at it a bit # *Should* only be one NXentry group but that is not a rule. if ( len(nxdl_def.fields) == 0 and len(nxdl_def.links) == 0 and len(nxdl_def.groups) == 1 ): # maybe ... entry_group = nxdl_def.groups.values()[0] # TODO: test entry_group.NX_class == "NXentry" but that attribute is not ready yet! # assume OK return "definition" not in entry_group.fields return True
[docs] class ValidationItem(object): """HDF5 data file object for validation""" def __init__(self, parent, obj, attribute_name=None): assert isinstance(parent, (ValidationItem, type(None))) self.parent = parent self.validations = {} # validation findings go here self.h5_object = obj if hasattr(obj, "name"): self.h5_address = obj.name if obj.name == SLASH: self.name = SLASH else: self.name = obj.name.split(SLASH)[-1] self.classpath = self.determine_NeXus_classpath() else: self.name = attribute_name if parent.classpath == CLASSPATH_OF_NON_NEXUS_CONTENT: self.h5_address = None self.classpath = CLASSPATH_OF_NON_NEXUS_CONTENT else: self.h5_address = "%s@%s" % (parent.h5_address, self.name) self.classpath = str(parent.classpath) + "@" + str(self.name) self.object_type = self.identify_object_type() def __str__(self, *args, **kwargs): try: terms = collections.OrderedDict() terms["name"] = self.name terms["type"] = self.object_type terms["classpath"] = self.classpath s = ", ".join(["%s=%s" % (k, str(v)) for k, v in terms.items()]) return "ValidationItem(" + s + ")" except Exception: return object.__str__(self, *args, **kwargs) def identify_object_type(self, *args, **kwargs): import h5py._hl if isinstance(self.h5_object, h5py._hl.files.File): object_type = "HDF5 file root" elif isinstance(self.h5_object, h5py._hl.group.Group): object_type = "HDF5 group" elif isinstance(self.h5_object, h5py._hl.dataset.Dataset): object_type = "HDF5 dataset" else: object_type = type(self.h5_object) if object_type in ("HDF5 file root", "HDF5 group", "HDF5 dataset"): if utils.isNeXusLink(self.h5_object): object_type = "NeXus link" return object_type
[docs] def determine_NeXus_classpath(self): """ determine the NeXus class path :see: http://download.nexusformat.org/sphinx/preface.html#class-path-specification EXAMPLE Given this NeXus data file structure:: / entry: NXentry data: NXdata @signal = data data: NX_NUMBER For the "signal" attribute of this HDF5 address: ``/entry/data``, its NeXus class path is: ``/NXentry/NXdata@signal`` The ``@signal`` attribute has the value of ``data`` which means that the local field named ``data`` is the plottable data. The HDF5 address of the plottable data is: ``/entry/data/data``, its NeXus class path is: ``/NXentry/NXdata/data`` """ if self.name == SLASH: return "" else: h5_obj = self.h5_object classpath = str(self.parent.classpath) if classpath == CLASSPATH_OF_NON_NEXUS_CONTENT: logger.log(INFORMATIVE, "%s is not NeXus content", h5_obj.name) return CLASSPATH_OF_NON_NEXUS_CONTENT if not classpath.endswith(SLASH): if utils.isHdf5Group(h5_obj): nx_class = utils.decode_byte_string( h5_obj.attrs.get("NX_class")) if isinstance(nx_class, str) and nx_class.startswith("NX"): self.nx_class = nx_class # only for groups logger.log( INFORMATIVE, "NeXus base class: " + nx_class, ) else: logger.log( INFORMATIVE, "HDF5 group is not NeXus: " + self.h5_address, ) return CLASSPATH_OF_NON_NEXUS_CONTENT else: nx_class = self.name classpath += SLASH + nx_class return classpath