Formatron v0.4.2
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json.py
Go to the documentation of this file.
1"""
2The module defines the `JsonExtractor` class, which is used to extract data from a string in JSON format.
3"""
4import collections
5import decimal
6import types
7import typing
8
9from frozendict import frozendict
10
11from formatron import extractor, schemas
12
13__all__ = ["JsonExtractor"]
14
15SPACE_NONTERMINAL = "[ \t\n\r]*"
17GRAMMAR_HEADER = rf"""integer ::= #"-?(0|[1-9]\\d*)";
18number ::= #"-?(0|[1-9]\\d*)(\\.\\d+)?([eE][+-]?\\d+)?";
19string ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}})*"';
20boolean ::= "true"|"false";
21null ::= "null";
22array ::= array_begin (json_value (comma json_value)*)? array_end;
23object ::= object_begin (string colon json_value (comma string colon json_value)*)? object_end;
24json_value ::= number|string|boolean|null|array|object;
25comma ::= #"{SPACE_NONTERMINAL},{SPACE_NONTERMINAL}";
26colon ::= #"{SPACE_NONTERMINAL}:{SPACE_NONTERMINAL}";
27object_begin ::= #"\\{{{SPACE_NONTERMINAL}";
28object_end ::= #"{SPACE_NONTERMINAL}\\}}";
29array_begin ::= #"\\[{SPACE_NONTERMINAL}";
30array_end ::= #"{SPACE_NONTERMINAL}\\]";
31"""
32_type_to_nonterminals = []
33
34
36 generate_nonterminal_def: typing.Callable[
37 [typing.Type, str],
38 typing.Optional[typing.Tuple[str,
39 typing.List[typing.Tuple[typing.Type, str]]]]]) -> None:
40 """
41 Register a callable to generate nonterminal definition from a type.
42 The callable returns (nonterminal_definition, [(sub_type, sub_nonterminal), ...])
43 if the type is supported by this callable, otherwise None.
44 [(sub_type, sub_nonterminal), ...] are the types and nonterminals used in nonterminal_definition that may need
45 to be generated in the grammar too.
46
47 Args:
48 generate_nonterminal_def: A callable to generate nonterminal definition from a type.
49 """
50 _type_to_nonterminals.append(generate_nonterminal_def)
51
52
54 def schema(current: typing.Type, nonterminal: str):
55 if isinstance(current, type) and not isinstance(current, types.GenericAlias) \
56 and issubclass(current, schemas.schema.Schema):
57 line = [f"{nonterminal} ::= ", "object_begin "]
58 result = []
59 fields = []
60 for field, _field_info in current.fields().items():
61 field_name = f"{nonterminal}_{field}"
62 fields.append(f"'\"{field}\"' colon {field_name}")
63 result.append((_field_info, field_name))
64 line.append(" comma ".join(fields))
65 line.append(" object_end;\n")
66 return "".join(line), result
67 return None
68
69 def field_info(current: typing.Type, nonterminal: str):
70 if isinstance(current, schemas.schema.FieldInfo):
71 if current.required:
72 return "", [(current.annotation, nonterminal)]
73 new_nonterminal = f"{nonterminal}_required"
74 return f"{nonterminal} ::= {new_nonterminal}?;\n", [(current.annotation, new_nonterminal)]
75 return None
76
77 def builtin_list(current: typing.Type, nonterminal: str):
78 original = typing.get_origin(current)
79 if original is None:
80 original = current
81 if original is typing.Sequence or isinstance(original, type) \
82 and issubclass(original, collections.abc.Sequence):
83 new_nonterminal = f"{nonterminal}_value"
84 annotation = typing.get_args(current)
85 if not annotation:
86 annotation = typing.Any
87 else:
88 annotation = annotation[0]
89 return f"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
90 [(annotation, new_nonterminal)]
91 return None
92
93 def builtin_dict(current: typing.Type, nonterminal: str):
94 original = typing.get_origin(current)
95 if original is None:
96 original = current
97 if original is typing.Mapping or isinstance(original, type) and issubclass(original,
98 collections.abc.Mapping):
99 new_nonterminal = f"{nonterminal}_value"
100 args = typing.get_args(current)
101 if not args:
102 value = typing.Any
103 else:
104 assert issubclass(
105 args[0], str), f"{args[0]} is not string!"
106 value = args[1]
107 return f"{nonterminal} ::=" \
108 f" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
109 f" object_end;\n", \
110 [(value, new_nonterminal)]
111 return None
112
113 def builtin_tuple(current: typing.Type, nonterminal: str):
114 if typing.get_origin(current) is tuple or isinstance(current, type) and issubclass(current, tuple):
115 args = typing.get_args(current)
116 new_nonterminals = []
117 result = []
118 for i, arg in enumerate(args):
119 result.append(arg)
120 new_nonterminals.append(f"{nonterminal}_{i}")
121 return f"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
122 zip(result, new_nonterminals)
123
124 def builtin_union(current: typing.Type, nonterminal: str):
125 if typing.get_origin(current) is typing.Union:
126 args = typing.get_args(current)
127 assert args, f"{current} from {nonterminal} cannot be an empty union!"
128 new_nonterminals = []
129 result = []
130 for i, arg in enumerate(args):
131 result.append(arg)
132 new_nonterminals.append(f"{nonterminal}_{i}")
133 return f"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
134
135 def builtin_literal(current: typing.Type, nonterminal: str):
136 if typing.get_origin(current) is typing.Literal:
137 args = typing.get_args(current)
138 assert args, f"{current} from {nonterminal} cannot be an empty literal!"
139 new_items = []
140 result = []
141 for i, arg in enumerate(args):
142 if isinstance(arg, str):
143 new_items.append(f'"{repr(arg)}"')
144 elif isinstance(arg, bool):
145 new_items.append(f'"{str(arg).lower()}"')
146 elif isinstance(arg, int):
147 new_items.append(f'"{str(arg)}"')
148 elif isinstance(arg, float):
149 new_items.append(f'"{str(arg)}"')
150 elif arg is None:
151 new_items.append("null")
152 elif isinstance(arg, tuple):
153 for j,item in enumerate(arg):
154 new_nonterminal = f"{nonterminal}_{i}_{j}"
155 result.append((typing.Literal[item], new_nonterminal))
156 new_item = f"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
157 new_items.append(new_item)
158 elif isinstance(arg, frozendict):
159 for key, value in arg.items():
160 new_nonterminal = f"{nonterminal}_{i}_{key}"
161 result.append((typing.Literal[value], new_nonterminal))
162 new_item = f"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
163 new_items.append(new_item)
164 else:
165 new_nonterminal = f"{nonterminal}_{i}"
166 result.append((arg, new_nonterminal))
167 new_items.append(new_nonterminal)
168 return f"{nonterminal} ::= {' | '.join(new_items)};\n", result
169
170 def builtin_simple_types(current: typing.Type, nonterminal: str):
171 if isinstance(current, type) and issubclass(current, bool):
172 return f"{nonterminal} ::= boolean;\n", []
173 elif isinstance(current, type) and issubclass(current, int):
174 return f"{nonterminal} ::= integer;\n", []
175 elif isinstance(current, type) and issubclass(current, float):
176 return f"{nonterminal} ::= number;\n", []
177 elif isinstance(current, type) and issubclass(current, decimal.Decimal):
178 return f"{nonterminal} ::= number;\n", []
179 elif isinstance(current, type) and issubclass(current, str):
180 return f"{nonterminal} ::= string;\n", []
181 elif isinstance(current, type) and issubclass(current, type(None)):
182 return f"{nonterminal} ::= null;\n", []
183 elif current is typing.Any:
184 return f"{nonterminal} ::= json_value;\n", []
185 elif isinstance(current, typing.NewType):
186 current: typing.NewType
187 return "", [(current.__supertype__, nonterminal)]
188
189 register_generate_nonterminal_def(builtin_simple_types)
193 register_generate_nonterminal_def(builtin_literal)
197
198
199def _generate_kbnf_grammar(schema: schemas.schema.Schema, start_nonterminal: str) -> str:
200 """
201 Generate a KBNF grammar string from a schema for JSON format.
202
203 Args:
204 schema: The schema to generate a grammar for.
205 start_nonterminal: The start nonterminal of the grammar. Default is "start".
206
207 Returns:
208 The generated KBNF grammar string.
209 """
210 type_id_to_nonterminal = {
211 id(int): "integer",
212 id(float): "number",
213 id(str): "string",
214 id(bool): "boolean",
215 id(type(None)): "null",
216 id(list): "array",
217 id(dict): "object",
218 }
219 result = [GRAMMAR_HEADER]
220 nonterminals = set()
221 stack = [(schema, start_nonterminal)]
222 while stack:
223 (current, nonterminal) = stack.pop()
224 type_id = id(current)
225 if type_id in type_id_to_nonterminal:
226 line = f"{nonterminal} ::= {type_id_to_nonterminal[type_id]};\n"
227 result.append(line)
228 continue
229 type_id_to_nonterminal[type_id] = nonterminal
230 for i in _type_to_nonterminals:
231 value = i(current, nonterminal)
232 if value is not None:
233 line, to_stack = value
234 result.append(line)
235 stack.extend(to_stack)
236 nonterminals.add(nonterminal)
237 break
238 else:
239 raise TypeError(
240 f"{current} from {nonterminal} is not supported in json_generators!")
241 return "".join(result)
242
243
245 """
246 An extractor that loads json data to an object from a string.
247 """
248
249 def __init__(self, nonterminal: str, capture_name: typing.Optional[str], schema: schemas.schema.Schema,
250 to_object: typing.Callable[[str], schemas.schema.Schema]):
251 """
252 Create a json extractor from a given schema.
254 Currently, the following data types are supported:
255
256 - bool
257 - int
258 - float
259 - string
260 - NoneType
261 - typing.Any
262 - Subclasses of collections.abc.Mapping[str,T] and typing.Mapping[str,T] where T is a supported type,
263 - Subclasses of collections.abc.Sequence[T] and typing.Sequence[T] where T is a supported type.
264 - tuple[T1,T2,...] where T1,T2,... are supported types. The order, type and number of elements will be preserved.
265 - typing.Literal[x1,x2,...] where x1, x2, ... are instances of int, string, bool or NoneType, or another typing.Literal[y1,y2,...]
266 - typing.Union[T1,T2,...] where T1,T2,... are supported types.
267 - schemas.Schema where all its fields' data types are supported. Recursive schema definitions are supported as well.
268
269 Args:
270 nonterminal: The nonterminal representing the extractor.
271 capture_name: The capture name of the extractor, or `None` if the extractor does not capture.
272 to_object: A callable to convert the extracted string to a schema instance.
273 """
274 super().__init__(nonterminal, capture_name)
275 self._to_object = to_object
276 self._rule_str = _generate_kbnf_grammar(schema, self.nonterminal)
277 def extract(self, input_str: str) -> typing.Optional[tuple[str, schemas.schema.Schema]]:
278 """
279 Extract a schema instance from a string.
281 Args:
282 input_str: The input string to extract from.
284 Returns:
285 A tuple of the remaining string and the extracted schema instance, or `None` if extraction failed.
286 """
287
288 # Ensure the input string starts with '{' after stripping leading whitespace
289 input_str = input_str.lstrip()
290 if not input_str.startswith('{'):
291 return None
292
293 # Variables to track the balance of brackets and the position in the string
294 bracket_count = 0
295 position = 0
296 in_string = False
297 escape_next = False
298
299 # Iterate over the string to find where the JSON object ends
300 for char in input_str:
301 if not in_string:
302 if char == '{':
303 bracket_count += 1
304 elif char == '}':
305 bracket_count -= 1
306 elif char == '"':
307 in_string = True
308 else:
309 if char == '"' and not escape_next:
310 in_string = False
311 elif char == '\\':
312 escape_next = not escape_next
313 else:
314 escape_next = False
315
316 # Move to the next character
317 position += 1
318
319 # If brackets are balanced and we're not in a string, stop processing
320 if bracket_count == 0 and not in_string:
321 break
322 else:
323 return None
324 # The position now points to the character after the last '}', so we slice to position
325 json_str = input_str[:position]
326 remaining_str = input_str[position:]
327 # Return the unparsed remainder of the string and the decoded JSON object
328 return remaining_str, self._to_object(json_str)
329
330 @property
331 def kbnf_definition(self):
332 return self._rule_str
333
334
An extractor that extracts data corresponding to a nonterminal.
Definition extractor.py:98
str nonterminal(self)
Get the nonterminal of the extractor.
Definition extractor.py:121
An extractor that loads json data to an object from a string.
Definition json.py:253
__init__(self, str nonterminal, typing.Optional[str] capture_name, schemas.schema.Schema schema, typing.Callable[[str], schemas.schema.Schema] to_object)
Create a json extractor from a given schema.
Definition json.py:281
typing.Optional[tuple[str, schemas.schema.Schema]] extract(self, str input_str)
Extract a schema instance from a string.
Definition json.py:294
An abstract schema that describes some data.
Definition schema.py:48
_register_all_predefined_types()
Definition json.py:57
None register_generate_nonterminal_def(typing.Callable[[typing.Type, str], typing.Optional[typing.Tuple[str, typing.List[typing.Tuple[typing.Type, str]]]]] generate_nonterminal_def)
Register a callable to generate nonterminal definition from a type.
Definition json.py:53
str _generate_kbnf_grammar(schemas.schema.Schema schema, str start_nonterminal)
Generate a KBNF grammar string from a schema for JSON format.
Definition json.py:215