Formatron v0.4.11
Formatron empowers everyone to control the output format of language models with minimal overhead.
Loading...
Searching...
No Matches
json_schema.py
Go to the documentation of this file.
1"""
2This module contains utilities for creating schemas from JSON schemas.
3"""
4
5import collections
6import collections.abc
7import copy
8import json
9from urllib.parse import urldefrag, urljoin
10import frozendict
11import jsonschema.validators
12from pydantic import typing
13import jsonschema
14from formatron import schemas
15from referencing import Registry, Resource
16
18 __slots__ = ("_annotation",)
19
20 def __init__(self, annotation: typing.Type, required:bool):
21 """
22 Initialize the field information.
23
24 Args:
25 annotation: The type annotation of the field.
26 required: Whether the field is required for the schema.
27 """
28 self._annotation = annotation
29 self._required = required
31 @property
32 def annotation(self) -> typing.Type[typing.Any] | None:
33 """
34 Get the type annotation of the field.
35 """
36 return self._annotation
37
38 @property
39 def required(self) -> bool:
40 """
41 Check if the field is required for the schema.
42 """
43 return self._required
44
45_counter = 0
46
47def create_schema(schema: dict[str, typing.Any], registry=Registry()) -> schemas.schema.Schema:
48 """
49 Create a Schema object from a JSON schema object.
50
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
53
54 - `type` keyword
55 - `minLength, maxLength, pattern` keywords for string type
56 - `substringOf` keyword for string type
57 - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type
58 - `items` keyword
59 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
60 - `properties` keyword
61 - Due to implementation limitations, we always assume `additionalProperties` is false.
62 - Note that `properties` is optional for object type.
63 - `enum` and `const` keyword
64 - This includes advanced enum types such as array and object.
65 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
66 - `required` keyword
67 - `anyOf` keyword
68 - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas)
69 - Schema references ($ref and $dynamicRef)
70 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
71 - This includes recursive schema references.
72 - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet.
73 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
74 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
75
76 Requirements:
77 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
78 - The root schema's type must be exactly "object" or "array" or both
79 - The schema must have a valid '$id' and '$schema' fields
80 - All references must be resolvable within the given schema and registry
81
82 Args:
83 schema: A dictionary representing a valid JSON schema.
84 registry: A Registry object containing additional schema definitions.
85 Defaults to an empty Registry.
86
87 Returns:
88 schemas.schema.Schema: A Schema object representing the input JSON schema.
89
90 Raises:
91 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
92 ValueError: If there are issues with schema references, constraints or requirements.
93 """
94 registry = copy.deepcopy(registry)
95 schema = copy.deepcopy(schema)
97 registry = Resource.from_contents(schema) @ registry
98 json_schema_id_to_schema = {}
99 memo = set()
100 _recursive_resolve_reference(schema["$id"], schema, registry, memo)
101 memo.clear()
102 _merge_referenced_schema(schema,memo)
103 result = _convert_json_schema_to_our_schema(schema,json_schema_id_to_schema)
104 return result
105
106def _resolve_new_url(uri: str, ref: str) -> str:
107 """
108 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
109 """
110 if not ref.startswith("#"):
111 uri, _ = urldefrag(urljoin(uri, ref))
112 return uri
113
114def _validate_json_schema(schema: dict[str, typing.Any]) -> None:
115 if "type" in schema:
116 root_type = schema["type"]
117 if isinstance(root_type, str):
118 if root_type not in ["object", "array"]:
119 raise ValueError("Root schema type must be 'object' or 'array'")
120 elif isinstance(root_type, list):
121 if not set(root_type).issubset({"object", "array"}):
122 raise ValueError("Root schema type must be 'object', 'array', or both")
123 else:
124 raise ValueError("Invalid 'type' specification in root schema")
125 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
126
127def _convert_json_schema_to_our_schema(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type])->typing.Type:
128 """
129 Recursively handle all types needed to fully determine the type of a schema
130 """
131 schema_id = id(schema)
132 if schema_id in json_schema_id_to_schema: # Circular reference
133 return json_schema_id_to_schema[schema_id]
134 if isinstance(schema, dict):
135 _inferred_type = _infer_type(schema, json_schema_id_to_schema)
136 if "properties" in schema:
137 fields = _extract_fields_from_object_type(json_schema_id_to_schema[schema_id])
138 properties = schema["properties"]
139 required = schema.get("required", [])
140 for _property in properties:
141 fields[_property] = FieldInfo(_convert_json_schema_to_our_schema(properties[_property], json_schema_id_to_schema), required=_property in required)
142 return _inferred_type
143
144def _extract_fields_from_object_type(object_type:typing.Type):
145 args = typing.get_args(object_type)
146 for arg in args:
147 arg = typing.get_origin(arg) or arg
148 if isinstance(arg, type) and issubclass(arg, schemas.schema.Schema):
149 return arg.fields()
150 return object_type.fields()
151
152def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
153 allowed_keys = {"anyOf", "$id", "$schema"}
154 assert set(schema.keys()).issubset(allowed_keys), "Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present"
155 new_list = []
156 for item in schema["anyOf"]:
157 new_list.append(_convert_json_schema_to_our_schema(item, json_schema_id_to_schema))
158 return typing.Union[tuple(new_list)]
159
160def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any | None]:
161 """
162 Infer more specific types.
163 """
164 if "anyOf" in schema:
165 return _handle_anyOf(schema, json_schema_id_to_schema)
166 obtained_type = _obtain_type(schema, json_schema_id_to_schema)
167 if obtained_type is None:
168 obtained_type = typing.Union[str, float, int, bool, None, list[typing.Any]]
169 args = None
170 origin = typing.get_origin(obtained_type)
171 if origin is typing.Union or origin is typing.Literal or origin is list:
172 args = typing.get_args(obtained_type)
173 if not args:
174 args = [obtained_type]
175 else:
176 args = list(args)
177 for i, arg in enumerate(args):
178 if arg is object:
179 args[i] = _create_custom_type(schema, json_schema_id_to_schema)
180 elif arg is list:
181 args[i] = _handle_list_metadata(schema, json_schema_id_to_schema)
182 elif arg is str:
183 args[i] = _handle_str_with_metadata(schema)
184 elif arg is int or arg is float:
185 args[i] = _handle_numeric_with_metadata(schema, arg)
186 if typing.get_origin(obtained_type) is typing.Union:
187 obtained_type = typing.Union[tuple(args)]
188 elif typing.get_origin(obtained_type) is typing.Literal:
189 obtained_type = typing.Literal[tuple(args)]
190 else:
191 obtained_type = args[0]
192 json_schema_id_to_schema[id(schema)] = obtained_type
193 return obtained_type
195def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
196 if "enum" in schema and "const" in schema:
197 raise ValueError("JSON schema cannot contain both 'enum' and 'const' keywords")
198 return tuple(schema["enum"]) if "enum" in schema else schema.get("const")
199
200def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
201 # TODO: validate literal against obtained_type
202 if not isinstance(literal, tuple):
203 literal = (literal,)
204 literal = frozendict.deepfreeze(literal)
205 literal_type = typing.Literal[literal]
206 return literal_type
207
208def _handle_str_with_metadata(schema: dict[str, typing.Any]) -> typing.Type:
209 """
210 Handle string type with metadata such as maxLength, minLength, and pattern.
211 """
212 metadata = {}
213 if "maxLength" in schema:
214 metadata["max_length"] = schema["maxLength"]
215 if "minLength" in schema:
216 metadata["min_length"] = schema["minLength"]
217 if "pattern" in schema:
218 metadata["pattern"] = schema["pattern"]
219 if "substringOf" in schema:
220 metadata["substring_of"] = schema["substringOf"]
221
222 if metadata:
223 return schemas.schema.TypeWithMetadata(str, metadata)
224 return str
225
226def _handle_numeric_with_metadata(schema: dict[str, typing.Any], numeric_type: typing.Type) -> typing.Type:
227 """
228 Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum.
229 """
230 metadata = {}
231 if "minimum" in schema:
232 metadata["ge"] = schema["minimum"]
233 if "maximum" in schema:
234 metadata["le"] = schema["maximum"]
235 if "exclusiveMinimum" in schema:
236 metadata["gt"] = schema["exclusiveMinimum"]
237 if "exclusiveMaximum" in schema:
238 metadata["lt"] = schema["exclusiveMaximum"]
239
240 if metadata:
241 return schemas.schema.TypeWithMetadata(numeric_type, metadata)
242 return numeric_type
243
245
246def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
247 global _counter
248 fields = {}
249 new_type = type(f"__json_schema_{_counter}", (schemas.schema.Schema,), {
250 "from_json": classmethod(lambda cls, x: json.loads(x)),
251 "fields": classmethod(lambda cls: fields)
252 })
253 _counter += 1
254 json_schema_id_to_schema[id(schema)] = new_type
255 return new_type
256
257def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
258 """
259 Handle cases where the obtained type is a list
260 """
261 metadata = {}
262 if "minItems" in schema:
263 metadata["min_length"] = schema["minItems"]
264 if "maxItems" in schema:
265 metadata["max_length"] = schema["maxItems"]
266 if "prefixItems" in schema:
267 metadata["prefix_items"] = tuple(_convert_json_schema_to_our_schema(i, json_schema_id_to_schema) for i in schema["prefixItems"])
268 item_type = typing.Any
269 if "items" in schema:
270 if schema["items"] == False:
271 metadata["additional_items"] = False
272 else:
273 item_type = _convert_json_schema_to_our_schema(schema["items"], json_schema_id_to_schema)
274 if item_type is None:
275 item_type = typing.Any
276 if metadata:
277 if "additional_items" not in metadata:
278 metadata["additional_items"] = True
279 return schemas.schema.TypeWithMetadata(list, metadata)
280 return list[item_type]
282
283def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|None]:
284 """
285 Directly obtain type information from this schema's type keyword.
286 """
287 if "type" not in schema:
288 obtained_type = None
289 else:
290 json_type = schema["type"]
291 if json_type == "string":
292 obtained_type = str
293 elif json_type == "number":
294 obtained_type = float
295 elif json_type == "integer":
296 obtained_type = int
297 elif json_type == "boolean":
298 obtained_type = bool
299 elif json_type == "null":
300 obtained_type = type(None)
301 elif json_type == "array":
302 obtained_type = list
303 elif json_type == "object":
304 if "properties" in schema:
305 obtained_type = object
306 else:
307 obtained_type = dict[str, typing.Any]
308 elif isinstance(json_type, collections.abc.Sequence):
309 new_list = []
310 for item in json_type:
311 new_schema = schema.copy()
312 new_schema["type"] = item
313 new_list.append(_obtain_type(new_schema, json_schema_id_to_schema))
314 obtained_type = typing.Union[tuple(new_list)]
315 else:
316 raise TypeError(f"Unsupported type in json schema: {json_type}")
317 literal = _get_literal(schema)
318 if literal is not None:
319 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
320 return obtained_type
321
322
323def _merge_referenced_schema(schema: dict[str, typing.Any], memo: set[int]):
324 keys = ["$ref", "$dynamicRef"]
325 if id(schema) in memo: # Circular reference
326 return None
327 if isinstance(schema, list):
328 memo.add(id(schema))
329 for item in schema:
330 _merge_referenced_schema(item, memo)
331 elif isinstance(schema, dict):
332 memo.add(id(schema))
333 for key in keys:
334 if key in schema:
335 _merge_referenced_schema(schema[key], memo) # ensure no unmerged references
336 for ref_key, ref_value in schema[key].items():
337 _merge_key(schema, ref_key, ref_value)
338 del schema[key]
339 for key, value in schema.items():
340 _merge_referenced_schema(value, memo)
341
342def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
343 if ref_key not in schema:
344 schema[ref_key] = reference_value
345 return None
346 if schema[ref_key] is reference_value:
347 return None
348 if isinstance(schema[ref_key], dict) and isinstance(reference_value, dict):
349 for new_ref_key, new_ref_value in reference_value.items():
350 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
351 return None
352 if ref_key in ("$id", "$schema"):
353 # For $id and $schema, keep the original value
354 return None
355 if isinstance(schema[ref_key], (str, int, float, bool)) and isinstance(reference_value, (str, int, float, bool)):
356 if schema[ref_key] == reference_value:
357 return None
358 raise ValueError(f"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
359
360
361def _recursive_resolve_reference(base_uri: str, schema: typing.Any, registry: Registry, memo: set[int]):
362 if id(schema) in memo:
363 return schema
364 memo.add(id(schema))
365 if isinstance(schema, list):
366 new_list = []
367 for item in schema:
368 new_list.append(_recursive_resolve_reference(base_uri, item, registry, memo))
369 schema.clear()
370 schema.extend(new_list)
371 if isinstance(schema, dict):
372 if "$id" in schema:
373 base_uri = _resolve_new_url(base_uri, schema["$id"])
374 resolver = registry.resolver(base_uri)
375 keys = ["$ref", "$dynamicRef"]
376 for key in keys:
377 if key in schema:
378 _resolve_reference(schema, key, resolver)
379 for key, value in schema.items():
380 _recursive_resolve_reference(base_uri, value, registry, memo)
381 return schema
382
383def _resolve_reference(schema: dict[str, typing.Any], key: str, resolver: typing.Any):
384 resolved = resolver.lookup(schema[key])
385 if resolved.contents is schema:
386 raise ValueError(f"Circular self reference detected in JSON schema: {schema}")
387 schema[key] = resolved.contents
typing.Type[typing.Any]|None annotation(self)
Get the type annotation of the field.
__init__(self, typing.Type annotation, bool required)
Initialize the field information.
bool required(self)
Check if the field is required for the schema.
An abstract field info that describes a data field in a schema.
Definition schema.py:13
An abstract schema that describes some data.
Definition schema.py:91
typing.Any _get_literal(dict[str, typing.Any] schema)
typing.Type _create_custom_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
_resolve_reference(dict[str, typing.Any] schema, str key, typing.Any resolver)
typing.Type _handle_str_with_metadata(dict[str, typing.Any] schema)
Handle string type with metadata such as maxLength, minLength, and pattern.
_merge_referenced_schema(dict[str, typing.Any] schema, set[int] memo)
None _validate_json_schema(dict[str, typing.Any] schema)
typing.Type[typing.Any|None] _obtain_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Directly obtain type information from this schema's type keyword.
typing.Type _convert_json_schema_to_our_schema(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Recursively handle all types needed to fully determine the type of a schema.
_merge_key(dict[str, typing.Any] schema, str ref_key, typing.Any reference_value)
typing.Type _handle_anyOf(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
typing.Type _handle_literal(typing.Any literal, typing.Type obtained_type, dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
str _resolve_new_url(str uri, str ref)
Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core....
_recursive_resolve_reference(str base_uri, typing.Any schema, Registry registry, set[int] memo)
schemas.schema.Schema create_schema(dict[str, typing.Any] schema, registry=Registry())
Create a Schema object from a JSON schema object.
typing.Type[typing.Any|None] _infer_type(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Infer more specific types.
typing.Type _handle_list_metadata(dict[str, typing.Any] schema, dict[int, typing.Type] json_schema_id_to_schema)
Handle cases where the obtained type is a list.
typing.Type _handle_numeric_with_metadata(dict[str, typing.Any] schema, typing.Type numeric_type)
Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum,...
_extract_fields_from_object_type(typing.Type object_type)