83 line = [f
"{nonterminal} ::= ",
"object_begin "]
86 for field, _field_info
in current.fields().items():
87 field_name = f
"{nonterminal}_{field}"
88 field_name = escape_identifier(field_name)
90 fields.append(f
"{key} colon {field_name}")
91 result.append((_field_info, field_name))
92 line.append(
" comma ".join(fields))
93 line.append(
" object_end;\n")
94 return "".join(line), result
97 def field_info(current: typing.Type, nonterminal: str):
99 annotation = current.annotation
101 return "", [(annotation, nonterminal)]
102 new_nonterminal = f
"{nonterminal}_required"
103 return f
"{nonterminal} ::= {new_nonterminal}?;\n", [(annotation, new_nonterminal)]
106 def string_metadata(current: typing.Type, nonterminal: str):
107 min_length = current.metadata.get(
"min_length")
108 max_length = current.metadata.get(
"max_length")
109 pattern = current.metadata.get(
"pattern")
110 substring_of = current.metadata.get(
"substring_of")
114 temp_pattern = pattern.replace(
r'\^',
'').replace(
r'\$',
'').replace(
r'\\A',
'').replace(
r'\\z',
'')
116 if '^' in temp_pattern
or '$' in temp_pattern
or '\A' in temp_pattern
or '\z' in temp_pattern:
118 raise ValueError(f
"Pattern '{pattern}' contains unescaped anchors (^, $, \\A, \\z) which are not allowed")
120 print(f
"Warning: pattern '{pattern}' contains unescaped anchors (^, $, \\A, \\z) which are not allowed in schema {current} from {nonterminal}")
121 pattern = pattern.strip(
'^$')
122 pattern = repr(pattern)[1:-1]
124 assert not (min_length
or max_length
or substring_of),
"pattern is mutually exclusive with min_length, max_length and substring_of"
126 if min_length
or max_length
or substring_of:
127 print(f
"Warning: pattern is mutually exclusive with min_length, max_length and substring_of in schema {current} from {nonterminal}")
133 assert not (min_length
or max_length
or pattern),
"substring_of is mutually exclusive with min_length, max_length and pattern"
135 if min_length
or max_length
or pattern:
136 print(f
"Warning: substring_of is mutually exclusive with min_length, max_length and pattern in schema {current} from {nonterminal}")
141 (
True,
False): f
"{{{min_length},}}",
142 (
False,
True): f
"{{0,{max_length}}}",
143 (
True,
True): f
"{{{min_length},{max_length}}}"
145 repetition = repetition_map.get((min_length
is not None, max_length
is not None))
146 if repetition
is not None:
147 return fr
"""{nonterminal} ::= #'{SPACE_NONTERMINAL}"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}}){repetition}"';
149 if pattern
is not None:
150 pattern = pattern.replace(
"'",
"\\'")
151 return f
"""{nonterminal} ::= #'{SPACE_NONTERMINAL}"{pattern}"';\n""", []
152 if substring_of
is not None:
153 return f
"""{nonterminal} ::= #'{SPACE_NONTERMINAL}' '"' #substrs{repr(substring_of)} '"';\n""", []
155 def number_metadata(current: typing.Type, nonterminal: str):
156 gt = current.metadata.get(
"gt")
157 ge = current.metadata.get(
"ge")
158 lt = current.metadata.get(
"lt")
159 le = current.metadata.get(
"le")
168 for (condition, value), prefix
in prefix_map.items():
169 if condition
is not None and condition == value:
170 if issubclass(current.type, int):
171 return f
"""{nonterminal} ::= #'{SPACE_NONTERMINAL}{prefix}[1-9][0-9]*';\n""", []
172 elif issubclass(current.type, float):
173 return f
"""{nonterminal} ::= #'{SPACE_NONTERMINAL}{prefix}[1-9][0-9]*(\\\\.[0-9]+)?([eE][+-]?[0-9]+)?';\n""", []
175 raise ValueError(f
"{current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
177 print(f
"Warning: {current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
178 return "", [(current.type, nonterminal)]
180 def sequence_metadata(current: typing.Type, nonterminal: str):
181 min_items = current.metadata.get(
"min_length")
182 max_items = current.metadata.get(
"max_length")
183 prefix_items = current.metadata.get(
"prefix_items")
184 additional_items = current.metadata.get(
"additional_items")
185 if max_items
is not None and prefix_items
is not None and max_items <= len(prefix_items):
186 prefix_items = prefix_items[:max_items+1]
190 if not additional_items:
191 if min_items > len(prefix_items):
192 raise ValueError(f
"min_items {min_items} is greater than the number of prefix_items {len(prefix_items)} and additional_items is not allowed")
193 max_items = len(prefix_items)
194 if min_items
is not None or max_items
is not None:
195 new_nonterminal = f
"{nonterminal}_item"
197 if min_items
is None:
199 if min_items == 0
and max_items
is None and prefix_items
is None:
200 return "", [(current.type, new_nonterminal)]
201 prefix_items_nonterminals = [f
"{new_nonterminal}_{i}" for i
in range(len(prefix_items))]
if prefix_items
else []
202 prefix_items_parts = []
203 if prefix_items
is not None:
204 for i
in range(max(min_items,1), len(prefix_items)+1):
205 prefix_items_parts.append(prefix_items_nonterminals[:i])
207 ebnf_rules.append(f
"{nonterminal} ::= array_begin array_end;")
208 if max_items
is None:
210 min_items_part =
' comma '.join([new_nonterminal] * (min_items - 1))
211 ebnf_rules.append(f
"{nonterminal} ::= array_begin {min_items_part} comma {new_nonterminal}+ array_end;")
212 elif len(prefix_items_parts) >= min_items:
213 for prefix_items_part
in prefix_items_parts:
214 prefix_items_part =
' comma '.join(prefix_items_part)
215 ebnf_rules.append(f
"{nonterminal} ::= array_begin {prefix_items_part} (comma {new_nonterminal})* array_end;")
217 min_items_part =
' comma '.join([new_nonterminal] * (min_items - len(prefix_items_nonterminals)-1))
219 min_items_part =
"comma " + min_items_part
220 prefix_items_part =
' comma '.join(prefix_items_nonterminals)
221 ebnf_rules.append(f
"{nonterminal} ::= array_begin {prefix_items_part} {min_items_part} comma {new_nonterminal}+ array_end;")
222 elif min_items == 0
and not prefix_items:
223 for i
in range(min_items, max_items + 1):
224 items =
' comma '.join([new_nonterminal] * i)
225 ebnf_rules.append(f
"{nonterminal} ::= array_begin {items} array_end;")
227 prefix_items_num = len(prefix_items_nonterminals)
229 for prefix_items_part
in prefix_items_parts:
230 prefix_items_part =
' comma '.join(prefix_items_part)
231 ebnf_rules.append(f
"{nonterminal} ::= array_begin {prefix_items_part} array_end;")
232 min_items_part =
' comma '.join([new_nonterminal] * (min_items - prefix_items_num))
233 prefix_items_part =
' comma '.join(prefix_items_nonterminals)
234 if min_items_part
and prefix_items_part:
235 ebnf_rules.append(f
"{nonterminal}_min ::= {prefix_items_part} comma {min_items_part};")
237 ebnf_rules.append(f
"{nonterminal}_min ::= {min_items_part};")
238 elif prefix_items_part:
239 ebnf_rules.append(f
"{nonterminal}_min ::= {prefix_items_part};")
241 common = max(min_items, prefix_items_num)
242 for i
in range(1, max_items + 1 - common):
243 items =
' comma '.join([new_nonterminal] * i)
244 ebnf_rules.append(f
"{nonterminal} ::= array_begin {nonterminal}_min comma {items} array_end;")
246 args = typing.get_args(current.type)
251 item_type = typing.Any
253 return "\n".join(ebnf_rules) +
"\n", list(zip(prefix_items, prefix_items_nonterminals)) + [(item_type, new_nonterminal)]
254 return "\n".join(ebnf_rules) +
"\n", [(item_type, new_nonterminal)]
257 def is_sequence_like(current: typing.Type) -> bool:
259 Check if the given type is sequence-like.
261 This function returns True for:
265 - Any subclass of collections.abc.Sequence
270 current: The type to check.
273 bool: True if the type is sequence-like, False otherwise.
275 original = typing.get_origin(current)
279 original
is typing.Sequence
or
280 original
is typing.List
or
281 original
is typing.Tuple
or
282 (isinstance(original, type)
and (issubclass(original, collections.abc.Sequence)
or
283 issubclass(original, list)
or
284 issubclass(original, tuple)))
287 def metadata(current: typing.Type, nonterminal: str):
289 original = typing.get_origin(current.type)
291 original = current.type
292 if not current.metadata:
293 return "", [(current.type, nonterminal)]
294 if isinstance(current.type, type)
and issubclass(current.type, str):
295 return string_metadata(current, nonterminal)
296 elif isinstance(current.type, type)
and issubclass(current.type, (int, float)):
297 return number_metadata(current, nonterminal)
298 elif is_sequence_like(original):
299 return sequence_metadata(current, nonterminal)
302 def builtin_sequence(current: typing.Type, nonterminal: str):
303 original = typing.get_origin(current)
306 if is_sequence_like(original):
307 new_nonterminal = f
"{nonterminal}_value"
308 annotation = typing.get_args(current)
310 annotation = typing.Any
312 annotation = annotation[0]
313 return f
"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
314 [(annotation, new_nonterminal)]
317 def builtin_dict(current: typing.Type, nonterminal: str):
318 original = typing.get_origin(current)
321 if original
is typing.Mapping
or isinstance(original, type)
and issubclass(original,
322 collections.abc.Mapping):
323 new_nonterminal = f
"{nonterminal}_value"
324 args = typing.get_args(current)
329 args[0], str), f
"{args[0]} is not string!"
331 if value
is typing.Any:
332 return f
"{nonterminal} ::= object;\n", []
333 return f
"{nonterminal} ::=" \
334 f
" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
336 [(value, new_nonterminal)]
339 def builtin_tuple(current: typing.Type, nonterminal: str):
340 if typing.get_origin(current)
is tuple
or isinstance(current, type)
and issubclass(current, tuple):
341 args = typing.get_args(current)
342 new_nonterminals = []
344 for i, arg
in enumerate(args):
346 new_nonterminals.append(f
"{nonterminal}_{i}")
347 return f
"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
348 zip(result, new_nonterminals)
350 def builtin_union(current: typing.Type, nonterminal: str):
351 if typing.get_origin(current)
is typing.Union:
352 args = typing.get_args(current)
353 assert args, f
"{current} from {nonterminal} cannot be an empty union!"
354 new_nonterminals = []
356 for i, arg
in enumerate(args):
358 new_nonterminals.append(f
"{nonterminal}_{i}")
359 return f
"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
361 def builtin_literal(current: typing.Type, nonterminal: str):
362 if typing.get_origin(current)
is typing.Literal:
363 args = typing.get_args(current)
364 assert args, f
"{current} from {nonterminal} cannot be an empty literal!"
367 for i, arg
in enumerate(args):
368 if isinstance(arg, str):
370 elif isinstance(arg, bool):
371 new_items.append(f
'#"{SPACE_NONTERMINAL}{str(arg).lower()}"')
372 elif isinstance(arg, int):
373 new_items.append(f
'#"{SPACE_NONTERMINAL}{str(arg)}"')
374 elif isinstance(arg, float):
375 new_items.append(f
'#"{SPACE_NONTERMINAL}{str(arg)}"')
377 new_items.append(
"null")
378 elif isinstance(arg, tuple):
379 for j,item
in enumerate(arg):
380 new_nonterminal = f
"{nonterminal}_{i}_{j}"
381 result.append((typing.Literal[item], new_nonterminal))
382 new_item = f
"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
383 new_items.append(new_item)
384 elif isinstance(arg, frozendict):
385 for key, value
in arg.items():
386 new_nonterminal = f
"{nonterminal}_{i}_{key}"
387 result.append((typing.Literal[value], new_nonterminal))
388 new_item = f
"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
389 new_items.append(new_item)
391 new_nonterminal = f
"{nonterminal}_{i}"
392 result.append((arg, new_nonterminal))
393 new_items.append(new_nonterminal)
394 return f
"{nonterminal} ::= {' | '.join(new_items)};\n", result
396 def builtin_simple_types(current: typing.Type, nonterminal: str):
397 if isinstance(current, type)
and issubclass(current, bool):
398 return f
"{nonterminal} ::= boolean;\n", []
399 elif isinstance(current, type)
and issubclass(current, int):
400 return f
"{nonterminal} ::= integer;\n", []
401 elif isinstance(current, type)
and issubclass(current, float):
402 return f
"{nonterminal} ::= number;\n", []
403 elif isinstance(current, type)
and issubclass(current, decimal.Decimal):
404 return f
"{nonterminal} ::= number;\n", []
405 elif isinstance(current, type)
and issubclass(current, str):
406 return f
"{nonterminal} ::= string;\n", []
407 elif isinstance(current, type)
and issubclass(current, type(
None)):
408 return f
"{nonterminal} ::= null;\n", []
409 elif current
is typing.Any:
410 return f
"{nonterminal} ::= json_value;\n", []
411 elif isinstance(current, typing.NewType):
412 current: typing.NewType
413 return "", [(current.__supertype__, nonterminal)]
425def _generate_kbnf_grammar(schema: schemas.schema.Schema|collections.abc.Sequence, start_nonterminal: str) -> str:
427 Generate a KBNF grammar string from a schema for JSON format.
430 schema: The schema to generate a grammar for.
431 start_nonterminal: The start nonterminal of the grammar. Default is "start".