comparison configmix/yaml.py @ 291:edf5cc1ffd26

Provide an optional "strict" keyword flag to all YAML load functions to detect and prevent duplicate keys within a single YAML document
author Franz Glasner <f.glasner@feldmann-mg.com>
date Wed, 10 Feb 2021 14:47:41 +0100
parents ff964825a75a
children eed16a1ec8f3
comparison
equal deleted inserted replaced
290:aec97edf7945 291:edf5cc1ffd26
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # :- 2 # :-
3 # :Copyright: (c) 2015-2020, Franz Glasner. All rights reserved. 3 # :Copyright: (c) 2015-2021, Franz Glasner. All rights reserved.
4 # :License: 3-clause BSD. See LICENSE.txt for details. 4 # :License: 3-clause BSD. See LICENSE.txt for details.
5 # :- 5 # :-
6 """Simple wrapper for :mod:`yaml` to support all-unicode strings when 6 """Simple wrapper for :mod:`yaml` to support all-unicode strings when
7 loading configuration files. 7 loading configuration files.
8 8
40 This is against YAML specs but within configuration files it seems 40 This is against YAML specs but within configuration files it seems
41 more natural. 41 more natural.
42 42
43 """ 43 """
44 44
45 def __init__(self, *args, **kwds):
46 strict = kwds.pop("strict", False)
47 self.__allow_duplicate_keys = not strict
48 yaml.Loader.__init__(self, *args, **kwds)
49
45 def construct_yaml_str(self, node): 50 def construct_yaml_str(self, node):
46 return self.construct_scalar(node) 51 return self.construct_scalar(node)
47 52
48 if OrderedDict: 53 #
49 54 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
50 # 55 # (MIT License)
51 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 56 #
52 # (MIT License) 57
53 # 58 def construct_yaml_map(self, node):
54 59 data = DictImpl()
55 def construct_yaml_map(self, node): 60 yield data
56 data = OrderedDict() 61 value = self.construct_mapping(node)
57 yield data 62 data.update(value)
58 value = self.construct_mapping(node) 63
59 data.update(value) 64 def construct_mapping(self, node, deep=False):
60 65 if isinstance(node, yaml.MappingNode):
61 def construct_mapping(self, node, deep=False): 66 self.flatten_mapping(node)
62 if isinstance(node, yaml.MappingNode): 67 else:
63 self.flatten_mapping(node) 68 raise yaml.constructor.ConstructorError(
64 else: 69 None,
65 raise yaml.constructor.ConstructorError( 70 None,
66 None, 71 'expected a mapping node, but found %s' % node.id,
67 None, 72 node.start_mark)
68 'expected a mapping node, but found %s' % node.id, 73
69 node.start_mark) 74 mapping = DictImpl()
70 75 for key_node, value_node in node.value:
71 mapping = OrderedDict() 76 key = self.construct_object(key_node, deep=deep)
72 for key_node, value_node in node.value: 77 try:
73 key = self.construct_object(key_node, deep=deep) 78 hash(key)
74 try: 79 except TypeError as err:
75 hash(key) 80 raise yaml.constructor.ConstructorError(
76 except TypeError as err: 81 'while constructing a mapping', node.start_mark,
77 raise yaml.constructor.ConstructorError( 82 'found unacceptable key (%s)' % (err,
78 'while constructing a mapping', node.start_mark, 83 key_node.start_mark)
79 'found unacceptable key (%s)' % (err, 84 )
80 key_node.start_mark) 85 value = self.construct_object(value_node, deep=deep)
81 ) 86 if not self.__allow_duplicate_keys and key in mapping:
82 value = self.construct_object(value_node, deep=deep) 87 raise yaml.constructor.ConstructorError(
83 mapping[key] = value 88 'while constructing a mapping', node.start_mark,
84 return mapping 89 'found duplicate key %r (%s)' % (key,
90 key_node.start_mark)
91 )
92 mapping[key] = value
93 return mapping
85 94
86 95
87 ConfigLoader.add_constructor( 96 ConfigLoader.add_constructor(
88 u("tag:yaml.org,2002:str"), 97 u("tag:yaml.org,2002:str"),
89 ConfigLoader.construct_yaml_str) 98 ConfigLoader.construct_yaml_str)
90 if OrderedDict: 99 ConfigLoader.add_constructor(
91 ConfigLoader.add_constructor( 100 u("tag:yaml.org,2002:map"),
92 u("tag:yaml.org,2002:map"), 101 ConfigLoader.construct_yaml_map)
93 ConfigLoader.construct_yaml_map) 102 ConfigLoader.add_constructor(
94 ConfigLoader.add_constructor( 103 u("tag:yaml.org,2002:omap"),
95 u("tag:yaml.org,2002:omap"), 104 ConfigLoader.construct_yaml_map)
96 ConfigLoader.construct_yaml_map)
97 105
98 106
99 class ConfigSafeLoader(yaml.SafeLoader): 107 class ConfigSafeLoader(yaml.SafeLoader):
100 108
101 """A safe YAML loader, which makes all ``!!str`` strings to Unicode. 109 """A safe YAML loader, which makes all ``!!str`` strings to Unicode.
106 This is against YAML specs but within configuration files it seems 114 This is against YAML specs but within configuration files it seems
107 more natural. 115 more natural.
108 116
109 """ 117 """
110 118
119 def __init__(self, *args, **kwds):
120 strict = kwds.pop("strict", False)
121 self.__allow_duplicate_keys = not strict
122 yaml.SafeLoader.__init__(self, *args, **kwds)
123
111 def construct_yaml_str(self, node): 124 def construct_yaml_str(self, node):
112 return self.construct_scalar(node) 125 return self.construct_scalar(node)
113 126
114 if OrderedDict: 127 #
115 128 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
116 # 129 # (MIT License)
117 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 130 #
118 # (MIT License) 131
119 # 132 def construct_yaml_map(self, node):
120 133 data = DictImpl()
121 def construct_yaml_map(self, node): 134 yield data
122 data = OrderedDict() 135 value = self.construct_mapping(node)
123 yield data 136 data.update(value)
124 value = self.construct_mapping(node) 137
125 data.update(value) 138 def construct_mapping(self, node, deep=False):
126 139 if isinstance(node, yaml.MappingNode):
127 def construct_mapping(self, node, deep=False): 140 self.flatten_mapping(node)
128 if isinstance(node, yaml.MappingNode): 141 else:
129 self.flatten_mapping(node) 142 raise yaml.constructor.ConstructorError(
130 else: 143 None,
131 raise yaml.constructor.ConstructorError( 144 None,
132 None, 145 'expected a mapping node, but found %s' % node.id,
133 None, 146 node.start_mark)
134 'expected a mapping node, but found %s' % node.id, 147
135 node.start_mark) 148 mapping = DictImpl()
136 149 for key_node, value_node in node.value:
137 mapping = OrderedDict() 150 key = self.construct_object(key_node, deep=deep)
138 for key_node, value_node in node.value: 151 try:
139 key = self.construct_object(key_node, deep=deep) 152 hash(key)
140 try: 153 except TypeError as err:
141 hash(key) 154 raise yaml.constructor.ConstructorError(
142 except TypeError as err: 155 'while constructing a mapping', node.start_mark,
143 raise yaml.constructor.ConstructorError( 156 'found unacceptable key (%s)' % (err,
144 'while constructing a mapping', node.start_mark, 157 key_node.start_mark)
145 'found unacceptable key (%s)' % (err, 158 )
146 key_node.start_mark) 159 value = self.construct_object(value_node, deep=deep)
147 ) 160 if not self.__allow_duplicate_keys and key in mapping:
148 value = self.construct_object(value_node, deep=deep) 161 raise yaml.constructor.ConstructorError(
149 mapping[key] = value 162 'while constructing a mapping', node.start_mark,
150 return mapping 163 'found duplicate key %r (%s)' % (key,
164 key_node.start_mark)
165 )
166 mapping[key] = value
167 return mapping
151 168
152 169
153 ConfigSafeLoader.add_constructor( 170 ConfigSafeLoader.add_constructor(
154 u("tag:yaml.org,2002:str"), 171 u("tag:yaml.org,2002:str"),
155 ConfigSafeLoader.construct_yaml_str) 172 ConfigSafeLoader.construct_yaml_str)
156 if OrderedDict: 173 ConfigSafeLoader.add_constructor(
157 ConfigSafeLoader.add_constructor( 174 u("tag:yaml.org,2002:map"),
158 u("tag:yaml.org,2002:map"), 175 ConfigSafeLoader.construct_yaml_map)
159 ConfigSafeLoader.construct_yaml_map) 176 ConfigSafeLoader.add_constructor(
160 ConfigSafeLoader.add_constructor( 177 u("tag:yaml.org,2002:omap"),
161 u("tag:yaml.org,2002:omap"), 178 ConfigSafeLoader.construct_yaml_map)
162 ConfigSafeLoader.construct_yaml_map) 179
163 180
164 181 def config_loader_factory(strict=False):
165 def load(stream, Loader=ConfigLoader): 182 def _real_factory(*args, **kwds):
183 kwds["strict"] = strict
184 return ConfigLoader(*args, **kwds)
185 return _real_factory
186
187
188 def config_safe_loader_factory(strict=False):
189 def _real_factory(*args, **kwds):
190 kwds["strict"] = strict
191 return ConfigSafeLoader(*args, **kwds)
192 return _real_factory
193
194
195 def load(stream, Loader=None, strict=False):
166 """Parse the given `stream` and return a Python object constructed 196 """Parse the given `stream` and return a Python object constructed
167 from for the first document in the stream. 197 from for the first document in the stream.
168 198
169 """ 199 If `strict` is `True` then duplicate mapping keys within a YAML
200 document are detected and prevented. If a `Loader` is given then
201 `strict` does not apply.
202
203 """
204 if Loader is None:
205 Loader = config_loader_factory(strict=strict)
170 data = yaml.load(stream, Loader) 206 data = yaml.load(stream, Loader)
171 # Map an empty document to an empty dict 207 # Map an empty document to an empty dict
172 if data is None: 208 if data is None:
173 return DictImpl() 209 return DictImpl()
174 if not isinstance(data, DictImpl): 210 if not isinstance(data, DictImpl):
175 raise TypeError("YAML root object must be a mapping") 211 raise TypeError("YAML root object must be a mapping")
176 return data 212 return data
177 213
178 214
179 def load_all(stream, Loader=ConfigLoader): 215 def load_all(stream, Loader=None, strict=False):
180 """Parse the given `stream` and return a sequence of Python objects 216 """Parse the given `stream` and return a sequence of Python objects
181 corresponding to the documents in the `stream`. 217 corresponding to the documents in the `stream`.
182 218
183 """ 219 If `strict` is `True` then duplicate mapping keys within a YAML
220 document are detected and prevented. If a `Loader` is given then
221 `strict` does not apply.
222
223 """
224 if Loader is None:
225 Loader = config_loader_factory(strict=strict)
184 data_all = yaml.load_all(stream, Loader) 226 data_all = yaml.load_all(stream, Loader)
185 rdata = [] 227 rdata = []
186 for data in data_all: 228 for data in data_all:
187 if data is None: 229 if data is None:
188 rdata.append(DictImpl()) 230 rdata.append(DictImpl())
191 raise TypeError("YAML root object must be a mapping") 233 raise TypeError("YAML root object must be a mapping")
192 rdata.append(data) 234 rdata.append(data)
193 return rdata 235 return rdata
194 236
195 237
196 def safe_load(stream): 238 def safe_load(stream, strict=False):
197 """Parse the given `stream` and return a Python object constructed 239 """Parse the given `stream` and return a Python object constructed
198 from for the first document in the stream. 240 from for the first document in the stream.
199 241
200 Recognizes only standard YAML tags and cannot construct an 242 Recognizes only standard YAML tags and cannot construct an
201 arbitrary Python object. 243 arbitrary Python object.
202 244
203 """ 245 If `strict` is `True` then duplicate mapping keys within a YAML document
204 data = yaml.load(stream, Loader=ConfigSafeLoader) 246 are detected and prevented.
247
248 """
249 data = yaml.load(stream,
250 Loader=config_safe_loader_factory(strict=strict))
205 # Map an empty document to an empty dict 251 # Map an empty document to an empty dict
206 if data is None: 252 if data is None:
207 return DictImpl() 253 return DictImpl()
208 if not isinstance(data, DictImpl): 254 if not isinstance(data, DictImpl):
209 raise TypeError("YAML root object must be a mapping") 255 raise TypeError("YAML root object must be a mapping")
210 return data 256 return data
211 257
212 258
213 def safe_load_all(stream): 259 def safe_load_all(stream, strict=False):
214 """Return the list of all decoded YAML documents in the file `stream`. 260 """Return the list of all decoded YAML documents in the file `stream`.
215 261
216 Recognizes only standard YAML tags and cannot construct an 262 Recognizes only standard YAML tags and cannot construct an
217 arbitrary Python object. 263 arbitrary Python object.
218 264
219 """ 265 If `strict` is `True` then duplicate mapping keys within a YAML document
220 data_all = yaml.load_all(stream, Loader=ConfigSafeLoader) 266 are detected and prevented.
267
268 """
269 data_all = yaml.load_all(stream,
270 Loader=config_safe_loader_factory(strict=strict))
221 rdata = [] 271 rdata = []
222 for data in data_all: 272 for data in data_all:
223 if data is None: 273 if data is None:
224 rdata.append(DictImpl()) 274 rdata.append(DictImpl())
225 else: 275 else: