2This module contains utilities for creating schemas from JSON schemas. 
    9from urllib.parse 
import urldefrag, urljoin
 
   11import jsonschema.validators
 
   12from pydantic 
import typing
 
   14from formatron 
import schemas
 
   15from referencing 
import Registry, Resource
 
   18    __slots__ = (
"_annotation",)
 
   20    def __init__(self, annotation: typing.Type, required:bool):
 
   22        Initialize the field information. 
   25            annotation: The type annotation of the field. 
   26            required: Whether the field is required for the schema. 
   32    def annotation(self) -> typing.Type[typing.Any] | None:
 
   34        Get the type annotation of the field. 
   41        Check if the field is required for the schema. 
   49    Create a Schema object from a JSON schema object. 
   51    This function takes a JSON schema and converts it into a Schema object that can be used 
   52    for data validation and serialization. Currently, only the following JSON Schema features are supported: 
   55    - `minLength, maxLength, pattern` keywords for string type 
   56    - `substringOf` keyword for string type 
   57    - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type 
   59      - optionally with `minItems`, `maxItems`, `prefixItems` constraints 
   60    - `properties` keyword 
   61      - Due to implementation limitations, we always assume `additionalProperties` is false. 
 
   62      - Note that `properties` is optional for object type. 
   63    - `enum` and `const` keyword 
 
   64      - This includes advanced enum types such as array and object. 
   65      - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored. 
   68      - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas) 
   69    - Schema references ($ref and $dynamicRef) 
   70      - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported. 
   71      - This includes recursive schema references. 
   72        - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet. 
   73      - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed. 
   74        - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.     
   77    - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard 
   78    - The root schema's type must be exactly "object" or "array" or both 
   79    - The schema must have a valid '$id' and '$schema' fields 
   80    - All references must be resolvable within the given schema and registry 
   83        schema: A dictionary representing a valid JSON schema.  
   84        registry: A Registry object containing additional schema definitions.  
   85                                       Defaults to an empty Registry. 
   88        schemas.schema.Schema: A Schema object representing the input JSON schema. 
   91        jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema. 
   92        ValueError: If there are issues with schema references, constraints or requirements. 
   94    registry = copy.deepcopy(registry)
 
   95    schema = copy.deepcopy(schema)
 
   97    registry = Resource.from_contents(schema) @ registry
 
   98    json_schema_id_to_schema = {}
 
  108    Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667. 
  110    if not ref.startswith(
"#"):
 
  111        uri, _ = urldefrag(urljoin(uri, ref))
 
  116        root_type = schema[
"type"]
 
  117        if isinstance(root_type, str):
 
  118            if root_type 
not in [
"object", 
"array"]:
 
  119                raise ValueError(
"Root schema type must be 'object' or 'array'")
 
  120        elif isinstance(root_type, list):
 
  121            if not set(root_type).issubset({
"object", 
"array"}):
 
  122                raise ValueError(
"Root schema type must be 'object', 'array', or both")
 
  124            raise ValueError(
"Invalid 'type' specification in root schema")
 
  125    jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
 
 
  129    Recursively handle all types needed to fully determine the type of a schema 
  131    schema_id = id(schema)
 
  132    if schema_id 
in json_schema_id_to_schema: 
 
  133        return json_schema_id_to_schema[schema_id]
 
  134    if isinstance(schema, dict):
 
  135        _inferred_type = 
_infer_type(schema, json_schema_id_to_schema)
 
  136        if "properties" in schema:
 
 
  138            properties = schema[
"properties"]
 
  139            required = schema.get(
"required", [])
 
  140            for _property 
in properties:
 
  142        return _inferred_type
 
  145    args = typing.get_args(object_type)
 
  149    return object_type.fields()
 
 
  151def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  152    allowed_keys = {
"anyOf", 
"$id", 
"$schema"}
 
  153    assert set(schema.keys()).issubset(allowed_keys), 
"Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present" 
  155    for item 
in schema[
"anyOf"]:
 
  157    return typing.Union[tuple(new_list)]
 
  159def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any | 
None]:
 
  161    Infer more specific types. 
  163    if "anyOf" in schema:
 
  165    obtained_type = 
_obtain_type(schema, json_schema_id_to_schema)
 
  166    if obtained_type 
is None:
 
  167        obtained_type = typing.Union[str, float, int, bool, 
None, list[typing.Any]]
 
  169    origin = typing.get_origin(obtained_type)
 
  170    if origin 
is typing.Union 
or origin 
is typing.Literal 
or origin 
is list:
 
  171        args = typing.get_args(obtained_type)
 
 
  173        args = [obtained_type]
 
  176    for i, arg 
in enumerate(args):
 
 
  183        elif arg 
is int 
or arg 
is float:
 
  185    if typing.get_origin(obtained_type) 
is typing.Union:
 
  186        obtained_type = typing.Union[tuple(args)]
 
  187    elif typing.get_origin(obtained_type) 
is typing.Literal:
 
 
  188        obtained_type = typing.Literal[tuple(args)]
 
  190        obtained_type = args[0]
 
  191    json_schema_id_to_schema[id(schema)] = obtained_type
 
  194def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
 
  195    if "enum" in schema 
