Mercurial > hgrepos > Python > libs > data-schema
diff data_schema/__init__.py @ 5:84dfd1a94926
Add the existing implementation.
All tests work.
The documentation as text file is included also.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 06 Jul 2023 23:41:41 +0200 |
| parents | |
| children | f4e1b6d6fe63 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data_schema/__init__.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,1564 @@ +# -*- coding: utf-8 -*- +r""" +Object schema validation support. + +Somewhat modelled after JSON schema. + +.. seealso:: https://json-schema.org/understanding-json-schema/index.html + +:Author: Franz Glasner <fzglas.hg@dom66.de> +:Copyright: \(c) 2023 Franz Glasner +:License: BSD 3-Clause "New" or "Revised" License. + See :ref:`LICENSE.txt <license>` for details. +:ID: @(#) $Header$ + +""" + +__version__ = "0.1.dev1" + +__revision__ = "|VCSRevision|" + +__date__ = "|VCSJustDate|" + +__all__ = ["ERROR", "WARNING", "INFO", "ERRORS", "WARNINGS", + "level_name", "problem_message", + "ValidationProblem", "SchemaError", + "validate", + "log_problem_cause"] + + +import ast +import collections +import copy +import datetime +import re +import urllib.parse + +import rfc3986 + +import configmix.yaml + +from .util import get_data_stream + + +def NC_(ctx, msg): + """Mimimum dummy translation support""" + return msg + + +ERROR = 40 +WARNING = 30 +INFO = 20 + +_level_to_name = { + ERROR: "ERROR", + WARNING: "WARNING", + INFO: "INFO", +} +_name_to_level = {name: level for (level, name) in _level_to_name.items()} + +ERRORS = { + 10000: NC_("schema-msg", "dict expected"), + 10001: NC_("schema-msg", "list expected"), + 10002: NC_("schema-msg", "string expected"), + 10003: NC_("schema-msg", "dict key must be a string"), + 10004: NC_("schema-msg", "additional key encountered"), + 10005: NC_("schema-msg", "required key(s) missing"), + 10006: NC_("schema-msg", "min string length encountered"), + 10007: NC_("schema-msg", "max string length exceeded"), + 10008: NC_("schema-msg", "string value does not match the required RE pattern"), + 10009: NC_("schema-msg", "string value does not validate"), + 10010: NC_("schema-msg", "validation error"), + 10011: NC_("schema-msg", "None/Null object expected"), + 10012: NC_("schema-msg", "min list length encountered"), + 10013: NC_("schema-msg", "max list length exceeded"), + 10014: NC_("schema-msg", "tuple expected"), + 10015: NC_("schema-msg", "min tuple length encountered"), + 10016: NC_("schema-msg", "max tuple length exceeded"), + 10017: NC_("schema-msg", "additional items in tuple not allowed"), + 10018: NC_("schema-msg", "object is not empty"), + 10019: NC_("schema-msg", "more than one match in `one-of' detected"), + 10020: NC_("schema-msg", "int expected"), + 10021: NC_("schema-msg", "int value lower than minValue"), + 10022: NC_("schema-msg", "int value greater than maxValue"), + 10023: NC_("schema-msg", "float expected"), + 10024: NC_("schema-msg", "float value lower than minValue"), + 10025: NC_("schema-msg", "float value greater than maxValue"), + 10026: NC_("schema-msg", "boolean value expected"), + 10027: NC_("schema-msg", "boolean true expected"), + 10028: NC_("schema-msg", "boolean false expected"), + 10029: NC_("schema-msg", "`not' expected problems but got none"), + 10030: NC_("schema-msg", "numeric type (int or float) expected"), + 10031: NC_("schema-msg", "numeric value lower than minValue"), + 10032: NC_("schema-msg", "numeric value greater than maxValue"), + 10033: NC_("schema-msg", "a plain scalar value expected"), + 10034: NC_("schema-msg", "dict key does not match required schema"), + 10035: NC_("schema-msg", "binary data expected"), + 10036: NC_("schema-msg", "length of binary data lower than minValue"), + 10037: NC_("schema-msg", "length of binary data exceeds maxValue"), + 10038: NC_("schema-msg", "a set is expected"), + 10039: NC_("schema-msg", "length of set lower than minLength"), + 10040: NC_("schema-msg", "length of set greater than maxLength"), + 10041: NC_("schema-msg", "timestamp expected"), + 10042: NC_("schema-msg", "value of timestamp does not validate"), + 10043: NC_("schema-msg", "enumerated string value expected but not found"), + 10044: NC_("schema-msg", "referenced object doest not exist"), + 10045: NC_("schema-msg", "key is not contained in referenced object"), + 10046: NC_("schema-msg", "referenced object is not a container"), + 10047: NC_("schema-msg", "binary data does not match the required RE pattern"), + 10048: NC_("schema-msg", "enumerated integer value expected but not found"), + 10049: NC_("schema-msg", "enumerated number value expected but not found"), + 10050: NC_("schema-msg", "min dict length encountered"), + 10051: NC_("schema-msg", "max dict length exceeded"), + 10052: NC_("schema-msg", "index constraint violated"), + 10053: NC_("schema-msg", "`one-of' failed"), + 10054: NC_("schema-msg", "failing `one-of' item"), + 10055: NC_("schema-msg", "`any-of' failed"), + 10056: NC_("schema-msg", "failing `any-of' item"), + 10057: NC_("schema-msg", "`all-of' failed"), + 10058: NC_("schema-msg", "failing `all-of' item"), +} + +WARNINGS = { + 80000: NC_("schema-msg", "duplicate dict key"), +} + +if not set(ERRORS.keys()).isdisjoint(set(WARNINGS.keys())): + raise ValueError("ERRORS and WARNINGS must be disjoint") + + +TYPE_RE = type(re.compile(r"\A.+\Z")) + +_SENTINEL = object() + +SCHEMA_REF_KEY = "$ref" +"""Key name for schema references (like a symlink within a schema)""" + +SCHEMA_PATH_ROOT = "$root" +"""URI path to the root schema""" + +SCHEMA_PATH_SELF = "$self" +"""URI path to the current schema""" + + +def level_name(level): + name = _level_to_name.get(level) + if name is None: + name = "Level {}".format(level) + return name + + +def problem_message(pr): + if isinstance(pr, ValidationProblem): + code = getattr(pr, "code", None) + else: + code = pr + msg = ERRORS.get(code, None) + if msg is None: + msg = WARNINGS[code] + return msg + + +class ValidationProblem(object): + + __slots__ = ("code", "severity", "hint", "context", "cause", "index") + + def __init__(self, + code=None, + severity=None, + hint=None, + context=None, + cause=None, + index=None): + if code is not None: + # check validity + if code not in ERRORS and code not in WARNINGS: + raise ValueError( + "unknown validation error code: {}".format(code)) + self.code = code + if severity is None: + # autodetermine + if code in ERRORS: + self.severity = ERROR + elif code in WARNINGS: + self.severity = WARNING + else: + assert False + else: + self.severity = severity + else: + raise TypeError("`code' must be given") + self.hint = hint + self.context = context + if cause: + if not isinstance(cause, (list, tuple, set, frozenset)): + cause = (cause, ) + for c in cause: + if not isinstance(c, ValidationProblem): + raise SchemaError( + "can only nest other `ValidationProblem' instances") + self.cause = cause + self.index = index + + def __repr__(self): + try: + msg = " (" + problem_message(self) + ")" + except LookupError: + msg = "" + if self.index is None: + return "ValidationProblem(code={!r}{}, severity={!r}, hint={}, context=[depth={}]{})".format( + self.code, msg, self.severity, self.hint, self.context.depth, self.context) + else: + return "ValidationProblem(code={!r}{}, severity={!r}, hint={}, context=[depth={}]{}, index={})".format( + self.code, msg, self.severity, self.hint, self.context.depth, self.context, self.index) + + +class SchemaError(Exception): + """An error within the schema itself""" + pass + + +ValidationSettings = collections.namedtuple( + "ValidationSettings", + ["skip_keys", "break_on_keynames_problems"]) + + +class _Schema(dict): + + __slots__ = ("parent", "is_sub_root", "_schema_cache") + + def __init__(self, parent, is_sub_root, *args, **kwds): + super().__init__(*args, **kwds) + if parent is None or isinstance(parent, _Schema): + self.parent = parent + else: + raise TypeError("`_Schema' or `None' expected") + if parent is None: + self._schema_cache = {} + if not is_sub_root: + raise ValueError( + "the root schmema must be a sub-root (aka `$self') also") + self.is_sub_root = True + else: + self.is_sub_root = is_sub_root + + @property + def ROOT(self): + """Get the root schema""" + r = self + while r.parent is not None: + r = r.parent + return r + + @property + def SELF(self): + r = self + while not r.is_sub_root: + r = r.parent + return r + + def copy(self): + return _Schema(self.parent, self.is_sub_root, self) + + def get_child(self, name, default=None): + return self.ensure_child_schema(self.get(name, default)) + + def ensure_child_schema(self, v): + if v is None: + return None + elif isinstance(v, _Schema): + return v + elif isinstance(v, dict): + return _Schema(self, False, v) + else: + return v + + def ensure_list_of_child_schemata(self, v): + if isinstance(v, (list, tuple)): + return [_Schema(self, False, i) for i in v] + else: + return v + + def __eq__(self, other): + if not isinstance(other, _Schema): + return NotImplemented + return (self.parent is other.parent + and bool(self.is_sub_root) == bool(other.is_sub_root) + and dict(self) == dict(other)) + + def __ne__(self, other): + # + # While the default in Python3 is sensible implementing is recommended + # when a built-in __eq__ is overwritten (Raymond Hettinger). + # + # Do not use not self == other because NotImplemented is not handled + # properly in some early Python versions (including Py2). + # + equal = self.__eq__(other) + return NotImplemented if equal is NotImplemented else not equal + + def __copy__(self): + return _Schema(self.parent, self.is_sub_root, self) + + def __deepcopy__(self, memo): + return _Schema(self.parent, + self.is_sub_root, + copy.deepcopy(dict(self), memo)) + + def __str__(self): + return "<_Schema " + super().__str__() + ">" + + def __repr__(self): + return "<_Schema " + super().__repr__() + ">" + + def get_cached_schema(self, key, load_if_needed=True): + root = self.ROOT + s = root._schema_cache.get(key, None) + if s is None and load_if_needed: + with get_data_stream(key) as schemastream: + # load schema a new `$self' (i.e. sub-root is True) + s = _Schema(self, True, configmix.yaml.load(schemastream)) + root._schema_cache[key] = s + return s + + def add_cached_schema(self, key, schema): + r = self.ROOT + assert isinstance(schema, _Schema) + r._schema_cache[key] = schema + + +class Context(object): + + __slots__ = ("_parent", "_key", "_key_index", + "_index", + "root_object", "root_schema", + "_current_object", + "_settings") + + def __init__(self, parent, *, key=_SENTINEL, index=_SENTINEL, + root_object=_SENTINEL, root_schema=_SENTINEL, + current_object=_SENTINEL, + settings=_SENTINEL, + key_index=_SENTINEL): + if parent is None: + if key is not _SENTINEL: + raise TypeError("the root context may not have a key") + if index is not _SENTINEL: + raise TypeError("the root context may not have an index") + if settings is _SENTINEL: + raise TypeError("the root context must have settings") + self.root_object = root_object + if current_object is _SENTINEL: + current_object = root_object + self.root_schema = root_schema + else: + if key is _SENTINEL and index is _SENTINEL: + raise TypeError("one of `key` and `index` must be given in a non-root context") + if root_object is not _SENTINEL: + raise TypeError("non-root context may not have a root object") + if root_schema is not _SENTINEL: + raise TypeError("non-root context may not have a root schema") + if key is not _SENTINEL and index is not _SENTINEL: + raise ValueError("only one of `key` and `index` may be given in a context") + if key_index is not _SENTINEL and key is _SENTINEL: + raise ValueError("when having a `key_index` a `key` also must be given") + self._parent = parent + self._key = key + self._key_index = key_index + self._index = index + self._current_object = current_object + self._settings = settings + + @property + def parent(self): + return self._parent + + @property + def safe_parent(self): + if self.is_root: + raise TypeError("the root context has no parent") + return self.parent + + @property + def root(self): + """Get the root context""" + ctx = self + while not ctx.is_root: + ctx = ctx.parent + return ctx + + @property + def is_root(self): + return not bool(self.parent) + + @property + def key(self): + if self._key is _SENTINEL: + raise AttributeError("no `key' in Context") + return self._key + + @property + def index(self): + if self._index is _SENTINEL: + raise AttributeError("no `index' in Context") + return self._index + + @property + def key_index(self): + if self._key_index is _SENTINEL: + raise AttributeError("no `key_index' in Context") + return self._key_index + + @property + def current_object(self): + if self._current_object is _SENTINEL: + raise AttributeError("no `current_object' in Context") + return self._current_object + + @property + def settings(self): + s = self._settings + return s if s is not _SENTINEL else self.parent.settings + + @property + def depth(self): + if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: + return 0 + n = 0 + ctx = self + while not ctx.is_root: + n += 1 + ctx = ctx.parent + return n + + def __str__(self): + if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: + return "<ROOT>" + chain = [] + ctx = self + while not ctx.is_root: + if ctx._key is not _SENTINEL: + chain.append(str(ctx.key)) + elif ctx._index is not _SENTINEL: + chain.append("INDEX:{}".format(ctx.index)) + else: + chain.append("") + ctx = ctx.parent + chain.reverse() + return " / ".join(chain) + + def __repr__(self): + return "<Context path=`{}'>".format(str(self)) + + +def _get_one_of(d, *keys, default=None, strict=True): + """Get the first found key and its value of `keys` from dict `d`. + + """ + for k in keys: + v = d.get(k, _SENTINEL) + if v is not _SENTINEL: + if strict: + # + # check that all no other key of `keys` besides of `k` is + # in `d` + # + other_keys = set(keys) + other_keys.remove(k) + for k2 in other_keys: + if k2 in d: + raise SchemaError("ambiguous key from: {}".format( + ", ".join(keys))) + return k, v + return None, default + + +def validate(obj, schema, **kwds): + """Validate object `obj` against the *specific* schema `schema`. + + Yields errors and warnings + + """ + settings = { + "skip_keys": None, + "break_on_keynames_problems": True, + } + settings.update(kwds) + if not isinstance(schema, _Schema): + if not isinstance(schema, dict): + raise SchemaError("Schema must be a dict-alike." + " Got: {!r}".format(schema)) + schema = _Schema(None, True, schema) + context = Context(None, root_object=obj, root_schema=schema, + settings=ValidationSettings(**settings)) + yield from _validate(obj, schema, context, is_root=True) + + +def _validate(obj, schema, context, is_root=False): + """Validate object `obj` against the *specific* schema `schema`. + + Yields errors and warnings + + """ + if not isinstance(schema, _Schema): + raise SchemaError("Schema must be a `_Schema'." + " Got: {!r}. Context: {!s}".format(schema, context)) + # 1. Process "cond" or "match" + schema = process_schema_conditionals(schema, context) + # 2. Process "$ref" schema references + schema = process_schema_references( + schema, context, check_single_ref_key=not is_root) + + # 3. Real validation + + # check combinator shortcuts without "type" indirection + combinator, combinator_schema = _get_one_of( + schema, "not", "all-of", "any-of", "one-of") + if combinator is None: + try: + t = schema["type"] + except KeyError: + raise SchemaError("Schema has no `type' key: {!r}." + " Context: {!s}".format(schema, context)) + else: + # + # Construct a temporary schema with the proper indirection for + # the check below + # + t = {"type": {combinator: combinator_schema}} + if combinator_schema is None: + raise SchemaError("a combinator requires a child") + if callable(t): + yield from t(obj, schema, context) + elif t is None: + yield from validate_null(obj, schema, context) + elif isinstance(t, dict): + if len(t) != 1: + raise SchemaError("type dict must be of length 1") + # Check whether a shortcut is already seen above + if combinator is None: + combinator = list(t.keys())[0] + combinator_schema = t[combinator] + if combinator == "not": + yield from validate_not( + obj, schema.ensure_child_schema(combinator_schema), context) + elif combinator == "all-of": + yield from validate_allOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + elif combinator == "any-of": + yield from validate_anyOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + elif combinator == "one-of": + yield from validate_oneOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + else: + raise SchemaError("unknown combinator: {}".format(combinator)) + elif isinstance(t, (list, tuple)): + # a simple list is "any-of" + yield from validate_anyOf( + obj, schema.ensure_list_of_child_schemata(t), context) + elif t in ("dict", "map", "object"): + yield from validate_dict(obj, schema, context) + elif t in ("list", "array",): + yield from validate_list(obj, schema, context) + elif t in ("tuple", "record"): + yield from validate_tuple(obj, schema, context) + elif t in ("set", "frozenset"): + yield from validate_set(obj, schema, context) + elif t in ("string", "str"): + yield from validate_str(obj, schema, context) + elif t in ("deny", ): + yield from validate_deny(obj, schema, context) + elif t in ("accept", ): + yield from validate_accept(obj, schema, context) + elif t in ("none", "null", "nil"): + yield from validate_null(obj, schema, context) + elif t in ("empty", ): + yield from validate_empty(obj, schema, context) + elif t in ("integer", "int"): + yield from validate_integer(obj, schema, context) + elif t in ("float", "real", "double"): + yield from validate_float(obj, schema, context) + elif t in ("number", "num"): + yield from validate_number(obj, schema, context) + elif t in ("bool", "boolean"): + yield from validate_bool(obj, schema, context) + elif t in ("scalar", ): + yield from validate_scalar(obj, schema, context) + elif t in ("binary", ): + yield from validate_binary(obj, schema, context) + elif t in ("timestamp", "datetime"): + yield from validate_timestamp(obj, schema, context) + else: + raise SchemaError("unknown type in schema: {}".format(t)) + + +def _is_in_skip_keys(key, skip_keys): + if not skip_keys: + return False + for sk in skip_keys: + if isinstance(sk, str): + if key == sk: + return True + else: + if sk.search(key): + return True + return False + + +def _is_null_allowed_for_object(obj, schema, context): + if obj is None and schema.get("nullable", False): + return True + return False + + +def _validate_index_constraint(obj, schema, context): + # No evaluation of index constraints for the root context + if context.is_root: + return + try: + index_constraints = schema["index-constraint"] + except KeyError: + return # no constraints + else: + if not isinstance(index_constraints, (list, tuple, set, frozenset)): + index_constraints = [index_constraints] + if not index_constraints: + return + parent = context.safe_parent + try: + effective_index = context.index + except AttributeError: + try: + effective_index = context.key_index + except AttributeError: + raise SchemaError("parent container has no usable index") + for idx in index_constraints: + if idx < 0: + idx = len(parent.current_object) + idx + if idx == effective_index: + break + else: + yield ValidationProblem(code=10052, context=context) + + +def validate_dict(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, dict): + yield ValidationProblem(code=10000, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10050, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10051, hint=obj, context=context) + schema_keys = schema.get("keys", {}) if schema else {} + seen_keys = set() + schema_keynames = schema.get_child("keyNames", None) + idx = -1 + for key, item in obj.items(): + idx += 1 + if schema_keynames is None: + if not isinstance(key, str): + yield ValidationProblem(code=10003, hint=repr(key), context=context) + else: + # validate the key against given schema + new_context = Context(context, key=key, key_index=idx, current_object=key) + key_probs = list(_validate(key, schema_keynames, new_context)) + if key_probs: + yield ValidationProblem( + code=10034, hint=key, context=context, cause=key_probs) + if context.settings.break_on_keynames_problems: + return + if key in seen_keys: + yield ValidationProblem(code=80000, hint=key, context=context) + else: + seen_keys.add(key) + # XXX FIXME: context: new leaf context with new key for recursion + if key in schema_keys: + new_context = Context(context, key=key, key_index=idx, current_object=item) + yield from _validate(item, schema.ensure_child_schema(schema_keys[key]), new_context) + else: + # check whether additional keys are allowed + additional_keys = schema.get_child("additionalKeys", False) + if isinstance(additional_keys, bool): + if not additional_keys: + if not _is_in_skip_keys(key, context.settings.skip_keys): + yield ValidationProblem(code=10004, hint=str(key), context=context) + else: + if not _is_in_skip_keys(key, context.settings.skip_keys): + # try this as the common schema for all the additional keys + new_context = Context(context, key=key, key_index=idx, current_object=item) + yield from _validate(item, additional_keys, new_context) + # check whether all required keys are seen + try: + required_keys = set(schema.get("required", set())) + except (TypeError, ValueError): + raise SchemaError("`required` must be an iterable") + if not required_keys <= seen_keys: + hs = [str(i) for i in required_keys - seen_keys] + yield ValidationProblem(code=10005, hint=sorted(hs), context=context) + + +def validate_list(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (list, tuple)): + yield ValidationProblem(code=10001, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10012, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10013, hint=obj, context=context) + try: + schema_items = schema.ensure_child_schema(schema["items"]) + except KeyError: + schema_items = _Schema(schema, False, {"type": validate_deny}) + for idx, o in enumerate(obj): + new_context = Context(parent=context, index=idx, current_object=o) + yield from _validate(o, schema_items, new_context) + + +def validate_set(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (set, frozenset)): + yield ValidationProblem(code=10038, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10039, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10040, hint=obj, context=context) + try: + schema_items = schema.ensure_child_schema(schema["items"]) + except KeyError: + schema_items = _Schema(schema, False, {"type": validate_deny}) + for o in obj: + new_context = Context(parent=context, key=o, current_object=o) + yield from _validate(o, schema_items, new_context) + + +def validate_tuple(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (list, tuple)): + yield ValidationProblem(code=10014, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10015, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10016, hint=obj, context=context) + schema_items = schema.get("items", []) + if not isinstance(schema_items, (list, tuple)): + raise SchemaError("tuple items require a list of schemata in items") + for idx, o in enumerate(obj): + # early exit at maxlen + if maxlen is not None and idx >= maxlen: + break + new_context = Context(parent=context, index=idx, current_object=o) + try: + schema_index = schema.ensure_child_schema(schema_items[idx]) + except IndexError: + additional_items = schema.get_child("additionalItems", False) + if isinstance(additional_items, bool): + if not additional_items: + yield ValidationProblem(code=10017, context=new_context) + else: + yield from _validate(o, additional_items, new_context) + else: + yield from _validate(o, schema_index, new_context) + + +def validate_str(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, str): + yield ValidationProblem(code=10002, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10043, hint=obj, context=context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10006, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10007, hint=obj, context=context) + pattern = schema.get("pattern", None) + if pattern is not None: + if isinstance(pattern, str): + mo = re.search(pattern, obj) + if not mo: + yield ValidationProblem(code=10008, context=context) + elif isinstance(pattern, TYPE_RE): + mo = pattern.search(obj) + if not mo: + yield ValidationProblem(code=10008, context=context) + elif callable(pattern): + yield from pattern(obj, schema, context) + else: + raise SchemaError("unknown pattern type") + is_contained = schema.get("is-contained-in-ref", None) + if is_contained is not None: + refobj = try_get_reference(is_contained, + context, + schema, + default=_SENTINEL) + if refobj is _SENTINEL: + yield ValidationProblem(code=10044, context=context) + else: + try: + if obj not in refobj: + yield ValidationProblem(code=10045, context=context) + except TypeError: + yield ValidationProblem(code=10046, context=context) + + +def validate_binary(obj, schema, context): + if not isinstance(obj, (bytes, bytearray)): + yield ValidationProblem(code=10035, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10036, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10037, hint=obj, context=context) + pattern = schema.get("pattern", None) + if pattern is not None: + if isinstance(pattern, (str, bytes, bytearray)): + if isinstance(pattern, str): + if "'''" not in pattern: + bytes_pattern = ast.literal_eval( + "b'''" + pattern + "'''") + elif '"""' not in pattern: + bytes_pattern = ast.literal_eval( + 'b"""' + pattern + '"""') + else: + raise SchemaError("incompatible bytes pattern") + else: + bytes_pattern = pattern + mo = re.search(bytes_pattern, obj) + if not mo: + yield ValidationProblem(code=10047, context=context) + elif isinstance(pattern, TYPE_RE): + mo = pattern.search(obj) + if not mo: + yield ValidationProblem(code=10047, context=context) + elif callable(pattern): + yield from pattern(obj, schema, context) + else: + raise SchemaError("unknown pattern type") + + +def validate_timestamp(obj, schema, context): + if not isinstance(obj, datetime.datetime): + yield ValidationProblem(code=10041, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_integer(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, int): + yield ValidationProblem(code=10020, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10021, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10022, hint=obj, context=context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10048, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_float(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, float): + yield ValidationProblem(code=10023, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10024, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10025, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_number(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (int, float)): + yield ValidationProblem(code=10030, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and isinstance(obj, float): + minValue *= 1.0 + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10031, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and isinstance(obj, float): + maxValue *= 1.0 + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10032, hint=obj, context=context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10049, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_scalar(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + yield from _validate_index_constraint(obj, schema, context) + if obj is None: + yield ValidationProblem(code=10033, hint=obj, context=context) + if isinstance(obj, (dict, list, tuple, set, frozenset)): + yield ValidationProblem(code=10033, hint=obj, context=context) + + +def validate_deny(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + yield ValidationProblem(code=10010, context=context) + + +def validate_accept(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + + +def validate_null(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + if obj is not None: + yield ValidationProblem(code=10011, context=context) + + +def validate_empty(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + if obj is None: + return + if isinstance(obj, (dict, list, tuple, set, frozenset)) and not obj: + return + yield ValidationProblem(10018, context=context) + + +def validate_bool(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, bool): + yield ValidationProblem(code=10026, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + elif value and not obj: + yield ValidationProblem(code=10027, hint=obj, context=context) + elif not value and obj: + yield ValidationProblem(code=10028, hint=obj, context=context) + + +def validate_allOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `all-of'") + res = [] + for idx, s in enumerate(schema): + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + res.append((idx, tr, )) + if res: + yield ValidationProblem( + code=10057, + context=context, + cause=[ + ValidationProblem( + code=10058, + context=context, + cause=tr, + index=idx) for (idx, tr) in res]) + + +def validate_anyOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `any-of'") + res = [] + for s in schema: + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + res.append(tr) + else: + # Erfolg: gleich positiv zurueck ohne Meldungen + return + # Ansonsten: alle Fehlschlaege protokollieren + if res: + yield ValidationProblem( + code=10055, + context=context, + cause=[ + ValidationProblem( + code=10056, + context=context, + cause=tr) for tr in res]) + + +def validate_oneOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `one-of'") + success_res = [] + failed_res = [] + for idx, s in enumerate(schema): + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + failed_res.append((idx, tr, )) + else: + success_res.append(idx) + if len(success_res) == 1: + return + elif len(success_res) == 0: + # Ansonsten: alle Fehlschlaege protokollieren + if failed_res: + yield ValidationProblem( + code=10053, + context=context, + cause=[ + ValidationProblem( + code=10054, + context=context, + cause=tr, + index=idx) for (idx, tr) in failed_res]) + else: + # Die Indizes der "zuvielen" in "hint" anzeigen + yield ValidationProblem(code=10019, hint=",".join([str(k) for k in success_res])) + + +def validate_not(obj, schema, context): + assert isinstance(schema, _Schema) + res = list(_validate(obj, schema, context)) + if not res: + yield ValidationProblem(code=10029, hint=obj, context=context, + cause=res) + + +def process_schema_references(schema, context, check_single_ref_key=True): + try: + ref = schema[SCHEMA_REF_KEY] + except (KeyError, TypeError): + return schema + # if `$ref' is found it MUST be the only key + if check_single_ref_key and len(schema) != 1: + raise SchemaError("`{}' must be the single key if it exists") + schema = try_get_reference(ref, context, schema) + if not isinstance(schema, _Schema): + raise SchemaError( + "dereferenced schema is not a `_Schema': {}".format(ref)) + schema = copy.deepcopy(schema) + return process_schema_references(schema, context, check_single_ref_key=True) + + +def process_schema_conditionals(schema, context): + """Lisp-like `cond` to provide schema modifications + + :param schema: the input schema + :param context: the validation context with a valid + `context.root.root_object` + :returns: the processed schema: the schema itself if it is unchanged and + a copy of the schema if has been changed + + """ + what, conds = _get_one_of(schema, "cond", "match", default=None) + if what is None or conds is None: + return schema + if not isinstance(conds, (list, tuple)): + raise SchemaError("the conditions of a cond must be a sequence") + if what == "cond": + return _process_schema_conditionals_cond(schema, conds, context) + elif what == "match": + return _process_schema_conditionals_match(schema, conds, context) + else: + assert False, "unreachable" + + +def _process_schema_conditionals_cond(schema, conds, context): + for cond in conds: + if not isinstance(cond, dict): + raise SchemaError("a single condition must be a dict") + if eval_condition(cond, context, schema): + rep_type, rep_schema = _get_one_of( + cond, "then", "then-replace", "then-merge") + rep_schema = schema.ensure_child_schema(rep_schema) + if rep_type in ("then", "then-replace"): + do_merge = False + elif rep_type == "then-merge": + do_merge = True + else: + raise SchemaError("unknown then type: {}".format(rep_type)) + break + else: + # + # No condition was true: just remove the "cond" to get the + # effective schema. + # + rep_schema = None + do_merge = False + + new_schema = schema.copy() + del new_schema["cond"] + if rep_schema: + rep_schema = process_schema_references(rep_schema, context) + # this could insert a new nested "cond" or "match" again + if do_merge: + rep_schema = copy.deepcopy(rep_schema) + new_schema = _merge(rep_schema, new_schema) + else: + new_schema.update(rep_schema) + # Recursively apply "cond/match" evaluation to the resulting schema + return process_schema_conditionals(new_schema, context) + + +def _process_schema_conditionals_match(schema, conds, context): + rep_schemata = [] + for cond in conds: + if not isinstance(cond, dict): + raise SchemaError("a single condition must be a dict") + if eval_condition(cond, context, schema): + rep_type, rep_schema = _get_one_of( + cond, "then", "then-replace", "then-merge") + rep_schema = schema.ensure_child_schema(rep_schema) + if rep_type in ("then", "then-replace"): + rep_schemata.append((False, rep_schema)) + elif rep_type == "then-merge": + rep_schemata.append((True, rep_schema)) + else: + raise SchemaError("unknown then type: {}".format(rep_type)) + + new_schema = schema.copy() + del new_schema["match"] + for do_merge, rep_schema in rep_schemata: + rep_schema = process_schema_references(rep_schema, context) + # this could insert a new nested "cond" or "match" again + if do_merge: + rep_schema = copy.deepcopy(rep_schema) + new_schema = _merge(rep_schema, new_schema) + else: + new_schema.update(rep_schema) + # Recursively apply "cond/match" evaluation to the resulting schema + return process_schema_conditionals(new_schema, context) + + +def eval_condition(cond, context, schema): + """Eval the condition in `cond` and return a tuple `(hit, predval)` + + """ + pred, predval = _get_one_of( + cond, + "when-ref-true", "when-ref-exists", "when", + default=_SENTINEL) + + if pred == "when": + # rekursive evaluation of `predval` as the real predicate + return eval_pred(predval, context, schema) + elif pred == "when-ref-true": + refobj = try_get_reference(predval, context, schema, default=None) + return bool(refobj) + elif pred == "when-ref-exists": + refobj = try_get_reference(predval, context, schema, default=_SENTINEL) + return refobj is not _SENTINEL + else: + raise SchemaError("unknown condition type: {}".format(pred)) + + +def eval_pred(pred, context, schema): + if isinstance(pred, dict): + combinator, combinator_val = _get_one_of( + pred, + "not", "all-of", "any-of", "one-of", + default=None) + if combinator: + if combinator == "not": + return not eval_pred(combinator_val, context, schema) + elif combinator == "all-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`all-of' requires a list of childs") + for cv in combinator_val: + if not eval_pred(cv, context, schema): + return False + return True + elif combinator == "any-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`any-of' requires a list of childs") + for cv in combinator_val: + if eval_pred(cv, context, schema): + return True + return False + elif combinator == "one-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`one-of' requires a list of childs") + num_true = 0 + for cv in combinator_val: + if eval_pred(cv, context, schema): + num_true += 1 + # shortcut + if num_true > 1: + return False + if num_true == 1: + return True + else: + return False + else: + raise SchemaError( + "unknown logical operator: {}".format(combinator)) + else: + pred_key, pred_val = _get_one_of( + pred, + "ref-true", "ref-exists", "equals", + default=None) + if pred_key == "ref-true": + refobj = try_get_reference( + pred_val, context, schema, default=None) + return bool(refobj) + elif pred_key == "ref-exists": + refobj = try_get_reference( + pred_val, context, schema, default=_SENTINEL) + return refobj is not _SENTINEL + elif pred_key == "equals": + if not isinstance(pred_val, (list, tuple)): + raise SchemaError("`equals' requires a list as childs") + if len(pred_val) != 2: + raise SchemaError("`equals' requires a list of len 2") + op1 = eval_comparison_operator_operand( + pred_val[0], context, schema) + op2 = eval_comparison_operator_operand( + pred_val[1], context, schema) + return op1 == op2 + else: + raise SchemaError("unknown predicate: {}".format(pred)) + elif isinstance(pred, list): + # implicit all-of (aka AND) + for cv in pred: + if not eval_pred(cv, context, schema): + return False + return True + else: + return pred + + +def eval_comparison_operator_operand(op, context, schema): + if not isinstance(op, dict): + raise SchemaError("an operand must be a dict") + opkey, opval = _get_one_of(op, "ref", "val", "value") + if opkey is None: + raise SchemaError("no operant given in {!r}".format(op)) + if opkey == "ref": + return try_get_reference(opval, context, schema) + elif opkey in ("val", "value"): + return opval + else: + assert False + + +def try_get_reference(ref, context, schema, default=None): + """Get the object referenced in `ref` + + Use `context` as data/object context and `schema` as the current schema + context. + + """ + uri = rfc3986.URIReference.from_string(ref).normalize() + if not uri.scheme: + uri = uri.copy_with(scheme="object") + if uri.scheme == "object": + if ref.startswith("object#"): + for attr in ("authority", "path", "query"): + if getattr(uri, attr, None) is not None: + raise SchemaError( + "bogus {} in URI reference `{}'".format(attr, ref)) + if uri.fragment is None: + raise SchemaError("fragment required in reference") + if not uri.fragment: + return context.root.root_object + elif uri.fragment == '.': + return context.current_object + parts = uri.fragment.split('.') # use '.' separator as in configmix + if parts[0]: + # absolute + d = context.root.root_object + else: + # relative + d = context.current_object + parts = parts[1:] + c = context # needed to determine relative object references + relative_refs_allowed = True + for part in [urllib.parse.unquote(p) for p in parts]: + if part: + relative_refs_allowed = False + try: + d = d[part] + except (KeyError, IndexError, TypeError): + return default + else: + if not relative_refs_allowed: + raise SchemaError( + "empty part in path to object reference not allowed") + c = c.safe_parent + d = c.current_object + return d + elif uri.scheme == "schema": + if not uri.path or (uri.path == SCHEMA_PATH_SELF): + s = schema.SELF + elif uri.path == SCHEMA_PATH_ROOT: + s = schema.ROOT + else: + s = schema.get_cached_schema(uri.path, load_if_needed=True) + if uri.fragment is None: + raise SchemaError("fragment required in reference") + + if not uri.fragment.startswith('/'): + raise SchemaError("references to parts of a schema must be absolute (begin with `/')") + if uri.fragment == '/': + return s + parts = uri.fragment.split('/') + parent_for_subschema = s + for part in [urllib.parse.unquote(p) for p in parts[1:]]: + try: + v = s[part] + except (KeyError, IndexError, TypeError): + return default + else: + if isinstance(v, _Schema): + pass + elif isinstance(v, dict): + s = _Schema(parent_for_subschema, False, v) + else: + # need not try further + return default + return s + else: + raise SchemaError("Unknown schema reference scheme: {}".format(uri.scheme)) + + +_DEL_VALUE = '{{::DEL::}}' +"""Sigil to mark keys to be deleted in the target when merging""" + + +def _merge(user, default): + """Logically merge the configuration in `user` into `default`. + + :param dict user: + the new configuration that will be logically merged + into `default` + :param dict default: + the base configuration where `user` is logically merged into + :returns: `user` with the necessary amendments from `default`. + If `user` is ``None`` then `default` is returned. + + .. note:: Implementation: The configuration in `user` is + augmented/changed **inplace**. + + If a value in `user` is equal to :data:`._DEL_VALUE` + (``{{::DEL::}}``) the corresponding key will be deleted from the + merged output. + + From http://stackoverflow.com/questions/823196/yaml-merge-in-python + + """ + if user is None: + _filter_deletions(default) + return default + if isinstance(user, dict) and isinstance(default, dict): + for k, v in default.items(): + if k in user: + if user[k] == _DEL_VALUE: + # do not copy and delete the marker + del user[k] + else: + user[k] = _merge_item(user[k], v) + else: + user[k] = v + else: + raise SchemaError("can only merge two dicts on top-level") + _filter_deletions(user) + return user + + +def _merge_item(user, default): + """Recursion helper for :func:`._merge` + + """ + if isinstance(user, dict) and isinstance(default, dict): + for k, v in default.items(): + if k in user: + if user[k] == _DEL_VALUE: + # do not copy and delete the marker + del user[k] + else: + user[k] = _merge_item(user[k], v) + else: + user[k] = v + elif isinstance(user, (list, tuple)) and isinstance(default, (list, tuple)): + for idx, v in enumerate(default): + user.insert(idx, v) + return user + + +def _filter_deletions(d): + """Recursively filter deletions in the dict `d`. + + Deletions have values that equal :data:`._DEL_VALUE`. + + """ + if not isinstance(d, dict): + return + # use a copy of the items because we change `d` while iterating + for k, v in list(d.items()): + if v == _DEL_VALUE: + del d[k] + else: + if isinstance(d[k], dict): + _filter_deletions(d[k]) + + +def _log_problem_cause_all(logger, loglevel, level, problems): + if not problems: + return + for pr in problems: + logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) + _log_problem_cause_all(logger, loglevel, level+1, pr.cause) + + +def _build_problems_by_level_and_depth(by_level, by_depth, level, problems): + for pr in problems: + if not pr.cause: + continue + try: + prl = by_level[level] + except LookupError: + prl= [] + by_level[level] = prl + prl.append(pr) + + depth = pr.context.depth + try: + prd = by_depth[depth] + except LookupError: + prd= [] + by_depth[depth] = prd + prd.append(pr) + _build_problems_by_level_and_depth( + by_level, by_depth, level+1, pr.cause) + + +def _log_problem_cause(logger, loglevel, max_level, max_depth, level, problems): + for pr in problems: + # + # Check whether we will start logging from this level downwards + # all problems + # + if max_level is None or level == max_level: + new_max_level = None # trigger logging + else: + new_max_level = max_level + if max_depth is None or max_depth == pr.context.depth: + new_max_depth = None # trigger logging + else: + new_max_depth = max_depth + if new_max_level is None or new_max_depth is None: + logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) + if pr.cause: + _log_problem_cause( + logger, loglevel, + new_max_level, new_max_depth, + level+1, pr.cause) + + +def log_problem_cause(logger, loglevel, debug, level, problems): + if not problems: + return + if debug: + _log_problem_cause_all(logger, loglevel, level, problems) + else: + by_level = {} # to determine maximum problem nesting level + by_depth = {} # to determine maximum context nexting level + _build_problems_by_level_and_depth(by_level, by_depth, level, problems) + + max_level = max(by_level.keys()) + max_depth = max(by_depth.keys()) + + _log_problem_cause( + logger, loglevel, max_level, max_depth, level, problems)
