Formatron v0.5.0
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json.py
Go to the documentation of this file.
1"""
2The module defines the `JsonExtractor` class, which is used to extract data from a string in JSON format.
3"""
4import collections
5import decimal
6import types
7import typing
8
9from frozendict import frozendict
10
11from formatron import extractor, schemas
12from formatron.formats.utils import escape_identifier
13
14
15__all__ = ["JsonExtractor", "strict_schema"]
16
17
18"""
19Whether to raise an error if the grammar cannot be precisely constructed from the schema.
20True by default.
21
22If set to False, heuristics will be used to construct the grammar that may not fully describe the schema's constraints.
23"""
24strict_schema = True
26SPACE_NONTERMINAL = "[ \t\n]*"
28GRAMMAR_HEADER = rf"""integer ::= #"{SPACE_NONTERMINAL}-?(0|[1-9][0-9]*)";
29number ::= #"{SPACE_NONTERMINAL}-?(0|[1-9][0-9]*)(\\.[0-9]+)?([eE][+-]?[0-9]+)?";
30string ::= #'{SPACE_NONTERMINAL}"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt]|\\\\u[0-9A-Fa-f]{{4}})*"';
31boolean ::= #"{SPACE_NONTERMINAL}(true|false)";
32null ::= #"{SPACE_NONTERMINAL}null";
33array ::= array_begin (json_value (comma json_value)*)? array_end;
34object ::= object_begin (string colon json_value (comma string colon json_value)*)? object_end;
35json_value ::= number|string|boolean|null|array|object;
36comma ::= #"{SPACE_NONTERMINAL},";
37colon ::= #"{SPACE_NONTERMINAL}:";
38object_begin ::= #"{SPACE_NONTERMINAL}\\{{";
39object_end ::= #"{SPACE_NONTERMINAL}\\}}";
40array_begin ::= #"{SPACE_NONTERMINAL}\\[";
41array_end ::= #"{SPACE_NONTERMINAL}\\]";
42"""
43
44def from_str_to_kbnf_str(s: str) -> str:
45 """
46 Convert a string to a kbnf string.
47
48 Args:
49 s: The string to convert.
50
51 Returns:
52 The kbnf string.
53 """
54 s = f"\"{repr(s)[1:-1]}\""
55 return f"#'{SPACE_NONTERMINAL}{s}'"
56
57_type_to_nonterminals = []
58
59
60
62 generate_nonterminal_def: typing.Callable[
63 [typing.Type, str],
64 typing.Optional[typing.Tuple[str,
65 typing.List[typing.Tuple[typing.Type, str]]]]]) -> None:
66 """
67 Register a callable to generate nonterminal definition from a type.
68 The callable returns (nonterminal_definition, [(sub_type, sub_nonterminal), ...])
69 if the type is supported by this callable, otherwise None.
70 [(sub_type, sub_nonterminal), ...] are the types and nonterminals used in nonterminal_definition that may need
71 to be generated in the grammar too.
72
73 Args:
74 generate_nonterminal_def: A callable to generate nonterminal definition from a type.
75 """
76 _type_to_nonterminals.append(generate_nonterminal_def)
77
78
80 def schema(current: typing.Type, nonterminal: str):
81 if isinstance(current, type) and not isinstance(current, types.GenericAlias) \
82 and issubclass(current, schemas.schema.Schema):
83 line = [f"{nonterminal} ::= ", "object_begin "]
84 result = []
85 fields = []
86 for field, _field_info in current.fields().items():
87 field_name = f"{nonterminal}_{field}"
88 field_name = escape_identifier(field_name)
89 key = from_str_to_kbnf_str(field)
90 fields.append(f"{key} colon {field_name}")
91 result.append((_field_info, field_name))
92 line.append(" comma ".join(fields))
93 line.append(" object_end;\n")
94 return "".join(line), result
95 return None
96
97 def field_info(current: typing.Type, nonterminal: str):
98 if isinstance(current, schemas.schema.FieldInfo):
99 annotation = current.annotation
100 if current.required:
101 return "", [(annotation, nonterminal)]
102 new_nonterminal = f"{nonterminal}_required"
103 return f"{nonterminal} ::= {new_nonterminal}?;\n", [(annotation, new_nonterminal)]
104 return None
105
106 def string_metadata(current: typing.Type, nonterminal: str):
107 min_length = current.metadata.get("min_length")
108 max_length = current.metadata.get("max_length")
109 pattern = current.metadata.get("pattern")
110 substring_of = current.metadata.get("substring_of")
111 if pattern:
112 # Check if pattern contains unescaped anchors (^ or $)
113 # First replace escaped anchors with placeholders
114 temp_pattern = pattern.replace(r'\^', '').replace(r'\$', '').replace(r'\\A', '').replace(r'\\z', '')
115 # Check for unescaped anchors
116 if '^' in temp_pattern or '$' in temp_pattern or '\A' in temp_pattern or '\z' in temp_pattern:
117 if strict_schema:
118 raise ValueError(f"Pattern '{pattern}' contains unescaped anchors (^, $, \\A, \\z) which are not allowed")
119 else:
120 print(f"Warning: pattern '{pattern}' contains unescaped anchors (^, $, \\A, \\z) which are not allowed in schema {current} from {nonterminal}")
121 pattern = pattern.strip('^$')
122 pattern = repr(pattern)[1:-1]
123 if strict_schema:
124 assert not (min_length or max_length or substring_of), "pattern is mutually exclusive with min_length, max_length and substring_of"
125 else:
126 if min_length or max_length or substring_of:
127 print(f"Warning: pattern is mutually exclusive with min_length, max_length and substring_of in schema {current} from {nonterminal}")
128 min_length = None
129 max_length = None
130 substring_of = None
131 if substring_of:
132 if strict_schema:
133 assert not (min_length or max_length or pattern), "substring_of is mutually exclusive with min_length, max_length and pattern"
134 else:
135 if min_length or max_length or pattern:
136 print(f"Warning: substring_of is mutually exclusive with min_length, max_length and pattern in schema {current} from {nonterminal}")
137 min_length = None
138 max_length = None
139 pattern = None
140 repetition_map = {
141 (True, False): f"{{{min_length},}}",
142 (False, True): f"{{0,{max_length}}}",
143 (True, True): f"{{{min_length},{max_length}}}"
144 }
145 repetition = repetition_map.get((min_length is not None, max_length is not None))
146 if repetition is not None:
147 return fr"""{nonterminal} ::= #'{SPACE_NONTERMINAL}"([^\\\\"\u0000-\u001f]|\\\\["\\\\bfnrt/]|\\\\u[0-9A-Fa-f]{{4}}){repetition}"';
148""", []
149 if pattern is not None:
150 pattern = pattern.replace("'", "\\'")
151 return f"""{nonterminal} ::= #'{SPACE_NONTERMINAL}"{pattern}"';\n""", []
152 if substring_of is not None:
153 return f"""{nonterminal} ::= #'{SPACE_NONTERMINAL}' '"' #substrs{repr(substring_of)} '"';\n""", []
154
155 def number_metadata(current: typing.Type, nonterminal: str):
156 gt = current.metadata.get("gt")
157 ge = current.metadata.get("ge")
158 lt = current.metadata.get("lt")
159 le = current.metadata.get("le")
160
161 prefix_map = {
162 (gt, 0): "",
163 (ge, 0): "0|",
164 (lt, 0): "-",
165 (le, 0): "0|-",
166 }
167
168 for (condition, value), prefix in prefix_map.items():
169 if condition is not None and condition == value:
170 if issubclass(current.type, int):
171 return f"""{nonterminal} ::= #'{SPACE_NONTERMINAL}{prefix}[1-9][0-9]*';\n""", []
172 elif issubclass(current.type, float):
173 return f"""{nonterminal} ::= #'{SPACE_NONTERMINAL}{prefix}[1-9][0-9]*(\\\\.[0-9]+)?([eE][+-]?[0-9]+)?';\n""", []
174 if strict_schema:
175 raise ValueError(f"{current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
176 else:
177 print(f"Warning: {current.type.__name__} metadata {current.metadata} is not supported in json_generators!")
178 return "", [(current.type, nonterminal)]
179
180 def sequence_metadata(current: typing.Type, nonterminal: str):
181 min_items = current.metadata.get("min_length")
182 max_items = current.metadata.get("max_length")
183 prefix_items = current.metadata.get("prefix_items")
184 additional_items = current.metadata.get("additional_items")
185 if max_items is not None and prefix_items is not None and max_items <= len(prefix_items): # truncate prefix items
186 prefix_items = prefix_items[:max_items+1]
187 if prefix_items:
188 if not min_items: # json schema defaults to 0
189 min_items = 0
190 if not additional_items:
191 if min_items > len(prefix_items):
192 raise ValueError(f"min_items {min_items} is greater than the number of prefix_items {len(prefix_items)} and additional_items is not allowed")
193 max_items = len(prefix_items)
194 if min_items is not None or max_items is not None: # prefix items will set min
195 new_nonterminal = f"{nonterminal}_item"
196 ebnf_rules = []
197 if min_items is None:
198 min_items = 0
199 if min_items == 0 and max_items is None and prefix_items is None: # no special handling needed
200 return "", [(current.type, new_nonterminal)]
201 prefix_items_nonterminals = [f"{new_nonterminal}_{i}" for i in range(len(prefix_items))] if prefix_items else []
202 prefix_items_parts = [] # contains the sequence of nonterminals for prefix items from min_items to len(prefix_items)
203 if prefix_items is not None:
204 for i in range(max(min_items,1), len(prefix_items)+1):
205 prefix_items_parts.append(prefix_items_nonterminals[:i])
206 if min_items == 0: # EMPTY_PREFIX_ITEMS_ALLOWED
207 ebnf_rules.append(f"{nonterminal} ::= array_begin array_end;")
208 if max_items is None: # unbounded
209 if not prefix_items:
210 min_items_part = ' comma '.join([new_nonterminal] * (min_items - 1))
211 ebnf_rules.append(f"{nonterminal} ::= array_begin {min_items_part} comma {new_nonterminal}+ array_end;")
212 elif len(prefix_items_parts) >= min_items: # this part assumes prefix items are not empty, so we need the EMPTY_PREFIX_ITEMS_ALLOWED check above
213 for prefix_items_part in prefix_items_parts:
214 prefix_items_part = ' comma '.join(prefix_items_part)
215 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} (comma {new_nonterminal})* array_end;")
216 else:
217 min_items_part = ' comma '.join([new_nonterminal] * (min_items - len(prefix_items_nonterminals)-1))
218 if min_items_part:
219 min_items_part = "comma " + min_items_part
220 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
221 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} {min_items_part} comma {new_nonterminal}+ array_end;")
222 elif min_items == 0 and not prefix_items: # TAG: ONLY_MAX_ITEMS
223 for i in range(min_items, max_items + 1):
224 items = ' comma '.join([new_nonterminal] * i)
225 ebnf_rules.append(f"{nonterminal} ::= array_begin {items} array_end;")
226 else:
227 prefix_items_num = len(prefix_items_nonterminals)
228 if prefix_items:
229 for prefix_items_part in prefix_items_parts:
230 prefix_items_part = ' comma '.join(prefix_items_part)
231 ebnf_rules.append(f"{nonterminal} ::= array_begin {prefix_items_part} array_end;")
232 min_items_part = ' comma '.join([new_nonterminal] * (min_items - prefix_items_num))
233 prefix_items_part = ' comma '.join(prefix_items_nonterminals)
234 if min_items_part and prefix_items_part:
235 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part} comma {min_items_part};")
236 elif min_items_part:
237 ebnf_rules.append(f"{nonterminal}_min ::= {min_items_part};")
238 elif prefix_items_part:
239 ebnf_rules.append(f"{nonterminal}_min ::= {prefix_items_part};")
240 # sanity check: if prefix_items_part and min_items_part are both empty, we will in ONLY_MAX_ITEMS branch above
241 common = max(min_items, prefix_items_num)
242 for i in range(1, max_items + 1 - common):
243 items = ' comma '.join([new_nonterminal] * i)
244 ebnf_rules.append(f"{nonterminal} ::= array_begin {nonterminal}_min comma {items} array_end;")
245 # Handle the item type
246 args = typing.get_args(current.type)
247 if args:
248 item_type = args[0]
249 else:
250 # If args is empty, default to Any
251 item_type = typing.Any
252 if prefix_items:
253 return "\n".join(ebnf_rules) + "\n", list(zip(prefix_items, prefix_items_nonterminals)) + [(item_type, new_nonterminal)]
254 return "\n".join(ebnf_rules) + "\n", [(item_type, new_nonterminal)]
255 return None
256
257 def is_sequence_like(current: typing.Type) -> bool:
258 """
259 Check if the given type is sequence-like.
260
261 This function returns True for:
262 - typing.Sequence
263 - typing.List
264 - typing.Tuple
265 - Any subclass of collections.abc.Sequence
266 - list
267 - tuple
268
269 Args:
270 current: The type to check.
271
272 Returns:
273 bool: True if the type is sequence-like, False otherwise.
274 """
275 original = typing.get_origin(current)
276 if original is None:
277 original = current
278 return (
279 original is typing.Sequence or
280 original is typing.List or
281 original is typing.Tuple or
282 (isinstance(original, type) and (issubclass(original, collections.abc.Sequence) or
283 issubclass(original, list) or
284 issubclass(original, tuple)))
285 )
286
287 def metadata(current: typing.Type, nonterminal: str):
288 if isinstance(current, schemas.schema.TypeWithMetadata):
289 original = typing.get_origin(current.type)
290 if original is None:
291 original = current.type
292 if not current.metadata:
293 return "", [(current.type, nonterminal)]
294 if isinstance(current.type, type) and issubclass(current.type, str):
295 return string_metadata(current, nonterminal)
296 elif isinstance(current.type, type) and issubclass(current.type, (int, float)):
297 return number_metadata(current, nonterminal)
298 elif is_sequence_like(original):
299 return sequence_metadata(current, nonterminal)
300 return None
301
302 def builtin_sequence(current: typing.Type, nonterminal: str):
303 original = typing.get_origin(current)
304 if original is None:
305 original = current
306 if is_sequence_like(original):
307 new_nonterminal = f"{nonterminal}_value"
308 annotation = typing.get_args(current)
309 if not annotation:
310 annotation = typing.Any
311 else:
312 annotation = annotation[0]
313 return f"{nonterminal} ::= array_begin ({new_nonterminal} (comma {new_nonterminal})*)? array_end;\n", \
314 [(annotation, new_nonterminal)]
315 return None
316
317 def builtin_dict(current: typing.Type, nonterminal: str):
318 original = typing.get_origin(current)
319 if original is None:
320 original = current
321 if original is typing.Mapping or isinstance(original, type) and issubclass(original,
322 collections.abc.Mapping):
323 new_nonterminal = f"{nonterminal}_value"
324 args = typing.get_args(current)
325 if not args:
326 value = typing.Any
327 else:
328 assert issubclass(
329 args[0], str), f"{args[0]} is not string!"
330 value = args[1]
331 if value is typing.Any:
332 return f"{nonterminal} ::= object;\n", []
333 return f"{nonterminal} ::=" \
334 f" object_begin (string colon {new_nonterminal} (comma string colon {new_nonterminal})*)?" \
335 f" object_end;\n", \
336 [(value, new_nonterminal)]
337 return None
338
339 def builtin_tuple(current: typing.Type, nonterminal: str):
340 if typing.get_origin(current) is tuple or isinstance(current, type) and issubclass(current, tuple):
341 args = typing.get_args(current)
342 new_nonterminals = []
343 result = []
344 for i, arg in enumerate(args):
345 result.append(arg)
346 new_nonterminals.append(f"{nonterminal}_{i}")
347 return f"{nonterminal} ::=array_begin {' comma '.join(new_nonterminals)} array_end;\n", \
348 zip(result, new_nonterminals)
349
350 def builtin_union(current: typing.Type, nonterminal: str):
351 if typing.get_origin(current) is typing.Union:
352 args = typing.get_args(current)
353 assert args, f"{current} from {nonterminal} cannot be an empty union!"
354 new_nonterminals = []
355 result = []
356 for i, arg in enumerate(args):
357 result.append(arg)
358 new_nonterminals.append(f"{nonterminal}_{i}")
359 return f"{nonterminal} ::= {' | '.join(new_nonterminals)};\n", zip(result, new_nonterminals)
360
361 def builtin_literal(current: typing.Type, nonterminal: str):
362 if typing.get_origin(current) is typing.Literal:
363 args = typing.get_args(current)
364 assert args, f"{current} from {nonterminal} cannot be an empty literal!"
365 new_items = []
366 result = []
367 for i, arg in enumerate(args):
368 if isinstance(arg, str):
369 new_items.append(from_str_to_kbnf_str(arg))
370 elif isinstance(arg, bool):
371 new_items.append(f'#"{SPACE_NONTERMINAL}{str(arg).lower()}"')
372 elif isinstance(arg, int):
373 new_items.append(f'#"{SPACE_NONTERMINAL}{str(arg)}"')
374 elif isinstance(arg, float):
375 new_items.append(f'#"{SPACE_NONTERMINAL}{str(arg)}"')
376 elif arg is None:
377 new_items.append("null")
378 elif isinstance(arg, tuple):
379 for j,item in enumerate(arg):
380 new_nonterminal = f"{nonterminal}_{i}_{j}"
381 result.append((typing.Literal[item], new_nonterminal))
382 new_item = f"(array_begin {' comma '.join(map(lambda x:x[1], result))} array_end)"
383 new_items.append(new_item)
384 elif isinstance(arg, frozendict):
385 for key, value in arg.items():
386 new_nonterminal = f"{nonterminal}_{i}_{key}"
387 result.append((typing.Literal[value], new_nonterminal))
388 new_item = f"object_begin {' comma '.join(map(lambda x:x[1], result))} object_end"
389 new_items.append(new_item)
390 else:
391 new_nonterminal = f"{nonterminal}_{i}"
392 result.append((arg, new_nonterminal))
393 new_items.append(new_nonterminal)
394 return f"{nonterminal} ::= {' | '.join(new_items)};\n", result
395
396 def builtin_simple_types(current: typing.Type, nonterminal: str):
397 if isinstance(current, type) and issubclass(current, bool):
398 return f"{nonterminal} ::= boolean;\n", []
399 elif isinstance(current, type) and issubclass(current, int):
400 return f"{nonterminal} ::= integer;\n", []
401 elif isinstance(current, type) and issubclass(current, float):
402 return f"{nonterminal} ::= number;\n", []
403 elif isinstance(current, type) and issubclass(current, decimal.Decimal):
404 return f"{nonterminal} ::= number;\n", []
405 elif isinstance(current, type) and issubclass(current, str):
406 return f"{nonterminal} ::= string;\n", []
407 elif isinstance(current, type) and issubclass(current, type(None)):
408 return f"{nonterminal} ::= null;\n", []
409 elif current is typing.Any:
410 return f"{nonterminal} ::= json_value;\n", []
411 elif isinstance(current, typing.NewType):
412 current: typing.NewType
413 return "", [(current.__supertype__, nonterminal)]
414
415 register_generate_nonterminal_def(builtin_simple_types)
420 register_generate_nonterminal_def(builtin_literal)
422 register_generate_nonterminal_def(builtin_sequence)
424
425def _generate_kbnf_grammar(schema: schemas.schema.Schema|collections.abc.Sequence, start_nonterminal: str) -> str:
426 """
427 Generate a KBNF grammar string from a schema for JSON format.
428
429 Args:
430 schema: The schema to generate a grammar for.
431 start_nonterminal: The start nonterminal of the grammar. Default is "start".
432
433 Returns:
434 The generated KBNF grammar string.
435 """
436 type_id_to_nonterminal = {
437 id(int): "integer",
438 id(float): "number",
439 id(str): "string",
440 id(bool): "boolean",
441 id(type(None)): "null",
442 id(list): "array",
443 id(dict): "object",
444 id(typing.Any): "json_value",
446 result = [GRAMMAR_HEADER]
447 nonterminals = set()
448 stack = [(schema, start_nonterminal)]
449 while stack:
450 (current, nonterminal) = stack.pop()
451 type_id = id(current)
452 if type_id in type_id_to_nonterminal:
453 line = f"{nonterminal} ::= {type_id_to_nonterminal[type_id]};\n"
454 result.append(line)
455 continue
456 type_id_to_nonterminal[type_id] = nonterminal
457 for i in _type_to_nonterminals:
458 value = i(current, nonterminal)
459 if value is not None:
460 line, to_stack = value
461 result.append(line)
462 stack.extend(to_stack)
463 nonterminals.add(nonterminal)
464 break
465 else:
466 raise TypeError(
467 f"{current} from {nonterminal} is not supported in json_generators!")
468 return "".join(result)
469
470
472 """
473 An extractor that loads json data to an object from a string.
474 """
475
476 def __init__(self, nonterminal: str, capture_name: typing.Optional[str], schema: schemas.schema.Schema|collections.abc.Sequence,
477 to_object: typing.Callable[[str], schemas.schema.Schema]):
478 """
479 Create a json extractor from a given schema or a list of supported types.
480
481 Currently, the following data types are supported:
482
483 - bool
484 - int
485 - positive int
486 - negative int
487 - nonnegative int
488 - nonpositive int
489 - float
490 - positive float
491 - negative float
492 - nonnegative float
493 - nonpositive float
494 - str
495 - optionally with min_length, max_length and pattern constraints
496 - length is measured in UTF-8 character number after json parsing
497 - *Warning*: too large difference between min_length and max_length can lead to enormous memory consumption!
498 - pattern is mutually exclusive with min_length and max_length
499 - pattern will be compiled to a regular expression so all caveats of regular expressions apply
500 - pattern currently is automatically anchored at both ends
501 - the generated json could be invalid if the pattern allows invalid content between the json string's quotes.
502 - for example, `pattern=".*"` will allow '\"' to appear in the json string which is forbidden by JSON standard.
503 - also supports substring_of constraint which constrains the string to be a substring of a given string
504 - the generated json could be invalid if the given string contains invalid content when put into the json string's quotes.
505 - for example, `substring_of="abc\""` will allow '\"' to appear in the json string which is forbidden by JSON standard.
506 - NoneType
507 - typing.Any
508 - Subclasses of collections.abc.Mapping[str,T] and typing.Mapping[str,T] where T is a supported type,
509 - Subclasses of collections.abc.Sequence[T] and typing.Sequence[T] where T is a supported type.
510 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
511 - *Warning*: too large difference between minItems and maxItems can lead to very slow performance!
512 - *Warning*: By json schema definition, prefixItems by default allows additional items and missing items in the prefixItems, which may not be the desired behavior and can lead to very slow performance if prefixItems is long!
513 - tuple[T1,T2,...] where T1,T2,... are supported types. The order, type and number of elements will be preserved.
514 - typing.Literal[x1,x2,...] where x1, x2, ... are instances of int, string, bool or NoneType, or another typing.Literal[y1,y2,...]
515 - typing.Union[T1,T2,...] where T1,T2,... are supported types.
516 - schemas.Schema where all its fields' data types are supported. Recursive schema definitions are supported as well.
517 - *Warning*: while not required field is supported, they can lead to very slow performance and/or enormous memory consumption if there are too many of them!
518
519 Args:
520 nonterminal: The nonterminal representing the extractor.
521 capture_name: The capture name of the extractor, or `None` if the extractor does not capture.
522 schema: The schema.
523 to_object: A callable to convert the extracted string to a schema instance.
524 """
525 super().__init__(nonterminal, capture_name)
526 self._to_object = to_object
527 self._rule_str = _generate_kbnf_grammar(schema, self.nonterminal)
528 def extract(self, input_str: str) -> typing.Optional[tuple[str, schemas.schema.Schema]]:
529 """
530 Extract a schema instance from a string.
531
532 Args:
533 input_str: The input string to extract from.
534
535 Returns:
536 A tuple of the remaining string and the extracted schema instance, or `None` if extraction failed.
537 """
538
539 # Ensure the input string starts with '{' or '[' after stripping leading whitespace
540 input_str = input_str.lstrip()
541 if not input_str.startswith(('{', '[')):
542 return None
543
544 # Variables to track the balance of brackets and the position in the string
545 bracket_count = 0
546 position = 0
547 in_string = False
548 escape_next = False
549 start_char = input_str[0]
550 end_char = '}' if start_char == '{' else ']'
551
552 # Iterate over the string to find where the JSON object or array ends
553 for char in input_str:
554 if not in_string:
555 if char == start_char:
556 bracket_count += 1
557 elif char == end_char:
558 bracket_count -= 1
559 elif char == '"':
560 in_string = True
561 else:
562 if char == '"' and not escape_next:
563 in_string = False
564 elif char == '\\':
565 escape_next = not escape_next
566 else:
567 escape_next = False
568
569 # Move to the next character
570 position += 1
571
572 # If brackets are balanced and we're not in a string, stop processing
573 if bracket_count == 0 and not in_string:
574 break
575 else:
576 return None
577 # The position now points to the character after the last '}', so we slice to position
578 json_str = input_str[:position]
579 remaining_str = input_str[position:]
580 # Return the unparsed remainder of the string and the decoded JSON object
581 return remaining_str, self._to_object(json_str)
582
583 @property
584 def kbnf_definition(self):
585 return self._rule_str
586
587
An extractor that extracts data corresponding to a nonterminal.
Definition extractor.py:98
str nonterminal(self)
Get the nonterminal of the extractor.
Definition extractor.py:121
An extractor that loads json data to an object from a string.
Definition json.py:484
typing.Optional[tuple[str, schemas.schema.Schema]] extract(self, str input_str)
Extract a schema instance from a string.
Definition json.py:555
__init__(self, str nonterminal, typing.Optional[str] capture_name, schemas.schema.Schema|collections.abc.Sequence schema, typing.Callable[[str], schemas.schema.Schema] to_object)
Create a json extractor from a given schema or a list of supported types.
Definition json.py:542
An abstract field info that describes a data field in a schema.
Definition schema.py:13
An abstract schema that describes some data.
Definition schema.py:91
_register_all_predefined_types()
Definition json.py:83
None register_generate_nonterminal_def(typing.Callable[[typing.Type, str], typing.Optional[typing.Tuple[str, typing.List[typing.Tuple[typing.Type, str]]]]] generate_nonterminal_def)
Register a callable to generate nonterminal definition from a type.
Definition json.py:79
str from_str_to_kbnf_str(str s)
Convert a string to a kbnf string.
Definition json.py:54
str _generate_kbnf_grammar(schemas.schema.Schema|collections.abc.Sequence schema, str start_nonterminal)
Generate a KBNF grammar string from a schema for JSON format.
Definition json.py:445