and "const" in schema:
 
  196        raise ValueError(
"JSON schema cannot contain both 'enum' and 'const' keywords")
 
  197    return tuple(schema[
"enum"]) 
if "enum" in schema 
else schema.get(
"const")
 
  199def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  201    if not isinstance(literal, tuple):
 
  203    literal = frozendict.deepfreeze(literal)
 
  204    literal_type = typing.Literal[literal]
 
  209    Handle string type with metadata such as maxLength, minLength, and pattern. 
  212    if "maxLength" in schema:
 
  213        metadata[
"max_length"] = schema[
"maxLength"]
 
  214    if "minLength" in schema:
 
  215        metadata[
"min_length"] = schema[
"minLength"]
 
  216    if "pattern" in schema:
 
  217        metadata[
"pattern"] = schema[
"pattern"]
 
  218    if "substringOf" in schema:
 
  219        metadata[
"substring_of"] = schema[
"substringOf"]
 
 
  227    Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum. 
 
  230    if "minimum" in schema:
 
  231        metadata[
"ge"] = schema[
"minimum"]
 
  232    if "maximum" in schema:
 
  233        metadata[
"le"] = schema[
"maximum"]
 
  234    if "exclusiveMinimum" in schema:
 
  235        metadata[
"gt"] = schema[
"exclusiveMinimum"]
 
  236    if "exclusiveMaximum" in schema:
 
  237        metadata[
"lt"] = schema[
"exclusiveMaximum"]
 
 
  245def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  249        "from_json": classmethod(
lambda cls, x: json.loads(x)),
 
  250        "fields": classmethod(
lambda cls: fields)
 
  253    json_schema_id_to_schema[id(schema)] = new_type
 
  256def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
 
  258    Handle cases where the obtained type is a list 
  261    if "minItems" in schema:
 
  262        metadata[
"min_length"] = schema[
"minItems"]
 
  263    if "maxItems" in schema:
 
  264        metadata[
"max_length"] = schema[
"maxItems"]
 
  265    if "prefixItems" in schema:
 
  267    item_type = typing.Any
 
  268    if "items" in schema:
 
  269        if schema[
"items"] == 
False:
 
  270            metadata[
"additional_items"] = 
False 
  273            if item_type 
is None:
 
  274                item_type = typing.Any
 
  276        if "additional_items" not in metadata:
 
  277            metadata[
"additional_items"] = 
True 
  279    return list[item_type]
 
 
  282def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|
None]:
 
  284    Directly obtain type information from this schema's type keyword. 
  286    if "type" not in schema:
 
  289        json_type = schema[
"type"]
 
  290        if json_type == 
"string":
 
 
  292        elif json_type == 
"number":
 
  293            obtained_type = float
 
  294        elif json_type == 
"integer":
 
  296        elif json_type == 
"boolean":
 
  298        elif json_type == 
"null":
 
  299            obtained_type = type(
None)
 
  300        elif json_type == 
"array":
 
  302        elif json_type == 
"object":
 
  303            if "properties" in schema:
 
  304                obtained_type = object
 
  306                obtained_type = dict[str, typing.Any]
 
  307        elif isinstance(json_type, collections.abc.Sequence):
 
  309            for item 
in json_type:
 
  310                new_schema = schema.copy()
 
  311                new_schema[
"type"] = item
 
  312                new_list.append(
_obtain_type(new_schema, json_schema_id_to_schema))
 
  313            obtained_type = typing.Union[tuple(new_list)]
 
  315            raise TypeError(f
"Unsupported type in json schema: {json_type}")
 
  317    if literal 
is not None:
 
  318        return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
 
 
  323    keys = [
"$ref", 
"$dynamicRef"]
 
  324    if id(schema) 
in memo: 
 
  326    if isinstance(schema, list):
 
  330    elif isinstance(schema, dict):
 
  335                for ref_key, ref_value 
in schema[key].items():
 
  338        for key, value 
in schema.items():
 
  341def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
 
  342    if ref_key 
not in schema:
 
  343        schema[ref_key] = reference_value
 
  345    if schema[ref_key] 
is reference_value:
 
  347    if isinstance(schema[ref_key], dict) 
and isinstance(reference_value, dict):
 
  348        for new_ref_key, new_ref_value 
in reference_value.items():
 
  349            _merge_key(schema[ref_key], new_ref_key, new_ref_value)
 
  351    if ref_key 
in (
"$id", 
"$schema"):
 
  354    if isinstance(schema[ref_key], (str, int, float, bool)) 
and isinstance(reference_value, (str, int, float, bool)):
 
  355        if schema[ref_key] == reference_value:
 
  357    raise ValueError(f
"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
 
 
  361    if id(schema) 
in memo:
 
  364    if isinstance(schema, list):
 
  369        schema.extend(new_list)
 
  370    if isinstance(schema, dict):
 
  373        resolver = registry.resolver(base_uri)
 
  374        keys = [
"$ref", 
"$dynamicRef"]
 
  378        for key, value 
in schema.items():
 
 
  383    resolved = resolver.lookup(schema[key])
 
  384    if resolved.contents 
is schema:
 
  385        raise ValueError(f
"Circular self reference detected in JSON schema: {schema}")
 
  386    schema[key] = resolved.contents