Formatron v0.4.11
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json.py
Go to the documentation of this file.
1"""
2The module defines the `JsonExtractor` class, which is used to extract data from a string in JSON format.
3"""
4import collections
5import decimal
6import types
7import typing
8
9from frozendict import frozendict
10
11from formatron import extractor, schemas
12from formatron.formats.utils import escape_identifier, from_str_to_kbnf_str
13
14__all__ = ["JsonExtractor"]
15
16SPACE_NONTERMINAL = "[ \t\n\r]*"
18GRAMMAR_HEADER = rf"""integer ::= #"-?(0|[1-9][0-9]*)";
19number ::= #"-?(0|[1-9][0-9]*)(\\.[0-9]+)?([eE][+-]?[0-9]+)?";
20string ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}})*"';
21boolean ::= "true"|"false";
22null ::= "null";
23array ::= array_begin (json_value (comma json_value)*)? array_end;
24object ::= object_begin (string colon json_value (comma string colon json_value)*)? object_end;
25json_value ::= number|string|boolean|null|array|object;
26comma ::= #"{SPACE_NONTERMINAL},{SPACE_NONTERMINAL}";
27colon ::= #"{SPACE_NONTERMINAL}:{SPACE_NONTERMINAL}";
28object_begin ::= #"\\{{{SPACE_NONTERMINAL}";
29object_end ::= #"{SPACE_NONTERMINAL}\\}}";
30array_begin ::= #"\\[{SPACE_NONTERMINAL}";
31array_end ::= #"{SPACE_NONTERMINAL}\\]";
32"""
33
34_type_to_nonterminals = []
35
36
37
39 generate_nonterminal_def: typing.Callable[
40 [typing.Type, str],
41 typing.Optional[typing.Tuple[str,
42 typing.List[typing.Tuple[typing.Type, str]]]]]) -> None:
43 """
44 Register a callable to generate nonterminal definition from a type.
45 The callable returns (nonterminal_definition, [(sub_type, sub_nonterminal), ...])
46 if the type is supported by this callable, otherwise None.
47 [(sub_type, sub_nonterminal), ...] are the types and nonterminals used in nonterminal_definition that may need
48 to be generated in the grammar too.
49
50 Args:
51 generate_nonterminal_def: A callable to generate nonterminal definition from a type.
52 """
53 _type_to_nonterminals.append(generate_nonterminal_def)
54
55
57 def schema(current: typing.Type, nonterminal: str):
58 if isinstance(current, type) and not isinstance(current, types.GenericAlias) \
59 and issubclass(current, schemas.schema.Schema):
60 line = [f"{nonterminal} ::= ", "object_begin "]
61 result = []
62 fields = []
63 for field, _field_info in current.fields().items():
64 field_name = f"{nonterminal}_{field}"
65 field_name = escape_identifier(field_name)
66 key = from_str_to_kbnf_str(field)
67 fields.append(f"{key} colon {field_name}")
68 result.append((_field_info, field_name))
69 line.append(" comma ".join(fields))
70 line.append(" object_end;\n")
71 return "".join(line), result
72 return None
73
74 def field_info(current: typing.Type, nonterminal: str):
75 if isinstance(current, schemas.schema.FieldInfo):
76 annotation = current.annotation
77 if current.required:
78 return "", [(annotation, nonterminal)]
79 new_nonterminal = f"{nonterminal}_required"
80 return f"{nonterminal} ::= {new_nonterminal}?;\n", [(annotation, new_nonterminal)]
81 return None
82
83 def string_metadata(current: typing.Type, nonterminal: str):
84 min_length = current.metadata.get("min_length")
85 max_length = current.metadata.get("max_length")
86 pattern = current.metadata.get("pattern")
87 substring_of = current.metadata.get("substring_of")
88 if pattern:
89 assert not (min_length or max_length or substring_of), "pattern is mutually exclusive with min_length, max_length and substring_of"
90 if substring_of:
91 assert not (min_length or max_length or pattern), "substring_of is mutually exclusive with min_length, max_length and pattern"
92 repetition_map = {
93 (True, False): f"{{{min_length},}}",
94 (False, True): f"{{0,{max_length}}}",
95 (True, True): f"{{{min_length},{max_length}}}"
96 }
97 repetition = repetition_map.get((min_length is not None, max_length is not None))
98 if repetition is not None:
99 return fr"""{nonterminal} ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}}){repetition}"';
100""", []
101 if pattern is not None:
102 pattern = pattern.replace("'", "\\'")
103 return f"""{nonterminal} ::= #'"{pattern}"';\n""", []
104 if substring_of is not None:
105 return f"""{nonterminal} ::= '"' #substrs{repr(substring_of)} '"';\n""", []
106
107 def number_metadata(current: typing.Type, nonterminal: str):
108 gt = current.metadata.get("gt")
109 ge = current.metadata.get("ge")
110 lt = current.metadata.get("lt")
111 le = current.metadata.get("le")
112
113 prefix_map = {
114 (gt, 0): "",
115 (ge, 0): "0|",
116 (lt, 0): "-",
117 (le, 0): "0|-",
118 }
119
120 for (condition, value), prefix in prefix_map.items():
121 if condition is not None and condition == value:
122 if issubclass(current.type, int):
123 return f"""{nonterminal} ::= #'{prefix}[1-9][0-9]*';\n""", []
124 elif issubclass(current.type, float):
125 return f"""{nonterminal} ::= #'{prefix}[1-9][0-9]*(\\.[0-9]+)?([eE][+-]?[0-9]+)?';\n""", []
126
127 raise ValueError(f"{current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
128
129 def sequence_metadata(current: typing.Type, nonterminal: str):
130 min_items = current.metadata.get("min_length")
131 max_items = current.metadata.get("max_length")
132 prefix_items = current.metadata.get("prefix_items")
133 additional_items = current.metadata.get("additional_items")
134 if max_items is not None and prefix_items is not None and max_items <= len(prefix_items): # truncate prefix items
135 prefix_items = prefix_items[:max_items+1]
136 if prefix_items:
137 if not min_items: # json schema defaults to 0
138 min_items = 0
139 if not additional_items:
140 if min_items > len(prefix_items):
141 raise ValueError(f"min_items {min_items} is greater than the number of prefix_items {len(prefix_items)} and additional_items is not allowed")
142 max_items = len(prefix_items)
143 if min_items is not None or max_items is not None: # prefix items will set min
144 new_nonterminal = f"{nonterminal}_item"
145 ebnf_rules = []
146 if min_items is None:
147 min_items = 0
148 if min_items == 0 and max_items is None and prefix_items is None: # no special handling needed
149 return "", [(current.type, new_nonterminal)]
150 prefix_items_nonterminals = [f"{new_nonterminal}_{i}" for i in range(len(prefix_items))] if prefix_items else []
151 prefix_items_parts = [] # contains the sequence of nonterminals for prefix items from min_items to len(prefix_items)
152 if prefix_items is not None:
153 for i in range(max(min_items,1), len(prefix_items)+1):
154 prefix_items_parts.append(prefix_items_nonterminals[:i])
155 if min_items == 0: # EMPTY_PREFIX_ITEMS_ALLOWED
156 ebnf_rules.append(f"{nonterminal} ::= array_begin array_end;")
157 if max_items is None: # unbounded
158 if not prefix_items:
159 min_items_part = ' comma '.join([new_nonterminal] * (min_items - 1))
160 ebnf_rules.append(f"{nonterminal} ::= array_begin {min_items_part} comma {new_nonterminal}+ array_end;")
161 elif len(prefix_items_parts) >= min_items: # this part assumes prefix items are not empty, so we need the EMPTY_PREFIX_ITEMS_ALLOWED check above
162 for prefix_items_part in prefix_items_parts:
163 prefix_items_part = ' comma '.join(prefix_items_part)
164 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} (comma {new_nonterminal})* array_end;")
165 else:
166 min_items_part = ' comma '.join([new_nonterminal] * (min_items - len(prefix_items_nonterminals)-1))
167 if min_items_part:
168 min_items_part = "comma " + min_items_part
169 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
170 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} {min_items_part} comma {new_nonterminal}+ array_end;")
171 elif min_items == 0 and not prefix_items: # TAG: ONLY_MAX_ITEMS
172 for i in range(min_items, max_items + 1):
173 items = ' comma '.join([new_nonterminal] * i)
174 ebnf_rules.append(f"{nonterminal} ::= array_begin {items} array_end;")
175 else:
176 prefix_items_num = len(prefix_items_nonterminals)
177 if prefix_items:
178 for prefix_items_part in prefix_items_parts:
179 prefix_items_part = ' comma '.join(prefix_items_part)
180 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} array_end;")
181 min_items_part = ' comma '.join([new_nonterminal] * (min_items - prefix_items_num))
182 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
183 if min_items_part and prefix_items_part:
184 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part} comma {min_items_part};")
185 elif min_items_part:
186 ebnf_rules.append(f"{nonterminal}_min ::= {min_items_part};")
187 elif prefix_items_part:
188 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part};")
189 # sanity check: if prefix_items_part and min_items_part are both empty, we will in ONLY_MAX_ITEMS branch above
190 common = max(min_items, prefix_items_num)
191 for i in range(1, max_items + 1 - common):
192 items = ' comma '.join([new_nonterminal] * i)
193 ebnf_rules.append(f"{nonterminal} ::= array_begin {nonterminal}_min comma {items} array_end;")
194 # Handle the item type
195 args = typing.get_args(current.type)
196 if args:
197 item_type = args[0]
198 else:
199 # If args is empty, default to Any
200 item_type = typing.Any
201 if prefix_items:
202 return "\n".join(ebnf_rules) + "\n", list(zip(prefix_items, prefix_items_nonterminals)) + [(item_type, new_nonterminal)]
203 return "\n".join(ebnf_rules) + "\n", [(item_type, new_nonterminal)]
204 return None
205
206 def is_sequence_like(current: typing.Type) -> bool:
207 """
208 Check if the given type is sequence-like.
209
210 This function returns True for:
211 - typing.Sequence
212 - typing.List
213 - typing.Tuple
214 - Any subclass of collections.abc.Sequence
215 - list
216 - tuple
217
218 Args:
219 current: The type to check.
220
221 Returns:
222 bool: True if the type is sequence-like, False otherwise.
223 """
224 original = typing.get_origin(current)
225 if original is None:
226 original = current
227 return (
228 original is typing.Sequence or
229 original is typing.List or
230 original is typing.Tuple or
231 (isinstance(original, type) and (issubclass(original, collections.abc.Sequence) or
232 issubclass(original, list) or
233 issubclass(original, tuple)))
234 )
235
236 def metadata(current: typing.Type, nonterminal: str):
237 if isinstance(current, schemas.schema.TypeWithMetadata):
238 original = typing.get_origin(current.type)
239 if original is None:
240 original = current.type
241 if not current.metadata:
242 return "", [(current.type, nonterminal)]
243 if isinstance(current.type, type) and issubclass(current.type, str):
244 return string_metadata(current, nonterminal)
245 elif isinstance(current.type, type) and issubclass(current.type, (int, float)):
246 return number_metadata(current, nonterminal)
247 elif is_sequence_like(original):
248 return sequence_metadata(current, nonterminal)
249 return None
250
251 def builtin_sequence(current: typing.Type, nonterminal: str):
252 original = typing.get_origin(current)
253 if original is None:
254 original = current
255 if is_sequence_like(original):
256 new_nonterminal = f"{nonterminal}_value"
257 annotation = typing.get_args(current)
258 if not annotation:
259 annotation = typing.Any
260 else:
261 annotation = annotation[0]
262 return f"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
263 [(annotation, new_nonterminal)]
264 return None
265
266 def builtin_dict(current: typing.Type, nonterminal: str):
267 original = typing.get_origin(current)
268 if original is None:
269 original = current
270 if original is typing.Mapping or isinstance(original, type) and issubclass(original,
271 collections.abc.Mapping):
272 new_nonterminal = f"{nonterminal}_value"
273 args = typing.get_args(current)
274 if not args:
275 value = typing.Any
276 else:
277 assert issubclass(
278 args[0], str), f"{args[0]} is not string!"
279 value = args[1]
280 return f"{nonterminal} ::=" \
281 f" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
282 f" object_end;\n", \
283 [(value, new_nonterminal)]
284 return None
285
286 def builtin_tuple(current: typing.Type, nonterminal: str):
287 if typing.get_origin(current) is tuple or isinstance(current, type) and issubclass(current, tuple):
288 args = typing.get_args(current)
289 new_nonterminals = []
290 result = []
291 for i, arg in enumerate(args):
292 result.append(arg)
293 new_nonterminals.append(f"{nonterminal}_{i}")
294 return f"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
295 zip(result, new_nonterminals)
296
297 def builtin_union(current: typing.Type, nonterminal: str):
298 if typing.get_origin(current) is typing.Union:
299 args = typing.get_args(current)
300 assert args, f"{current} from {nonterminal} cannot be an empty union!"
301 new_nonterminals = []
302 result = []
303 for i, arg in enumerate(args):
304 result.append(arg)
305 new_nonterminals.append(f"{nonterminal}_{i}")
306 return f"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
307
308 def builtin_literal(current: typing.Type, nonterminal: str):
309 if typing.get_origin(current) is typing.Literal:
310 args = typing.get_args(current)
311 assert args, f"{current} from {nonterminal} cannot be an empty literal!"
312 new_items = []
313 result = []
314 for i, arg in enumerate(args):
315 if isinstance(arg, str):
316 new_items.append(from_str_to_kbnf_str(arg))
317 elif isinstance(arg, bool):
318 new_items.append(f'"{str(arg).lower()}"')
319 elif isinstance(arg, int):
320 new_items.append(f'"{str(arg)}"')
321 elif isinstance(arg, float):
322 new_items.append(f'"{str(arg)}"')
323 elif arg is None:
324 new_items.append("null")
325 elif isinstance(arg, tuple):
326 for j,item in enumerate(arg):
327 new_nonterminal = f"{nonterminal}_{i}_{j}"
328 result.append((typing.Literal[item], new_nonterminal))
329 new_item = f"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
330 new_items.append(new_item)
331 elif isinstance(arg, frozendict):
332 for key, value in arg.items():
333 new_nonterminal = f"{nonterminal}_{i}_{key}"
334 result.append((typing.Literal[value], new_nonterminal))
335 new_item = f"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
336 new_items.append(new_item)
337 else:
338 new_nonterminal = f"{nonterminal}_{i}"
339 result.append((arg, new_nonterminal))
340 new_items.append(new_nonterminal)
341 return f"{nonterminal} ::= {' | '.join(new_items)};\n", result
342
343 def builtin_simple_types(current: typing.Type, nonterminal: str):
344 if isinstance(current, type) and issubclass(current, bool):
345 return f"{nonterminal} ::= boolean;\n", []
346 elif isinstance(current, type) and issubclass(current, int):
347 return f"{nonterminal} ::= integer;\n", []
348 elif isinstance(current, type) and issubclass(current, float):
349 return f"{nonterminal} ::= number;\n", []
350 elif isinstance(current, type) and issubclass(current, decimal.Decimal):
351 return f"{nonterminal} ::= number;\n", []
352 elif isinstance(current, type) and issubclass(current, str):
353 return f"{nonterminal} ::= string;\n", []
354 elif isinstance(current, type) and issubclass(current, type(None)):
355 return f"{nonterminal} ::= null;\n", []
356 elif current is typing.Any:
357 return f"{nonterminal} ::= json_value;\n", []
358 elif isinstance(current, typing.NewType):
359 current: typing.NewType
360 return "", [(current.__supertype__, nonterminal)]
361
362 register_generate_nonterminal_def(builtin_simple_types)
367 register_generate_nonterminal_def(builtin_literal)
369 register_generate_nonterminal_def(builtin_sequence)
371
372def _generate_kbnf_grammar(schema: schemas.schema.Schema|collections.abc.Sequence, start_nonterminal: str) -> str:
373 """
374 Generate a KBNF grammar string from a schema for JSON format.
375
376 Args:
377 schema: The schema to generate a grammar for.
378 start_nonterminal: The start nonterminal of the grammar. Default is "start".
379
380 Returns:
381 The generated KBNF grammar string.
382 """
383 type_id_to_nonterminal = {
384 id(int): "integer",
385 id(float): "number",
386 id(str): "string",
387 id(bool): "boolean",
388 id(type(None)): "null",
389 id(list): "array",
390 id(dict): "object",
391 id(typing.Any): "json_value",
393 result = [GRAMMAR_HEADER]
394 nonterminals = set()
395 stack = [(schema, start_nonterminal)]
396 while stack:
397 (current, nonterminal) = stack.pop()
398 type_id = id(current)
399 if type_id in type_id_to_nonterminal:
400 line = f"{nonterminal} ::= {type_id_to_nonterminal[type_id]};\n"
401 result.append(line)
402 continue
403 type_id_to_nonterminal[type_id] = nonterminal
404 for i in _type_to_nonterminals:
405 value = i(current, nonterminal)
406 if value is not None:
407 line, to_stack = value
408 result.append(line)
409 stack.extend(to_stack)
410 nonterminals.add(nonterminal)
411 break
412 else:
413 raise TypeError(
414 f"{current} from {nonterminal} is not supported in json_generators!")
415 return "".join(result)
416
417
419 """
420 An extractor that loads json data to an object from a string.
421 """
422
423 def __init__(self, nonterminal: str, capture_name: typing.Optional[str], schema: schemas.schema.Schema|collections.abc.Sequence,
424 to_object: typing.Callable[[str], schemas.schema.Schema]):
425 """
426 Create a json extractor from a given schema or a list of supported types.
427
428 Currently, the following data types are supported:
429
430 - bool
431 - int
432 - positive int
433 - negative int
434 - nonnegative int
435 - nonpositive int
436 - float
437 - positive float
438 - negative float
439 - nonnegative float
440 - nonpositive float
441 - str
442 - optionally with min_length, max_length and pattern constraints
443 - length is measured in UTF-8 character number after json parsing
444 - *Warning*: too large difference between min_length and max_length can lead to enormous memory consumption!
445 - pattern is mutually exclusive with min_length and max_length
446 - pattern will be compiled to a regular expression so all caveats of regular expressions apply
447 - pattern currently is automatically anchored at both ends
448 - the generated json could be invalid if the pattern allows invalid content between the json string's quotes.
449 - for example, `pattern=".*"` will allow '\"' to appear in the json string which is forbidden by JSON standard.
450 - also supports substring_of constraint which constrains the string to be a substring of a given string
451 - the generated json could be invalid if the given string contains invalid content when put into the json string's quotes.
452 - for example, `substring_of="abc\""` will allow '\"' to appear in the json string which is forbidden by JSON standard.
453 - NoneType
454 - typing.Any
455 - Subclasses of collections.abc.Mapping[str,T] and typing.Mapping[str,T] where T is a supported type,
456 - Subclasses of collections.abc.Sequence[T] and typing.Sequence[T] where T is a supported type.
457 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
458 - *Warning*: too large difference between minItems and maxItems can lead to very slow performance!
459 - *Warning*: By json schema definition, prefixItems by default allows additional items and missing items in the prefixItems, which may not be the desired behavior and can lead to very slow performance if prefixItems is long!
460 - tuple[T1,T2,...] where T1,T2,... are supported types. The order, type and number of elements will be preserved.
461 - typing.Literal[x1,x2,...] where x1, x2, ... are instances of int, string, bool or NoneType, or another typing.Literal[y1,y2,...]
462 - typing.Union[T1,T2,...] where T1,T2,... are supported types.
463 - schemas.Schema where all its fields' data types are supported. Recursive schema definitions are supported as well.
464 - *Warning*: while not required field is supported, they can lead to very slow performance and/or enormous memory consumption if there are too many of them!
465
466 Args:
467 nonterminal: The nonterminal representing the extractor.
468 capture_name: The capture name of the extractor, or `None` if the extractor does not capture.
469 schema: The schema.
470 to_object: A callable to convert the extracted string to a schema instance.
471 """
472 super().__init__(nonterminal, capture_name)
473 self._to_object = to_object
474 self._rule_str = _generate_kbnf_grammar(schema, self.nonterminal)
475 def extract(self, input_str: str) -> typing.Optional[tuple[str, schemas.schema.Schema]]:
476 """
477 Extract a schema instance from a string.
478
479 Args:
480 input_str: The input string to extract from.
481
482 Returns:
483 A tuple of the remaining string and the extracted schema instance, or `None` if extraction failed.
484 """
485
486 # Ensure the input string starts with '{' or '[' after stripping leading whitespace
487 input_str = input_str.lstrip()
488 if not input_str.startswith(('{', '[')):
489 return None
490
491 # Variables to track the balance of brackets and the position in the string
492 bracket_count = 0
493 position = 0
494 in_string = False
495 escape_next = False
496 start_char = input_str[0]
497 end_char = '}' if start_char == '{' else ']'
498
499 # Iterate over the string to find where the JSON object or array ends
500 for char in input_str:
501 if not in_string:
502 if char == start_char:
503 bracket_count += 1
504 elif char == end_char:
505 bracket_count -= 1
506 elif char == '"':
507 in_string = True
508 else:
509 if char == '"' and not escape_next:
510 in_string = False
511 elif char == '\\':
512 escape_next = not escape_next
513 else:
514 escape_next = False
515
516 # Move to the next character
517 position += 1
518
519 # If brackets are balanced and we're not in a string, stop processing
520 if bracket_count == 0 and not in_string:
521 break
522 else:
523 return None
524 # The position now points to the character after the last '}', so we slice to position
525 json_str = input_str[:position]
526 remaining_str = input_str[position:]
527 # Return the unparsed remainder of the string and the decoded JSON object
528 return remaining_str, self._to_object(json_str)
529
530 @property
531 def kbnf_definition(self):
532 return self._rule_str
533
534
An extractor that extracts data corresponding to a nonterminal.
Definition extractor.py:98
str nonterminal(self)
Get the nonterminal of the extractor.
Definition extractor.py:121
An extractor that loads json data to an object from a string.
Definition json.py:431
typing.Optional[tuple[str, schemas.schema.Schema]] extract(self, str input_str)
Extract a schema instance from a string.
Definition json.py:502
__init__(self, str nonterminal, typing.Optional[str] capture_name, schemas.schema.Schema|collections.abc.Sequence schema, typing.Callable[[str], schemas.schema.Schema] to_object)
Create a json extractor from a given schema or a list of supported types.
Definition json.py:489
An abstract field info that describes a data field in a schema.
Definition schema.py:13
An abstract schema that describes some data.
Definition schema.py:91
_register_all_predefined_types()
Definition json.py:60
None register_generate_nonterminal_def(typing.Callable[[typing.Type, str], typing.Optional[typing.Tuple[str, typing.List[typing.Tuple[typing.Type, str]]]]] generate_nonterminal_def)
Register a callable to generate nonterminal definition from a type.
Definition json.py:56
str _generate_kbnf_grammar(schemas.schema.Schema|collections.abc.Sequence schema, str start_nonterminal)
Generate a KBNF grammar string from a schema for JSON format.
Definition json.py:392