2This module contains utilities for creating schemas from JSON schemas.
9from urllib.parse
import urldefrag, urljoin
11import jsonschema.validators
12from pydantic
import typing
14from formatron
import schemas
15from referencing
import Registry, Resource
18 __slots__ = (
"_annotation",)
20 def __init__(self, annotation: typing.Type, required:bool):
22 Initialize the field information.
25 annotation: The type annotation of the field.
26 required: Whether the field is required for the schema.
32 def annotation(self) -> typing.Type[typing.Any] | None:
34 Get the type annotation of the field.
41 Check if the field is required for the schema.
49 Create a Schema object from a JSON schema object.
51 This function takes a JSON schema and converts it into a Schema object that can be used
52 for data validation and serialization. Currently, only the following JSON Schema features are supported:
55 - `minLength, maxLength, pattern` keywords for string type
56 - `substringOf` keyword for string type
57 - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type
59 - optionally with `minItems`, `maxItems`, `prefixItems` constraints
60 - `properties` keyword
61 - Due to implementation limitations, we always assume `additionalProperties` is false.
62 - Note that `properties` is optional for object type.
63 - `enum` and `const` keyword
64 - This includes advanced enum types such as array and object.
65 - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored.
68 - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas)
69 - Schema references ($ref and $dynamicRef)
70 - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported.
71 - This includes recursive schema references.
72 - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet.
73 - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed.
74 - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.
77 - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard
78 - The root schema's type must be exactly "object" or "array" or both
79 - The schema must have a valid '$id' and '$schema' fields
80 - All references must be resolvable within the given schema and registry
83 schema: A dictionary representing a valid JSON schema.
84 registry: A Registry object containing additional schema definitions.
85 Defaults to an empty Registry.
88 schemas.schema.Schema: A Schema object representing the input JSON schema.
91 jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema.
92 ValueError: If there are issues with schema references, constraints or requirements.
94 registry = copy.deepcopy(registry)
95 schema = copy.deepcopy(schema)
97 registry = Resource.from_contents(schema) @ registry
98 json_schema_id_to_schema = {}
108 Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667.
110 if not ref.startswith(
"#"):
111 uri, _ = urldefrag(urljoin(uri, ref))
116 root_type = schema[
"type"]
117 if isinstance(root_type, str):
118 if root_type
not in [
"object",
"array"]:
119 raise ValueError(
"Root schema type must be 'object' or 'array'")
120 elif isinstance(root_type, list):
121 if not set(root_type).issubset({
"object",
"array"}):
122 raise ValueError(
"Root schema type must be 'object', 'array', or both")
124 raise ValueError(
"Invalid 'type' specification in root schema")
125 jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
129 Recursively handle all types needed to fully determine the type of a schema
131 schema_id = id(schema)
132 if schema_id
in json_schema_id_to_schema:
133 return json_schema_id_to_schema[schema_id]
134 if isinstance(schema, dict):
135 _inferred_type =
_infer_type(schema, json_schema_id_to_schema)
136 if "properties" in schema:
138 properties = schema[
"properties"]
139 required = schema.get(
"required", [])
140 for _property
in properties:
142 return _inferred_type
145 args = typing.get_args(object_type)
149 return object_type.fields()
151def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
152 allowed_keys = {
"anyOf",
"$id",
"$schema"}
153 assert set(schema.keys()).issubset(allowed_keys),
"Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present"
155 for item
in schema[
"anyOf"]:
157 return typing.Union[tuple(new_list)]
159def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any |
None]:
161 Infer more specific types.
163 if "anyOf" in schema:
165 obtained_type =
_obtain_type(schema, json_schema_id_to_schema)
166 if obtained_type
is None:
167 obtained_type = typing.Union[str, float, int, bool,
None, list[typing.Any]]
169 origin = typing.get_origin(obtained_type)
170 if origin
is typing.Union
or origin
is typing.Literal
or origin
is list:
171 args = typing.get_args(obtained_type)
173 args = [obtained_type]
176 for i, arg
in enumerate(args):
183 elif arg
is int
or arg
is float:
185 if typing.get_origin(obtained_type)
is typing.Union:
186 obtained_type = typing.Union[tuple(args)]
187 elif typing.get_origin(obtained_type)
is typing.Literal:
188 obtained_type = typing.Literal[tuple(args)]
190 obtained_type = args[0]
191 json_schema_id_to_schema[id(schema)] = obtained_type
194def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
195 if "enum" in schema
and "const" in schema:
196 raise ValueError(
"JSON schema cannot contain both 'enum' and 'const' keywords")
197 return tuple(schema[
"enum"])
if "enum" in schema
else schema.get(
"const")
199def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
201 if not isinstance(literal, tuple):
203 literal = frozendict.deepfreeze(literal)
204 literal_type = typing.Literal[literal]
209 Handle string type with metadata such as maxLength, minLength, and pattern.
212 if "maxLength" in schema:
213 metadata[
"max_length"] = schema[
"maxLength"]
214 if "minLength" in schema:
215 metadata[
"min_length"] = schema[
"minLength"]
216 if "pattern" in schema:
217 metadata[
"pattern"] = schema[
"pattern"]
218 if "substringOf" in schema:
219 metadata[
"substring_of"] = schema[
"substringOf"]
227 Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum.
230 if "minimum" in schema:
231 metadata[
"ge"] = schema[
"minimum"]
232 if "maximum" in schema:
233 metadata[
"le"] = schema[
"maximum"]
234 if "exclusiveMinimum" in schema:
235 metadata[
"gt"] = schema[
"exclusiveMinimum"]
236 if "exclusiveMaximum" in schema:
237 metadata[
"lt"] = schema[
"exclusiveMaximum"]
245def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
249 "from_json": classmethod(
lambda cls, x: json.loads(x)),
250 "fields": classmethod(
lambda cls: fields)
253 json_schema_id_to_schema[id(schema)] = new_type
256def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
258 Handle cases where the obtained type is a list
261 if "minItems" in schema:
262 metadata[
"min_length"] = schema[
"minItems"]
263 if "maxItems" in schema:
264 metadata[
"max_length"] = schema[
"maxItems"]
265 if "prefixItems" in schema:
267 item_type = typing.Any
268 if "items" in schema:
269 if schema[
"items"] ==
False:
270 metadata[
"additional_items"] =
False
273 if item_type
is None:
274 item_type = typing.Any
276 if "additional_items" not in metadata:
277 metadata[
"additional_items"] =
True
279 return list[item_type]
282def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|
None]:
284 Directly obtain type information from this schema's type keyword.
286 if "type" not in schema:
289 json_type = schema[
"type"]
290 if json_type ==
"string":
292 elif json_type ==
"number":
293 obtained_type = float
294 elif json_type ==
"integer":
296 elif json_type ==
"boolean":
298 elif json_type ==
"null":
299 obtained_type = type(
None)
300 elif json_type ==
"array":
302 elif json_type ==
"object":
303 if "properties" in schema:
304 obtained_type = object
306 obtained_type = dict[str, typing.Any]
307 elif isinstance(json_type, collections.abc.Sequence):
309 for item
in json_type:
310 new_schema = schema.copy()
311 new_schema[
"type"] = item
312 new_list.append(
_obtain_type(new_schema, json_schema_id_to_schema))
313 obtained_type = typing.Union[tuple(new_list)]
315 raise TypeError(f
"Unsupported type in json schema: {json_type}")
317 if literal
is not None:
318 return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
323 keys = [
"$ref",
"$dynamicRef"]
324 if id(schema)
in memo:
326 if isinstance(schema, list):
330 elif isinstance(schema, dict):
335 for ref_key, ref_value
in schema[key].items():
338 for key, value
in schema.items():
341def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
342 if ref_key
not in schema:
343 schema[ref_key] = reference_value
345 if schema[ref_key]
is reference_value:
347 if isinstance(schema[ref_key], dict)
and isinstance(reference_value, dict):
348 for new_ref_key, new_ref_value
in reference_value.items():
349 _merge_key(schema[ref_key], new_ref_key, new_ref_value)
351 if ref_key
in (
"$id",
"$schema"):
354 if isinstance(schema[ref_key], (str, int, float, bool))
and isinstance(reference_value, (str, int, float, bool)):
355 if schema[ref_key] == reference_value:
357 raise ValueError(f
"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
361 if id(schema)
in memo:
364 if isinstance(schema, list):
369 schema.extend(new_list)
370 if isinstance(schema, dict):
373 resolver = registry.resolver(base_uri)
374 keys = [
"$ref",
"$dynamicRef"]
378 for key, value
in schema.items():
383 resolved = resolver.lookup(schema[key])
384 if resolved.contents
is schema:
385 raise ValueError(f
"Circular self reference detected in JSON schema: {schema}")
386 schema[key] = resolved.contents