Formatron v0.4.9
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json.py
Go to the documentation of this file.
1"""
2The module defines the `JsonExtractor` class, which is used to extract data from a string in JSON format.
3"""
4import collections
5import decimal
6import types
7import typing
8
9from frozendict import frozendict
10
11from formatron import extractor, schemas
12
13__all__ = ["JsonExtractor"]
14
15SPACE_NONTERMINAL = "[ \t\n\r]*"
17GRAMMAR_HEADER = rf"""integer ::= #"-?(0|[1-9][0-9]*)";
18number ::= #"-?(0|[1-9][0-9]*)(\\.[0-9]+)?([eE][+-]?[0-9]+)?";
19string ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}})*"';
20boolean ::= "true"|"false";
21null ::= "null";
22array ::= array_begin (json_value (comma json_value)*)? array_end;
23object ::= object_begin (string colon json_value (comma string colon json_value)*)? object_end;
24json_value ::= number|string|boolean|null|array|object;
25comma ::= #"{SPACE_NONTERMINAL},{SPACE_NONTERMINAL}";
26colon ::= #"{SPACE_NONTERMINAL}:{SPACE_NONTERMINAL}";
27object_begin ::= #"\\{{{SPACE_NONTERMINAL}";
28object_end ::= #"{SPACE_NONTERMINAL}\\}}";
29array_begin ::= #"\\[{SPACE_NONTERMINAL}";
30array_end ::= #"{SPACE_NONTERMINAL}\\]";
31"""
32_type_to_nonterminals = []
33
34
36 generate_nonterminal_def: typing.Callable[
37 [typing.Type, str],
38 typing.Optional[typing.Tuple[str,
39 typing.List[typing.Tuple[typing.Type, str]]]]]) -> None:
40 """
41 Register a callable to generate nonterminal definition from a type.
42 The callable returns (nonterminal_definition, [(sub_type, sub_nonterminal), ...])
43 if the type is supported by this callable, otherwise None.
44 [(sub_type, sub_nonterminal), ...] are the types and nonterminals used in nonterminal_definition that may need
45 to be generated in the grammar too.
46
47 Args:
48 generate_nonterminal_def: A callable to generate nonterminal definition from a type.
49 """
50 _type_to_nonterminals.append(generate_nonterminal_def)
51
52
54 def schema(current: typing.Type, nonterminal: str):
55 if isinstance(current, type) and not isinstance(current, types.GenericAlias) \
56 and issubclass(current, schemas.schema.Schema):
57 line = [f"{nonterminal} ::= ", "object_begin "]
58 result = []
59 fields = []
60 for field, _field_info in current.fields().items():
61 field_name = f"{nonterminal}_{field}"
62 fields.append(f"'\"{field}\"' colon {field_name}")
63 result.append((_field_info, field_name))
64 line.append(" comma ".join(fields))
65 line.append(" object_end;\n")
66 return "".join(line), result
67 return None
68
69 def field_info(current: typing.Type, nonterminal: str):
70 if isinstance(current, schemas.schema.FieldInfo):
71 annotation = current.annotation
72 if current.required:
73 return "", [(annotation, nonterminal)]
74 new_nonterminal = f"{nonterminal}_required"
75 return f"{nonterminal} ::= {new_nonterminal}?;\n", [(annotation, new_nonterminal)]
76 return None
77
78 def string_metadata(current: typing.Type, nonterminal: str):
79 min_length = current.metadata.get("min_length")
80 max_length = current.metadata.get("max_length")
81 pattern = current.metadata.get("pattern")
82 substring_of = current.metadata.get("substring_of")
83 if pattern:
84 assert not (min_length or max_length or substring_of), "pattern is mutually exclusive with min_length, max_length and substring_of"
85 if substring_of:
86 assert not (min_length or max_length or pattern), "substring_of is mutually exclusive with min_length, max_length and pattern"
87 repetition_map = {
88 (True, False): f"{{{min_length},}}",
89 (False, True): f"{{0,{max_length}}}",
90 (True, True): f"{{{min_length},{max_length}}}"
91 }
92 repetition = repetition_map.get((min_length is not None, max_length is not None))
93 if repetition is not None:
94 return fr"""{nonterminal} ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}}){repetition}"';
95""", []
96 if pattern is not None:
97 pattern = pattern.replace("'", "\\'")
98 return f"""{nonterminal} ::= #'"{pattern}"';\n""", []
99 if substring_of is not None:
100 return f"""{nonterminal} ::= '"' #substrs{repr(substring_of)} '"';\n""", []
101
102 def number_metadata(current: typing.Type, nonterminal: str):
103 gt = current.metadata.get("gt")
104 ge = current.metadata.get("ge")
105 lt = current.metadata.get("lt")
106 le = current.metadata.get("le")
107
108 prefix_map = {
109 (gt, 0): "",
110 (ge, 0): "0|",
111 (lt, 0): "-",
112 (le, 0): "0|-",
113 }
114
115 for (condition, value), prefix in prefix_map.items():
116 if condition is not None and condition == value:
117 if issubclass(current.type, int):
118 return f"""{nonterminal} ::= #'{prefix}[1-9][0-9]*';\n""", []
119 elif issubclass(current.type, float):
120 return f"""{nonterminal} ::= #'{prefix}[1-9][0-9]*(\\.[0-9]+)?([eE][+-]?[0-9]+)?';\n""", []
121
122 raise ValueError(f"{current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
123
124 def sequence_metadata(current: typing.Type, nonterminal: str):
125 min_items = current.metadata.get("min_length")
126 max_items = current.metadata.get("max_length")
127 prefix_items = current.metadata.get("prefix_items")
128 additional_items = current.metadata.get("additional_items")
129 if max_items is not None and prefix_items is not None and max_items <= len(prefix_items): # truncate prefix items
130 prefix_items = prefix_items[:max_items+1]
131 if prefix_items:
132 if not min_items: # json schema defaults to 0
133 min_items = 0
134 if not additional_items:
135 if min_items > len(prefix_items):
136 raise ValueError(f"min_items {min_items} is greater than the number of prefix_items {len(prefix_items)} and additional_items is not allowed")
137 max_items = len(prefix_items)
138 if min_items is not None or max_items is not None: # prefix items will set min
139 new_nonterminal = f"{nonterminal}_item"
140 ebnf_rules = []
141 if min_items is None:
142 min_items = 0
143 if min_items == 0 and max_items is None and prefix_items is None: # no special handling needed
144 return "", [(current.type, new_nonterminal)]
145 prefix_items_nonterminals = [f"{new_nonterminal}_{i}" for i in range(len(prefix_items))] if prefix_items else []
146 prefix_items_parts = [] # contains the sequence of nonterminals for prefix items from min_items to len(prefix_items)
147 if prefix_items is not None:
148 for i in range(max(min_items,1), len(prefix_items)+1):
149 prefix_items_parts.append(prefix_items_nonterminals[:i])
150 if min_items == 0: # EMPTY_PREFIX_ITEMS_ALLOWED
151 ebnf_rules.append(f"{nonterminal} ::= array_begin array_end;")
152 if max_items is None: # unbounded
153 if not prefix_items:
154 min_items_part = ' comma '.join([new_nonterminal] * (min_items - 1))
155 ebnf_rules.append(f"{nonterminal} ::= array_begin {min_items_part} comma {new_nonterminal}+ array_end;")
156 elif len(prefix_items_parts) >= min_items: # this part assumes prefix items are not empty, so we need the EMPTY_PREFIX_ITEMS_ALLOWED check above
157 for prefix_items_part in prefix_items_parts:
158 prefix_items_part = ' comma '.join(prefix_items_part)
159 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} (comma {new_nonterminal})* array_end;")
160 else:
161 min_items_part = ' comma '.join([new_nonterminal] * (min_items - len(prefix_items_nonterminals)-1))
162 if min_items_part:
163 min_items_part = "comma " + min_items_part
164 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
165 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} {min_items_part} comma {new_nonterminal}+ array_end;")
166 elif min_items == 0 and not prefix_items: # TAG: ONLY_MAX_ITEMS
167 for i in range(min_items, max_items + 1):
168 items = ' comma '.join([new_nonterminal] * i)
169 ebnf_rules.append(f"{nonterminal} ::= array_begin {items} array_end;")
170 else:
171 prefix_items_num = len(prefix_items_nonterminals)
172 if prefix_items:
173 for prefix_items_part in prefix_items_parts:
174 prefix_items_part = ' comma '.join(prefix_items_part)
175 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} array_end;")
176 min_items_part = ' comma '.join([new_nonterminal] * (min_items - prefix_items_num))
177 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
178 if min_items_part and prefix_items_part:
179 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part} comma {min_items_part};")
180 elif min_items_part:
181 ebnf_rules.append(f"{nonterminal}_min ::= {min_items_part};")
182 elif prefix_items_part:
183 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part};")
184 # sanity check: if prefix_items_part and min_items_part are both empty, we will in ONLY_MAX_ITEMS branch above
185 common = max(min_items, prefix_items_num)
186 for i in range(1, max_items + 1 - common):
187 items = ' comma '.join([new_nonterminal] * i)
188 ebnf_rules.append(f"{nonterminal} ::= array_begin {nonterminal}_min comma {items} array_end;")
189 # Handle the item type
190 args = typing.get_args(current.type)
191 if args:
192 item_type = args[0]
193 else:
194 # If args is empty, default to Any
195 item_type = typing.Any
196 if prefix_items:
197 return "\n".join(ebnf_rules) + "\n", list(zip(prefix_items, prefix_items_nonterminals)) + [(item_type, new_nonterminal)]
198 return "\n".join(ebnf_rules) + "\n", [(item_type, new_nonterminal)]
199 return None
200
201 def is_sequence_like(current: typing.Type) -> bool:
202 """
203 Check if the given type is sequence-like.
204
205 This function returns True for:
206 - typing.Sequence
207 - typing.List
208 - typing.Tuple
209 - Any subclass of collections.abc.Sequence
210 - list
211 - tuple
212
213 Args:
214 current: The type to check.
215
216 Returns:
217 bool: True if the type is sequence-like, False otherwise.
218 """
219 original = typing.get_origin(current)
220 if original is None:
221 original = current
222 return (
223 original is typing.Sequence or
224 original is typing.List or
225 original is typing.Tuple or
226 (isinstance(original, type) and (issubclass(original, collections.abc.Sequence) or
227 issubclass(original, list) or
228 issubclass(original, tuple)))
229 )
230
231 def metadata(current: typing.Type, nonterminal: str):
232 if isinstance(current, schemas.schema.TypeWithMetadata):
233 original = typing.get_origin(current.type)
234 if original is None:
235 original = current.type
236 if not current.metadata:
237 return "", [(current.type, nonterminal)]
238 if isinstance(current.type, type) and issubclass(current.type, str):
239 return string_metadata(current, nonterminal)
240 elif isinstance(current.type, type) and issubclass(current.type, (int, float)):
241 return number_metadata(current, nonterminal)
242 elif is_sequence_like(original):
243 return sequence_metadata(current, nonterminal)
244 return None
245
246 def builtin_sequence(current: typing.Type, nonterminal: str):
247 original = typing.get_origin(current)
248 if original is None:
249 original = current
250 if is_sequence_like(original):
251 new_nonterminal = f"{nonterminal}_value"
252 annotation = typing.get_args(current)
253 if not annotation:
254 annotation = typing.Any
255 else:
256 annotation = annotation[0]
257 return f"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
258 [(annotation, new_nonterminal)]
259 return None
260
261 def builtin_dict(current: typing.Type, nonterminal: str):
262 original = typing.get_origin(current)
263 if original is None:
264 original = current
265 if original is typing.Mapping or isinstance(original, type) and issubclass(original,
266 collections.abc.Mapping):
267 new_nonterminal = f"{nonterminal}_value"
268 args = typing.get_args(current)
269 if not args:
270 value = typing.Any
271 else:
272 assert issubclass(
273 args[0], str), f"{args[0]} is not string!"
274 value = args[1]
275 return f"{nonterminal} ::=" \
276 f" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
277 f" object_end;\n", \
278 [(value, new_nonterminal)]
279 return None
280
281 def builtin_tuple(current: typing.Type, nonterminal: str):
282 if typing.get_origin(current) is tuple or isinstance(current, type) and issubclass(current, tuple):
283 args = typing.get_args(current)
284 new_nonterminals = []
285 result = []
286 for i, arg in enumerate(args):
287 result.append(arg)
288 new_nonterminals.append(f"{nonterminal}_{i}")
289 return f"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
290 zip(result, new_nonterminals)
291
292 def builtin_union(current: typing.Type, nonterminal: str):
293 if typing.get_origin(current) is typing.Union:
294 args = typing.get_args(current)
295 assert args, f"{current} from {nonterminal} cannot be an empty union!"
296 new_nonterminals = []
297 result = []
298 for i, arg in enumerate(args):
299 result.append(arg)
300 new_nonterminals.append(f"{nonterminal}_{i}")
301 return f"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
302
303 def builtin_literal(current: typing.Type, nonterminal: str):
304 if typing.get_origin(current) is typing.Literal:
305 args = typing.get_args(current)
306 assert args, f"{current} from {nonterminal} cannot be an empty literal!"
307 new_items = []
308 result = []
309 for i, arg in enumerate(args):
310 if isinstance(arg, str):
311 new_items.append(f'"\\"{repr(arg)[1:-1]}\\""')
312 elif isinstance(arg, bool):
313 new_items.append(f'"{str(arg).lower()}"')
314 elif isinstance(arg, int):
315 new_items.append(f'"{str(arg)}"')
316 elif isinstance(arg, float):
317 new_items.append(f'"{str(arg)}"')
318 elif arg is None:
319 new_items.append("null")
320 elif isinstance(arg, tuple):
321 for j,item in enumerate(arg):
322 new_nonterminal = f"{nonterminal}_{i}_{j}"
323 result.append((typing.Literal[item], new_nonterminal))
324 new_item = f"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
325 new_items.append(new_item)
326 elif isinstance(arg, frozendict):
327 for key, value in arg.items():
328 new_nonterminal = f"{nonterminal}_{i}_{key}"
329 result.append((typing.Literal[value], new_nonterminal))
330 new_item = f"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
331 new_items.append(new_item)
332 else:
333 new_nonterminal = f"{nonterminal}_{i}"
334 result.append((arg, new_nonterminal))
335 new_items.append(new_nonterminal)
336 return f"{nonterminal} ::= {' | '.join(new_items)};\n", result
337
338 def builtin_simple_types(current: typing.Type, nonterminal: str):
339 if isinstance(current, type) and issubclass(current, bool):
340 return f"{nonterminal} ::= boolean;\n", []
341 elif isinstance(current, type) and issubclass(current, int):
342 return f"{nonterminal} ::= integer;\n", []
343 elif isinstance(current, type) and issubclass(current, float):
344 return f"{nonterminal} ::= number;\n", []
345 elif isinstance(current, type) and issubclass(current, decimal.Decimal):
346 return f"{nonterminal} ::= number;\n", []
347 elif isinstance(current, type) and issubclass(current, str):
348 return f"{nonterminal} ::= string;\n", []
349 elif isinstance(current, type) and issubclass(current, type(None)):
350 return f"{nonterminal} ::= null;\n", []
351 elif current is typing.Any:
352 return f"{nonterminal} ::= json_value;\n", []
353 elif isinstance(current, typing.NewType):
354 current: typing.NewType
355 return "", [(current.__supertype__, nonterminal)]
356
357 register_generate_nonterminal_def(builtin_simple_types)
362 register_generate_nonterminal_def(builtin_literal)
364 register_generate_nonterminal_def(builtin_sequence)
366
367def _generate_kbnf_grammar(schema: schemas.schema.Schema|collections.abc.Sequence, start_nonterminal: str) -> str:
368 """
369 Generate a KBNF grammar string from a schema for JSON format.
370
371 Args:
372 schema: The schema to generate a grammar for.
373 start_nonterminal: The start nonterminal of the grammar. Default is "start".
374
375 Returns:
376 The generated KBNF grammar string.
377 """
378 type_id_to_nonterminal = {
379 id(int): "integer",
380 id(float): "number",
381 id(str): "string",
382 id(bool): "boolean",
383 id(type(None)): "null",
384 id(list): "array",
385 id(dict): "object",
386 id(typing.Any): "json_value",
388 result = [GRAMMAR_HEADER]
389 nonterminals = set()
390 stack = [(schema, start_nonterminal)]
391 while stack:
392 (current, nonterminal) = stack.pop()
393 type_id = id(current)
394 if type_id in type_id_to_nonterminal:
395 line = f"{nonterminal} ::= {type_id_to_nonterminal[type_id]};\n"
396 result.append(line)
397 continue
398 type_id_to_nonterminal[type_id] = nonterminal
399 for i in _type_to_nonterminals:
400 value = i(current, nonterminal)
401 if value is not None:
402 line, to_stack = value
403 result.append(line)
404 stack.extend(to_stack)
405 nonterminals.add(nonterminal)
406 break
407 else:
408 raise TypeError(
409 f"{current} from {nonterminal} is not supported in json_generators!")
410 return "".join(result)
411
412
414 """
415 An extractor that loads json data to an object from a string.
416 """
417
418 def __init__(self, nonterminal: str, capture_name: typing.Optional[str], schema: schemas.schema.Schema|collections.abc.Sequence,
419 to_object: typing.Callable[[str], schemas.schema.Schema]):
420 """
421 Create a json extractor from a given schema or a list of supported types.
422
423 Currently, the following data types are supported:
424
425 - bool
426 - int
427 - positive int
428 - negative int
429 - nonnegative int
430 - nonpositive int
431 - float
432 - positive float
433 - negative float
434 - nonnegative float
435 - nonpositive float
436 - str
437 - optionally with min_length, max_length and pattern constraints
438 - length is measured in UTF-8 character number after json parsing
439 - *Warning*: too large difference between min_length and max_length can lead to enormous memory consumption!
440 - pattern is mutually exclusive with min_length and max_length
441 - pattern will be compiled to a regular expression so all caveats of regular expressions apply
442 - pattern currently is automatically anchored at both ends
443 - the generated json could be invalid if the pattern allows invalid content between the json string's quotes.
444 - for example, `pattern=".*"` will allow '\"' to appear in the json string which is forbidden by JSON standard.
445 - also supports substring_of constraint which constrains the string to be a substring of a given string
446 - the generated json could be invalid if the given string contains invalid content when put into the json string's quotes.
447 - for example, `substring_of="abc\""` will allow '\"' to appear in the json string which is forbidden by JSON standard.
448 - NoneType
449 - typing.Any
450 - Subclasses of collections.abc.Mapping[str,T] and typing.Mapping[str,T] where T is a supported type,
451 - Subclasses of collections.abc.Sequence[T] and typing.Sequence[T] where T is a supported type.
452 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
453 - *Warning*: too large difference between minItems and maxItems can lead to very slow performance!
454 - *Warning*: By json schema definition, prefixItems by default allows additional items and missing items in the prefixItems, which may not be the desired behavior and can lead to very slow performance if prefixItems is long!
455 - tuple[T1,T2,...] where T1,T2,... are supported types. The order, type and number of elements will be preserved.
456 - typing.Literal[x1,x2,...] where x1, x2, ... are instances of int, string, bool or NoneType, or another typing.Literal[y1,y2,...]
457 - typing.Union[T1,T2,...] where T1,T2,... are supported types.
458 - schemas.Schema where all its fields' data types are supported. Recursive schema definitions are supported as well.
459 - *Warning*: while not required field is supported, they can lead to very slow performance and/or enormous memory consumption if there are too many of them!
460
461 Args:
462 nonterminal: The nonterminal representing the extractor.
463 capture_name: The capture name of the extractor, or `None` if the extractor does not capture.
464 schema: The schema.
465 to_object: A callable to convert the extracted string to a schema instance.
466 """
467 super().__init__(nonterminal, capture_name)
468 self._to_object = to_object
469 self._rule_str = _generate_kbnf_grammar(schema, self.nonterminal)
470 def extract(self, input_str: str) -> typing.Optional[tuple[str, schemas.schema.Schema]]:
471 """
472 Extract a schema instance from a string.
473
474 Args:
475 input_str: The input string to extract from.
476
477 Returns:
478 A tuple of the remaining string and the extracted schema instance, or `None` if extraction failed.
479 """
480
481 # Ensure the input string starts with '{' or '[' after stripping leading whitespace
482 input_str = input_str.lstrip()
483 if not input_str.startswith(('{', '[')):
484 return None
485
486 # Variables to track the balance of brackets and the position in the string
487 bracket_count = 0
488 position = 0
489 in_string = False
490 escape_next = False
491 start_char = input_str[0]
492 end_char = '}' if start_char == '{' else ']'
493
494 # Iterate over the string to find where the JSON object or array ends
495 for char in input_str:
496 if not in_string:
497 if char == start_char:
498 bracket_count += 1
499 elif char == end_char:
500 bracket_count -= 1
501 elif char == '"':
502 in_string = True
503 else:
504 if char == '"' and not escape_next:
505 in_string = False
506 elif char == '\\':
507 escape_next = not escape_next
508 else:
509 escape_next = False
510
511 # Move to the next character
512 position += 1
513
514 # If brackets are balanced and we're not in a string, stop processing
515 if bracket_count == 0 and not in_string:
516 break
517 else:
518 return None
519 # The position now points to the character after the last '}', so we slice to position
520 json_str = input_str[:position]
521 remaining_str = input_str[position:]
522 # Return the unparsed remainder of the string and the decoded JSON object
523 return remaining_str, self._to_object(json_str)
524
525 @property
526 def kbnf_definition(self):
527 return self._rule_str
528
529
An extractor that extracts data corresponding to a nonterminal.
Definition extractor.py:98
str nonterminal(self)
Get the nonterminal of the extractor.
Definition extractor.py:121
An extractor that loads json data to an object from a string.
Definition json.py:426
typing.Optional[tuple[str, schemas.schema.Schema]] extract(self, str input_str)
Extract a schema instance from a string.
Definition json.py:497
__init__(self, str nonterminal, typing.Optional[str] capture_name, schemas.schema.Schema|collections.abc.Sequence schema, typing.Callable[[str], schemas.schema.Schema] to_object)
Create a json extractor from a given schema or a list of supported types.
Definition json.py:484
An abstract field info that describes a data field in a schema.
Definition schema.py:13
An abstract schema that describes some data.
Definition schema.py:91
_register_all_predefined_types()
Definition json.py:57
None register_generate_nonterminal_def(typing.Callable[[typing.Type, str], typing.Optional[typing.Tuple[str, typing.List[typing.Tuple[typing.Type, str]]]]] generate_nonterminal_def)
Register a callable to generate nonterminal definition from a type.
Definition json.py:53
str _generate_kbnf_grammar(schemas.schema.Schema|collections.abc.Sequence schema, str start_nonterminal)
Generate a KBNF grammar string from a schema for JSON format.
Definition json.py:387