Mercurial > hgrepos > Python > libs > data-schema
changeset 5:84dfd1a94926
Add the existing implementation.
All tests work.
The documentation as text file is included also.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 06 Jul 2023 23:41:41 +0200 |
| parents | d715f0c13c60 |
| children | fc18f1cb3309 |
| files | data_schema/__init__.py data_schema/util.py docs/schema.txt tests/_config.py tests/schemata/test1.schema.yml tests/test_schema.py tests/testschematalib/__init__.py tests/testschematalib/packagedata/__init__.py tests/testschematalib/packagedata/test2.schema.yml |
| diffstat | 9 files changed, 4700 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data_schema/__init__.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,1564 @@ +# -*- coding: utf-8 -*- +r""" +Object schema validation support. + +Somewhat modelled after JSON schema. + +.. seealso:: https://json-schema.org/understanding-json-schema/index.html + +:Author: Franz Glasner <fzglas.hg@dom66.de> +:Copyright: \(c) 2023 Franz Glasner +:License: BSD 3-Clause "New" or "Revised" License. + See :ref:`LICENSE.txt <license>` for details. +:ID: @(#) $Header$ + +""" + +__version__ = "0.1.dev1" + +__revision__ = "|VCSRevision|" + +__date__ = "|VCSJustDate|" + +__all__ = ["ERROR", "WARNING", "INFO", "ERRORS", "WARNINGS", + "level_name", "problem_message", + "ValidationProblem", "SchemaError", + "validate", + "log_problem_cause"] + + +import ast +import collections +import copy +import datetime +import re +import urllib.parse + +import rfc3986 + +import configmix.yaml + +from .util import get_data_stream + + +def NC_(ctx, msg): + """Mimimum dummy translation support""" + return msg + + +ERROR = 40 +WARNING = 30 +INFO = 20 + +_level_to_name = { + ERROR: "ERROR", + WARNING: "WARNING", + INFO: "INFO", +} +_name_to_level = {name: level for (level, name) in _level_to_name.items()} + +ERRORS = { + 10000: NC_("schema-msg", "dict expected"), + 10001: NC_("schema-msg", "list expected"), + 10002: NC_("schema-msg", "string expected"), + 10003: NC_("schema-msg", "dict key must be a string"), + 10004: NC_("schema-msg", "additional key encountered"), + 10005: NC_("schema-msg", "required key(s) missing"), + 10006: NC_("schema-msg", "min string length encountered"), + 10007: NC_("schema-msg", "max string length exceeded"), + 10008: NC_("schema-msg", "string value does not match the required RE pattern"), + 10009: NC_("schema-msg", "string value does not validate"), + 10010: NC_("schema-msg", "validation error"), + 10011: NC_("schema-msg", "None/Null object expected"), + 10012: NC_("schema-msg", "min list length encountered"), + 10013: NC_("schema-msg", "max list length exceeded"), + 10014: NC_("schema-msg", "tuple expected"), + 10015: NC_("schema-msg", "min tuple length encountered"), + 10016: NC_("schema-msg", "max tuple length exceeded"), + 10017: NC_("schema-msg", "additional items in tuple not allowed"), + 10018: NC_("schema-msg", "object is not empty"), + 10019: NC_("schema-msg", "more than one match in `one-of' detected"), + 10020: NC_("schema-msg", "int expected"), + 10021: NC_("schema-msg", "int value lower than minValue"), + 10022: NC_("schema-msg", "int value greater than maxValue"), + 10023: NC_("schema-msg", "float expected"), + 10024: NC_("schema-msg", "float value lower than minValue"), + 10025: NC_("schema-msg", "float value greater than maxValue"), + 10026: NC_("schema-msg", "boolean value expected"), + 10027: NC_("schema-msg", "boolean true expected"), + 10028: NC_("schema-msg", "boolean false expected"), + 10029: NC_("schema-msg", "`not' expected problems but got none"), + 10030: NC_("schema-msg", "numeric type (int or float) expected"), + 10031: NC_("schema-msg", "numeric value lower than minValue"), + 10032: NC_("schema-msg", "numeric value greater than maxValue"), + 10033: NC_("schema-msg", "a plain scalar value expected"), + 10034: NC_("schema-msg", "dict key does not match required schema"), + 10035: NC_("schema-msg", "binary data expected"), + 10036: NC_("schema-msg", "length of binary data lower than minValue"), + 10037: NC_("schema-msg", "length of binary data exceeds maxValue"), + 10038: NC_("schema-msg", "a set is expected"), + 10039: NC_("schema-msg", "length of set lower than minLength"), + 10040: NC_("schema-msg", "length of set greater than maxLength"), + 10041: NC_("schema-msg", "timestamp expected"), + 10042: NC_("schema-msg", "value of timestamp does not validate"), + 10043: NC_("schema-msg", "enumerated string value expected but not found"), + 10044: NC_("schema-msg", "referenced object doest not exist"), + 10045: NC_("schema-msg", "key is not contained in referenced object"), + 10046: NC_("schema-msg", "referenced object is not a container"), + 10047: NC_("schema-msg", "binary data does not match the required RE pattern"), + 10048: NC_("schema-msg", "enumerated integer value expected but not found"), + 10049: NC_("schema-msg", "enumerated number value expected but not found"), + 10050: NC_("schema-msg", "min dict length encountered"), + 10051: NC_("schema-msg", "max dict length exceeded"), + 10052: NC_("schema-msg", "index constraint violated"), + 10053: NC_("schema-msg", "`one-of' failed"), + 10054: NC_("schema-msg", "failing `one-of' item"), + 10055: NC_("schema-msg", "`any-of' failed"), + 10056: NC_("schema-msg", "failing `any-of' item"), + 10057: NC_("schema-msg", "`all-of' failed"), + 10058: NC_("schema-msg", "failing `all-of' item"), +} + +WARNINGS = { + 80000: NC_("schema-msg", "duplicate dict key"), +} + +if not set(ERRORS.keys()).isdisjoint(set(WARNINGS.keys())): + raise ValueError("ERRORS and WARNINGS must be disjoint") + + +TYPE_RE = type(re.compile(r"\A.+\Z")) + +_SENTINEL = object() + +SCHEMA_REF_KEY = "$ref" +"""Key name for schema references (like a symlink within a schema)""" + +SCHEMA_PATH_ROOT = "$root" +"""URI path to the root schema""" + +SCHEMA_PATH_SELF = "$self" +"""URI path to the current schema""" + + +def level_name(level): + name = _level_to_name.get(level) + if name is None: + name = "Level {}".format(level) + return name + + +def problem_message(pr): + if isinstance(pr, ValidationProblem): + code = getattr(pr, "code", None) + else: + code = pr + msg = ERRORS.get(code, None) + if msg is None: + msg = WARNINGS[code] + return msg + + +class ValidationProblem(object): + + __slots__ = ("code", "severity", "hint", "context", "cause", "index") + + def __init__(self, + code=None, + severity=None, + hint=None, + context=None, + cause=None, + index=None): + if code is not None: + # check validity + if code not in ERRORS and code not in WARNINGS: + raise ValueError( + "unknown validation error code: {}".format(code)) + self.code = code + if severity is None: + # autodetermine + if code in ERRORS: + self.severity = ERROR + elif code in WARNINGS: + self.severity = WARNING + else: + assert False + else: + self.severity = severity + else: + raise TypeError("`code' must be given") + self.hint = hint + self.context = context + if cause: + if not isinstance(cause, (list, tuple, set, frozenset)): + cause = (cause, ) + for c in cause: + if not isinstance(c, ValidationProblem): + raise SchemaError( + "can only nest other `ValidationProblem' instances") + self.cause = cause + self.index = index + + def __repr__(self): + try: + msg = " (" + problem_message(self) + ")" + except LookupError: + msg = "" + if self.index is None: + return "ValidationProblem(code={!r}{}, severity={!r}, hint={}, context=[depth={}]{})".format( + self.code, msg, self.severity, self.hint, self.context.depth, self.context) + else: + return "ValidationProblem(code={!r}{}, severity={!r}, hint={}, context=[depth={}]{}, index={})".format( + self.code, msg, self.severity, self.hint, self.context.depth, self.context, self.index) + + +class SchemaError(Exception): + """An error within the schema itself""" + pass + + +ValidationSettings = collections.namedtuple( + "ValidationSettings", + ["skip_keys", "break_on_keynames_problems"]) + + +class _Schema(dict): + + __slots__ = ("parent", "is_sub_root", "_schema_cache") + + def __init__(self, parent, is_sub_root, *args, **kwds): + super().__init__(*args, **kwds) + if parent is None or isinstance(parent, _Schema): + self.parent = parent + else: + raise TypeError("`_Schema' or `None' expected") + if parent is None: + self._schema_cache = {} + if not is_sub_root: + raise ValueError( + "the root schmema must be a sub-root (aka `$self') also") + self.is_sub_root = True + else: + self.is_sub_root = is_sub_root + + @property + def ROOT(self): + """Get the root schema""" + r = self + while r.parent is not None: + r = r.parent + return r + + @property + def SELF(self): + r = self + while not r.is_sub_root: + r = r.parent + return r + + def copy(self): + return _Schema(self.parent, self.is_sub_root, self) + + def get_child(self, name, default=None): + return self.ensure_child_schema(self.get(name, default)) + + def ensure_child_schema(self, v): + if v is None: + return None + elif isinstance(v, _Schema): + return v + elif isinstance(v, dict): + return _Schema(self, False, v) + else: + return v + + def ensure_list_of_child_schemata(self, v): + if isinstance(v, (list, tuple)): + return [_Schema(self, False, i) for i in v] + else: + return v + + def __eq__(self, other): + if not isinstance(other, _Schema): + return NotImplemented + return (self.parent is other.parent + and bool(self.is_sub_root) == bool(other.is_sub_root) + and dict(self) == dict(other)) + + def __ne__(self, other): + # + # While the default in Python3 is sensible implementing is recommended + # when a built-in __eq__ is overwritten (Raymond Hettinger). + # + # Do not use not self == other because NotImplemented is not handled + # properly in some early Python versions (including Py2). + # + equal = self.__eq__(other) + return NotImplemented if equal is NotImplemented else not equal + + def __copy__(self): + return _Schema(self.parent, self.is_sub_root, self) + + def __deepcopy__(self, memo): + return _Schema(self.parent, + self.is_sub_root, + copy.deepcopy(dict(self), memo)) + + def __str__(self): + return "<_Schema " + super().__str__() + ">" + + def __repr__(self): + return "<_Schema " + super().__repr__() + ">" + + def get_cached_schema(self, key, load_if_needed=True): + root = self.ROOT + s = root._schema_cache.get(key, None) + if s is None and load_if_needed: + with get_data_stream(key) as schemastream: + # load schema a new `$self' (i.e. sub-root is True) + s = _Schema(self, True, configmix.yaml.load(schemastream)) + root._schema_cache[key] = s + return s + + def add_cached_schema(self, key, schema): + r = self.ROOT + assert isinstance(schema, _Schema) + r._schema_cache[key] = schema + + +class Context(object): + + __slots__ = ("_parent", "_key", "_key_index", + "_index", + "root_object", "root_schema", + "_current_object", + "_settings") + + def __init__(self, parent, *, key=_SENTINEL, index=_SENTINEL, + root_object=_SENTINEL, root_schema=_SENTINEL, + current_object=_SENTINEL, + settings=_SENTINEL, + key_index=_SENTINEL): + if parent is None: + if key is not _SENTINEL: + raise TypeError("the root context may not have a key") + if index is not _SENTINEL: + raise TypeError("the root context may not have an index") + if settings is _SENTINEL: + raise TypeError("the root context must have settings") + self.root_object = root_object + if current_object is _SENTINEL: + current_object = root_object + self.root_schema = root_schema + else: + if key is _SENTINEL and index is _SENTINEL: + raise TypeError("one of `key` and `index` must be given in a non-root context") + if root_object is not _SENTINEL: + raise TypeError("non-root context may not have a root object") + if root_schema is not _SENTINEL: + raise TypeError("non-root context may not have a root schema") + if key is not _SENTINEL and index is not _SENTINEL: + raise ValueError("only one of `key` and `index` may be given in a context") + if key_index is not _SENTINEL and key is _SENTINEL: + raise ValueError("when having a `key_index` a `key` also must be given") + self._parent = parent + self._key = key + self._key_index = key_index + self._index = index + self._current_object = current_object + self._settings = settings + + @property + def parent(self): + return self._parent + + @property + def safe_parent(self): + if self.is_root: + raise TypeError("the root context has no parent") + return self.parent + + @property + def root(self): + """Get the root context""" + ctx = self + while not ctx.is_root: + ctx = ctx.parent + return ctx + + @property + def is_root(self): + return not bool(self.parent) + + @property + def key(self): + if self._key is _SENTINEL: + raise AttributeError("no `key' in Context") + return self._key + + @property + def index(self): + if self._index is _SENTINEL: + raise AttributeError("no `index' in Context") + return self._index + + @property + def key_index(self): + if self._key_index is _SENTINEL: + raise AttributeError("no `key_index' in Context") + return self._key_index + + @property + def current_object(self): + if self._current_object is _SENTINEL: + raise AttributeError("no `current_object' in Context") + return self._current_object + + @property + def settings(self): + s = self._settings + return s if s is not _SENTINEL else self.parent.settings + + @property + def depth(self): + if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: + return 0 + n = 0 + ctx = self + while not ctx.is_root: + n += 1 + ctx = ctx.parent + return n + + def __str__(self): + if self._key is _SENTINEL and self._index is _SENTINEL and self.is_root: + return "<ROOT>" + chain = [] + ctx = self + while not ctx.is_root: + if ctx._key is not _SENTINEL: + chain.append(str(ctx.key)) + elif ctx._index is not _SENTINEL: + chain.append("INDEX:{}".format(ctx.index)) + else: + chain.append("") + ctx = ctx.parent + chain.reverse() + return " / ".join(chain) + + def __repr__(self): + return "<Context path=`{}'>".format(str(self)) + + +def _get_one_of(d, *keys, default=None, strict=True): + """Get the first found key and its value of `keys` from dict `d`. + + """ + for k in keys: + v = d.get(k, _SENTINEL) + if v is not _SENTINEL: + if strict: + # + # check that all no other key of `keys` besides of `k` is + # in `d` + # + other_keys = set(keys) + other_keys.remove(k) + for k2 in other_keys: + if k2 in d: + raise SchemaError("ambiguous key from: {}".format( + ", ".join(keys))) + return k, v + return None, default + + +def validate(obj, schema, **kwds): + """Validate object `obj` against the *specific* schema `schema`. + + Yields errors and warnings + + """ + settings = { + "skip_keys": None, + "break_on_keynames_problems": True, + } + settings.update(kwds) + if not isinstance(schema, _Schema): + if not isinstance(schema, dict): + raise SchemaError("Schema must be a dict-alike." + " Got: {!r}".format(schema)) + schema = _Schema(None, True, schema) + context = Context(None, root_object=obj, root_schema=schema, + settings=ValidationSettings(**settings)) + yield from _validate(obj, schema, context, is_root=True) + + +def _validate(obj, schema, context, is_root=False): + """Validate object `obj` against the *specific* schema `schema`. + + Yields errors and warnings + + """ + if not isinstance(schema, _Schema): + raise SchemaError("Schema must be a `_Schema'." + " Got: {!r}. Context: {!s}".format(schema, context)) + # 1. Process "cond" or "match" + schema = process_schema_conditionals(schema, context) + # 2. Process "$ref" schema references + schema = process_schema_references( + schema, context, check_single_ref_key=not is_root) + + # 3. Real validation + + # check combinator shortcuts without "type" indirection + combinator, combinator_schema = _get_one_of( + schema, "not", "all-of", "any-of", "one-of") + if combinator is None: + try: + t = schema["type"] + except KeyError: + raise SchemaError("Schema has no `type' key: {!r}." + " Context: {!s}".format(schema, context)) + else: + # + # Construct a temporary schema with the proper indirection for + # the check below + # + t = {"type": {combinator: combinator_schema}} + if combinator_schema is None: + raise SchemaError("a combinator requires a child") + if callable(t): + yield from t(obj, schema, context) + elif t is None: + yield from validate_null(obj, schema, context) + elif isinstance(t, dict): + if len(t) != 1: + raise SchemaError("type dict must be of length 1") + # Check whether a shortcut is already seen above + if combinator is None: + combinator = list(t.keys())[0] + combinator_schema = t[combinator] + if combinator == "not": + yield from validate_not( + obj, schema.ensure_child_schema(combinator_schema), context) + elif combinator == "all-of": + yield from validate_allOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + elif combinator == "any-of": + yield from validate_anyOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + elif combinator == "one-of": + yield from validate_oneOf( + obj, + schema.ensure_list_of_child_schemata(combinator_schema), + context) + else: + raise SchemaError("unknown combinator: {}".format(combinator)) + elif isinstance(t, (list, tuple)): + # a simple list is "any-of" + yield from validate_anyOf( + obj, schema.ensure_list_of_child_schemata(t), context) + elif t in ("dict", "map", "object"): + yield from validate_dict(obj, schema, context) + elif t in ("list", "array",): + yield from validate_list(obj, schema, context) + elif t in ("tuple", "record"): + yield from validate_tuple(obj, schema, context) + elif t in ("set", "frozenset"): + yield from validate_set(obj, schema, context) + elif t in ("string", "str"): + yield from validate_str(obj, schema, context) + elif t in ("deny", ): + yield from validate_deny(obj, schema, context) + elif t in ("accept", ): + yield from validate_accept(obj, schema, context) + elif t in ("none", "null", "nil"): + yield from validate_null(obj, schema, context) + elif t in ("empty", ): + yield from validate_empty(obj, schema, context) + elif t in ("integer", "int"): + yield from validate_integer(obj, schema, context) + elif t in ("float", "real", "double"): + yield from validate_float(obj, schema, context) + elif t in ("number", "num"): + yield from validate_number(obj, schema, context) + elif t in ("bool", "boolean"): + yield from validate_bool(obj, schema, context) + elif t in ("scalar", ): + yield from validate_scalar(obj, schema, context) + elif t in ("binary", ): + yield from validate_binary(obj, schema, context) + elif t in ("timestamp", "datetime"): + yield from validate_timestamp(obj, schema, context) + else: + raise SchemaError("unknown type in schema: {}".format(t)) + + +def _is_in_skip_keys(key, skip_keys): + if not skip_keys: + return False + for sk in skip_keys: + if isinstance(sk, str): + if key == sk: + return True + else: + if sk.search(key): + return True + return False + + +def _is_null_allowed_for_object(obj, schema, context): + if obj is None and schema.get("nullable", False): + return True + return False + + +def _validate_index_constraint(obj, schema, context): + # No evaluation of index constraints for the root context + if context.is_root: + return + try: + index_constraints = schema["index-constraint"] + except KeyError: + return # no constraints + else: + if not isinstance(index_constraints, (list, tuple, set, frozenset)): + index_constraints = [index_constraints] + if not index_constraints: + return + parent = context.safe_parent + try: + effective_index = context.index + except AttributeError: + try: + effective_index = context.key_index + except AttributeError: + raise SchemaError("parent container has no usable index") + for idx in index_constraints: + if idx < 0: + idx = len(parent.current_object) + idx + if idx == effective_index: + break + else: + yield ValidationProblem(code=10052, context=context) + + +def validate_dict(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, dict): + yield ValidationProblem(code=10000, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10050, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10051, hint=obj, context=context) + schema_keys = schema.get("keys", {}) if schema else {} + seen_keys = set() + schema_keynames = schema.get_child("keyNames", None) + idx = -1 + for key, item in obj.items(): + idx += 1 + if schema_keynames is None: + if not isinstance(key, str): + yield ValidationProblem(code=10003, hint=repr(key), context=context) + else: + # validate the key against given schema + new_context = Context(context, key=key, key_index=idx, current_object=key) + key_probs = list(_validate(key, schema_keynames, new_context)) + if key_probs: + yield ValidationProblem( + code=10034, hint=key, context=context, cause=key_probs) + if context.settings.break_on_keynames_problems: + return + if key in seen_keys: + yield ValidationProblem(code=80000, hint=key, context=context) + else: + seen_keys.add(key) + # XXX FIXME: context: new leaf context with new key for recursion + if key in schema_keys: + new_context = Context(context, key=key, key_index=idx, current_object=item) + yield from _validate(item, schema.ensure_child_schema(schema_keys[key]), new_context) + else: + # check whether additional keys are allowed + additional_keys = schema.get_child("additionalKeys", False) + if isinstance(additional_keys, bool): + if not additional_keys: + if not _is_in_skip_keys(key, context.settings.skip_keys): + yield ValidationProblem(code=10004, hint=str(key), context=context) + else: + if not _is_in_skip_keys(key, context.settings.skip_keys): + # try this as the common schema for all the additional keys + new_context = Context(context, key=key, key_index=idx, current_object=item) + yield from _validate(item, additional_keys, new_context) + # check whether all required keys are seen + try: + required_keys = set(schema.get("required", set())) + except (TypeError, ValueError): + raise SchemaError("`required` must be an iterable") + if not required_keys <= seen_keys: + hs = [str(i) for i in required_keys - seen_keys] + yield ValidationProblem(code=10005, hint=sorted(hs), context=context) + + +def validate_list(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (list, tuple)): + yield ValidationProblem(code=10001, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10012, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10013, hint=obj, context=context) + try: + schema_items = schema.ensure_child_schema(schema["items"]) + except KeyError: + schema_items = _Schema(schema, False, {"type": validate_deny}) + for idx, o in enumerate(obj): + new_context = Context(parent=context, index=idx, current_object=o) + yield from _validate(o, schema_items, new_context) + + +def validate_set(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (set, frozenset)): + yield ValidationProblem(code=10038, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10039, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10040, hint=obj, context=context) + try: + schema_items = schema.ensure_child_schema(schema["items"]) + except KeyError: + schema_items = _Schema(schema, False, {"type": validate_deny}) + for o in obj: + new_context = Context(parent=context, key=o, current_object=o) + yield from _validate(o, schema_items, new_context) + + +def validate_tuple(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (list, tuple)): + yield ValidationProblem(code=10014, hint="got: {}".format(type(obj).__name__), context=context) + return + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10015, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10016, hint=obj, context=context) + schema_items = schema.get("items", []) + if not isinstance(schema_items, (list, tuple)): + raise SchemaError("tuple items require a list of schemata in items") + for idx, o in enumerate(obj): + # early exit at maxlen + if maxlen is not None and idx >= maxlen: + break + new_context = Context(parent=context, index=idx, current_object=o) + try: + schema_index = schema.ensure_child_schema(schema_items[idx]) + except IndexError: + additional_items = schema.get_child("additionalItems", False) + if isinstance(additional_items, bool): + if not additional_items: + yield ValidationProblem(code=10017, context=new_context) + else: + yield from _validate(o, additional_items, new_context) + else: + yield from _validate(o, schema_index, new_context) + + +def validate_str(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, str): + yield ValidationProblem(code=10002, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10043, hint=obj, context=context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10006, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10007, hint=obj, context=context) + pattern = schema.get("pattern", None) + if pattern is not None: + if isinstance(pattern, str): + mo = re.search(pattern, obj) + if not mo: + yield ValidationProblem(code=10008, context=context) + elif isinstance(pattern, TYPE_RE): + mo = pattern.search(obj) + if not mo: + yield ValidationProblem(code=10008, context=context) + elif callable(pattern): + yield from pattern(obj, schema, context) + else: + raise SchemaError("unknown pattern type") + is_contained = schema.get("is-contained-in-ref", None) + if is_contained is not None: + refobj = try_get_reference(is_contained, + context, + schema, + default=_SENTINEL) + if refobj is _SENTINEL: + yield ValidationProblem(code=10044, context=context) + else: + try: + if obj not in refobj: + yield ValidationProblem(code=10045, context=context) + except TypeError: + yield ValidationProblem(code=10046, context=context) + + +def validate_binary(obj, schema, context): + if not isinstance(obj, (bytes, bytearray)): + yield ValidationProblem(code=10035, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minlen = schema.get("minLength", None) + if minlen: + if len(obj) < minlen: + yield ValidationProblem(code=10036, hint=obj, context=context) + maxlen = schema.get("maxLength", None) + if maxlen is not None: + if len(obj) > maxlen: + yield ValidationProblem(code=10037, hint=obj, context=context) + pattern = schema.get("pattern", None) + if pattern is not None: + if isinstance(pattern, (str, bytes, bytearray)): + if isinstance(pattern, str): + if "'''" not in pattern: + bytes_pattern = ast.literal_eval( + "b'''" + pattern + "'''") + elif '"""' not in pattern: + bytes_pattern = ast.literal_eval( + 'b"""' + pattern + '"""') + else: + raise SchemaError("incompatible bytes pattern") + else: + bytes_pattern = pattern + mo = re.search(bytes_pattern, obj) + if not mo: + yield ValidationProblem(code=10047, context=context) + elif isinstance(pattern, TYPE_RE): + mo = pattern.search(obj) + if not mo: + yield ValidationProblem(code=10047, context=context) + elif callable(pattern): + yield from pattern(obj, schema, context) + else: + raise SchemaError("unknown pattern type") + + +def validate_timestamp(obj, schema, context): + if not isinstance(obj, datetime.datetime): + yield ValidationProblem(code=10041, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_integer(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, int): + yield ValidationProblem(code=10020, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10021, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10022, hint=obj, context=context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10048, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_float(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, float): + yield ValidationProblem(code=10023, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10024, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10025, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_number(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, (int, float)): + yield ValidationProblem(code=10030, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + minValue = schema.get("minValue", None) + if minValue is not None and isinstance(obj, float): + minValue *= 1.0 + if minValue is not None and obj < minValue: + yield ValidationProblem(code=10031, hint=obj, context=context) + maxValue = schema.get("maxValue", None) + if maxValue is not None and isinstance(obj, float): + maxValue *= 1.0 + if maxValue is not None and obj > maxValue: + yield ValidationProblem(code=10032, hint=obj, context=context) + enumvalues = schema.get("enum", None) + if enumvalues is not None: + for ev in enumvalues: + if ev == obj: + break + else: + yield ValidationProblem(code=10049, hint=obj, context=context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + else: + raise SchemaError("unknown value validator (only a callable allowed)") + + +def validate_scalar(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + yield from _validate_index_constraint(obj, schema, context) + if obj is None: + yield ValidationProblem(code=10033, hint=obj, context=context) + if isinstance(obj, (dict, list, tuple, set, frozenset)): + yield ValidationProblem(code=10033, hint=obj, context=context) + + +def validate_deny(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + yield ValidationProblem(code=10010, context=context) + + +def validate_accept(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + + +def validate_null(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + if obj is not None: + yield ValidationProblem(code=10011, context=context) + + +def validate_empty(obj, schema, context): + yield from _validate_index_constraint(obj, schema, context) + if obj is None: + return + if isinstance(obj, (dict, list, tuple, set, frozenset)) and not obj: + return + yield ValidationProblem(10018, context=context) + + +def validate_bool(obj, schema, context): + if _is_null_allowed_for_object(obj, schema, context): + return + if not isinstance(obj, bool): + yield ValidationProblem(code=10026, hint=obj, context=context) + else: + yield from _validate_index_constraint(obj, schema, context) + value = schema.get("value", None) + if value is not None: + if callable(value): + yield from value(obj, schema, context) + elif value and not obj: + yield ValidationProblem(code=10027, hint=obj, context=context) + elif not value and obj: + yield ValidationProblem(code=10028, hint=obj, context=context) + + +def validate_allOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `all-of'") + res = [] + for idx, s in enumerate(schema): + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + res.append((idx, tr, )) + if res: + yield ValidationProblem( + code=10057, + context=context, + cause=[ + ValidationProblem( + code=10058, + context=context, + cause=tr, + index=idx) for (idx, tr) in res]) + + +def validate_anyOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `any-of'") + res = [] + for s in schema: + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + res.append(tr) + else: + # Erfolg: gleich positiv zurueck ohne Meldungen + return + # Ansonsten: alle Fehlschlaege protokollieren + if res: + yield ValidationProblem( + code=10055, + context=context, + cause=[ + ValidationProblem( + code=10056, + context=context, + cause=tr) for tr in res]) + + +def validate_oneOf(obj, schema, context): + if not isinstance(schema, (list, tuple)): + raise SchemaError("require a list of schematas for `one-of'") + success_res = [] + failed_res = [] + for idx, s in enumerate(schema): + assert isinstance(s, _Schema) + tr = list(_validate(obj, s, context)) + if tr: + failed_res.append((idx, tr, )) + else: + success_res.append(idx) + if len(success_res) == 1: + return + elif len(success_res) == 0: + # Ansonsten: alle Fehlschlaege protokollieren + if failed_res: + yield ValidationProblem( + code=10053, + context=context, + cause=[ + ValidationProblem( + code=10054, + context=context, + cause=tr, + index=idx) for (idx, tr) in failed_res]) + else: + # Die Indizes der "zuvielen" in "hint" anzeigen + yield ValidationProblem(code=10019, hint=",".join([str(k) for k in success_res])) + + +def validate_not(obj, schema, context): + assert isinstance(schema, _Schema) + res = list(_validate(obj, schema, context)) + if not res: + yield ValidationProblem(code=10029, hint=obj, context=context, + cause=res) + + +def process_schema_references(schema, context, check_single_ref_key=True): + try: + ref = schema[SCHEMA_REF_KEY] + except (KeyError, TypeError): + return schema + # if `$ref' is found it MUST be the only key + if check_single_ref_key and len(schema) != 1: + raise SchemaError("`{}' must be the single key if it exists") + schema = try_get_reference(ref, context, schema) + if not isinstance(schema, _Schema): + raise SchemaError( + "dereferenced schema is not a `_Schema': {}".format(ref)) + schema = copy.deepcopy(schema) + return process_schema_references(schema, context, check_single_ref_key=True) + + +def process_schema_conditionals(schema, context): + """Lisp-like `cond` to provide schema modifications + + :param schema: the input schema + :param context: the validation context with a valid + `context.root.root_object` + :returns: the processed schema: the schema itself if it is unchanged and + a copy of the schema if has been changed + + """ + what, conds = _get_one_of(schema, "cond", "match", default=None) + if what is None or conds is None: + return schema + if not isinstance(conds, (list, tuple)): + raise SchemaError("the conditions of a cond must be a sequence") + if what == "cond": + return _process_schema_conditionals_cond(schema, conds, context) + elif what == "match": + return _process_schema_conditionals_match(schema, conds, context) + else: + assert False, "unreachable" + + +def _process_schema_conditionals_cond(schema, conds, context): + for cond in conds: + if not isinstance(cond, dict): + raise SchemaError("a single condition must be a dict") + if eval_condition(cond, context, schema): + rep_type, rep_schema = _get_one_of( + cond, "then", "then-replace", "then-merge") + rep_schema = schema.ensure_child_schema(rep_schema) + if rep_type in ("then", "then-replace"): + do_merge = False + elif rep_type == "then-merge": + do_merge = True + else: + raise SchemaError("unknown then type: {}".format(rep_type)) + break + else: + # + # No condition was true: just remove the "cond" to get the + # effective schema. + # + rep_schema = None + do_merge = False + + new_schema = schema.copy() + del new_schema["cond"] + if rep_schema: + rep_schema = process_schema_references(rep_schema, context) + # this could insert a new nested "cond" or "match" again + if do_merge: + rep_schema = copy.deepcopy(rep_schema) + new_schema = _merge(rep_schema, new_schema) + else: + new_schema.update(rep_schema) + # Recursively apply "cond/match" evaluation to the resulting schema + return process_schema_conditionals(new_schema, context) + + +def _process_schema_conditionals_match(schema, conds, context): + rep_schemata = [] + for cond in conds: + if not isinstance(cond, dict): + raise SchemaError("a single condition must be a dict") + if eval_condition(cond, context, schema): + rep_type, rep_schema = _get_one_of( + cond, "then", "then-replace", "then-merge") + rep_schema = schema.ensure_child_schema(rep_schema) + if rep_type in ("then", "then-replace"): + rep_schemata.append((False, rep_schema)) + elif rep_type == "then-merge": + rep_schemata.append((True, rep_schema)) + else: + raise SchemaError("unknown then type: {}".format(rep_type)) + + new_schema = schema.copy() + del new_schema["match"] + for do_merge, rep_schema in rep_schemata: + rep_schema = process_schema_references(rep_schema, context) + # this could insert a new nested "cond" or "match" again + if do_merge: + rep_schema = copy.deepcopy(rep_schema) + new_schema = _merge(rep_schema, new_schema) + else: + new_schema.update(rep_schema) + # Recursively apply "cond/match" evaluation to the resulting schema + return process_schema_conditionals(new_schema, context) + + +def eval_condition(cond, context, schema): + """Eval the condition in `cond` and return a tuple `(hit, predval)` + + """ + pred, predval = _get_one_of( + cond, + "when-ref-true", "when-ref-exists", "when", + default=_SENTINEL) + + if pred == "when": + # rekursive evaluation of `predval` as the real predicate + return eval_pred(predval, context, schema) + elif pred == "when-ref-true": + refobj = try_get_reference(predval, context, schema, default=None) + return bool(refobj) + elif pred == "when-ref-exists": + refobj = try_get_reference(predval, context, schema, default=_SENTINEL) + return refobj is not _SENTINEL + else: + raise SchemaError("unknown condition type: {}".format(pred)) + + +def eval_pred(pred, context, schema): + if isinstance(pred, dict): + combinator, combinator_val = _get_one_of( + pred, + "not", "all-of", "any-of", "one-of", + default=None) + if combinator: + if combinator == "not": + return not eval_pred(combinator_val, context, schema) + elif combinator == "all-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`all-of' requires a list of childs") + for cv in combinator_val: + if not eval_pred(cv, context, schema): + return False + return True + elif combinator == "any-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`any-of' requires a list of childs") + for cv in combinator_val: + if eval_pred(cv, context, schema): + return True + return False + elif combinator == "one-of": + if not isinstance(combinator_val, (list, tuple)): + raise SchemaError("`one-of' requires a list of childs") + num_true = 0 + for cv in combinator_val: + if eval_pred(cv, context, schema): + num_true += 1 + # shortcut + if num_true > 1: + return False + if num_true == 1: + return True + else: + return False + else: + raise SchemaError( + "unknown logical operator: {}".format(combinator)) + else: + pred_key, pred_val = _get_one_of( + pred, + "ref-true", "ref-exists", "equals", + default=None) + if pred_key == "ref-true": + refobj = try_get_reference( + pred_val, context, schema, default=None) + return bool(refobj) + elif pred_key == "ref-exists": + refobj = try_get_reference( + pred_val, context, schema, default=_SENTINEL) + return refobj is not _SENTINEL + elif pred_key == "equals": + if not isinstance(pred_val, (list, tuple)): + raise SchemaError("`equals' requires a list as childs") + if len(pred_val) != 2: + raise SchemaError("`equals' requires a list of len 2") + op1 = eval_comparison_operator_operand( + pred_val[0], context, schema) + op2 = eval_comparison_operator_operand( + pred_val[1], context, schema) + return op1 == op2 + else: + raise SchemaError("unknown predicate: {}".format(pred)) + elif isinstance(pred, list): + # implicit all-of (aka AND) + for cv in pred: + if not eval_pred(cv, context, schema): + return False + return True + else: + return pred + + +def eval_comparison_operator_operand(op, context, schema): + if not isinstance(op, dict): + raise SchemaError("an operand must be a dict") + opkey, opval = _get_one_of(op, "ref", "val", "value") + if opkey is None: + raise SchemaError("no operant given in {!r}".format(op)) + if opkey == "ref": + return try_get_reference(opval, context, schema) + elif opkey in ("val", "value"): + return opval + else: + assert False + + +def try_get_reference(ref, context, schema, default=None): + """Get the object referenced in `ref` + + Use `context` as data/object context and `schema` as the current schema + context. + + """ + uri = rfc3986.URIReference.from_string(ref).normalize() + if not uri.scheme: + uri = uri.copy_with(scheme="object") + if uri.scheme == "object": + if ref.startswith("object#"): + for attr in ("authority", "path", "query"): + if getattr(uri, attr, None) is not None: + raise SchemaError( + "bogus {} in URI reference `{}'".format(attr, ref)) + if uri.fragment is None: + raise SchemaError("fragment required in reference") + if not uri.fragment: + return context.root.root_object + elif uri.fragment == '.': + return context.current_object + parts = uri.fragment.split('.') # use '.' separator as in configmix + if parts[0]: + # absolute + d = context.root.root_object + else: + # relative + d = context.current_object + parts = parts[1:] + c = context # needed to determine relative object references + relative_refs_allowed = True + for part in [urllib.parse.unquote(p) for p in parts]: + if part: + relative_refs_allowed = False + try: + d = d[part] + except (KeyError, IndexError, TypeError): + return default + else: + if not relative_refs_allowed: + raise SchemaError( + "empty part in path to object reference not allowed") + c = c.safe_parent + d = c.current_object + return d + elif uri.scheme == "schema": + if not uri.path or (uri.path == SCHEMA_PATH_SELF): + s = schema.SELF + elif uri.path == SCHEMA_PATH_ROOT: + s = schema.ROOT + else: + s = schema.get_cached_schema(uri.path, load_if_needed=True) + if uri.fragment is None: + raise SchemaError("fragment required in reference") + + if not uri.fragment.startswith('/'): + raise SchemaError("references to parts of a schema must be absolute (begin with `/')") + if uri.fragment == '/': + return s + parts = uri.fragment.split('/') + parent_for_subschema = s + for part in [urllib.parse.unquote(p) for p in parts[1:]]: + try: + v = s[part] + except (KeyError, IndexError, TypeError): + return default + else: + if isinstance(v, _Schema): + pass + elif isinstance(v, dict): + s = _Schema(parent_for_subschema, False, v) + else: + # need not try further + return default + return s + else: + raise SchemaError("Unknown schema reference scheme: {}".format(uri.scheme)) + + +_DEL_VALUE = '{{::DEL::}}' +"""Sigil to mark keys to be deleted in the target when merging""" + + +def _merge(user, default): + """Logically merge the configuration in `user` into `default`. + + :param dict user: + the new configuration that will be logically merged + into `default` + :param dict default: + the base configuration where `user` is logically merged into + :returns: `user` with the necessary amendments from `default`. + If `user` is ``None`` then `default` is returned. + + .. note:: Implementation: The configuration in `user` is + augmented/changed **inplace**. + + If a value in `user` is equal to :data:`._DEL_VALUE` + (``{{::DEL::}}``) the corresponding key will be deleted from the + merged output. + + From http://stackoverflow.com/questions/823196/yaml-merge-in-python + + """ + if user is None: + _filter_deletions(default) + return default + if isinstance(user, dict) and isinstance(default, dict): + for k, v in default.items(): + if k in user: + if user[k] == _DEL_VALUE: + # do not copy and delete the marker + del user[k] + else: + user[k] = _merge_item(user[k], v) + else: + user[k] = v + else: + raise SchemaError("can only merge two dicts on top-level") + _filter_deletions(user) + return user + + +def _merge_item(user, default): + """Recursion helper for :func:`._merge` + + """ + if isinstance(user, dict) and isinstance(default, dict): + for k, v in default.items(): + if k in user: + if user[k] == _DEL_VALUE: + # do not copy and delete the marker + del user[k] + else: + user[k] = _merge_item(user[k], v) + else: + user[k] = v + elif isinstance(user, (list, tuple)) and isinstance(default, (list, tuple)): + for idx, v in enumerate(default): + user.insert(idx, v) + return user + + +def _filter_deletions(d): + """Recursively filter deletions in the dict `d`. + + Deletions have values that equal :data:`._DEL_VALUE`. + + """ + if not isinstance(d, dict): + return + # use a copy of the items because we change `d` while iterating + for k, v in list(d.items()): + if v == _DEL_VALUE: + del d[k] + else: + if isinstance(d[k], dict): + _filter_deletions(d[k]) + + +def _log_problem_cause_all(logger, loglevel, level, problems): + if not problems: + return + for pr in problems: + logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) + _log_problem_cause_all(logger, loglevel, level+1, pr.cause) + + +def _build_problems_by_level_and_depth(by_level, by_depth, level, problems): + for pr in problems: + if not pr.cause: + continue + try: + prl = by_level[level] + except LookupError: + prl= [] + by_level[level] = prl + prl.append(pr) + + depth = pr.context.depth + try: + prd = by_depth[depth] + except LookupError: + prd= [] + by_depth[depth] = prd + prd.append(pr) + _build_problems_by_level_and_depth( + by_level, by_depth, level+1, pr.cause) + + +def _log_problem_cause(logger, loglevel, max_level, max_depth, level, problems): + for pr in problems: + # + # Check whether we will start logging from this level downwards + # all problems + # + if max_level is None or level == max_level: + new_max_level = None # trigger logging + else: + new_max_level = max_level + if max_depth is None or max_depth == pr.context.depth: + new_max_depth = None # trigger logging + else: + new_max_depth = max_depth + if new_max_level is None or new_max_depth is None: + logger.log(loglevel, "%s> %r", "-"*((level*2)+2), pr) + if pr.cause: + _log_problem_cause( + logger, loglevel, + new_max_level, new_max_depth, + level+1, pr.cause) + + +def log_problem_cause(logger, loglevel, debug, level, problems): + if not problems: + return + if debug: + _log_problem_cause_all(logger, loglevel, level, problems) + else: + by_level = {} # to determine maximum problem nesting level + by_depth = {} # to determine maximum context nexting level + _build_problems_by_level_and_depth(by_level, by_depth, level, problems) + + max_level = max(by_level.keys()) + max_depth = max(by_depth.keys()) + + _log_problem_cause( + logger, loglevel, max_level, max_depth, level, problems)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/data_schema/util.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# :- +# :Copyright: (c) 2023 Franz Glasner +# :License: BSD-3-Clause. See LICENSE.txt for details. +# :- +r"""Some utility functions for use within the package. + +""" + +__all__ = ["get_data_stream"] + + +try: + from importlib import resources as il_resources +except ImportError: + il_resources = None + import pkg_resources + +import rfc3986 +import rfc3986.validators + + +def _is_safe_path(path): + if any(sep in path for sep in ('\\', ':')): + return False + if path.startswith("../"): + return False + if path.endswith("/.."): + return False + if "/../" in path: + return False + return True + + +def get_data_stream(uri, basedir=None, basepackage=None): + """ + + "data:" URIs are resolved as Python package resources for packages + `package`. by default this is the package where this module lives + in. + + "file:" URIs are resolved by prepending `basedir` to the URI path. + + "data:" URIs are resolve within "<basepackage>.packagedata". + + The returned stream needs to be closes as usual. + + """ + u = rfc3986.URIReference.from_string(uri).normalize() + if u.scheme == "data": + if u.authority or u.query or u.fragment: + raise ValueError("invalid data URI: authority, query and " + "fragment MUST be empty") + if not rfc3986.validators.path_is_valid(u.path, require=True): + raise ValueError("invalid or empty empty path within a data URI") + if u.path.find('%') >= 0: + raise ValueError("URI encoded paths not supported") + datapackage, sep, datapath = u.path.partition(':') + if sep: + if not datapackage: + datapackage = basepackage + if datapath.find(':') >= 0: + raise ValueError("colon in an URI's path not supported") + else: + datapackage = basepackage + datapath = u.path + # urllib3 normalizes to absolute paths: just to be sure + if "//" in datapath: + raise ValueError( + "URI path for the `data' scheme contains `//' substring") + if not datapath.startswith('/'): + if datapackage is None: + raise ValueError("missing the data package") + if il_resources: + datapath_parts = datapath.rsplit('/', 1) + datapath_dirs = datapath_parts[:-1] + datapath_file = datapath_parts[-1] + if datapath_dirs: + datapath_sep = '.' + else: + datapath_sep = '' + return il_resources.open_binary( + datapackage + '.packagedata' + datapath_sep + + '.'.join(datapath_dirs), # noqa: E131 + datapath_file) + else: + return pkg_resources.resource_stream( # noqa:E501 # pylint:disable=used-before-assignment + datapackage, "packagedata/" + datapath) + else: + raise ValueError( + "URI path for the `data' scheme must not be absolute") + elif u.scheme == "file": + if u.authority or u.query or u.fragment: + raise ValueError("invalid file URI: authority, query and " + "fragment MUST be empty") + if not rfc3986.validators.path_is_valid(u.path, require=True): + raise ValueError("invalid or empty empty path within a file URI") + if u.path.find('%') >= 0: + raise ValueError( + "percent-encoded paths not supported in data-stream file URI") + if not _is_safe_path(u.path): + raise ValueError("unsafe path in file URI is not supported") + if u.path.startswith('/'): + # resolve the file relative to the projectdir + if basedir is None: + raise TypeError("no base directory in `basedir' given") + return open("{}/{}".format(basedir.rstrip("/\\"), + u.path.lstrip('/')), + "rb") + else: + raise ValueError("relative file URI not handled") + else: + raise ValueError("scheme `{}' not supported".format(u.scheme))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/docs/schema.txt Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,542 @@ +.. -*- coding: utf-8; mode: rst; indent-tabs-mode: nil -*- + +======== + Schema +======== + +Grundideen +========== + +- Angelehnt an JSON Schema +- Deklarativ in YAML +- Verwendung von erweiterten YAML-Features: + + + laden von (beliebigen) Python-Objekten + + Benutzung von YAML-Referenzen + +- Möglichkeit der direkten Verwendung von Python-Callables + Diese müssen Iteratoren sein und jedes Problem `yield`en. +- Rückgabe einer Liste von Problemen: Versuch möglichst viele Probleme auf + einen Schlag zu melden (soweit möglich und sinnvoll) + +.. seealso:: - https://json-schema.org/understanding-json-schema/index.html + - http://rx.codesimply.com/coretypes.html + + +Extra Keywords für :py:func:`validate` +====================================== + +- ``skip_keys`` + + Eine Liste Strings oder von compilierten REs + + Ein String-Item wird auf Gleichheit getestet, die RE per :py:meth:`search` + -- und zwar auf den Dict-key + + Bei Treffer wird dieser Key komplett ignoriert. Das ist also eine globale + Ignore-Liste für Dict-Keys. + + +Typen +===== + +Durch ``type`` (required) gekennzeichnet + + +Alle Schemata außer den `Schema-Kombinatoren`_ haben auch ein optionales +Attribut ``index-constraint``. + + Dessen Wert ist eine Liste von Indizes, an denen das Element in + seinem Parent-Container (Liste, sorted dict) vorkommen darf. + + +dict / map / object +------------------- + +- ``nullable`` + + bool (Default: False): instead of an empty dict allow also a None/null/nil + +- ``keys`` + + `dict` mit Keys und den Values als zugeordnete Schemata für die Values + des Dicts + +- ``keyNames`` + + Wenn vorhanden: ein Schema, dem die *Keys* -- auch die `additionalKeys` -- + folgen müssen. + + Default: entspricht ``{"type": "string"}`` + +- ``additionalKeys`` + + * bool + + `False` + nicht erlaubt (default) + + Globales ``skip_keys`` wird aber zusätzlich noch in Betracht + gezogen. + + `True` + erlaubt -- keine weitergehende Schema-Prüfung der Inhalte + + Globales ``skip_keys`` ist offensichtlich irrelevant. + + * Schema + + Prüfung erfolgt nach gegebenem Schema + + Globales ``skip_keys`` wird aber zusätzlich noch in Betracht gezogen. + +- ``required`` + + Liste von Strings mit Key-Namen, die vorkommen müssen + +- ``maxLength`` +- ``minLength`` + + +list / array +------------ + +- ``nullable`` + + bool (Default: False): instead of an empty list allow also a None/null/nil + +- ``items`` + + Ein Schema für *alle* Items. + +- ``maxLength`` +- ``minLength`` + + +set / frozenset +--------------- + +- ``nullable`` + + bool (Default: False): instead of an empty set allow also a None/null/nil + +- ``items`` + + Ein Schema für *alle* Items + +- ``maxLength`` +- ``minLength`` + + +tuple / record +-------------- + +- ``nullable`` + + bool (Default: False): instead of an empty list or tuple allow also + a None/null/nil + +- ``items`` + + Eine Liste: je ein spezielles Schema *pro Item* + +- ``additionalItems`` + + * bool + + `False` + nicht erlaubt (default) + + `True` + erlaubt -- keine weitergehende Schema-Prüfung der Inhalte + + * Schema + + Prüfung der "zusätzlichen" Items erfolgt nach gegebenem Schema + +- ``maxLength`` +- ``minLength`` + + +string / str +------------ + +- ``nullable`` + + bool (Default: False): instead of an empty string allow also a None/null/nil + +- ``enum`` + + Eine Liste von Strings, von denen genau einer dem String entspricht + + Achtung: Alle anderen Prüfungen (siehe unten) werden trotzdem auch + durchgeführt. + +- ``is-contained-in-ref`` + + The string's value must be contained in (Python ``in``) in the referenced + object (see `Referenzen`_). + +- ``maxLength`` +- ``minLength`` +- ``pattern`` + + * string + + RE of the accepted pattern + + * compiled RE + + compiled RE of the accepted pattern + + * Callable + + +binary +------ + +- ``maxLength`` +- ``minLength`` +- ``pattern`` + + * string + + RE of the accepted pattern. The YAML unicode string value will be + converted to a byte-string with :func:`ast.literal_eval` as if it + is surrounded by ``b'''<re>'''`` or ``b"""<re>"""``. If the pattern + contains both a ``'''`` or ``"""`` substring the conversion will fail. + + * bytes, bytearray + + RE of the accepted pattern + + * compiled RE + + compiled RE of the accepted pattern + + * Callable + + +bool / boolean +-------------- + +Only **real** boolean values: ``true`` and ``false`` + +- ``value`` + + The accepted value or a validating callable + +- ``nullable`` + + bool (Default: False): instead of a boolean allow also a None/null/nil + + +timestamp / datetime +-------------------- + +Only :py:class:`datetime.datetime` allowed + +- ``value`` + + Callable that validates the value of a timestamp + + +Callable +-------- + +Iterator (e.g. ``yield``) mit Signatur: :py:func:`callable(object, schema, context)` + + +accept +------ + +Validates successfully always: accept everything + + +deny +---- + +Does not validate successfully: always yield the error code 10010 + + +:py:obj:`None` / none / null / nil +---------------------------------- + +Only the `None` object validates + + +empty +----- + +Erlaubt sind: None, leeres Dict, leere Liste, leeres Set/Frozenset + +.. note:: Leere Strings sind **nicht** erlaubt. + + +integer / int +------------- + +- ``nullable`` + + bool (Default: False): allow also a None/null/nil + +- ``minValue`` +- ``maxValue`` +- ``value`` + + A callable to validate the integer value + +- ``enum`` + + Eine Liste von ganzen Zahlen, von denen genau einer dem vorhandenen + Wert entsprechen muß. + + Achtung: Alle anderen Prüfungen (`minValue`, `maxValue`, `value`) + werden trotzdem auch durchgeführt. + + +real / double / float +--------------------- + +- ``nullable`` + + bool (Default: False): allow also a None/null/nil + +- ``minValue`` +- ``maxValue`` +- ``value`` + + A callable to validate the float value + + +number / num +------------ + +- ``nullable`` + + bool (Default: False): allow also a None/null/nil + +Any numeric value (int or float) + +- ``minValue`` +- ``maxValue`` +- ``value`` + + A callable to validate the number + +- ``enum`` + + Eine Liste von Zahlen, von denen genau einer dem vorhandenen + Wert entsprechen muß. + + Achtung: Alle anderen Prüfungen (`minValue`, `maxValue`, `value`) + werden trotzdem auch durchgeführt. + + +scalar +------ + +Any scalar value: no `None`, no `dict`, no `tuple`, no `list`, no `set`, +no `frozenset`. + +But if + +- ``nullable`` + + bool (Default: False): None/null/nil is allowed also + + +Schema-Kombinatoren +------------------- + +- ``all-of`` + + alle in der gegebenen Liste müssen validieren + +- ``any-of`` + + mindestens einer muß validieren + + Nach den ersten erfolgreichen Test werden alle weiteren Sub-Tests + abgebrochen (aka. short-circuit Verhalten). + +- ``one-of`` + + **genau einer** aus der Liste muß validieren (aka. xor) + +- ``not`` + + das folgende Schema darf nicht successful validieren + + +Bedingungen +=========== + +``cond``-Key im Schema: + + Lisp-like `cond`: + + - eine Liste von Wenn-Dann-Paaren + + Bedingung: ``when``, ``when-ref-true``, ``when-ref-exists`` + + Dann: ``then``, ``then-merge`` + + Für ``when``: + + Logische Operatoren: + + ``not`` + + ``all-of`` (aka `and`) + + ``any-of`` (aka `or`) + + ``one-of`` (aka `xor`) + + Prädikate: + + ``ref-true``, ``ref-exists``, ein Objekt im boolschen Kontext + + Vergleichs-Operator: + + ``equals`` gefolgt von einer Liste der Länge zwei als Gleichheits- + Operator: + + Mögliche Keys: + + ``ref``: eine Referenz + + ``value`` oder ``val`` ein Wert + + z.B. in YAML:: + + equals: + - ref: object:#my.key + - value: "a string value" + + ``when-ref-true`` und ``when-ref-exists`` sind einfache Abkürzungen für:: + + when: + ref-true: ... + + bzw:: + + when: + ref-exists: ... + + - die *erste* zutreffende Bedingung bestimmt via seinem "Dann" ein Schema + + ``then`` + + Keys im Then-Schema *ersetzen* korrespondierende Keys im Parent-Schema + + ``then-merge`` + + Then-Merge-Schema wird in das Parent-Schema *eingemischt* + + - das ganze erfolgt rekursiv + + - falls keine der Bedingungen zutrifft wird nichts ausgeführt/geändert + + - ``when`` -- direkt gefolgt von einer Liste -- ist eine Abkürzung für + ``all-of` und eben dieser Liste:: + + cond: + when: + - test1 + - test2 + - test3 + + ist äquivalent zu:: + + cond: + when: + all-of: + - test1 + - test2 + - test3 + +.. important:: Schema-Referenzen werden **vor** dem Replace/Merge jeweils + aufgelöst! + +``match`` entspricht ``cond`` -- mit dem Unterschied, daß statt der *ersten* +wahren Bedingung **alle** wahren Bedingungen ausgeführt werden; + + erst werden alle Schemata, die aus wahren Bedingungen kommen gesammelt, + danach werden die Schemata ersetzt bzw. gemerged. + +Beispiel:: + + required: + - a + - b + cond: + - when: + all-of: + - not: + ref-true: 'object:#p1.p2.p3' + - ref-exists: '#p4.p5' + then: + required: ["foo", "bar"] # replace existing `required' + - when: + ref-true: 'object:#p6.p7' + then: + new-key: "new-val" # a new key to the containing dict + then-merge: + required: ["c", "d"] # add `c' and `d' to `a' and `b' + - when: true # als letzer Fall: "else" + then-replace: + required: ["something", "else"] # replace existing `required' + + +Referenzen +========== + +URI-Syntax + + Angepaßte und simplifizierte JSON-Pointer-Syntax (:rfc:`6901`) + +Beispiele: + + - ``object:#wsgi.china_detector.enabled`` + + ist (weil `object` das Default-URI-Schema ist) äquivalent zu: + + ``#wsgi.china_detector.enabled`` + + Das ist eine **absolute** Referenz. + + ``.`` ist also -- wie in :py:mod:`configmix` -- der Hierarchie-Separator + für URI-Fragmente in Objekt-Referenzen + + - ``object:#`` ist das Root-Objekt + + - ``object:#.`` ist das current Kontext-Object (aka "Base") + + - ``object:`` ist *ungültig* + + Ein Fragment **muß** also formal vorhanden sein -- auch wenn es leer ist. + + - Relative Referenzen *starten* mit einen Punkt (analog Python-Imports) + + Mehrere führende Punkte sind -- wie bei Python-Imports -- relative + Referenzen zu Parent-Objekten. Der Versuch, den Parent des Root-Objektes + anzusprechen, liefert einen :py:exc:`TypeError`. + +Wo ein Schema erlaubt ist, ist auch ein dict mit dem einzigen Key ``$ref`` +erlaubt. Dies ist eine Referenz auf ein anderes Schema mit dem URI-Schema +``schema:``. Dieses andere Schema kann auch aus einer anderen Datei kommen: + + - ``schema:$root#/`` + + Das Root-Element des Root-Schemas + + - ``schema:$self#/`` + + Das Root-Element des gerade aktiven Schemas. + + - ``schema:data:schemalib:file.schema.yml#/foo`` + + Das ``foo``-Element des via Packagedata von `schemalib` geladenen Schemas + `file.schema.yml`. Das ist dann auch das neue aktive Schema.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/_config.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,28 @@ +r"""Common configuration for unittests. + +Importing changes PYTHONPATH as a side-effect: be able to import data_schema + +""" + +import sys +import os + +import configmix.config + + +PROJECTDIR = os.path.abspath(os.path.join( + os.path.dirname(__file__), "..")) + +FILEURI_PREFIX = "file:/tests/schemata/" + + +# +# Side effects +# + +# Allow to import the vlobby package +sys.path.append(PROJECTDIR) + +# Also synthesize a minimal configuration just valid for the unittests +config = configmix.config.Configuration( + projectdir=PROJECTDIR)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/schemata/test1.schema.yml Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,128 @@ +# -*- coding: utf-8; mode: yaml; indent-tabs-mode: nil; -*- +# +# This is an sample schema definition file with most feature +# +%YAML 1.1 +--- + +type: dict +keys: + string-value: + type: string + minLength: 1 + maxLength: 255 + pattern: + anyOf: + - &NAMERE !!python/object/apply:re.compile ['\A[A-Za-z_][A-Za-z0-9_]*\Z'] + - &CALLABLE !!python/name:test_schema._test_generic_validator_for_yaml + + binary-value: + type: binary + minLength: 20 + maxLength: 256 + value: *CALLABLE + + int-value: + type: int + minValue: 0 + maxValue: 100 + value: *CALLABLE + + float-value: + type: float + minValue: 0.0 + maxValue: 50.0 + value: *CALLABLE + + numeric-value: + type: number + minValue: 0 + maxValue: 1000 + value: *CALLABLE + + scalar-value: + type: scalar + + bool-value: + type: bool + value: *CALLABLE + + timestamp-value: + type: timestamp + value: *CALLABLE + + null-value: + type: null + + empty-value: + type: empty + + any-value: + type: accept + + not-any-value-allowed: + type: deny + + custom-value: + type: *CALLABLE + + dict-value: + type: dict + keys: + key1: + type: accept + 0: + type: accept + 1: + type: accept + keyNames: + oneOf: + - type: string + - type: integer + minValue: 0 + additionalKeys: + type: accept # equivalent to the simpler: additionalKeys: true + required: + - key1 + - 0 + + list-value: + type: list + minLength: 1 + maxLength: 10 + items: + anyOf: + - type: scalar + - type: *CALLABLE + + record-value: + type: record + items: + - type: string + - type: number + minLength: 2 + maxLength: 2 + additionalItems: false + + combinator-oneOf: + oneOf: + - type: int + - type: float + - type: null + + combinator-allOf: + allOf: + - &NAME + type: string + maxLength: 255 + pattern: *NAMERE + - type: *CALLABLE + + combinator-anyOf: + anyOf: + - *NAME + - type: *CALLABLE + - null + +additionalKeys: false +required: []
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_schema.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,2322 @@ + +import copy +import datetime +import re +import unittest + +import _config + +import configmix.yaml + +import data_schema +import data_schema.util + + +TYPE_RE = type(re.compile(r"\A.+\Z")) + + +def _test_generic_validator_for_yaml(obj, schema, context): + """Callback for loading test1.schema.yml: Always successful""" + yield from () + + +class YAML(unittest.TestCase): + + """Tests to load Python objects from YAML with complex Python-specific + tags. + + .. seealso:: https://pyyaml.org/wiki/PyYAMLDocumentation + + """ + + def test_load_python_name(self): + y = configmix.yaml.load("key: !!python/name:data_schema.validate") + self.assertTrue(callable(y["key"])) + + def test_load_re(self): + y = configmix.yaml.load("key: !!python/object/apply:re.compile\n - '^[0-9]+$'") + self.assertTrue(isinstance(y["key"], TYPE_RE)) + + +class SchemaInYAML(unittest.TestCase): + def test_file(self): + with data_schema.util.get_data_stream( + _config.FILEURI_PREFIX + "test1.schema.yml", + basedir=_config.config.getvar_s("projectdir")) as f: + schema = data_schema._Schema( + None, True, configmix.yaml.load(f)) + + def test_data(self): + with data_schema.util.get_data_stream( + "data:testschematalib:test2.schema.yml") as f: + schema = data_schema._Schema( + None, True, configmix.yaml.load(f)) + + +class SchemaCheck(unittest.TestCase): + + def test_root_creation(self): + schema = data_schema._Schema(None, True) + self.assertIsInstance(schema, dict) + self.assertEqual(0, len(schema)) + self.assertFalse(schema) + + def test_root_creation_wrong(self): + self.assertRaises( + ValueError, + data_schema._Schema, + None, + False) + + def test_root_properties(self): + schema = data_schema._Schema(None, True) + self.assertIsNone(schema.parent) + self.assertIs(schema, schema.ROOT) + self.assertTrue(schema.is_sub_root) + self.assertIs(schema, schema.SELF) + + def test_dict_len_bool(self): + schema = data_schema._Schema(None, True, a=1, b=2) + self.assertTrue(schema) + self.assertEqual(2, len(schema)) + + def test_equality(self): + schema1 = data_schema._Schema(None, True, a=1, b=2) + schema2 = data_schema._Schema(None, True, b=2, a=1) + self.assertEqual(schema1, schema2) + self.assertIsNot(schema1, schema2) + + def test_copy(self): + schema = data_schema._Schema(None, True, type="str") + schema2 = schema.copy() + self.assertEqual(schema, schema2) + + def test_deepcopy(self): + d1 = {} + schema = data_schema._Schema(None, True, type="str", b=d1) + schema2 = copy.deepcopy(schema) + self.assertEqual(schema, schema2) + self.assertIs(schema["b"], d1) + self.assertIsNot(schema2["b"], d1) + self.assertIs(schema.parent, schema2.parent) + self.assertIs(schema.is_sub_root, schema2.is_sub_root) + + def test_nested_copy(self): + d1 = {} + d2 = {} + root_schema = data_schema._Schema(None, True, type="str", b=d1) + child_schema = data_schema._Schema(root_schema, True, type="bool", b=d2) + copied_child = child_schema.copy() + self.assertIs(copied_child.ROOT, root_schema) + self.assertIs(copied_child.SELF, copied_child) + self.assertIsNot(copied_child.SELF, root_schema) + self.assertEqual(child_schema, copied_child) + self.assertIs(copied_child["b"], d2) + + def test_nested_deepcopy(self): + d1 = {} + d2 = {} + root_schema = data_schema._Schema(None, True, type="str", b=d1) + child_schema = data_schema._Schema(root_schema, True, type="bool", b=d2) + copied_child = copy.deepcopy(child_schema) + self.assertIs(copied_child.ROOT, root_schema) + self.assertIs(copied_child.SELF, copied_child) + self.assertEqual(child_schema, copied_child) + self.assertIsNot(copied_child["b"], d2) + self.assertNotEqual(root_schema, child_schema) + + +class ContextCheck(unittest.TestCase): + + def test_root_without_settings(self): + self.assertRaises(TypeError, + data_schema.Context, + None, + root_object=object(), + schema=dict()) + + def test_root_context(self): + obj = object() + schema = object() + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertEqual("<ROOT>", str(ctx)) + self.assertTrue(ctx.root_object is obj) + self.assertTrue(ctx.root_schema is schema) + self.assertTrue(ctx.settings is settings) + + def test_parent_of_root_context(self): + obj = object() + schema = object() + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertTrue(ctx.is_root) + self.assertIsNone(ctx.parent) + try: + ctx.safe_parent + except TypeError: + pass + else: + self.fail( + "Context.safe_parent was expected to raise for a root context") + + def test_root_context_init_root_empty(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + self.assertRaises( + TypeError, + data_schema.Context, None, key="key", settings=settings) + self.assertRaises( + TypeError, + data_schema.Context, None, index="key", settings=settings) + + def test_root_context_init_only_one_of_key_index(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises( + ValueError, + data_schema.Context, root, key="key", index="index") + + def test_root_context_init_exactly_one(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises(TypeError, data_schema.Context, root) + + def test_nonroot_rootobj_schema(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + obj = object() + schema = object() + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertEqual("<ROOT>", str(ctx)) + self.assertTrue(ctx.root_object is obj) + self.assertTrue(ctx.root_schema is schema) + self.assertTrue(ctx.settings is settings) + self.assertRaises(TypeError, data_schema.Context, ctx, index=0, + root_object=object()) + self.assertRaises(TypeError, data_schema.Context, ctx, index=0, + root_schema=object()) + + def test_str(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + ctx1 = data_schema.Context(root, key="key1") + ctx2 = data_schema.Context(ctx1, index=2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + + def test_repr(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + ctx1 = data_schema.Context(root, key="key1") + ctx2 = data_schema.Context(ctx1, index=2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("<Context path=`key1 / INDEX:2 / key3'>", repr(ctx3)) + + def test_root(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertTrue(root.is_root) + self.assertTrue(root is root.root) + self.assertTrue(root.settings is settings) + ctx1 = data_schema.Context(root, key="key1") + self.assertFalse(ctx1.is_root) + self.assertTrue(ctx1.root is root) + self.assertTrue(ctx1.settings is settings) + ctx2 = data_schema.Context(ctx1, index=2) + self.assertTrue(ctx2.settings is settings) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + self.assertFalse(ctx3.is_root) + self.assertTrue(ctx3.root is root) + self.assertTrue(ctx3.settings is settings) + + def test_extra_settings_in_between(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + settings2 = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertTrue(root.is_root) + self.assertTrue(root is root.root) + self.assertTrue(root.settings is settings) + ctx1 = data_schema.Context(root, key="key1") + self.assertFalse(ctx1.is_root) + self.assertTrue(ctx1.root is root) + self.assertTrue(ctx1.settings is settings) + ctx2 = data_schema.Context(ctx1, index=2, settings=settings2) + self.assertTrue(ctx2.settings is settings2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + self.assertFalse(ctx3.is_root) + self.assertTrue(ctx3.root is root) + self.assertTrue(ctx3.settings is settings2) + + def test_key_xor_index(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises( + ValueError, + data_schema.Context, + root, + index=0, + key="huhu") + + def test_keyindex_requires_key(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + self.assertRaises( + ValueError, + data_schema.Context, + None, + key_index=0, + settings=settings) + + +class SchemaReferences(unittest.TestCase): + + def setUp(self): + self.empty_schema = data_schema._Schema(None, True) + + def test_no_fragment(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + self.assertRaises(data_schema.SchemaError, + data_schema.try_get_reference, + "object:", + ctx, + self.empty_schema) + + def test_empty_fragment(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + r = data_schema.try_get_reference( + "object:#", + ctx, + self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, ctx.current_object) + + def test_point_fragment_with_root(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, ctx.current_object) + + def test_point_fragment_with_current_object(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.current_object) + self.assertIs(r, current_object) + self.assertIsNot(r, ctx.root_object) + + r = data_schema.try_get_reference( + "object:#.current", + ctx, + self.empty_schema) + self.assertEqual({"object": "value"}, r) + + r = data_schema.try_get_reference( + "object:#.current.object", + ctx, + self.empty_schema) + self.assertEqual("value", r) + + def test_point_fragment_with_invalid_current_object_refs(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.current_object) + self.assertIs(r, current_object) + + r = data_schema.try_get_reference( + "object:#.non-current", + ctx, + self.empty_schema) + self.assertIsNone(r) + + r = data_schema.try_get_reference( + "object:#.non-current.object", ctx, self.empty_schema) + self.assertIsNone(r) + + r = data_schema.try_get_reference( + "object:#.current.non-object", ctx, self.empty_schema) + self.assertIsNone(r) + + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "object:#.current..", + ctx, + self.empty_schema) + + self.assertRaises( + TypeError, + data_schema.try_get_reference, + "object:#..current.object", + ctx, + self.empty_schema) + + def test_fragment_with_current_object_and_root(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#", ctx, self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, root_object) + + def test_default_schema_ref(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, + settings=None) + r = data_schema.try_get_reference("#foo", ctx, self.empty_schema) + self.assertEqual("bar", r) + r = data_schema.try_get_reference("#bar", ctx, self.empty_schema) + self.assertIsNone(r) + sentinel = object() + r = data_schema.try_get_reference("#bar", ctx, self.empty_schema, + default=sentinel) + self.assertIs(r, sentinel) + + def test_object_schema_ref(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, + settings=None) + r = data_schema.try_get_reference("object:#foo", ctx, + self.empty_schema) + self.assertEqual("bar", r) + r = data_schema.try_get_reference("object:#bar", ctx, + self.empty_schema) + self.assertIsNone(r) + sentinel = object() + r = data_schema.try_get_reference( + "object:#bar", + ctx, + self.empty_schema, + default=sentinel) + self.assertIs(r, sentinel) + + def test_nested_keys(self): + sentinel = object() + ctx = data_schema.Context( + None, + root_object={"foo": "bar", + "k1": {"k2": "v2", + "k3": None}, + "k4": None, + "k5": [1, 2, 3]}, + settings=None) + r = data_schema.try_get_reference( + "#k1.k2", ctx, self.empty_schema) + self.assertEqual("v2", r) + r = data_schema.try_get_reference( + "#k1.k3", ctx, self.empty_schema) + self.assertIsNone(r) + r = data_schema.try_get_reference( + "#k1.k3.fornone", ctx, self.empty_schema) + self.assertIsNone(r) + r = data_schema.try_get_reference( + "#k1.k3.fornone", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "#k5.0", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "#k6", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + + def test_url_quoted_fragment(self): + ctx = data_schema.Context( + None, + root_object={"foo": "bar", + "k1": {"k2": "v2", + "k3": None}, + "k4": None, + "k5": [1, 2, 3]}, + settings=None) + r = data_schema.try_get_reference( + "#fo%6F", ctx, self.empty_schema) + self.assertEqual("bar", r) + + def test_no_duplicate_unquoting_in_fragment(self): + ctx = data_schema.Context( + None, + root_object={"fo%o": "bar"}, + settings=None) + r = data_schema.try_get_reference( + "#fo%25o", ctx, self.empty_schema) + self.assertEqual("bar", r) + + def test_schema_ref_must_have_fragment(self): + ctx = data_schema.Context(None, root_schema={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "schema:", + ctx, + self.empty_schema) + + def test_schema_ref_must_have_absolute_fragment(self): + ctx = data_schema.Context(None, root_schema={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "schema:#", + ctx, + self.empty_schema) + + def test_schema_ref_root_schema(self): + schema = data_schema._Schema( + None, True, {"foo": "bar"}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/", ctx, schema) + self.assertIs(r, schema) + + def test_unknown_schema_ref_yet(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "data:#", + ctx, + self.empty_schema) + + def test_schema_not_found(self): + sentinel = object() + root_schema = data_schema._Schema(None, True, {"foo": "bar"}) + ctx = data_schema.Context(None, root_schema=root_schema, + settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/bar", ctx, root_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "schema:#/foo2", ctx, root_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "schema:#/foo3", ctx, root_schema) + self.assertIsNone(r) + + def test_schema_is_found(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/foo2", ctx, schema) + self.assertEqual(subsubschema, r) + + def test_schema_with_trailing_slash_is_found(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/foo2/", ctx, schema) + self.assertIsNone(r) + + def test_schema_is_found_with_quoted_fragment(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/f%6Fo/foo%32", ctx, schema) + self.assertEqual(subsubschema, r) + + +class SchemaConditionals(unittest.TestCase): + + def setUp(self): + self._ctx = data_schema.Context( + None, root_object={"foo": "bar", "foo2": None}, settings=None) + + def test_no_cond(self): + schema = data_schema._Schema(None, True, {"type": None}) + self.assertIs(data_schema.process_schema_conditionals( + schema, self._ctx), + schema) + + def test_cond_is_none(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": None}) + self.assertIs(data_schema.process_schema_conditionals( + schema, self._ctx), + schema) + + def test_ambiguous(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": None, + "match": None}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_cond_not_a_sequence(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": {"type": None}}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_match_not_a_sequence(self): + schema = data_schema._Schema( + None, True, {"type": None, + "match": {"type": None}}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_condline_not_a_dict(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": [None]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_matchline_not_a_dict(self): + schema = {"type": None, + "match": [None]} + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_cond_unknown_predicate(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"unexisting-when-xxxx": None, + "then": {}} + ]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_match_unknown_predicate(self): + schema = data_schema._Schema( + None, True, {"type": None, + "match": [ + {"unexisting-when-xxxx": None, + "then": {}} + ]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_simple_replace_when_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_not_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"not": False}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_merge_when_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "old-key": "here I am", + "cond": [ + {"when": True, + "then-merge": { + "require": ["r1", "r2", "r3"], + "new-key": None, + "old-key": "{{::DEL::}}" + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha", "r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("old-key" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_first_wins_1(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha", "hehe"], + "cond": [ + {"when": True, + "then": { + "new-key2": "v2"}}, + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha", "hehe"], r["require"]) + self.assertTrue("new-key2" in r) + self.assertEqual("v2", r["new-key2"]) + self.assertFalse("cond" in r) + + def test_simple_replace_first_wins_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha", "hehe"], + "cond": [ + {"when": False, + "then": { + "new-key2": "v2"}}, + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": False, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertFalse("new-key" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-true": '#foo', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_true_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-true": '#foo'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_is_not_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-true": '#not-a-foo', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertTrue("new-key" not in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_is_not_true_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-true": '#not-a-foo'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertTrue("new-key" not in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-exists": '#foo2', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key3" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-exists": '#foo2'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key3" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_is_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"when-ref-exists": '#foo-not-existing', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertFalse("require" in r) + self.assertFalse("new-key" in r) + self.assertEqual("val", r["new-key3"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_is_false_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"when": {"ref-exists": '#foo-not-existing'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertFalse("require" in r) + self.assertFalse("new-key" in r) + self.assertEqual("val", r["new-key3"]) + self.assertFalse("cond" in r) + + def test_allOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"all-of": [ + True, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_allOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"all-of": [ + True, + {"ref-exists": '#foo-non-existing'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_short_allOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": [ + True, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}], + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_short_allOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": [ + True, + {"ref-exists": '#foo-non-existing'}, + {"ref-true": '#foo'}], + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_anyOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"any-of": [ + False, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_anyOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"any-of": [ + False, + {"ref-exists": '#foo2-non'}, + {"ref-true": '#foo2'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_oneOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"ref-exists": '#foo2'}, + {"not": {"ref-true": '#foo'}}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_oneOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_oneOf_false_2(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"not": {"ref-exists": '#foo2'}}, + {"not": {"ref-true": '#foo'}}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_match_nothing(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": False, + "then": {"new-key": None}}, + {"when": False, + "then": {"new-key2": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertFalse("new-key" in r) + self.assertFalse("new-key2" in r) + + def test_match_all(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": True, + "then": {"new-key": "value"}}, + {"when": True, + "then": {"new-key2": "value2"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("value", r["new-key"]) + self.assertEqual("value2", r["new-key2"]) + + def test_match_some(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": True, + "then": {"new-key": "value"}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then": {"new-key3": "value3"}}, + {"when": False, + "then": {"new-key4": "value4"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("value", r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertEqual("value3", r["new-key3"]) + self.assertFalse("new-key4" in r) + + def test_match_some_merge(self): + schema = data_schema._Schema( + None, + True, + {"match": [ + {"when": True, + "then": {"new-key": [1, 2]}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then-merge": {"new-key": ["value3"]}}, + {"when": False, + "then": {"new-key3": "value3"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual([1, 2, "value3"], r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertFalse("new-key3" in r) + + def test_match_some_replace(self): + schema = data_schema._Schema( + None, + True, + {"match": [ + {"when": True, + "then": {"new-key": [1, 2]}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then-replace": {"new-key": ["value3"]}}, + {"when": True, + "then": {"new-key3": "value3"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual(["value3"], r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertEqual("value3", r["new-key3"]) + + def test_match_some_merge_existing(self): + # the typical case within vlobby: just extend "required" + schema = data_schema._Schema( + None, True, {"required": [1, 2], + "match": [ + {"when": True, + "then": {"required": [3, 4]}}, + {"when": False, + "then": {"required": [0]}}, + {"when": True, + "then-merge": {"required": [5, 6, 7]}}, + {"when": True, + "then-merge": {"required": [4, 8]}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual([3, 4, 5, 6, 7, 4, 8], r["required"]) + + def test_equal_ref_and_value(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}, + {"value": "bar"}]}, + "then-replace": { + "foos": "new-bar"}}] + }) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("new-bar", r["foos"]) + + def test_equal_val_and_ref(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "cond": [{ + "when": { + "equals": [ + {"val": "bar"}, + {"ref": "object:#foo"}]}, + "then-replace": { + "foos": "new-bar"}}] + }) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("new-bar", r["foos"]) + + def test_equal_no_list(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "match": [{ + "when": { + "equals": {"type": None}, + "then-replace": { + "foos": "new-bar"}}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_equal_list_length_mismatch_1(self): + schema = data_schema._Schema( + None, True, {"foo": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}]}, + "then-replace": { + "foo": "new-bar"}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_equal_list_length_mismatch_3(self): + schema = data_schema._Schema( + None, True, {"foo": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}, + {"ref": "object:#foo"}, + {"ref": "object:#foo"}]}, + "then-replace": { + "foo": "new-bar"}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_raise_if_scheme_ref_is_not_the_single_key(self): + schema = data_schema._Schema( + None, True, {"$ref": "schema:#/", + "type": None + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx) + + def test_raise_if_scheme_ref_is_not_the_single_key_root(self): + schema = data_schema._Schema( + None, True, {"$ref": "schema:#/subschema", + "subschema": { + "type": None + } + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + r = data_schema.process_schema_references( + schema, + ctx, + check_single_ref_key=False) + self.assertEqual({"type": None}, r) + + def test_recursive_schema_scheme(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/bar" + } + schema = data_schema._Schema( + None, True, {"foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + r = data_schema.process_schema_references( + schema, + ctx, + check_single_ref_key=False) + self.assertEqual({"type": "null"}, r) + + def test_recursive_schema_scheme_raises_if_non_root_is_not_single_key(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/bar", + "type": "dict", + } + schema = data_schema._Schema( + None, True, { + "foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx, + check_single_ref_key=False) + + def test_recursive_schema_scheme_ref_to_non_existing_schema_raises(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/non-bar", + } + schema = data_schema._Schema( + None, True, { + "foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx, + check_single_ref_key=False) + + +class BasicValidation(unittest.TestCase): + + def test_schema_must_be_a_dict_alike(self): + try: + pr = list(data_schema.validate(None, None)) + except data_schema.SchemaError: + pass + else: + self.assertFalse( + "no SchemaError raised when a non-dict given as schema") + + def test_problem_ctor_nonexisting_code(self): + self.assertRaises(ValueError, data_schema.ValidationProblem, code=2) + + def test_problem_ctor_no_code(self): + self.assertRaises(TypeError, data_schema.ValidationProblem, code=None) + + def test_error_ctor(self): + v = data_schema.ValidationProblem(code=10000) + self.assertEqual(data_schema.ERROR, v.severity) + + def test_warning_ctor(self): + v = data_schema.ValidationProblem(code=80000) + self.assertEqual(data_schema.WARNING, v.severity) + + def test_d1(self): + x = list(data_schema.validate({}, {"type": "dict"})) + self.assertEqual(0, len(x)) + + def test_d1_not_nullable(self): + x = list(data_schema.validate(None, {"type": "dict"})) + self.assertEqual(1, len(x)) + self.assertEqual(10000, x[0].code) + + def test_d1_nullable(self): + x = list(data_schema.validate(None, {"type": "dict", + "nullable": True})) + self.assertEqual(0, len(x)) + + def test_d2(self): + x = list(data_schema.validate([], {"type": "map"})) + self.assertEqual(1, len(x)) + self.assertEqual(data_schema.ERROR, x[0].severity) + self.assertEqual(10000, x[0].code) + + def test_d3(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "required": ["key2"]})) + self.assertEqual(2, len(x)) + self.assertEqual(data_schema.ERROR, x[0].severity) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + self.assertEqual(data_schema.ERROR, x[1].severity) + self.assertEqual(10005, x[1].code) + self.assertEqual(["key2"], x[1].hint) + + def test_d4(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "keys": { + "key": {"type": "string"}, + }, + "required": ["key"]})) + self.assertEqual(0, len(x)) + + def test_d5(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": False})) + self.assertEqual(1, len(x)) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + + def test_d5_2(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=["key"])) + self.assertEqual(0, len(x)) + + def test_d5_3(self): + x = list(data_schema.validate( + {"key": "value", + "key2": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=[re.compile(r"\Akey\d*\Z")])) + self.assertEqual(0, len(x)) + + def test_d5_4(self): + x = list(data_schema.validate( + {"key": "value", + "key2": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=[re.compile(r"\A__.+"), re.compile(r"\Akey\d+\Z")])) + self.assertEqual(1, len(x)) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + + def test_d6(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": True})) + self.assertEqual(0, len(x)) + + def test_d7(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": { + "type": "string"}})) + self.assertEqual(0, len(x)) + + def test_d8(self): + x = list(data_schema.validate( + {"key": 1234}, + {"type": "dict", + "additionalKeys": { + "type": "string"}})) + self.assertEqual(1, len(x)) + self.assertEqual(10002, x[0].code) + self.assertEqual(1234, x[0].hint) + + def test_d8_2(self): + x = list(data_schema.validate( + {"key": 1234}, + {"type": "dict", + "additionalKeys": { + "type": "string"}}, + skip_keys=["key"])) + self.assertEqual(0, len(x)) + + def test_d9_non_string_keys(self): + pr = list(data_schema.validate( + {0: "value"}, + {"type": "dict", + "additionalKeys": True})) + self.assertEqual(1, len(pr)) + self.assertEqual(10003, pr[0].code) + + def test_d10_int_dict_keys(self): + pr = list(data_schema.validate( + {1: "value", 2: "value2"}, + {"type": "dict", + "keys": { + 1: {"type": "string"}}, + "additionalKeys": True, + "keyNames": {"type": "int"}})) + self.assertEqual(0, len(pr)) + + def test_error_message(self): + self.assertEqual("dict expected", + data_schema.problem_message(10000)) + pr = data_schema.ValidationProblem(code=10000) + self.assertEqual("dict expected", data_schema.problem_message(pr)) + + self.assertEqual("duplicate dict key", + data_schema.problem_message(80000)) + pr = data_schema.ValidationProblem(code=80000) + self.assertEqual("duplicate dict key", + data_schema.problem_message(pr)) + + self.assertRaises(KeyError, data_schema.problem_message, 1234) + + def test_str_enum(self): + pr = list(data_schema.validate( + "e1", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "e2", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(0, len(pr)) + + def test_str_not_in_enum(self): + pr = list(data_schema.validate( + "e3", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10043, pr[0].code) + + def test_str_minlen(self): + pr = list(data_schema.validate( + "", + {"type": "string", + "minLength": 0})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "", + {"type": "string", + "minLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10006, pr[0].code) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "minLength": 1})) + self.assertEqual(0, len(pr)) + + def test_str_maxlen(self): + pr = list(data_schema.validate( + "", + {"type": "string", + "maxLength": 0})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "maxLength": 0})) + self.assertEqual(1, len(pr)) + self.assertEqual(10007, pr[0].code) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "maxLength": 1})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + b"x", + {"type": "string", + "maxLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10002, pr[0].code) + + @staticmethod + def _pattern_check_function(obj, schema, context=None): + if obj == " 5 ": + yield data_schema.ValidationProblem(code=10009) + + def test_str_re(self): + pr = list(data_schema.validate( + "abc", + {"type": "string", + "pattern": r'\A[0-9]+\Z'})) + self.assertEqual(1, len(pr)) + self.assertEqual(10008, pr[0].code) + + pr = list(data_schema.validate( + "123", + {"type": "string", + "pattern": re.compile(r'\A[a-z]+\Z')})) + self.assertEqual(1, len(pr)) + self.assertEqual(10008, pr[0].code) + + pr = list(data_schema.validate( + "123", + {"type": "string", + "pattern": self._pattern_check_function})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + " 5 ", + {"type": "string", + "pattern": self._pattern_check_function})) + self.assertEqual(1, len(pr)) + self.assertEqual(10009, pr[0].code) + + def test_binary_basic(self): + pr = list(data_schema.validate( + b"", + {"type": "binary"})) + self.assertEqual(0, len(pr)) + + def test_str_is_not_binary(self): + pr = list(data_schema.validate( + "", + {"type": "binary"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10035, pr[0].code) + + @staticmethod + def _binary_pattern_check_function(obj, schema, context=None): + if obj != b"\x00": + yield data_schema.ValidationProblem(code=10009) + + def test_binary_pattern_check(self): + pr = list(data_schema.validate( + b"\x00", + {"type": "binary", + "pattern": self._binary_pattern_check_function})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + b"\x01", + {"type": "binary", + "pattern": self._binary_pattern_check_function})) + self.assertEqual(1, len(pr)) + self.assertEqual(10009, pr[0].code) + + def test_binary_re_str_match(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": u"\\x00+"})) + self.assertEqual(0, len(pr)) + + def test_binary_re_bytes_match(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": b"\x00+"})) + self.assertEqual(0, len(pr)) + + def test_binary_re_str_mismatch(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": u"\\x01+"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10047, pr[0].code) + + def test_binary_re_bytes_mismatch(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": b"\x01+"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10047, pr[0].code) + + def test_binary_length(self): + pr = list(data_schema.validate( + b"", + {"type": "binary", + "minLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10036, pr[0].code) + + pr = list(data_schema.validate( + b"1", + {"type": "binary", + "maxLength": 0})) + self.assertEqual(1, len(pr)) + self.assertEqual(10037, pr[0].code) + + def test_deny(self): + pr = list(data_schema.validate("abc", {"type": "deny"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10010, pr[0].code) + + def test_accept(self): + pr = list(data_schema.validate("abc", {"type": "accept"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "accept"})) + self.assertEqual(0, len(pr)) + + def test_null(self): + pr = list(data_schema.validate(None, {"type": "none"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "null"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "nil"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": None})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate({}, {"type": None})) + self.assertEqual(1, len(pr)) + self.assertEqual(10011, pr[0].code) + + def test_l1(self): + pr = list(data_schema.validate([], {"type": "list"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(), {"type": "list"})) + self.assertEqual(0, len(pr)) + + def test_l1_not_nullable(self): + pr = list(data_schema.validate(None, {"type": "list"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10001, pr[0].code) + + def test_l1_nullable(self): + pr = list(data_schema.validate( + None, {"type": "list", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_l2_default_schema_for_items_is_deny(self): + pr = list(data_schema.validate(["a", "b", "c"], {"type": "list"})) + self.assertEqual(3, len(pr)) + for i in range(0, 3): + self.assertEqual(10010, pr[i].code) + + def test_l3_schema_for_items(self): + pr = list(data_schema.validate( + ["a", "b", "c"], + {"type": "array", + "items": {"type": "string"}})) + self.assertEqual(0, len(pr)) + + def test_t1(self): + pr = list(data_schema.validate([], {"type": "tuple"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(), {"type": "tuple"})) + self.assertEqual(0, len(pr)) + + def test_t1_not_nullable(self): + pr = list(data_schema.validate(None, {"type": "tuple"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10014, pr[0].code) + + def test_t1_nullable(self): + pr = list(data_schema.validate( + None, {"type": "tuple", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_t2(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}, + {"type": "accept"}]})) + self.assertEqual(0, len(pr)) + + def test_t3(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}, + {"type": "deny"}]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10010, pr[0].code) + self.assertEqual(2, pr[0].context.index) + + def test_t4(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10017, pr[0].code) + self.assertEqual(2, pr[0].context.index) + + def test_t5(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": True})) + self.assertEqual(0, len(pr)) + + def test_t6(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(0, len(pr)) + + def test_t7(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}, {"key2": "value"}], + {"type": "tuple", + "maxLength": 4, + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10016, pr[0].code) + + def test_t8(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}, {"key2": "value"}], + {"type": "tuple", + "minLength": 6, + "maxLength": 4, + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(2, len(pr)) + self.assertEqual(10015, pr[0].code) + self.assertEqual(10016, pr[1].code) + + def test_set1(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "minLength": 3, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(0, len(pr)) + + def test_set1_not_nullable(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + None, + {"type": "set"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10038, pr[0].code) + + def test_set1_nullable(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + None, + {"type": "set", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_set2(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "minLength": 4, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(1, len(pr)) + self.assertEqual(10039, pr[0].code) + + def test_set3(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "maxLength": 2, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(1, len(pr)) + self.assertEqual(10040, pr[0].code) + + def test_set4_itemschema(self): + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "items": {"any-of": [ + {"type": "string"}, + {"type": "int"}]}} + )) + codes = set([p.code for p in pr]) + + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10020, pr[0].cause[1].cause[0].code) + + def test_empty(self): + pr = list(data_schema.validate(None, {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate([], {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(["a"], {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate(tuple(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(set(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(frozenset(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(["a"]), {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate({}, {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate({"key": "value"}, {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate("", {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + def test_allOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "all-of": [ + {"type": None}, + {"type": "accept"}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "all-of": [ + {"type": None}, + {"type": "accept"}, + {"type": "deny"}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10057, pr[0].code) + self.assertEqual(1, len(pr[0].cause)) + self.assertEqual(10058, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10010, pr[0].cause[0].cause[0].code) + + def test_anyOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "any-of": [ + {"type": "deny"}, + {"type": None}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "any-of": [ + {"type": "string"}, + {"type": "deny"}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + def test_anyOf_with_list(self): + pr = list(data_schema.validate( + None, + {"type": [ + {"type": "deny"}, + {"type": None}, + ]})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": [ + {"type": "string"}, + {"type": "deny"}, + ]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + def test_oneOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "deny"}, + {"type": None}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "string"}, + {"type": "deny"}, + ] + }})) + + self.assertEqual(1, len(pr)) + self.assertEqual(10053, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10054, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10054, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "string"}, + {"type": "deny"}, + {"type": "empty"}, + {"type": None}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10019, pr[0].code) + self.assertEqual("2,3", pr[0].hint) + + def test_not(self): + pr = list(data_schema.validate( + None, + {"type": { + "not": { + "type": "empty"}}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "not": { + "type": { + "not": { + "type": "empty"}}}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2, + {"type": { + "not": { + "type": "int"}}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + 1, + {"type": { + "not": { + "type": { + "not": { + "type": "int"}}}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"type": { + "not": { + "type": "int"}}})) + self.assertEqual(0, len(pr)) + + def test_not_shortcut(self): + pr = list(data_schema.validate( + None, + {"not": { + "type": "empty"}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + None, + {"not": { + "not": { + "type": "empty"}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2, + {"not": { + "type": "int"}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + 1, + {"not": { + "not": { + "type": "int"}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"not": { + "type": "int"}})) + self.assertEqual(0, len(pr)) + + def test_integer(self): + pr = list(data_schema.validate(1, {"type": "integer"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(1, {"type": "float"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10023, pr[0].code) + + pr = list(data_schema.validate( + 2, + {"type": "int", + "minValue": 3, + "maxValue": 1})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10021, pr[0].code) + self.assertEqual(10022, pr[1].code) + + def test_float(self): + pr = list(data_schema.validate(1.8, {"type": "float"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(1, {"type": "float"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10023, pr[0].code) + + pr = list(data_schema.validate( + 2.0, + {"type": "real", + "minValue": 2.1, + "maxValue": 1.9})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10024, pr[0].code) + self.assertEqual(10025, pr[1].code) + + def test_number(self): + pr = list(data_schema.validate(1.8, {"type": "number"})) + self.assertEqual(0, len(pr)) + pr = list(data_schema.validate(1, {"type": "num"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"type": "number", + "minValue": 3, + "maxValue": 1.3})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10031, pr[0].code) + self.assertEqual(10032, pr[1].code) + + pr = list(data_schema.validate({}, {"type": "number"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10030, pr[0].code) + + def test_bool(self): + pr = list(data_schema.validate(True, {"type": "bool"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(True, {"type": "boolean", + "value": True})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(True, {"type": "boolean", + "value": False})) + self.assertEqual(1, len(pr)) + self.assertEqual(10028, pr[0].code) + + pr = list(data_schema.validate(False, {"type": "boolean"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "boolean", + "value": False})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "boolean", + "value": True})) + self.assertEqual(1, len(pr)) + self.assertEqual(10027, pr[0].code) + + def test_bool_real(self): + pr = list(data_schema.validate([1, 2], {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + pr = list(data_schema.validate([], {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + pr = list(data_schema.validate(None, {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + @staticmethod + def _check_value_ts(obj, schema, context): + if obj == datetime.datetime.fromtimestamp(0): + yield data_schema.ValidationProblem( + code=10042, hint=obj, context=context) + + def test_timestamp(self): + pr = list(data_schema.validate( + datetime.datetime.utcnow(), + {"type": "timestamp"})) + + pr = list(data_schema.validate( + datetime.datetime.fromtimestamp(180000), + {"type": "datetime", + "value": self._check_value_ts})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + datetime.datetime.fromtimestamp(0), + {"type": "datetime", + "value": self._check_value_ts})) + self.assertEqual(1, len(pr)) + self.assertEqual(10042, pr[0].code) + + def test_scalar(self): + pr = list(data_schema.validate(1, {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate("", {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(datetime.datetime.utcnow(), + {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate({}, {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate([], {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(tuple(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(set(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(frozenset(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "scalar"}, + {"type": None}, + ]}})) + self.assertEqual(0, len(pr)) + + +if __name__ == "__main__": + unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/testschematalib/__init__.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,1 @@ +# make schematalib a package
