Formatron v0.4.2
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json_schema.py
Go to the documentation of this file.
1"""
2This module contains utilities for creating schemas from JSON schemas.
3"""
4
5import collections
6import collections.abc
7import copy
8import inspect
9import json
10from urllib.parse import urldefrag, urljoin
11import frozendict
12import jsonschema.validators
13from pydantic import typing
14import jsonschema
15from formatron import schemas
16from referencing import Registry, Resource
17
19 __slots__ = ("_annotation",)
20
21 def __init__(self, annotation: typing.Type, required:bool):
22 """
23 Initialize the field information.
24
25 Args:
26 annotation: The type annotation of the field.
27 """
28 self._annotation = annotation
29 self._required = required
31 @property
32 def annotation(self) -> typing.Type[typing.Any] | None:
33 """
34 Get the type annotation of the field.
35 """
36 return self._annotation
37
38 @property
39 def required(self) -> bool:
40 """
41 Check if the field is required for the schema.
42 """
43 return self._required
44
45_counter = 0
46
47def create_schema(schema: dict[str, typing.Any], registry=Registry()) -> schemas.schema.Schema:
48 """
49 Create a Schema object from a JSON schema object.
50
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
53
54 - `type` keyword
55 - `items` keyword
56 - `properties` keyword
57 - Due to implementation limitations, we always assume `additionalProperties` is false.
58 - `enum` and `const` keyword
59 - This includes advanced enum types such as array and object.
60 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
61 - `required` keyword
62 - Schema references ($ref and $dynamicRef)
63 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
64 - This includes recursive schema references.
65 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
66 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
68 Requirements:
69 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
70 - The root schema's type must be exactly "object"
71 - The schema must have a valid '$id' and '$schema' fields
72 - All references must be resolvable within the given schema and registry
73
74 Args:
75 schema: A dictionary representing a valid JSON schema.
76 registry: A Registry object containing additional schema definitions.
77 Defaults to an empty Registry.
78
79 Returns:
80 schemas.schema.Schema: A Schema object representing the input JSON schema.
81
82 Raises:
83 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
84 ValueError: If there are issues with schema references, constraints or requirements.
85 """
86 registry = copy.deepcopy(registry)
87 schema = copy.deepcopy(schema)
89 registry = Resource.from_contents(schema) @ registry
90 json_schema_id_to_schema = {}
91 memo = set()
92 _recursive_resolve_reference(schema["$id"], schema, registry, memo)
93 memo.clear()
94 _merge_referenced_schema(schema,memo)
95 result = _convert_json_schema_to_our_schema(schema,json_schema_id_to_schema)
96 return result
97
98def _resolve_new_url(uri: str, ref: str) -> str:
99 """
100 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
101 """
102 if not ref.startswith("#"):
103 uri, _ = urldefrag(urljoin(uri, ref))
104 return uri
105
106def _validate_json_schema(schema: dict[str, typing.Any]) -> None:
107 if "type" not in schema or schema["type"] != "object":
108 raise ValueError("Root schema must have type 'object'")
109 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
110
111def _convert_json_schema_to_our_schema(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type])->typing.Type:
112 """
113 Recursively handle all types needed to fully determine the type of a schema
114 """
115 schema_id = id(schema)
116 if schema_id in json_schema_id_to_schema: # Circular reference
117 return json_schema_id_to_schema[schema_id]
118 if isinstance(schema, dict):
119 _inferred_type = _infer_type(schema, json_schema_id_to_schema)
120 if "properties" in schema:
121 fields = _extract_fields_from_object_type(json_schema_id_to_schema[schema_id])
122 properties = schema["properties"]
123 required = schema.get("required", [])
124 for _property in properties:
125 fields[_property] = FieldInfo(_convert_json_schema_to_our_schema(properties[_property], json_schema_id_to_schema), required=_property in required)
126 return _inferred_type
127
128def _extract_fields_from_object_type(object_type:typing.Type):
129 args = typing.get_args(object_type)
130 for arg in args:
131 if isinstance(arg, type) and issubclass(arg, schemas.schema.Schema):
132 return arg.fields()
133 return object_type.fields()
134
135def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any | None]:
136 """
137 Infer more specific types.
138 """
139 obtained_type = _obtain_type(schema, json_schema_id_to_schema)
140 args = typing.get_args(obtained_type)
141 if obtained_type is None or obtained_type is object or object in args:
142 obtained_type = _create_custom_type(obtained_type, schema, json_schema_id_to_schema)
143 if obtained_type is typing.List and "items" in schema:
144 item_type = _convert_json_schema_to_our_schema(schema["items"], json_schema_id_to_schema)
145 obtained_type = typing.List[item_type]
146 json_schema_id_to_schema[id(schema)] = obtained_type
147 return obtained_type
148
149def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
150 if "enum" in schema and "const" in schema:
151 raise ValueError("JSON schema cannot contain both 'enum' and 'const' keywords")
152 return tuple(schema["enum"]) if "enum" in schema else schema.get("const")
153
154def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
155 # TODO: validate literal against obtained_type
156 if not isinstance(literal, tuple):
157 literal = (literal,)
158 literal = frozendict.deepfreeze(literal)
159 literal_type = typing.Literal[literal]
160 return literal_type
161
162def _create_custom_type(obtained_type: typing.Type|None, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
163 global _counter
164 fields = {}
165 new_type = type(f"__json_schema_{_counter}", (schemas.schema.Schema,), {
166 "from_json": classmethod(lambda cls, x: json.loads(x)),
167 "fields": classmethod(lambda cls: fields)
168 })
169 _counter += 1
170
171 if obtained_type is None:
172 json_schema_id_to_schema[id(schema)] = typing.Union[str, float, int, bool, None, typing.List[typing.Any], new_type]
173 elif object in typing.get_args(obtained_type):
174 json_schema_id_to_schema[id(schema)] = typing.Union[tuple(item for item in typing.get_args(obtained_type) if item is not object)+(new_type,)]
175 else:
176 json_schema_id_to_schema[id(schema)] = new_type
177 return json_schema_id_to_schema[id(schema)]
178
179
180def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|None]:
181 """
182 Directly obtain type information from this schema level.
183 """
184
185 if "type" not in schema:
186 obtained_type = None
187 else:
188 json_type = schema["type"]
189 if json_type == "string":
190 obtained_type = str
191 elif json_type == "number":
192 obtained_type = float
193 elif json_type == "integer":
194 obtained_type = int
195 elif json_type == "boolean":
196 obtained_type = bool
197 elif json_type == "null":
198 obtained_type = type(None)
199 elif json_type == "array":
200 obtained_type = typing.List
201 elif json_type == "object":
202 obtained_type = object
203 elif isinstance(json_type, collections.abc.Sequence):
204 new_list = []
205 for item in json_type:
206 new_schema = schema.copy()
207 new_schema["type"] = item
208 new_list.append(_obtain_type(new_schema, json_schema_id_to_schema))
209 obtained_type = typing.Union[tuple(new_list)]
210 else:
211 raise TypeError(f"Unsupported type in json schema: {json_type}")
212 literal = _get_literal(schema)
213 if literal is not None:
214 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
215 return obtained_type
217
218
219
220
221def _merge_referenced_schema(schema: dict[str, typing.Any], memo: set[int]):
222 keys = ["$ref", "$dynamicRef"]
223 if id(schema) in memo: # Circular reference
224 return None
225 if isinstance(schema, list):
226 memo.add(id(schema))
227 for item in schema:
228 _merge_referenced_schema(item, memo)
229 elif isinstance(schema, dict):
230 memo.add(id(schema))
231 for key in keys:
232 if key in schema:
233 _merge_referenced_schema(schema[key], memo) # ensure no unmerged references
234 for ref_key, ref_value in schema[key].items():
235 _merge_key(schema, ref_key, ref_value)
236 del schema[key]
237 for key, value in schema.items():
238 _merge_referenced_schema(value, memo)
239
240def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
241 if ref_key not in schema:
242 schema[ref_key] = reference_value
243 return None
244 if schema[ref_key] is reference_value:
245 return None
246 if isinstance(schema[ref_key], dict) and isinstance(reference_value, dict):
247 for new_ref_key, new_ref_value in reference_value.items():
248 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
249 return None
250 if ref_key in ("$id", "$schema"):
251 # For $id and $schema, keep the original value
252 return None
253 if isinstance(schema[ref_key], (str, int, float, bool)) and isinstance(reference_value, (str, int, float, bool)):
254 if schema[ref_key] == reference_value:
255 return None
256 raise ValueError(f"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
257
258
259def _recursive_resolve_reference(base_uri: str, schema: typing.Any, registry: Registry, memo: set[int]):
260 if id(schema) in memo:
261 return schema
262 memo.add(id(schema))
263 if isinstance(schema, list):
264 new_list = []
265 for item in schema:
266 new_list.append(_recursive_resolve_reference(base_uri, item, registry, memo))
267 schema.clear()
268 schema.extend(new_list)
269 if isinstance(schema, dict):
270 if "$id" in schema:
271 base_uri = _resolve_new_url(base_uri, schema["$id"])
272 resolver = registry.resolver(base_uri)
273 keys = ["$ref", "$dynamicRef"]
274 for key in keys:
275 if key in schema:
276 _resolve_reference(schema, key, resolver)
277 for key, value in schema.items():
278 _recursive_resolve_reference(base_uri, value, registry, memo)
279 return schema
280
281def _resolve_reference(schema: dict[str, typing.Any], key: str, resolver: typing.Any):
282 resolved = resolver.lookup(schema[key])
283 if resolved.contents is schema:
284 raise ValueError(f"Circular self reference detected in JSON schema: {schema}")
285 schema[key] = resolved.contents
typing.Type[typing.Any]|None annotation(self)
Get the type annotation of the field.
__init__(self, typing.Type annotation, bool required)
Initialize the field information.
bool required(self)
Check if the field is required for the schema.
An abstract schema that describes some data.
Definition schema.py:48
typing.Any _get_literal(dict[str, typing.Any] schema)
_resolve_reference(dict[str, typing.Any] schema, str key, typing.Any resolver)
typing.Type _create_custom_type(typing.Type|None obtained_type, dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
_merge_referenced_schema(dict[str, typing.Any] schema, set[int] memo)
None _validate_json_schema(dict[str, typing.Any] schema)
typing.Type[typing.Any|None] _obtain_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Directly obtain type information from this schema level.
typing.Type _convert_json_schema_to_our_schema(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Recursively handle all types needed to fully determine the type of a schema.
_merge_key(dict[str, typing.Any] schema, str ref_key, typing.Any reference_value)
typing.Type _handle_literal(typing.Any literal, typing.Type obtained_type, dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
str _resolve_new_url(str uri, str ref)
Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core....
_recursive_resolve_reference(str base_uri, typing.Any schema, Registry registry, set[int] memo)
schemas.schema.Schema create_schema(dict[str, typing.Any] schema, registry=Registry())
Create a Schema object from a JSON schema object.
typing.Type[typing.Any|None] _infer_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Infer more specific types.
_extract_fields_from_object_type(typing.Type object_type)