Mercurial > hgrepos > Python > libs > data-schema
view data_schema/__init__.py @ 47:e7f6fc454f84
Disable some false positive pylint warnings
| author | Franz Glasner <f.glasner@feldmann-mg.com> |
|---|---|
| date | Wed, 02 Aug 2023 17:54:16 +0200 |
| parents | 92ae1e882cef |
| children | a0b464f6ab1f |
line wrap: on
line source
# -*- coding: utf-8 -*- r""" Object schema validation support. Somewhat modelled after JSON schema. .. seealso:: https://json-schema.org/understanding-json-schema/index.html :Author: Franz Glasner <fzglas.hg@dom66.de> :Copyright: \(c) 2023 Franz Glasner :License: BSD 3-Clause "New" or "Revised" License. See :ref:`LICENSE.txt <license>` for details. :ID: @(#) $Header$ """ __version__ = "0.3.dev1" __revision__ = "|VCSRevision|" __date__ = "|VCSJustDate|" __all__ = ["SEVERITY", "ERRORS", "WARNINGS", "problem_message", "problem_severity", "ValidationProblem", "SchemaError", "validate", "log_problem_cause"] import ast import collections import copy import datetime import enum import pickle import re import sys import urllib.parse import rfc3986 try: from configmix.yaml import load as default_schema_loader except ImportError: default_schema_loader = None from .util import get_data_stream def NC_(ctx, msg): """Mimimum dummy translation support""" return msg @enum.unique class SEVERITY(enum.IntEnum): INFO = 20 WARNING = 30 ERROR = 40 def __reduce_ex__(self, protocol): return (getattr, (self.__class__, self._name_)) # pylint: disable=no-member @enum.unique class ERRORS(enum.Enum): E10000 = NC_("schema-msg", "dict expected") E10001 = NC_("schema-msg", "list expected") E10002 = NC_("schema-msg", "string expected") E10003 = NC_("schema-msg", "dict key must be a string") E10004 = NC_("schema-msg", "additional key encountered") E10005 = NC_("schema-msg", "required key(s) missing") E10006 = NC_("schema-msg", "min string length encountered") E10007 = NC_("schema-msg", "max string length exceeded") E10008 = NC_("schema-msg", "string value does not match the required RE pattern") E10009 = NC_("schema-msg", "string value does not validate") E10010 = NC_("schema-msg", "validation error") E10011 = NC_("schema-msg", "None/Null object expected") E10012 = NC_("schema-msg", "min list length encountered") E10013 = NC_("schema-msg", "max list length exceeded") E10014 = NC_("schema-msg", "tuple expected") E10015 = NC_("schema-msg", "min tuple length encountered") E10016 = NC_("schema-msg", "max tuple length exceeded") E10017 = NC_("schema-msg", "additional items in tuple not allowed") E10018 = NC_("schema-msg", "object is not empty") E10019 = NC_("schema-msg", "more than one match in `one-of' detected") E10020 = NC_("schema-msg", "int expected") E10021 = NC_("schema-msg", "int value lower than min-value") E10022 = NC_("schema-msg", "int value greater than max-value") E10023 = NC_("schema-msg", "float expected") E10024 = NC_("schema-msg", "float value lower than min-value") E10025 = NC_("schema-msg", "float value greater than max-value") E10026 = NC_("schema-msg", "boolean value expected") E10027 = NC_("schema-msg", "boolean true expected") E10028 = NC_("schema-msg", "boolean false expected") E10029 = NC_("schema-msg", "`not' expected problems but got none") E10030 = NC_("schema-msg", "numeric type (int or float) expected") E10031 = NC_("schema-msg", "numeric value lower than min-value") E10032 = NC_("schema-msg", "numeric value greater than max-value") E10033 = NC_("schema-msg", "a plain scalar value expected") E10034 = NC_("schema-msg", "dict key does not match required schema") E10035 = NC_("schema-msg", "binary data expected") E10036 = NC_("schema-msg", "length of binary data lower than min-value") E10037 = NC_("schema-msg", "length of binary data exceeds max-value") E10038 = NC_("schema-msg", "a set is expected") E10039 = NC_("schema-msg", "length of set lower than min-length") E10040 = NC_("schema-msg", "length of set greater than max-length") E10041 = NC_("schema-msg", "timestamp expected") E10042 = NC_("schema-msg", "value of timestamp does not validate") E10043 = NC_("schema-msg", "enumerated string value expected but not found") E10044 = NC_("schema-msg", "referenced object doest not exist") E10045 = NC_("schema-msg", "key is not contained in referenced object") E10046 = NC_("schema-msg", "referenced object is not a container") E10047 = NC_("schema-msg", "binary data does not match the required RE pattern") E10048 = NC_("schema-msg", "enumerated integer value expected but not found") E10049 = NC_("schema-msg", "enumerated number value expected but not found") E10050 = NC_("schema-msg", "min dict length encountered") E10051 = NC_("schema-msg", "max dict length exceeded") E10052 = NC_("schema-msg", "index constraint violated") E10053 = NC_("schema-msg", "`one-of' failed") E10054 = NC_("schema-msg", "failing `one-of' item") E10055 = NC_("schema-msg", "`any-of' failed") E10056 = NC_("schema-msg", "failing `any-of' item") E10057 = NC_("schema-msg", "`all-of' failed") E10058 = NC_("schema-msg", "failing `all-of' item") E10059 = NC_("schema-msg", "forbidden key detected") def __reduce_ex__(self, protocol): return (getattr, (self.__class__, self._name_)) # pylint: disable=no-member @enum.unique class WARNINGS(enum.Enum): W80000 = NC_("schema-msg", "duplicate dict key") def __reduce_ex__(self, protocol): return (getattr, (self.__class__, self._name_)) # pylint: disable=no-member # Check some invariants at import time for e in ERRORS.__members__: assert e.startswith('E'), "ERROR code `{}' shall start with letter `E'".format(e) assert 10000 <= int(e[1:], 10) < 80000, "Invalid ERROR code number in `{}'".format(e) for w in WARNINGS.__members__: assert w.startswith('W'), "WARNING code `{}' must start with letter `W'".format(w) assert 80000 <= int(w[1:], 10), "Invalid WARNING code number in `{}'".format(w) TYPE_RE = type(re.compile(r"\A.+\Z")) SCHEMA_REF_KEY = "$ref" """Key name for schema references (like a symlink within a schema)""" SCHEMA_PATH_ROOT = "$root" """URI path to the root schema""" SCHEMA_PATH_SELF = "$self" """URI path to the current schema""" class _SENTINELType(object): @staticmethod def _get_single(module, name): return getattr(sys.modules[module], name) def __reduce_ex__(self, proto): """Make sure the _SENTINEL is ever only instantiated as singleton""" return (_SENTINELType._get_single, (self.__module__, "_SENTINEL")) _SENTINEL = _SENTINELType() def problem_message(pr): """ :raises KeyError: the code in `pr` does not refer to :class:`.ERRORS` or `.WARNINGS` """ if isinstance(pr, ValidationProblem): code = getattr(pr, "code", None) elif isinstance(pr, (ERRORS, WARNINGS)): code = pr else: if pr >= 80000: code = WARNINGS["W" + str(pr)] else: code = ERRORS["E" + str(pr)] return code.value def problem_severity(pr): """Get the default severity for error or warning code `pr` :raises TypeError: if `pr` is not in :class:`.ERRORS` or :class:`.WARNINGS` """ if pr in ERRORS: return SEVERITY.ERROR if pr in WARNINGS: return SEVERITY.WARNING raise TypeError("invalid error or warning code: %r" % (pr, )) class ValidationProblem(object): __slots__ = ("code", "severity", "hint", "context", "cause", "index") def __init__(self, code=None, severity=None, hint=None, context=None, cause=None, index=None): if code is None: raise TypeError("`code' must be given") # check validity if code not in ERRORS and code not in WARNINGS: raise ValueError( "unknown validation error code: {}".format(code)) self.code = code if severity is None: self.severity = problem_severity(code) else: if not isinstance(severity, SEVERITY): raise TypeError("invalid type for `severity'") self.severity = severity self.hint = hint self.context = context if cause: if not isinstance(cause, (list, tuple, set, frozenset)): cause = (cause, ) for c in cause: if not isinstance(c, ValidationProblem): raise SchemaError( "can only nest other `ValidationProblem' instances") self.cause = cause self.index = index def __eq__(self, other): if not isinstance(other, ValidationProblem): return NotImplemented return ((self.code == other.code) and (self.severity == other.severity) and (self.hint == other.hint) and (self.context == other.context) and (self.cause == other.cause) and (self.index == other.index)) def __ne__(self, other): # # While the default in Python3 is sensible implementing is recommended # when a built-in __eq__ is overwritten (Raymond Hettinger). # # Do not use not self == other because NotImplemented is not handled # properly in some early Python versions (including Py2). # equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __getstate__(self): return (1, self.code, self.severity, self.hint, self.context, self.cause, self.index) def __setstate__(self, state): ver = state[0] if ver == 1: _dummy, self.code, self.severity, self.hint, self.context, self.cause, self.index = state else: raise pickle.UnpicklingError( "Unsupported pickle version for ValidationProblem: %d" % (ver,)) def __repr__(self): try: msg = " (" + problem_message(self) + ")" except LookupError: msg = "" if self.context is not None: context_depth = self.context.depth else: context_depth = None if self.index is None: return "ValidationProblem(code={}{}, severity={}, hint={}, context=[depth={}]{})".format( self.code.name, msg, self.severity.name, self.hint, context_depth, self.context) else: return "ValidationProblem(code={}{}, severity={}, hint={}, context=[depth={}]{}, index={})".format( self.code.name, msg, self.severity.name, self.hint, context_depth, self.context, self.index) class SchemaError(Exception): """An error within the schema itself""" pass ValidationSettings = collections.namedtuple( "ValidationSettings", ["skip_keys", "break_on_keynames_problems", "data_stream_loader", "schema_loader"]) class _Schema(dict): __slots__ = ("parent", "is_sub_root", "_schema_cache") def __init__(self, parent, is_sub_root, *args, **kwds): super().__init__(*args, **kwds) if parent is None or isinstance(parent, _Schema): self.parent = parent else: raise TypeError( "`_Schema' or `None' expected for `parent` argument") if parent is None: self._schema_cache = {} if not is_sub_root: raise ValueError( "the root schmema must be a sub-root (aka `$self') also") self.is_sub_root = True else: self.is_sub_root = is_sub_root def __reduce_ex__(self, proto): return super().__reduce_ex__(proto) def __getstate__(self): return (1, self.parent, self.is_sub_root) def __setstate__(self, state): ver = state[0] if ver == 1: _dummy, self.parent, self.is_sub_root = state if self.parent is None: self._schema_cache = {} else: raise pickle.UnpicklingError( "Unsupported pickle version for _Schema: %d" % (ver,)) @property def ROOT(self): """Get the root schema""" r = self while r.parent is not None: r = r.parent return r @property def SELF(self): r = self while not r.is_sub_root: r = r.parent return r def copy(self): return _Schema(self.parent, self.is_sub_root, self) def get_child(self, name, default=None): return self.ensure_child_schema(self.get(name, default)) def ensure_child_schema(self, v): if v is None: return None elif isinstance(v, _Schema): return v elif isinstance(v, dict): return _Schema(self, False, v) else: return v def ensure_list_of_child_schemata(self, v): if isinstance(v, (list, tuple)): return [_Schema(self, False, i) for i in v] else: return v def __eq__(self, other): if not isinstance(other, _Schema): return NotImplemented return (self.parent is other.parent and bool(self.is_sub_root) == bool(other.is_sub_root) and dict(self) == dict(other)) def __ne__(self, other): # # While the default in Python3 is sensible implementing is recommended # when a built-in __eq__ is overwritten (Raymond Hettinger). # # Do not use not self == other because NotImplemented is not handled # properly in some early Python versions (including Py2). # equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __copy__(self): return _Schema(self.parent, self.is_sub_root, self) def __deepcopy__(self, memo): return _Schema(self.parent, self.is_sub_root, copy.deepcopy(dict(self), memo)) def __str__(self): return "<_Schema " + super().__str__() + ">" def __repr__(self): return "<_Schema " + super().__repr__() + ">" def get_cached_schema(self, key, load_if_needed=True, data_stream_loader=None, schema_loader=None): root = self.ROOT s = root._schema_cache.get(key, None) if s is None and load_if_needed: if schema_loader is None: raise SchemaError("no schema loader available") dsl = data_stream_loader or get_data_stream with dsl(key) as schemastream: # load schema a new `$self' (i.e. sub-root is True) s = _Schema(self, True, schema_loader(schemastream)) root._schema_cache[key] = s return s def add_cached_schema(self, key, schema): r = self.ROOT assert isinstance(schema, _Schema) r._schema_cache[key] = schema class Context(object): __slots__ = ("_parent", "_key", "_key_index", "_index", "root_object", "root_schema", "_current_object", "_settings") def __init__(self, parent, *, key=_SENTINEL, index=_SENTINEL, root_object=_SENTINEL, root_schema=_SENTINEL, current_object=_SENTINEL, settings=_SENTINEL, key_index=_SENTINEL): if parent is None: if key is not _SENTINEL: raise TypeError("the root context may not have a key") if index is not _SENTINEL: raise TypeError("the root context may not have an index") if settings is _SENTINEL: raise TypeError("the root context must have settings") self.root_object = root_object if current_object is _SENTINEL: current_object = root_object self.root_schema = root_schema else: if key is _SENTINEL and index is _SENTINEL: raise TypeError("one of `key` and `index` must be given in a non-root context") if root_object is not _SENTINEL: raise TypeError("non-root context may not have a root object") self.root_object = root_object if root_schema is not _SENTINEL: raise TypeError("non-root context may not have a root schema") self.root_schema = root_schema if key is not _SENTINEL and index is not _SENTINEL: raise ValueError("only one of `key` and `index` may be given in a context") if key_index is not _SENTINEL and key is _SENTINEL: raise ValueError("when having a `key_index` a `key` also must be given") self._parent = parent self._key = key self._key_index = key_index self._index = index self._current_object = current_object self._settings = settings def __getstate__(self): return (1, self._parent, self._key, self._key_index, self._index, self.root_object, self.root_schema, self._current_object, self._settings) def __setstate__(self, state): ver = state[0] if ver == 1: _dummy, self._parent, self._key, self._key_index, self._index, self.root_object, self.root_schema, self._current_object, self._settings = state else: raise pickle.UnpicklingError( "Unsupported pickle version for _Context: %d" % (ver,)) def __eq__(self, other): if not isinstance(other, Context): return NotImplemented return ((self._parent == other._parent) and (self._key == other._key) and (self._key_index == other._key_index) # XXX FIXME ??? # and (self.root_object == other.root_object) # and (self.root_schema == other.root_schema) # and (self._current_object == other._current_object) and (self._settings == other._settings) ) def __ne__(self, other): # # While the default in Python3 is sensible implementing is recommended # when a built-in __eq__ is overwritten (Raymond Hettinger). # # Do not use not self == other because NotImplemented is not handled # properly in some early Python versions (including Py2). # equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal @property def parent(self): return self._parent @property def safe_parent(self): if self.is_root: raise TypeError("the root context has no parent") return self.parent @property def root(self): """Get the root context""" ctx = self while not ctx.is_root: ctx = ctx.parent return ctx @property def is_root(self): return not bool(self.parent) @property def key(self): if self._key is _SENTINEL: raise AttributeError("no `key' in Context") return self._key @property def index(self): if self._index is _SENTINEL: raise AttributeError("no `index' in Context") return self._index @property def key_index(self): if self._key_index is _SENTINEL: raise AttributeError("no `key_index' in Context") return self._key_index @property def current_object(self): if self._current_object is _SENTINEL: raise AttributeError("no `current_object' in Context") return self._current_object @property def settings(self): s = self._settings return s if s is not _SENTINEL else self.parent.settings @property def depth(self): if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: return 0 n = 0 ctx = self while not ctx.is_root: n += 1 ctx = ctx.parent return n def __str__(self): if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: return "<ROOT>" chain = [] ctx = self while not ctx.is_root: if ctx._key is not _SENTINEL: chain.append(str(ctx.key)) elif ctx._index is not _SENTINEL: chain.append("INDEX:{}".format(ctx.index)) else: chain.append("") ctx = ctx.parent chain.reverse() return " / ".join(chain) def __repr__(self): return "<Context path=`{}'>".format(str(self)) def _get_one_of(d, *keys, default=None, strict=True): """Get the first found key and its value of `keys` from dict `d`. """ for k in keys: v = d.get(k, _SENTINEL) if v is not _SENTINEL: if strict: # # check that all no other key of `keys` besides of `k` is # in `d` # other_keys = set(keys) other_keys.remove(k) for k2 in other_keys: if k2 in d: raise SchemaError("ambiguous key from: {}".format( ", ".join(keys))) return k, v return None, default def validate(obj, schema, **kwds): """Validate object `obj` against the *specific* schema `schema`. Yields errors and warnings """ settings = { "skip_keys": None, "break_on_keynames_problems": True, "data_stream_loader": get_data_stream, "schema_loader": default_schema_loader } settings.update(kwds) if not isinstance(schema, _Schema): if not isinstance(schema, dict): raise SchemaError("Schema must be a dict-alike." " Got: {!r}".format(schema)) schema = _Schema(None, True, schema) context = Context(None, root_object=obj, root_schema=schema, settings=ValidationSettings(**settings)) yield from _validate(obj, schema, context, is_root=True) def _validate(obj, schema, context, is_root=False): """Validate object `obj` against the *specific* schema `schema`. Yields errors and warnings """ if not isinstance(schema, _Schema): raise SchemaError("Schema must be a `_Schema'." " Got: {!r}. Context: {!s}".format(schema, context)) # 1. Process "cond" or "match" schema = process_schema_conditionals(schema, context) # 2. Process "$ref" schema references schema = process_schema_references( schema, context, check_single_ref_key=not is_root) # 3. Real validation # check combinator shortcuts without "$type" indirection combinator, combinator_schema = _get_one_of( schema, "not", "all-of", "any-of", "one-of") if combinator is None: try: t = schema["$type"] except KeyError: raise SchemaError("Schema has no `$type' key: {!r}." " Context: {!s}".format(schema, context)) else: # # Construct a temporary schema with the proper indirection for # the check below # t = {"$type": {combinator: combinator_schema}} if combinator_schema is None: raise SchemaError("a combinator requires a child") if callable(t): yield from t(obj, schema, context) elif t is None: yield from validate_null(obj, schema, context) elif isinstance(t, dict): if len(t) != 1: raise SchemaError("type dict must be of length 1") # Check whether a shortcut is already seen above if combinator is None: combinator = list(t.keys())[0] combinator_schema = t[combinator] if combinator == "not": yield from validate_not( obj, schema.ensure_child_schema(combinator_schema), context) elif combinator == "all-of": yield from validate_all_of( obj, schema.ensure_list_of_child_schemata(combinator_schema), context) elif combinator == "any-of": yield from validate_any_of( obj, schema.ensure_list_of_child_schemata(combinator_schema), context) elif combinator == "one-of": yield from validate_one_of( obj, schema.ensure_list_of_child_schemata(combinator_schema), context) else: raise SchemaError("unknown combinator: {}".format(combinator)) elif isinstance(t, (list, tuple)): # a simple list is "any-of" yield from validate_any_of( obj, schema.ensure_list_of_child_schemata(t), context) elif t in ("dict", "map", "object"): yield from validate_dict(obj, schema, context) elif t in ("list", "array",): yield from validate_list(obj, schema, context) elif t in ("tuple", "record"): yield from validate_tuple(obj, schema, context) elif t in ("set", "frozenset"): yield from validate_set(obj, schema, context) elif t in ("string", "str"): yield from validate_str(obj, schema, context) elif t in ("deny", ): yield from validate_deny(obj, schema, context) elif t in ("accept", ): yield from validate_accept(obj, schema, context) elif t in ("none", "null", "nil"): yield from validate_null(obj, schema, context) elif t in ("empty", ): yield from validate_empty(obj, schema, context) elif t in ("integer", "int"): yield from validate_integer(obj, schema, context) elif t in ("float", "real", "double"): yield from validate_float(obj, schema, context) elif t in ("number", "num"): yield from validate_number(obj, schema, context) elif t in ("bool", "boolean"): yield from validate_bool(obj, schema, context) elif t in ("scalar", ): yield from validate_scalar(obj, schema, context) elif t in ("binary", ): yield from validate_binary(obj, schema, context) elif t in ("timestamp", "datetime"): yield from validate_timestamp(obj, schema, context) else: raise SchemaError("unknown type in schema: {}".format(t)) def _is_in_skip_keys(key, skip_keys): if not skip_keys: return False for sk in skip_keys: if isinstance(sk, str): if key == sk: return True else: if sk.search(key): return True return False def _is_null_allowed_for_object(obj, schema, context): if obj is None and schema.get("nullable", False): return True return False def _validate_index_constraint(obj, schema, context): # No evaluation of index constraints for the root context if context.is_root: return try: index_constraints = schema["index-constraint"] except KeyError: return # no constraints else: if not isinstance(index_constraints, (list, tuple, set, frozenset)): index_constraints = [index_constraints] if not index_constraints: return parent = context.safe_parent try: effective_index = context.index except AttributeError: try: effective_index = context.key_index except AttributeError: raise SchemaError("parent container has no usable index") for idx in index_constraints: if idx < 0: idx = len(parent.current_object) + idx if idx == effective_index: break else: yield ValidationProblem(code=ERRORS.E10052, context=context) def validate_dict(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, dict): yield ValidationProblem(code=ERRORS.E10000, hint="got: {}".format(type(obj).__name__), context=context) return yield from _validate_index_constraint(obj, schema, context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10050, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10051, hint=obj, context=context) schema_keys = schema.get("keys", {}) if schema else {} seen_keys = set() schema_keynames = schema.get_child("key-names", None) idx = -1 for key, item in obj.items(): idx += 1 if schema_keynames is None: if not isinstance(key, str): yield ValidationProblem(code=ERRORS.E10003, hint=repr(key), context=context) else: # validate the key against given schema new_context = Context(context, key=key, key_index=idx, current_object=key) key_probs = list(_validate(key, schema_keynames, new_context)) if key_probs: yield ValidationProblem( code=ERRORS.E10034, hint=key, context=context, cause=key_probs) if context.settings.break_on_keynames_problems: return if key in seen_keys: yield ValidationProblem(code=WARNINGS.W80000, hint=key, context=context) else: seen_keys.add(key) # XXX FIXME: context: new leaf context with new key for recursion if key in schema_keys: new_context = Context(context, key=key, key_index=idx, current_object=item) yield from _validate(item, schema.ensure_child_schema(schema_keys[key]), new_context) else: # check whether additional keys are allowed additional_keys = schema.get_child("additional-keys", False) if isinstance(additional_keys, bool): if not additional_keys: if not _is_in_skip_keys(key, context.settings.skip_keys): yield ValidationProblem(code=ERRORS.E10004, hint=str(key), context=context) else: if not _is_in_skip_keys(key, context.settings.skip_keys): # try this as the common schema for all the additional keys new_context = Context(context, key=key, key_index=idx, current_object=item) yield from _validate(item, additional_keys, new_context) # check whether all required keys are seen try: required_keys = set(schema.get("required", set())) except (TypeError, ValueError): raise SchemaError("`required` must be an iterable") if not required_keys <= seen_keys: hs = [str(i) for i in required_keys - seen_keys] yield ValidationProblem(code=ERRORS.E10005, hint=sorted(hs), context=context) # check whether no forbidden keys are seen try: forbidden_keys = set(schema.get("forbidden", set())) except (TypeError, ValueError): raise SchemaError("`forbidden` must be an iterable") if forbidden_keys & seen_keys: hs = [str(i) for i in forbidden_keys & seen_keys] yield ValidationProblem(code=ERRORS.E10059, hint=sorted(hs), context=context) def validate_list(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, (list, tuple)): yield ValidationProblem(code=ERRORS.E10001, hint="got: {}".format(type(obj).__name__), context=context) return yield from _validate_index_constraint(obj, schema, context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10012, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10013, hint=obj, context=context) try: schema_items = schema.ensure_child_schema(schema["items"]) except KeyError: schema_items = _Schema(schema, False, {"$type": validate_deny}) for idx, o in enumerate(obj): new_context = Context(parent=context, index=idx, current_object=o) yield from _validate(o, schema_items, new_context) def validate_set(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, (set, frozenset)): yield ValidationProblem(code=ERRORS.E10038, hint="got: {}".format(type(obj).__name__), context=context) return yield from _validate_index_constraint(obj, schema, context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10039, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10040, hint=obj, context=context) try: schema_items = schema.ensure_child_schema(schema["items"]) except KeyError: schema_items = _Schema(schema, False, {"$type": validate_deny}) for o in obj: new_context = Context(parent=context, key=o, current_object=o) yield from _validate(o, schema_items, new_context) def validate_tuple(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, (list, tuple)): yield ValidationProblem(code=ERRORS.E10014, hint="got: {}".format(type(obj).__name__), context=context) return yield from _validate_index_constraint(obj, schema, context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10015, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10016, hint=obj, context=context) schema_items = schema.get("items", []) if not isinstance(schema_items, (list, tuple)): raise SchemaError("tuple items require a list of schemata in items") for idx, o in enumerate(obj): # early exit at maxlen if maxlen is not None and idx >= maxlen: break new_context = Context(parent=context, index=idx, current_object=o) try: schema_index = schema.ensure_child_schema(schema_items[idx]) except IndexError: additional_items = schema.get_child("additional-items", False) if isinstance(additional_items, bool): if not additional_items: yield ValidationProblem(code=ERRORS.E10017, context=new_context) else: yield from _validate(o, additional_items, new_context) else: yield from _validate(o, schema_index, new_context) def validate_str(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, str): yield ValidationProblem(code=ERRORS.E10002, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) enumvalues = schema.get("enum", None) if enumvalues is not None: for ev in enumvalues: if ev == obj: break else: yield ValidationProblem(code=ERRORS.E10043, hint=obj, context=context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10006, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10007, hint=obj, context=context) pattern = schema.get("pattern", None) if pattern is not None: if isinstance(pattern, str): mo = re.search(pattern, obj) if not mo: yield ValidationProblem(code=ERRORS.E10008, context=context) elif isinstance(pattern, TYPE_RE): mo = pattern.search(obj) if not mo: yield ValidationProblem(code=ERRORS.E10008, context=context) elif callable(pattern): yield from pattern(obj, schema, context) else: raise SchemaError("unknown pattern type") is_contained = schema.get("is-contained-in-ref", None) if is_contained is not None: refobj = try_get_reference(is_contained, context, schema, default=_SENTINEL) if refobj is _SENTINEL: yield ValidationProblem(code=ERRORS.E10044, context=context) else: try: if obj not in refobj: yield ValidationProblem(code=ERRORS.E10045, context=context) except TypeError: yield ValidationProblem(code=ERRORS.E10046, context=context) def validate_binary(obj, schema, context): if not isinstance(obj, (bytes, bytearray)): yield ValidationProblem(code=ERRORS.E10035, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) minlen = schema.get("min-length", None) if minlen: if len(obj) < minlen: yield ValidationProblem(code=ERRORS.E10036, hint=obj, context=context) maxlen = schema.get("max-length", None) if maxlen is not None: if len(obj) > maxlen: yield ValidationProblem(code=ERRORS.E10037, hint=obj, context=context) pattern = schema.get("pattern", None) if pattern is not None: if isinstance(pattern, (str, bytes, bytearray)): if isinstance(pattern, str): if "'''" not in pattern: bytes_pattern = ast.literal_eval( "b'''" + pattern + "'''") elif '"""' not in pattern: bytes_pattern = ast.literal_eval( 'b"""' + pattern + '"""') else: raise SchemaError("incompatible bytes pattern") else: bytes_pattern = pattern mo = re.search(bytes_pattern, obj) if not mo: yield ValidationProblem(code=ERRORS.E10047, context=context) elif isinstance(pattern, TYPE_RE): mo = pattern.search(obj) if not mo: yield ValidationProblem(code=ERRORS.E10047, context=context) elif callable(pattern): yield from pattern(obj, schema, context) else: raise SchemaError("unknown pattern type") def validate_timestamp(obj, schema, context): if not isinstance(obj, datetime.datetime): yield ValidationProblem(code=ERRORS.E10041, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) value = schema.get("value", None) if value is not None: if callable(value): yield from value(obj, schema, context) else: raise SchemaError("unknown value validator (only a callable allowed)") def validate_integer(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, int): yield ValidationProblem(code=ERRORS.E10020, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) min_value = schema.get("min-value", None) if min_value is not None and obj < min_value: yield ValidationProblem(code=ERRORS.E10021, hint=obj, context=context) max_value = schema.get("max-value", None) if max_value is not None and obj > max_value: yield ValidationProblem(code=ERRORS.E10022, hint=obj, context=context) enumvalues = schema.get("enum", None) if enumvalues is not None: for ev in enumvalues: if ev == obj: break else: yield ValidationProblem(code=ERRORS.E10048, hint=obj, context=context) value = schema.get("value", None) if value is not None: if callable(value): yield from value(obj, schema, context) else: raise SchemaError("unknown value validator (only a callable allowed)") def validate_float(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, float): yield ValidationProblem(code=ERRORS.E10023, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) min_value = schema.get("min-value", None) if min_value is not None and obj < min_value: yield ValidationProblem(code=ERRORS.E10024, hint=obj, context=context) max_value = schema.get("max-value", None) if max_value is not None and obj > max_value: yield ValidationProblem(code=ERRORS.E10025, hint=obj, context=context) value = schema.get("value", None) if value is not None: if callable(value): yield from value(obj, schema, context) else: raise SchemaError("unknown value validator (only a callable allowed)") def validate_number(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, (int, float)): yield ValidationProblem(code=ERRORS.E10030, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) min_value = schema.get("min-value", None) if min_value is not None and isinstance(obj, float): min_value *= 1.0 if min_value is not None and obj < min_value: yield ValidationProblem(code=ERRORS.E10031, hint=obj, context=context) max_value = schema.get("max-value", None) if max_value is not None and isinstance(obj, float): max_value *= 1.0 if max_value is not None and obj > max_value: yield ValidationProblem(code=ERRORS.E10032, hint=obj, context=context) enumvalues = schema.get("enum", None) if enumvalues is not None: for ev in enumvalues: if ev == obj: break else: yield ValidationProblem(code=ERRORS.E10049, hint=obj, context=context) value = schema.get("value", None) if value is not None: if callable(value): yield from value(obj, schema, context) else: raise SchemaError("unknown value validator (only a callable allowed)") def validate_scalar(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return yield from _validate_index_constraint(obj, schema, context) if obj is None: yield ValidationProblem(code=ERRORS.E10033, hint=obj, context=context) if isinstance(obj, (dict, list, tuple, set, frozenset)): yield ValidationProblem(code=ERRORS.E10033, hint=obj, context=context) def validate_deny(obj, schema, context): yield from _validate_index_constraint(obj, schema, context) yield ValidationProblem(code=ERRORS.E10010, context=context) def validate_accept(obj, schema, context): yield from _validate_index_constraint(obj, schema, context) def validate_null(obj, schema, context): yield from _validate_index_constraint(obj, schema, context) if obj is not None: yield ValidationProblem(code=ERRORS.E10011, context=context) def validate_empty(obj, schema, context): yield from _validate_index_constraint(obj, schema, context) if obj is None: return if isinstance(obj, (dict, list, tuple, set, frozenset)) and not obj: return yield ValidationProblem(ERRORS.E10018, context=context) def validate_bool(obj, schema, context): if _is_null_allowed_for_object(obj, schema, context): return if not isinstance(obj, bool): yield ValidationProblem(code=ERRORS.E10026, hint=obj, context=context) else: yield from _validate_index_constraint(obj, schema, context) value = schema.get("value", None) if value is not None: if callable(value): yield from value(obj, schema, context) elif value and not obj: yield ValidationProblem(code=ERRORS.E10027, hint=obj, context=context) elif not value and obj: yield ValidationProblem(code=ERRORS.E10028, hint=obj, context=context) def validate_all_of(obj, schema, context): if not isinstance(schema, (list, tuple)): raise SchemaError("require a list of schematas for `all-of'") res = [] for idx, s in enumerate(schema): assert isinstance(s, _Schema) tr = list(_validate(obj, s, context)) if tr: res.append((idx, tr, )) if res: yield ValidationProblem( code=ERRORS.E10057, context=context, cause=[ ValidationProblem( code=ERRORS.E10058, context=context, cause=tr, index=idx) for (idx, tr) in res]) def validate_any_of(obj, schema, context): if not isinstance(schema, (list, tuple)): raise SchemaError("require a list of schematas for `any-of'") res = [] for s in schema: assert isinstance(s, _Schema) tr = list(_validate(obj, s, context)) if tr: res.append(tr) else: # Erfolg: gleich positiv zurueck ohne Meldungen return # Ansonsten: alle Fehlschlaege protokollieren if res: yield ValidationProblem( code=ERRORS.E10055, context=context, cause=[ ValidationProblem( code=ERRORS.E10056, context=context, cause=tr) for tr in res]) def validate_one_of(obj, schema, context): if not isinstance(schema, (list, tuple)): raise SchemaError("require a list of schematas for `one-of'") success_res = [] failed_res = [] for idx, s in enumerate(schema): assert isinstance(s, _Schema) tr = list(_validate(obj, s, context)) if tr: failed_res.append((idx, tr, )) else: success_res.append(idx) if len(success_res) == 1: return elif len(success_res) == 0: # Ansonsten: alle Fehlschlaege protokollieren if failed_res: yield ValidationProblem( code=ERRORS.E10053, context=context, cause=[ ValidationProblem( code=ERRORS.E10054, context=context, cause=tr, index=idx) for (idx, tr) in failed_res]) else: # Die Indizes der "zuvielen" in "hint" anzeigen yield ValidationProblem(code=ERRORS.E10019, hint=",".join([str(k) for k in success_res])) def validate_not(obj, schema, context): assert isinstance(schema, _Schema) res = list(_validate(obj, schema, context)) if not res: yield ValidationProblem(code=ERRORS.E10029, hint=obj, context=context, cause=res) def process_schema_references(schema, context, check_single_ref_key=True): """ .. note:: If a new dereferenced schema is found schema conditionals are evaluated also. So the resolved schema containing conditionals behaves according to the given conditions. """ try: ref = schema[SCHEMA_REF_KEY] except (KeyError, TypeError): return schema # if `$ref' is found it MUST be the only key if check_single_ref_key and len(schema) != 1: raise SchemaError("`{}' must be the single key if it exists") schema = try_get_reference(ref, context, schema) if not isinstance(schema, _Schema): raise SchemaError( "dereferenced schema is not a `_Schema': {}".format(ref)) schema = copy.deepcopy(schema) # process schema conditionals "cond" and "match" again schema = process_schema_conditionals(schema, context) return process_schema_references(schema, context, check_single_ref_key=True) def process_schema_conditionals(schema, context): """Lisp-like `cond` to provide schema modifications :param schema: the input schema :param context: the validation context with a valid `context.root.root_object` :returns: the processed schema: the schema itself if it is unchanged and a copy of the schema if has been changed """ what, conds = _get_one_of(schema, "cond", "match", default=None) if what is None or conds is None: return schema if not isinstance(conds, (list, tuple)): raise SchemaError("the conditions of a cond must be a sequence") if what == "cond": return _process_schema_conditionals_cond(schema, conds, context) elif what == "match": return _process_schema_conditionals_match(schema, conds, context) else: assert False, "unreachable" def _process_schema_conditionals_cond(schema, conds, context): for cond in conds: if not isinstance(cond, dict): raise SchemaError("a single condition must be a dict") if eval_condition(cond, context, schema): rep_type, rep_schema = _get_one_of( cond, "then", "then-replace", "then-merge") rep_schema = schema.ensure_child_schema(rep_schema) if rep_type in ("then", "then-replace"): do_merge = False elif rep_type == "then-merge": do_merge = True else: raise SchemaError("unknown then type: {}".format(rep_type)) break else: # # No condition was true: just remove the "cond" to get the # effective schema. # rep_schema = None do_merge = False new_schema = schema.copy() del new_schema["cond"] if rep_schema: rep_schema = process_schema_references(rep_schema, context) # this could insert a new nested "cond" or "match" again if do_merge: rep_schema = copy.deepcopy(rep_schema) new_schema = _merge(rep_schema, new_schema) else: new_schema.update(rep_schema) # Recursively apply "cond/match" evaluation to the resulting schema return process_schema_conditionals(new_schema, context) def _process_schema_conditionals_match(schema, conds, context): rep_schemata = [] for cond in conds: if not isinstance(cond, dict): raise SchemaError("a single condition must be a dict") if eval_condition(cond, context, schema): rep_type, rep_schema = _get_one_of( cond, "then", "then-replace", "then-merge") rep_schema = schema.ensure_child_schema(rep_schema) if rep_type in ("then", "then-replace"): rep_schemata.append((False, rep_schema)) elif rep_type == "then-merge": rep_schemata.append((True, rep_schema)) else: raise SchemaError("unknown then type: {}".format(rep_type)) new_schema = schema.copy() del new_schema["match"] for do_merge, rep_schema in rep_schemata: rep_schema = process_schema_references(rep_schema, context) # this could insert a new nested "cond" or "match" again if do_merge: rep_schema = copy.deepcopy(rep_schema) new_schema = _merge(rep_schema, new_schema) else: new_schema.update(rep_schema) # Recursively apply "cond/match" evaluation to the resulting schema return process_schema_conditionals(new_schema, context) def eval_condition(cond, context, schema): """Eval the condition in `cond` and return a tuple `(hit, predval)` """ pred, predval = _get_one_of( cond, "when-ref-true", "when-ref-false", "when-ref-exists", "when-ref-not-exists", "when", default=_SENTINEL) if pred == "when": # rekursive evaluation of `predval` as the real predicate return eval_pred(predval, context, schema) elif pred == "when-ref-true": refobj = try_get_reference(predval, context, schema, default=None) return bool(refobj) elif pred == "when-ref-false": refobj = try_get_reference(predval, context, schema, default=None) return not bool(refobj) elif pred == "when-ref-exists": refobj = try_get_reference(predval, context, schema, default=_SENTINEL) return refobj is not _SENTINEL elif pred == "when-ref-not-exists": refobj = try_get_reference(predval, context, schema, default=_SENTINEL) return refobj is _SENTINEL else: raise SchemaError("unknown condition type: {}".format(pred)) def eval_pred(pred, context, schema): if isinstance(pred, dict): combinator, combinator_val = _get_one_of( pred, "not", "all-of", "any-of", "one-of", default=None) if combinator: if combinator == "not": return not eval_pred(combinator_val, context, schema) elif combinator == "all-of": if not isinstance(combinator_val, (list, tuple)): raise SchemaError("`all-of' requires a list of childs") for cv in combinator_val: if not eval_pred(cv, context, schema): return False return True elif combinator == "any-of": if not isinstance(combinator_val, (list, tuple)): raise SchemaError("`any-of' requires a list of childs") for cv in combinator_val: if eval_pred(cv, context, schema): return True return False elif combinator == "one-of": if not isinstance(combinator_val, (list, tuple)): raise SchemaError("`one-of' requires a list of childs") num_true = 0 for cv in combinator_val: if eval_pred(cv, context, schema): num_true += 1 # shortcut if num_true > 1: return False if num_true == 1: return True else: return False else: raise SchemaError( "unknown logical operator: {}".format(combinator)) else: pred_key, pred_val = _get_one_of( pred, "ref-true", "ref-false", "ref-exists", "ref-not-exists", "equals", default=None) if pred_key == "ref-true": refobj = try_get_reference( pred_val, context, schema, default=None) return bool(refobj) elif pred_key == "ref-false": refobj = try_get_reference( pred_val, context, schema, default=None) return not bool(refobj) elif pred_key == "ref-exists": refobj = try_get_reference( pred_val, context, schema, default=_SENTINEL) return refobj is not _SENTINEL elif pred_key == "ref-not-exists": refobj = try_get_reference( pred_val, context, schema, default=_SENTINEL) return refobj is _SENTINEL elif pred_key == "equals": if not isinstance(pred_val, (list, tuple)): raise SchemaError("`equals' requires a list as childs") if len(pred_val) != 2: raise SchemaError("`equals' requires a list of len 2") op1 = eval_comparison_operator_operand( pred_val[0], context, schema) op2 = eval_comparison_operator_operand( pred_val[1], context, schema) return op1 == op2 else: raise SchemaError("unknown predicate: {}".format(pred)) elif isinstance(pred, list): # implicit all-of (aka AND) for cv in pred: if not eval_pred(cv, context, schema): return False return True else: return pred def eval_comparison_operator_operand(op, context, schema): if not isinstance(op, dict): raise SchemaError("an operand must be a dict") opkey, opval = _get_one_of(op, "ref", "val", "value") if opkey is None: raise SchemaError("no operant given in {!r}".format(op)) if opkey == "ref": return try_get_reference(opval, context, schema) elif opkey in ("val", "value"): return opval else: assert False def try_get_reference(ref, context, schema, default=None): """Get the object referenced in `ref` Use `context` as data/object context and `schema` as the current schema context. """ uri = rfc3986.URIReference.from_string(ref).normalize() if not uri.scheme: uri = uri.copy_with(scheme="object") if uri.scheme == "object": if ref.startswith("object#"): for attr in ("authority", "path", "query"): if getattr(uri, attr, None) is not None: raise SchemaError( "bogus {} in URI reference `{}'".format(attr, ref)) if uri.fragment is None: raise SchemaError("fragment required in reference") if not uri.fragment: return context.root.root_object elif uri.fragment == '.': return context.current_object parts = uri.fragment.split('.') # use '.' separator as in configmix if parts[0]: # absolute d = context.root.root_object else: # relative d = context.current_object parts = parts[1:] c = context # needed to determine relative object references relative_refs_allowed = True for part in [urllib.parse.unquote(p) for p in parts]: if part: relative_refs_allowed = False try: d = d[part] except (KeyError, IndexError, TypeError): return default else: if not relative_refs_allowed: raise SchemaError( "empty part in path to object reference not allowed") c = c.safe_parent d = c.current_object return d elif uri.scheme == "schema": if not uri.path or (uri.path == SCHEMA_PATH_SELF): s = schema.SELF elif uri.path == SCHEMA_PATH_ROOT: s = schema.ROOT else: s = schema.get_cached_schema( uri.path, load_if_needed=True, data_stream_loader=context.settings.data_stream_loader, schema_loader=context.settings.schema_loader) if uri.fragment is None: raise SchemaError("fragment required in reference") if not uri.fragment.startswith('/'): raise SchemaError("references to parts of a schema must be absolute (begin with `/')") if uri.fragment == '/': return s parts = uri.fragment.split('/') parent_for_subschema = s for part in [urllib.parse.unquote(p) for p in parts[1:]]: try: v = s[part] except (KeyError, IndexError, TypeError): return default else: if isinstance(v, _Schema): pass elif isinstance(v, dict): s = _Schema(parent_for_subschema, False, v) else: # need not try further return default return s else: raise SchemaError("Unknown schema reference scheme: {}".format(uri.scheme)) _DEL_VALUE = '{{::DEL::}}' """Sigil to mark keys to be deleted in the target when merging""" def _merge(user, default): """Logically merge the configuration in `user` into `default`. :param dict user: the new configuration that will be logically merged into `default` :param dict default: the base configuration where `user` is logically merged into :returns: `user` with the necessary amendments from `default`. If `user` is ``None`` then `default` is returned. .. note:: Implementation: The configuration in `user` is augmented/changed **inplace**. If a value in `user` is equal to :data:`._DEL_VALUE` (``{{::DEL::}}``) the corresponding key will be deleted from the merged output. From http://stackoverflow.com/questions/823196/yaml-merge-in-python """ if user is None: _filter_deletions(default) return default if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if k in user: if user[k] == _DEL_VALUE: # do not copy and delete the marker del user[k] else: user[k] = _merge_item(user[k], v) else: user[k] = v else: raise SchemaError("can only merge two dicts on top-level") _filter_deletions(user) return user def _merge_item(user, default): """Recursion helper for :func:`._merge` """ if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if k in user: if user[k] == _DEL_VALUE: # do not copy and delete the marker del user[k] else: user[k] = _merge_item(user[k], v) else: user[k] = v elif isinstance(user, (list, tuple)) and isinstance(default, (list, tuple)): for idx, v in enumerate(default): user.insert(idx, v) return user def _filter_deletions(d): """Recursively filter deletions in the dict `d`. Deletions have values that equal :data:`._DEL_VALUE`. """ if not isinstance(d, dict): return # use a copy of the items because we change `d` while iterating for k, v in list(d.items()): if v == _DEL_VALUE: del d[k] else: if isinstance(d[k], dict): _filter_deletions(d[k]) def _log_problem_cause_all(logger, loglevel, level, problems): if not problems: return for pr in problems: logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) _log_problem_cause_all(logger, loglevel, level+1, pr.cause) def _build_problems_by_level_and_depth(by_level, by_depth, level, problems): for pr in problems: if not pr.cause: continue try: prl = by_level[level] except LookupError: prl = [] by_level[level] = prl prl.append(pr) depth = pr.context.depth try: prd = by_depth[depth] except LookupError: prd = [] by_depth[depth] = prd prd.append(pr) _build_problems_by_level_and_depth( by_level, by_depth, level+1, pr.cause) def _log_problem_cause(logger, loglevel, max_level, max_depth, level, problems): for pr in problems: # # Check whether we will start logging from this level downwards # all problems # if max_level is None or level == max_level: new_max_level = None # trigger logging else: new_max_level = max_level if max_depth is None or max_depth == pr.context.depth: new_max_depth = None # trigger logging else: new_max_depth = max_depth if new_max_level is None or new_max_depth is None: logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) if pr.cause: _log_problem_cause( logger, loglevel, new_max_level, new_max_depth, level+1, pr.cause) def log_problem_cause(logger, loglevel, debug, level, problems): if not problems: return if debug: _log_problem_cause_all(logger, loglevel, level, problems) else: by_level = {} # to determine maximum problem nesting level by_depth = {} # to determine maximum context nexting level _build_problems_by_level_and_depth(by_level, by_depth, level, problems) max_level = max(by_level.keys()) max_depth = max(by_depth.keys()) _log_problem_cause( logger, loglevel, max_level, max_depth, level, problems)
