2This module contains utilities for creating schemas from JSON schemas.
9from urllib.parse
import urldefrag, urljoin
11import jsonschema.validators
12from pydantic
import typing
14from formatron
import schemas
15from referencing
import Registry, Resource
18 __slots__ = (
"_annotation",)
20 def __init__(self, annotation: typing.Type, required:bool):
22 Initialize the field information.
25 annotation: The type annotation of the field.
26 required: Whether the field is required for the schema.
32 def annotation(self) -> typing.Type[typing.Any] | None:
34 Get the type annotation of the field.
41 Check if the field is required for the schema.
49 Create a Schema object from a JSON schema object.
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
55 - `minLength, maxLength, pattern` keywords for string type
56 - `substringOf` keyword for string type
57 - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type
59 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
60 - `properties` keyword
61 - Due to implementation limitations, we always assume `additionalProperties` is false.
62 - Note that `properties` is optional for object type.
63 - `enum` and `const` keyword
64 - This includes advanced enum types such as array and object.
65 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
68 - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas)
69 - Schema references ($ref and $dynamicRef)
70 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
71 - This includes recursive schema references.
72 - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet.
73 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
74 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
77 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
78 - The root schema's type must be exactly "object" or "array" or both
79 - The schema must have a valid '$id' and '$schema' fields
80 - All references must be resolvable within the given schema and registry
83 schema: A dictionary representing a valid JSON schema.
84 registry: A Registry object containing additional schema definitions.
85 Defaults to an empty Registry.
88 schemas.schema.Schema: A Schema object representing the input JSON schema.
91 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
92 ValueError: If there are issues with schema references, constraints or requirements.
94 registry = copy.deepcopy(registry)
95 schema = copy.deepcopy(schema)
97 registry = Resource.from_contents(schema) @ registry
98 json_schema_id_to_schema = {}
108 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
110 if not ref.startswith(
"#"):
111 uri, _ = urldefrag(urljoin(uri, ref))
116 root_type = schema[
"type"]
117 if isinstance(root_type, str):
118 if root_type
not in [
"object",
"array"]:
119 raise ValueError(
"Root schema type must be 'object' or 'array'")
120 elif isinstance(root_type, list):
121 if not set(root_type).issubset({
"object",
"array"}):
122 raise ValueError(
"Root schema type must be 'object', 'array', or both")
124 raise ValueError(
"Invalid 'type' specification in root schema")
125 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
129 Recursively handle all types needed to fully determine the type of a schema
131 schema_id = id(schema)
132 if schema_id
in json_schema_id_to_schema:
133 return json_schema_id_to_schema[schema_id]
134 if isinstance(schema, dict):
135 _inferred_type =
_infer_type(schema, json_schema_id_to_schema)
136 if "properties" in schema:
138 properties = schema[
"properties"]
139 required = schema.get(
"required", [])
140 for _property
in properties:
142 return _inferred_type
145 args = typing.get_args(object_type)
147 arg = typing.get_origin(arg)
or arg
150 return object_type.fields()
152def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
153 allowed_keys = {
"anyOf",
"$id",
"$schema"}
154 assert set(schema.keys()).issubset(allowed_keys),
"Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present"
156 for item
in schema[
"anyOf"]:
158 return typing.Union[tuple(new_list)]
160def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any |
None]:
162 Infer more specific types.
164 if "anyOf" in schema:
166 obtained_type =
_obtain_type(schema, json_schema_id_to_schema)
167 if obtained_type
is None:
168 obtained_type = typing.Union[str, float, int, bool,
None, list[typing.Any]]
170 origin = typing.get_origin(obtained_type)
171 if origin
is typing.Union
or origin
is typing.Literal
or origin
is list:
172 args = typing.get_args(obtained_type)
174 args = [obtained_type]
177 for i, arg
in enumerate(args):
184 elif arg
is int
or arg
is float:
186 if typing.get_origin(obtained_type)
is typing.Union:
187 obtained_type = typing.Union[tuple(args)]
188 elif typing.get_origin(obtained_type)
is typing.Literal:
189 obtained_type = typing.Literal[tuple(args)]
191 obtained_type = args[0]
192 json_schema_id_to_schema[id(schema)] = obtained_type
195def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
196 if "enum" in schema
and "const" in schema:
197 raise ValueError(
"JSON schema cannot contain both 'enum' and 'const' keywords")
198 return tuple(schema[
"enum"])
if "enum" in schema
else schema.get(
"const")
200def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
202 if not isinstance(literal, tuple):
204 literal = frozendict.deepfreeze(literal)
205 literal_type = typing.Literal[literal]
210 Handle string type with metadata such as maxLength, minLength, and pattern.
213 if "maxLength" in schema:
214 metadata[
"max_length"] = schema[
"maxLength"]
215 if "minLength" in schema:
216 metadata[
"min_length"] = schema[
"minLength"]
217 if "pattern" in schema:
218 metadata[
"pattern"] = schema[
"pattern"]
219 if "substringOf" in schema:
220 metadata[
"substring_of"] = schema[
"substringOf"]
228 Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum.
231 if "minimum" in schema:
232 metadata[
"ge"] = schema[
"minimum"]
233 if "maximum" in schema:
234 metadata[
"le"] = schema[
"maximum"]
235 if "exclusiveMinimum" in schema:
236 metadata[
"gt"] = schema[
"exclusiveMinimum"]
237 if "exclusiveMaximum" in schema:
238 metadata[
"lt"] = schema[
"exclusiveMaximum"]
246def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
250 "from_json": classmethod(
lambda cls, x: json.loads(x)),
251 "fields": classmethod(
lambda cls: fields)
254 json_schema_id_to_schema[id(schema)] = new_type
257def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
259 Handle cases where the obtained type is a list
262 if "minItems" in schema:
263 metadata[
"min_length"] = schema[
"minItems"]
264 if "maxItems" in schema:
265 metadata[
"max_length"] = schema[
"maxItems"]
266 if "prefixItems" in schema:
268 item_type = typing.Any
269 if "items" in schema:
270 if schema[
"items"] ==
False:
271 metadata[
"additional_items"] =
False
274 if item_type
is None:
275 item_type = typing.Any
277 if "additional_items" not in metadata:
278 metadata[
"additional_items"] =
True
280 return list[item_type]
283def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|
None]:
285 Directly obtain type information from this schema's type keyword.
287 if "type" not in schema:
290 json_type = schema[
"type"]
291 if json_type ==
"string":
293 elif json_type ==
"number":
294 obtained_type = float
295 elif json_type ==
"integer":
297 elif json_type ==
"boolean":
299 elif json_type ==
"null":
300 obtained_type = type(
None)
301 elif json_type ==
"array":
303 elif json_type ==
"object":
304 if "properties" in schema:
305 obtained_type = object
307 obtained_type = dict[str, typing.Any]
308 elif isinstance(json_type, collections.abc.Sequence):
310 for item
in json_type:
311 new_schema = schema.copy()
312 new_schema[
"type"] = item
313 new_list.append(
_obtain_type(new_schema, json_schema_id_to_schema))
314 obtained_type = typing.Union[tuple(new_list)]
316 raise TypeError(f
"Unsupported type in json schema: {json_type}")
318 if literal
is not None:
319 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
324 keys = [
"$ref",
"$dynamicRef"]
325 if id(schema)
in memo:
327 if isinstance(schema, list):
331 elif isinstance(schema, dict):
336 for ref_key, ref_value
in schema[key].items():
339 for key, value
in schema.items():
342def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
343 if ref_key
not in schema:
344 schema[ref_key] = reference_value
346 if schema[ref_key]
is reference_value:
348 if isinstance(schema[ref_key], dict)
and isinstance(reference_value, dict):
349 for new_ref_key, new_ref_value
in reference_value.items():
350 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
352 if ref_key
in (
"$id",
"$schema"):
355 if isinstance(schema[ref_key], (str, int, float, bool))
and isinstance(reference_value, (str, int, float, bool)):
356 if schema[ref_key] == reference_value:
358 raise ValueError(f
"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
362 if id(schema)
in memo:
365 if isinstance(schema, list):
370 schema.extend(new_list)
371 if isinstance(schema, dict):
374 resolver = registry.resolver(base_uri)
375 keys = [
"$ref",
"$dynamicRef"]
379 for key, value
in schema.items():
384 resolved = resolver.lookup(schema[key])
385 if resolved.contents
is schema:
386 raise ValueError(f
"Circular self reference detected in JSON schema: {schema}")
387 schema[key] = resolved.contents