2This module contains utilities for creating schemas from JSON schemas.
10from urllib.parse
import urldefrag, urljoin
12import jsonschema.validators
13from pydantic
import typing
15from formatron
import schemas
16from referencing
import Registry, Resource
19 __slots__ = (
"_annotation",)
21 def __init__(self, annotation: typing.Type, required:bool):
23 Initialize the field information.
26 annotation: The type annotation of the field.
32 def annotation(self) -> typing.Type[typing.Any] | None:
34 Get the type annotation of the field.
41 Check if the field is required for the schema.
49 Create a Schema object from a JSON schema object.
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
56 - `properties` keyword
57 - Due to implementation limitations, we always assume `additionalProperties` is false.
58 - `enum` and `const` keyword
59 - This includes advanced enum types such as array and object.
60 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
62 - Schema references ($ref and $dynamicRef)
63 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
64 - This includes recursive schema references.
65 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
66 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
69 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
70 - The root schema's type must be exactly "object"
71 - The schema must have a valid '$id' and '$schema' fields
72 - All references must be resolvable within the given schema and registry
75 schema: A dictionary representing a valid JSON schema.
76 registry: A Registry object containing additional schema definitions.
77 Defaults to an empty Registry.
80 schemas.schema.Schema: A Schema object representing the input JSON schema.
83 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
84 ValueError: If there are issues with schema references, constraints or requirements.
86 registry = copy.deepcopy(registry)
87 schema = copy.deepcopy(schema)
89 registry = Resource.from_contents(schema) @ registry
90 json_schema_id_to_schema = {}
100 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
102 if not ref.startswith(
"#"):
103 uri, _ = urldefrag(urljoin(uri, ref))
107 if "type" not in schema
or schema[
"type"] !=
"object":
108 raise ValueError(
"Root schema must have type 'object'")
109 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
113 Recursively handle all types needed to fully determine the type of a schema
115 schema_id = id(schema)
116 if schema_id
in json_schema_id_to_schema:
117 return json_schema_id_to_schema[schema_id]
118 if isinstance(schema, dict):
119 _inferred_type =
_infer_type(schema, json_schema_id_to_schema)
120 if "properties" in schema:
122 properties = schema[
"properties"]
123 required = schema.get(
"required", [])
124 for _property
in properties:
126 return _inferred_type
129 args = typing.get_args(object_type)
133 return object_type.fields()
135def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any |
None]:
137 Infer more specific types.
139 obtained_type =
_obtain_type(schema, json_schema_id_to_schema)
140 args = typing.get_args(obtained_type)
141 if obtained_type
is None or obtained_type
is object
or object
in args:
143 if obtained_type
is typing.List
and "items" in schema:
145 obtained_type = typing.List[item_type]
146 json_schema_id_to_schema[id(schema)] = obtained_type
149def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
150 if "enum" in schema
and "const" in schema:
151 raise ValueError(
"JSON schema cannot contain both 'enum' and 'const' keywords")
152 return tuple(schema[
"enum"])
if "enum" in schema
else schema.get(
"const")
154def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
156 if not isinstance(literal, tuple):
158 literal = frozendict.deepfreeze(literal)
159 literal_type = typing.Literal[literal]
162def _create_custom_type(obtained_type: typing.Type|
None, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
166 "from_json": classmethod(
lambda cls, x: json.loads(x)),
167 "fields": classmethod(
lambda cls: fields)
171 if obtained_type
is None:
172 json_schema_id_to_schema[id(schema)] = typing.Union[str, float, int, bool,
None, typing.List[typing.Any], new_type]
173 elif object
in typing.get_args(obtained_type):
174 json_schema_id_to_schema[id(schema)] = typing.Union[tuple(item
for item
in typing.get_args(obtained_type)
if item
is not object)+(new_type,)]
176 json_schema_id_to_schema[id(schema)] = new_type
177 return json_schema_id_to_schema[id(schema)]
180def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|
None]:
182 Directly obtain type information from this schema level.
185 if "type" not in schema:
188 json_type = schema[
"type"]
189 if json_type ==
"string":
191 elif json_type ==
"number":
192 obtained_type = float
193 elif json_type ==
"integer":
195 elif json_type ==
"boolean":
197 elif json_type ==
"null":
198 obtained_type = type(
None)
199 elif json_type ==
"array":
200 obtained_type = typing.List
201 elif json_type ==
"object":
202 obtained_type = object
203 elif isinstance(json_type, collections.abc.Sequence):
205 for item
in json_type:
206 new_schema = schema.copy()
207 new_schema[
"type"] = item
208 new_list.append(
_obtain_type(new_schema, json_schema_id_to_schema))
209 obtained_type = typing.Union[tuple(new_list)]
211 raise TypeError(f
"Unsupported type in json schema: {json_type}")
213 if literal
is not None:
214 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
222 keys = [
"$ref",
"$dynamicRef"]
223 if id(schema)
in memo:
225 if isinstance(schema, list):
229 elif isinstance(schema, dict):
234 for ref_key, ref_value
in schema[key].items():
237 for key, value
in schema.items():
240def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
241 if ref_key
not in schema:
242 schema[ref_key] = reference_value
244 if schema[ref_key]
is reference_value:
246 if isinstance(schema[ref_key], dict)
and isinstance(reference_value, dict):
247 for new_ref_key, new_ref_value
in reference_value.items():
248 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
250 if ref_key
in (
"$id",
"$schema"):
253 if isinstance(schema[ref_key], (str, int, float, bool))
and isinstance(reference_value, (str, int, float, bool)):
254 if schema[ref_key] == reference_value:
256 raise ValueError(f
"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
260 if id(schema)
in memo:
263 if isinstance(schema, list):
268 schema.extend(new_list)
269 if isinstance(schema, dict):
272 resolver = registry.resolver(base_uri)
273 keys = [
"$ref",
"$dynamicRef"]
277 for key, value
in schema.items():
282 resolved = resolver.lookup(schema[key])
283 if resolved.contents
is schema:
284 raise ValueError(f
"Circular self reference detected in JSON schema: {schema}")
285 schema[key] = resolved.contents