Formatron v0.4.9
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json_schema.py
Go to the documentation of this file.
1"""
2This module contains utilities for creating schemas from JSON schemas.
3"""
4
5import collections
6import collections.abc
7import copy
8import json
9from urllib.parse import urldefrag, urljoin
10import frozendict
11import jsonschema.validators
12from pydantic import typing
13import jsonschema
14from formatron import schemas
15from referencing import Registry, Resource
16
18 __slots__ = ("_annotation",)
19
20 def __init__(self, annotation: typing.Type, required:bool):
21 """
22 Initialize the field information.
23
24 Args:
25 annotation: The type annotation of the field.
26 required: Whether the field is required for the schema.
27 """
28 self._annotation = annotation
29 self._required = required
31 @property
32 def annotation(self) -> typing.Type[typing.Any] | None:
33 """
34 Get the type annotation of the field.
35 """
36 return self._annotation
37
38 @property
39 def required(self) -> bool:
40 """
41 Check if the field is required for the schema.
42 """
43 return self._required
44
45_counter = 0
46
47def create_schema(schema: dict[str, typing.Any], registry=Registry()) -> schemas.schema.Schema:
48 """
49 Create a Schema object from a JSON schema object.
50
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
53
54 - `type` keyword
55 - `minLength, maxLength, pattern` keywords for string type
56 - `substringOf` keyword for string type
57 - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type
58 - `items` keyword
59 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
60 - `properties` keyword
61 - Due to implementation limitations, we always assume `additionalProperties` is false.
62 - Note that `properties` is optional for object type.
63 - `enum` and `const` keyword
64 - This includes advanced enum types such as array and object.
65 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
66 - `required` keyword
67 - `anyOf` keyword
68 - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas)
69 - Schema references ($ref and $dynamicRef)
70 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
71 - This includes recursive schema references.
72 - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet.
73 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
74 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
75
76 Requirements:
77 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
78 - The root schema's type must be exactly "object" or "array" or both
79 - The schema must have a valid '$id' and '$schema' fields
80 - All references must be resolvable within the given schema and registry
81
82 Args:
83 schema: A dictionary representing a valid JSON schema.
84 registry: A Registry object containing additional schema definitions.
85 Defaults to an empty Registry.
86
87 Returns:
88 schemas.schema.Schema: A Schema object representing the input JSON schema.
89
90 Raises:
91 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
92 ValueError: If there are issues with schema references, constraints or requirements.
93 """
94 registry = copy.deepcopy(registry)
95 schema = copy.deepcopy(schema)
97 registry = Resource.from_contents(schema) @ registry
98 json_schema_id_to_schema = {}
99 memo = set()
100 _recursive_resolve_reference(schema["$id"], schema, registry, memo)
101 memo.clear()
102 _merge_referenced_schema(schema,memo)
103 result = _convert_json_schema_to_our_schema(schema,json_schema_id_to_schema)
104 return result
105
106def _resolve_new_url(uri: str, ref: str) -> str:
107 """
108 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
109 """
110 if not ref.startswith("#"):
111 uri, _ = urldefrag(urljoin(uri, ref))
112 return uri
113
114def _validate_json_schema(schema: dict[str, typing.Any]) -> None:
115 if "type" in schema:
116 root_type = schema["type"]
117 if isinstance(root_type, str):
118 if root_type not in ["object", "array"]:
119 raise ValueError("Root schema type must be 'object' or 'array'")
120 elif isinstance(root_type, list):
121 if not set(root_type).issubset({"object", "array"}):
122 raise ValueError("Root schema type must be 'object', 'array', or both")
123 else:
124 raise ValueError("Invalid 'type' specification in root schema")
125 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
126
127def _convert_json_schema_to_our_schema(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type])->typing.Type:
128 """
129 Recursively handle all types needed to fully determine the type of a schema
130 """
131 schema_id = id(schema)
132 if schema_id in json_schema_id_to_schema: # Circular reference
133 return json_schema_id_to_schema[schema_id]
134 if isinstance(schema, dict):
135 _inferred_type = _infer_type(schema, json_schema_id_to_schema)
136 if "properties" in schema:
137 fields = _extract_fields_from_object_type(json_schema_id_to_schema[schema_id])
138 properties = schema["properties"]
139 required = schema.get("required", [])
140 for _property in properties:
141 fields[_property] = FieldInfo(_convert_json_schema_to_our_schema(properties[_property], json_schema_id_to_schema), required=_property in required)
142 return _inferred_type
143
144def _extract_fields_from_object_type(object_type:typing.Type):
145 args = typing.get_args(object_type)
146 for arg in args:
147 if isinstance(arg, type) and issubclass(arg, schemas.schema.Schema):
148 return arg.fields()
149 return object_type.fields()
150
151def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
152 allowed_keys = {"anyOf", "$id", "$schema"}
153 assert set(schema.keys()).issubset(allowed_keys), "Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present"
154 new_list = []
155 for item in schema["anyOf"]:
156 new_list.append(_convert_json_schema_to_our_schema(item, json_schema_id_to_schema))
157 return typing.Union[tuple(new_list)]
158
159def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any | None]:
160 """
161 Infer more specific types.
162 """
163 if "anyOf" in schema:
164 return _handle_anyOf(schema, json_schema_id_to_schema)
165 obtained_type = _obtain_type(schema, json_schema_id_to_schema)
166 if obtained_type is None:
167 obtained_type = typing.Union[str, float, int, bool, None, list[typing.Any]]
168 args = None
169 origin = typing.get_origin(obtained_type)
170 if origin is typing.Union or origin is typing.Literal or origin is list:
171 args = typing.get_args(obtained_type)
172 if not args:
173 args = [obtained_type]
174 else:
175 args = list(args)
176 for i, arg in enumerate(args):
177 if arg is object:
178 args[i] = _create_custom_type(schema, json_schema_id_to_schema)
179 elif arg is list:
180 args[i] = _handle_list_metadata(schema, json_schema_id_to_schema)
181 elif arg is str:
182 args[i] = _handle_str_with_metadata(schema)
183 elif arg is int or arg is float:
184 args[i] = _handle_numeric_with_metadata(schema, arg)
185 if typing.get_origin(obtained_type) is typing.Union:
186 obtained_type = typing.Union[tuple(args)]
187 elif typing.get_origin(obtained_type) is typing.Literal:
188 obtained_type = typing.Literal[tuple(args)]
189 else:
190 obtained_type = args[0]
191 json_schema_id_to_schema[id(schema)] = obtained_type
192 return obtained_type
194def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
195 if "enum" in schema and "const" in schema:
196 raise ValueError("JSON schema cannot contain both 'enum' and 'const' keywords")
197 return tuple(schema["enum"]) if "enum" in schema else schema.get("const")
198
199def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
200 # TODO: validate literal against obtained_type
201 if not isinstance(literal, tuple):
202 literal = (literal,)
203 literal = frozendict.deepfreeze(literal)
204 literal_type = typing.Literal[literal]
205 return literal_type
206
207def _handle_str_with_metadata(schema: dict[str, typing.Any]) -> typing.Type:
208 """
209 Handle string type with metadata such as maxLength, minLength, and pattern.
210 """
211 metadata = {}
212 if "maxLength" in schema:
213 metadata["max_length"] = schema["maxLength"]
214 if "minLength" in schema:
215 metadata["min_length"] = schema["minLength"]
216 if "pattern" in schema:
217 metadata["pattern"] = schema["pattern"]
218 if "substringOf" in schema:
219 metadata["substring_of"] = schema["substringOf"]
220
221 if metadata:
222 return schemas.schema.TypeWithMetadata(str, metadata)
223 return str
224
225def _handle_numeric_with_metadata(schema: dict[str, typing.Any], numeric_type: typing.Type) -> typing.Type:
226 """
227 Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum.
228 """
229 metadata = {}
230 if "minimum" in schema:
231 metadata["ge"] = schema["minimum"]
232 if "maximum" in schema:
233 metadata["le"] = schema["maximum"]
234 if "exclusiveMinimum" in schema:
235 metadata["gt"] = schema["exclusiveMinimum"]
236 if "exclusiveMaximum" in schema:
237 metadata["lt"] = schema["exclusiveMaximum"]
238
239 if metadata:
240 return schemas.schema.TypeWithMetadata(numeric_type, metadata)
241 return numeric_type
242
244
245def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
246 global _counter
247 fields = {}
248 new_type = type(f"__json_schema_{_counter}", (schemas.schema.Schema,), {
249 "from_json": classmethod(lambda cls, x: json.loads(x)),
250 "fields": classmethod(lambda cls: fields)
251 })
252 _counter += 1
253 json_schema_id_to_schema[id(schema)] = new_type
254 return new_type
255
256def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
257 """
258 Handle cases where the obtained type is a list
259 """
260 metadata = {}
261 if "minItems" in schema:
262 metadata["min_length"] = schema["minItems"]
263 if "maxItems" in schema:
264 metadata["max_length"] = schema["maxItems"]
265 if "prefixItems" in schema:
266 metadata["prefix_items"] = tuple(_convert_json_schema_to_our_schema(i, json_schema_id_to_schema) for i in schema["prefixItems"])
267 item_type = typing.Any
268 if "items" in schema:
269 if schema["items"] == False:
270 metadata["additional_items"] = False
271 else:
272 item_type = _convert_json_schema_to_our_schema(schema["items"], json_schema_id_to_schema)
273 if item_type is None:
274 item_type = typing.Any
275 if metadata:
276 if "additional_items" not in metadata:
277 metadata["additional_items"] = True
278 return schemas.schema.TypeWithMetadata(list, metadata)
279 return list[item_type]
281
282def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|None]:
283 """
284 Directly obtain type information from this schema's type keyword.
285 """
286 if "type" not in schema:
287 obtained_type = None
288 else:
289 json_type = schema["type"]
290 if json_type == "string":
291 obtained_type = str
292 elif json_type == "number":
293 obtained_type = float
294 elif json_type == "integer":
295 obtained_type = int
296 elif json_type == "boolean":
297 obtained_type = bool
298 elif json_type == "null":
299 obtained_type = type(None)
300 elif json_type == "array":
301 obtained_type = list
302 elif json_type == "object":
303 if "properties" in schema:
304 obtained_type = object
305 else:
306 obtained_type = dict[str, typing.Any]
307 elif isinstance(json_type, collections.abc.Sequence):
308 new_list = []
309 for item in json_type:
310 new_schema = schema.copy()
311 new_schema["type"] = item
312 new_list.append(_obtain_type(new_schema, json_schema_id_to_schema))
313 obtained_type = typing.Union[tuple(new_list)]
314 else:
315 raise TypeError(f"Unsupported type in json schema: {json_type}")
316 literal = _get_literal(schema)
317 if literal is not None:
318 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
319 return obtained_type
320
321
322def _merge_referenced_schema(schema: dict[str, typing.Any], memo: set[int]):
323 keys = ["$ref", "$dynamicRef"]
324 if id(schema) in memo: # Circular reference
325 return None
326 if isinstance(schema, list):
327 memo.add(id(schema))
328 for item in schema:
329 _merge_referenced_schema(item, memo)
330 elif isinstance(schema, dict):
331 memo.add(id(schema))
332 for key in keys:
333 if key in schema:
334 _merge_referenced_schema(schema[key], memo) # ensure no unmerged references
335 for ref_key, ref_value in schema[key].items():
336 _merge_key(schema, ref_key, ref_value)
337 del schema[key]
338 for key, value in schema.items():
339 _merge_referenced_schema(value, memo)
340
341def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
342 if ref_key not in schema:
343 schema[ref_key] = reference_value
344 return None
345 if schema[ref_key] is reference_value:
346 return None
347 if isinstance(schema[ref_key], dict) and isinstance(reference_value, dict):
348 for new_ref_key, new_ref_value in reference_value.items():
349 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
350 return None
351 if ref_key in ("$id", "$schema"):
352 # For $id and $schema, keep the original value
353 return None
354 if isinstance(schema[ref_key], (str, int, float, bool)) and isinstance(reference_value, (str, int, float, bool)):
355 if schema[ref_key] == reference_value:
356 return None
357 raise ValueError(f"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
358
359
360def _recursive_resolve_reference(base_uri: str, schema: typing.Any, registry: Registry, memo: set[int]):
361 if id(schema) in memo:
362 return schema
363 memo.add(id(schema))
364 if isinstance(schema, list):
365 new_list = []
366 for item in schema:
367 new_list.append(_recursive_resolve_reference(base_uri, item, registry, memo))
368 schema.clear()
369 schema.extend(new_list)
370 if isinstance(schema, dict):
371 if "$id" in schema:
372 base_uri = _resolve_new_url(base_uri, schema["$id"])
373 resolver = registry.resolver(base_uri)
374 keys = ["$ref", "$dynamicRef"]
375 for key in keys:
376 if key in schema:
377 _resolve_reference(schema, key, resolver)
378 for key, value in schema.items():
379 _recursive_resolve_reference(base_uri, value, registry, memo)
380 return schema
381
382def _resolve_reference(schema: dict[str, typing.Any], key: str, resolver: typing.Any):
383 resolved = resolver.lookup(schema[key])
384 if resolved.contents is schema:
385 raise ValueError(f"Circular self reference detected in JSON schema: {schema}")
386 schema[key] = resolved.contents
typing.Type[typing.Any]|None annotation(self)
Get the type annotation of the field.
__init__(self, typing.Type annotation, bool required)
Initialize the field information.
bool required(self)
Check if the field is required for the schema.
An abstract field info that describes a data field in a schema.
Definition schema.py:13
An abstract schema that describes some data.
Definition schema.py:91
typing.Any _get_literal(dict[str, typing.Any] schema)
typing.Type _create_custom_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
_resolve_reference(dict[str, typing.Any] schema, str key, typing.Any resolver)
typing.Type _handle_str_with_metadata(dict[str, typing.Any] schema)
Handle string type with metadata such as maxLength, minLength, and pattern.
_merge_referenced_schema(dict[str, typing.Any] schema, set[int] memo)
None _validate_json_schema(dict[str, typing.Any] schema)
typing.Type[typing.Any|None] _obtain_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Directly obtain type information from this schema's type keyword.
typing.Type _convert_json_schema_to_our_schema(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Recursively handle all types needed to fully determine the type of a schema.
_merge_key(dict[str, typing.Any] schema, str ref_key, typing.Any reference_value)
typing.Type _handle_anyOf(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
typing.Type _handle_literal(typing.Any literal, typing.Type obtained_type, dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
str _resolve_new_url(str uri, str ref)
Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core....
_recursive_resolve_reference(str base_uri, typing.Any schema, Registry registry, set[int] memo)
schemas.schema.Schema create_schema(dict[str, typing.Any] schema, registry=Registry())
Create a Schema object from a JSON schema object.
typing.Type[typing.Any|None] _infer_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Infer more specific types.
typing.Type _handle_list_metadata(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Handle cases where the obtained type is a list.
typing.Type _handle_numeric_with_metadata(dict[str, typing.Any] schema, typing.Type numeric_type)
Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum,...
_extract_fields_from_object_type(typing.Type object_type)