2The module defines the `JsonExtractor` class, which is used to extract data from a string in JSON format.
4import collections
5import decimal
6import types
7import typing
9from frozendict import frozendict
11from formatron import extractor, schemas
13__all__ = ["JsonExtractor"]
15SPACE_NONTERMINAL = "[ \t\n\r]*"
17GRAMMAR_HEADER = rf"""integer ::= #"-?(0|[1-9]\\d*)";
18number ::= #"-?(0|[1-9]\\d*)(\\.\\d+)?([eE][+-]?\\d+)?";
19string ::= #'"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}})*"';
20boolean ::= "true"|"false";
21null ::= "null";
22array ::= array_begin (json_value (comma json_value)*)? array_end;
23object ::= object_begin (string colon json_value (comma string colon json_value)*)? object_end;
24json_value ::= number|string|boolean|null|array|object;
27object_begin ::= #"\\{{{SPACE_NONTERMINAL}";
28object_end ::= #"{SPACE_NONTERMINAL}\\}}";
29array_begin ::= #"\\[{SPACE_NONTERMINAL}";
30array_end ::= #"{SPACE_NONTERMINAL}\\]";
32_type_to_nonterminals = []
36 generate_nonterminal_def: typing.Callable[
37 [typing.Type, str],
38 typing.Optional[typing.Tuple[str,
39 typing.List[typing.Tuple[typing.Type, str]]]]]) -> None:
40 """
41 Register a callable to generate nonterminal definition from a type.
42 The callable returns (nonterminal_definition, [(sub_type, sub_nonterminal), ...])
43 if the type is supported by this callable, otherwise None.
44 [(sub_type, sub_nonterminal), ...] are the types and nonterminals used in nonterminal_definition that may need
45 to be generated in the grammar too.
47 Args:
48 generate_nonterminal_def: A callable to generate nonterminal definition from a type.
49 """
50 _type_to_nonterminals.append(generate_nonterminal_def)
54 def schema(current: typing.Type, nonterminal: str):
55 if isinstance(current, type) and not isinstance(current, types.GenericAlias) \
56 and issubclass(current, schemas.schema.Schema):
57 line = [f"{nonterminal} ::= ", "object_begin "]
58 result = []
59 fields = []
60 for field, _field_info in current.fields().items():
61 field_name = f"{nonterminal}_{field}"
62 fields.append(f"'\"{field}\"' colon {field_name}")
63 result.append((_field_info, field_name))
64 line.append(" comma ".join(fields))
65 line.append(" object_end;\n")
66 return "".join(line), result
67 return None
69 def field_info(current: typing.Type, nonterminal: str):
70 if isinstance(current, schemas.schema.FieldInfo):
71 if current.required:
72 return "", [(current.annotation, nonterminal)]
73 new_nonterminal = f"{nonterminal}_required"
74 return f"{nonterminal} ::= {new_nonterminal}?;\n", [(current.annotation, new_nonterminal)]
75 return None
77 def builtin_list(current: typing.Type, nonterminal: str):
78 original = typing.get_origin(current)
79 if original is None:
80 original = current
81 if original is typing.Sequence or isinstance(original, type) \
82 and issubclass(original,
83 new_nonterminal = f"{nonterminal}_value"
84 annotation = typing.get_args(current)
85 if not annotation:
86 annotation = typing.Any
87 else:
88 annotation = annotation[0]
89 return f"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
90 [(annotation, new_nonterminal)]
91 return None
93 def builtin_dict(current: typing.Type, nonterminal: str):
94 original = typing.get_origin(current)
95 if original is None:
96 original = current
97 if original is typing.Mapping or isinstance(original, type) and issubclass(original,
99 new_nonterminal = f"{nonterminal}_value"
100 args = typing.get_args(current)
101 if not args:
102 value = typing.Any
103 else:
104 assert issubclass(
105 args[0], str), f"{args[0]} is not string!"
106 value = args[1]
107 return f"{nonterminal} ::=" \
108 f" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
109 f" object_end;\n", \
110 [(value, new_nonterminal)]
111 return None
113 def builtin_tuple(current: typing.Type, nonterminal: str):
114 if typing.get_origin(current) is tuple or isinstance(current, type) and issubclass(current, tuple):
115 args = typing.get_args(current)
116 new_nonterminals = []
117 result = []
118 for i, arg in enumerate(args):
119 result.append(arg)
120 new_nonterminals.append(f"{nonterminal}_{i}")
121 return f"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
122 zip(result, new_nonterminals)
124 def builtin_union(current: typing.Type, nonterminal: str):
125 if typing.get_origin(current) is typing.Union:
126 args = typing.get_args(current)
127 assert args, f"{current} from {nonterminal} cannot be an empty union!"
128 new_nonterminals = []
129 result = []
130 for i, arg in enumerate(args):
131 result.append(arg)
132 new_nonterminals.append(f"{nonterminal}_{i}")
133 return f"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
135 def builtin_literal(current: typing.Type, nonterminal: str):
136 if typing.get_origin(current) is typing.Literal:
137 args = typing.get_args(current)
138 assert args, f"{current} from {nonterminal} cannot be an empty literal!"
139 new_items = []
140 result = []
141 for i, arg in enumerate(args):
142 if isinstance(arg, str):
143 new_items.append(f'"{repr(arg)}"')
144 elif isinstance(arg, bool):
145 new_items.append(f'"{str(arg).lower()}"')
146 elif isinstance(arg, int):
147 new_items.append(f'"{str(arg)}"')
148 elif isinstance(arg, float):
149 new_items.append(f'"{str(arg)}"')
150 elif arg is None:
151 new_items.append("null")
152 elif isinstance(arg, tuple):
153 for j,item in enumerate(arg):
154 new_nonterminal = f"{nonterminal}_{i}_{j}"
155 result.append((typing.Literal[item], new_nonterminal))
156 new_item = f"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
157 new_items.append(new_item)
158 elif isinstance(arg, frozendict):
159 for key, value in arg.items():
160 new_nonterminal = f"{nonterminal}_{i}_{key}"
161 result.append((typing.Literal[value], new_nonterminal))
162 new_item = f"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
163 new_items.append(new_item)
164 else:
165 new_nonterminal = f"{nonterminal}_{i}"
166 result.append((arg, new_nonterminal))
167 new_items.append(new_nonterminal)
168 return f"{nonterminal} ::= {' | '.join(new_items)};\n", result
170 def builtin_simple_types(current: typing.Type, nonterminal: str):
171 if isinstance(current, type) and issubclass(current, bool):
172 return f"{nonterminal} ::= boolean;\n", []
173 elif isinstance(current, type) and issubclass(current, int):
174 return f"{nonterminal} ::= integer;\n", []
175 elif isinstance(current, type) and issubclass(current, float):
176 return f"{nonterminal} ::= number;\n", []
177 elif isinstance(current, type) and issubclass(current, decimal.Decimal):
178 return f"{nonterminal} ::= number;\n", []
179 elif isinstance(current, type) and issubclass(current, str):
180 return f"{nonterminal} ::= string;\n", []
181 elif isinstance(current, type) and issubclass(current, type(None)):
182 return f"{nonterminal} ::= null;\n", []
183 elif current is typing.Any:
184 return f"{nonterminal} ::= json_value;\n", []
185 elif isinstance(current, typing.NewType):
186 current: typing.NewType
187 return "", [(current.__supertype__, nonterminal)]
189 register_generate_nonterminal_def(builtin_simple_types)
193 register_generate_nonterminal_def(builtin_literal)
199def _generate_kbnf_grammar(schema: schemas.schema.Schema, start_nonterminal: str) -> str:
200 """
201 Generate a KBNF grammar string from a schema for JSON format.
203 Args:
204 schema: The schema to generate a grammar for.
205 start_nonterminal: The start nonterminal of the grammar. Default is "start".
207 Returns:
208 The generated KBNF grammar string.
209 """
210 type_id_to_nonterminal = {
211 id(int): "integer",
212 id(float): "number",
213 id(str): "string",
214 id(bool): "boolean",
215 id(type(None)): "null",
216 id(list): "array",
217 id(dict): "object",
218 }
219 result = [GRAMMAR_HEADER]
220 nonterminals = set()
221 stack = [(schema, start_nonterminal)]
222 while stack:
223 (current, nonterminal) = stack.pop()
224 type_id = id(current)
225 if type_id in type_id_to_nonterminal:
226 line = f"{nonterminal} ::= {type_id_to_nonterminal[type_id]};\n"
227 result.append(line)
228 continue
229 type_id_to_nonterminal[type_id] = nonterminal
230 for i in _type_to_nonterminals:
231 value = i(current, nonterminal)
232 if value is not None:
233 line, to_stack = value
234 result.append(line)
235 stack.extend(to_stack)
236 nonterminals.add(nonterminal)
237 break
238 else:
239 raise TypeError(
240 f"{current} from {nonterminal} is not supported in json_generators!")
241 return "".join(result)
245 """
246 An extractor that loads json data to an object from a string.
247 """
249 def __init__(self, nonterminal: str, capture_name: typing.Optional[str], schema: schemas.schema.Schema,
250 to_object: typing.Callable[[str], schemas.schema.Schema]):
251 """
252 Create a json extractor from a given schema.
254 Currently, the following data types are supported:
256 - bool
257 - int
258 - float
259 - string
260 - NoneType
261 - typing.Any
262 - Subclasses of[str,T] and typing.Mapping[str,T] where T is a supported type,
263 - Subclasses of[T] and typing.Sequence[T] where T is a supported type.
264 - tuple[T1,T2,...] where T1,T2,... are supported types. The order, type and number of elements will be preserved.
265 - typing.Literal[x1,x2,...] where x1, x2, ... are instances of int, string, bool or NoneType, or another typing.Literal[y1,y2,...]
266 - typing.Union[T1,T2,...] where T1,T2,... are supported types.
267 - schemas.Schema where all its fields' data types are supported. Recursive schema definitions are supported as well.
269 Args:
270 nonterminal: The nonterminal representing the extractor.
271 capture_name: The capture name of the extractor, or `None` if the extractor does not capture.
272 to_object: A callable to convert the extracted string to a schema instance.
273 """
274 super().__init__(nonterminal, capture_name)
275 self._to_object = to_object
276 self._rule_str = _generate_kbnf_grammar(schema, self.nonterminal)
277 def extract(self, input_str: str) -> typing.Optional[tuple[str, schemas.schema.Schema]]:
278 """
279 Extract a schema instance from a string.
281 Args:
282 input_str: The input string to extract from.
284 Returns:
285 A tuple of the remaining string and the extracted schema instance, or `None` if extraction failed.
286 """
288 # Ensure the input string starts with '{' after stripping leading whitespace
289 input_str = input_str.lstrip()
290 if not input_str.startswith('{'):
291 return None
293 # Variables to track the balance of brackets and the position in the string
294 bracket_count = 0
295 position = 0
296 in_string = False
297 escape_next = False
299 # Iterate over the string to find where the JSON object ends
300 for char in input_str:
301 if not in_string:
302 if char == '{':
303 bracket_count += 1
304 elif char == '}':
305 bracket_count -= 1
306 elif char == '"':
307 in_string = True
308 else:
309 if char == '"' and not escape_next:
310 in_string = False
311 elif char == '\\':
312 escape_next = not escape_next
313 else:
314 escape_next = False
316 # Move to the next character
317 position += 1
319 # If brackets are balanced and we're not in a string, stop processing
320 if bracket_count == 0 and not in_string:
321 break
322 else:
323 return None
324 # The position now points to the character after the last '}', so we slice to position
325 json_str = input_str[:position]
326 remaining_str = input_str[position:]
327 # Return the unparsed remainder of the string and the decoded JSON object
328 return remaining_str, self._to_object(json_str)
330 @property
331 def kbnf_definition(self):
332 return self._rule_str
