2This module contains utilities for creating schemas from JSON schemas. 
    9from urllib.parse 
import urldefrag, urljoin
 
   11import jsonschema.validators
 
   12from pydantic 
import typing
 
   14from formatron 
import schemas
 
   15from referencing 
import Registry, Resource
 
   18    __slots__ = (
"_annotation",)
 
   20    def __init__(self, annotation: typing.Type, required:bool):
 
   22        Initialize the field information. 
   25            annotation: The type annotation of the field. 
   26            required: Whether the field is required for the schema. 
   32    def annotation(self) -> typing.Type[typing.Any] | None:
 
   34        Get the type annotation of the field. 
   41        Check if the field is required for the schema. 
   49    Create a Schema object from a JSON schema object. 
   51    This function takes a JSON schema and converts it into a Schema object that can be used 
   52    for data validation and serialization. Currently, only the following JSON Schema features are supported: 
   55    - `minLength, maxLength, pattern` keywords for string type 
   56    - `substringOf` keyword for string type 
   57    - `minimum, maximum, exclusiveMinimum, exclusiveMaximum` keywords for number type and integer type 
   59      - optionally with `minItems`, `maxItems`, `prefixItems` constraints 
   60    - `properties` keyword 
   61      - Due to implementation limitations, we always assume `additionalProperties` is false. 
 
   62      - Note that `properties` is optional for object type. 
   63    - `enum` and `const` keyword 
 
   64      - This includes advanced enum types such as array and object. 
   65      - Note that if both `enum`(or `const`) and `type` are present, `type` will be ignored. 
   68      - This currently does not support factoring out common parts of the subschemas(like https://json-schema.org/understanding-json-schema/reference/combining#factoringschemas) 
   69    - Schema references ($ref and $dynamicRef) 
   70      - Hence, all types of schema identifications(`$defs`, `$id`, `$anchor`, `$dynamicAnchor`) are supported. 
   71      - This includes recursive schema references. 
   72        - Recursive array references(like \[\[\[\[...\]\]\]\]) are not supported yet. 
   73      - Due to implementation limitations, duplicate constraint keywords in both referrers and referents are not allowed. 
   74        - This bound is expected to be loosened in future versions of Formatron where "easily mergeable" constraint keywords will be merged.     
   77    - The input schema must be a valid JSON Schema according to the JSON Schema Draft 2020-12 standard 
   78    - The root schema's type must be exactly "object" or "array" or both 
   79    - The schema must have a valid '$id' and '$schema' fields 
   80    - All references must be resolvable within the given schema and registry 
   83        schema: A dictionary representing a valid JSON schema.  
   84        registry: A Registry object containing additional schema definitions.  
   85                                       Defaults to an empty Registry. 
   88        schemas.schema.Schema: A Schema object representing the input JSON schema. 
   91        jsonschema.exceptions.ValidationError: If the input schema is not a valid JSON Schema. 
   92        ValueError: If there are issues with schema references, constraints or requirements. 
   94    registry = copy.deepcopy(registry)
 
   95    schema = copy.deepcopy(schema)
 
   97    registry = Resource.from_contents(schema) @ registry
 
   98    json_schema_id_to_schema = {}
 
  108    Adapted from https://github.com/python-jsonschema/referencing/blob/main/referencing/_core.py#L667. 
  110    if not ref.startswith(
"#"):
 
  111        uri, _ = urldefrag(urljoin(uri, ref))
 
  116        root_type = schema[
"type"]
 
  117        if isinstance(root_type, str):
 
  118            if root_type 
not in [
"object", 
"array"]:
 
  119                raise ValueError(
"Root schema type must be 'object' or 'array'")
 
  120        elif isinstance(root_type, list):
 
  121            if not set(root_type).issubset({
"object", 
"array"}):
 
  122                raise ValueError(
"Root schema type must be 'object', 'array', or both")
 
  124            raise ValueError(
"Invalid 'type' specification in root schema")
 
  125    jsonschema.validate(instance=schema, schema=jsonschema.validators.Draft202012Validator.META_SCHEMA)
 
 
  129    Recursively handle all types needed to fully determine the type of a schema 
  131    schema_id = id(schema)
 
  132    if schema_id 
in json_schema_id_to_schema: 
 
  133        return json_schema_id_to_schema[schema_id]
 
  134    if isinstance(schema, dict):
 
  135        _inferred_type = 
_infer_type(schema, json_schema_id_to_schema)
 
  136        if "properties" in schema:
 
 
  138            properties = schema[
"properties"]
 
  139            required = schema.get(
"required", [])
 
  140            for _property 
in properties:
 
  142        return _inferred_type
 
  145    args = typing.get_args(object_type)
 
  147        arg = typing.get_origin(arg) 
or arg
 
  150    return object_type.fields()
 
 
  152def _handle_anyOf(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  153    allowed_keys = {
"anyOf", 
"$id", 
"$schema"}
 
  154    assert set(schema.keys()).issubset(allowed_keys), 
"Only 'anyOf', '$id', and '$schema' are allowed when 'anyOf' is present" 
  156    for item 
in schema[
"anyOf"]:
 
  158    return typing.Union[tuple(new_list)]
 
  160def _infer_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type[typing.Any | 
None]:
 
  162    Infer more specific types. 
  164    if "anyOf" in schema:
 
  166    obtained_type = 
_obtain_type(schema, json_schema_id_to_schema)
 
  167    if obtained_type 
is None:
 
  168        obtained_type = typing.Union[str, float, int, bool, 
None, list[typing.Any]]
 
  170    origin = typing.get_origin(obtained_type)
 
  171    if origin 
is typing.Union 
or origin 
is typing.Literal 
or origin 
is list:
 
  172        args = typing.get_args(obtained_type)
 
 
  174        args = [obtained_type]
 
  177    for i, arg 
in enumerate(args):
 
 
  184        elif arg 
is int 
or arg 
is float:
 
  186    if typing.get_origin(obtained_type) 
is typing.Union:
 
  187        obtained_type = typing.Union[tuple(args)]
 
  188    elif typing.get_origin(obtained_type) 
is typing.Literal:
 
 
  189        obtained_type = typing.Literal[tuple(args)]
 
  191        obtained_type = args[0]
 
  192    json_schema_id_to_schema[id(schema)] = obtained_type
 
  195def _get_literal(schema: dict[str, typing.Any]) -> typing.Any:
 
  196    if "enum" in schema 
and "const" in schema:
 
  197        raise ValueError(
"JSON schema cannot contain both 'enum' and 'const' keywords")
 
  198    return tuple(schema[
"enum"]) 
if "enum" in schema 
else schema.get(
"const")
 
  200def _handle_literal(literal: typing.Any, obtained_type: typing.Type, schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  202    if not isinstance(literal, tuple):
 
  204    literal = frozendict.deepfreeze(literal)
 
  205    literal_type = typing.Literal[literal]
 
  210    Handle string type with metadata such as maxLength, minLength, and pattern. 
  213    if "maxLength" in schema:
 
  214        metadata[
"max_length"] = schema[
"maxLength"]
 
  215    if "minLength" in schema:
 
  216        metadata[
"min_length"] = schema[
"minLength"]
 
  217    if "pattern" in schema:
 
  218        metadata[
"pattern"] = schema[
"pattern"]
 
  219    if "substringOf" in schema:
 
  220        metadata[
"substring_of"] = schema[
"substringOf"]
 
 
  228    Handle numeric types (int or float) with metadata such as minimum, maximum, exclusiveMinimum, and exclusiveMaximum. 
 
  231    if "minimum" in schema:
 
  232        metadata[
"ge"] = schema[
"minimum"]
 
  233    if "maximum" in schema:
 
  234        metadata[
"le"] = schema[
"maximum"]
 
  235    if "exclusiveMinimum" in schema:
 
  236        metadata[
"gt"] = schema[
"exclusiveMinimum"]
 
  237    if "exclusiveMaximum" in schema:
 
  238        metadata[
"lt"] = schema[
"exclusiveMaximum"]
 
 
  246def _create_custom_type(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
  250        "from_json": classmethod(
lambda cls, x: json.loads(x)),
 
  251        "fields": classmethod(
lambda cls: fields)
 
  254    json_schema_id_to_schema[id(schema)] = new_type
 
  257def _handle_list_metadata(schema: dict[str, typing.Any], json_schema_id_to_schema: dict[int, typing.Type]) -> typing.Type:
 
 
  259    Handle cases where the obtained type is a list 
  262    if "minItems" in schema:
 
  263        metadata[
"min_length"] = schema[
"minItems"]
 
  264    if "maxItems" in schema:
 
  265        metadata[
"max_length"] = schema[
"maxItems"]
 
  266    if "prefixItems" in schema:
 
  268    item_type = typing.Any
 
  269    if "items" in schema:
 
  270        if schema[
"items"] == 
False:
 
  271            metadata[
"additional_items"] = 
False 
  274            if item_type 
is None:
 
  275                item_type = typing.Any
 
  277        if "additional_items" not in metadata:
 
  278            metadata[
"additional_items"] = 
True 
  280    return list[item_type]
 
 
  283def _obtain_type(schema: dict[str, typing.Any], json_schema_id_to_schema:dict[int, typing.Type]) -> typing.Type[typing.Any|
None]:
 
  285    Directly obtain type information from this schema's type keyword. 
  287    if "type" not in schema:
 
  290        json_type = schema[
"type"]
 
  291        if json_type == 
"string":
 
 
  293        elif json_type == 
"number":
 
  294            obtained_type = float
 
  295        elif json_type == 
"integer":
 
  297        elif json_type == 
"boolean":
 
  299        elif json_type == 
"null":
 
  300            obtained_type = type(
None)
 
  301        elif json_type == 
"array":
 
  303        elif json_type == 
"object":
 
  304            if "properties" in schema:
 
  305                obtained_type = object
 
  307                obtained_type = dict[str, typing.Any]
 
  308        elif isinstance(json_type, collections.abc.Sequence):
 
  310            for item 
in json_type:
 
  311                new_schema = schema.copy()
 
  312                new_schema[
"type"] = item
 
  313                new_list.append(
_obtain_type(new_schema, json_schema_id_to_schema))
 
  314            obtained_type = typing.Union[tuple(new_list)]
 
  316            raise TypeError(f
"Unsupported type in json schema: {json_type}")
 
  318    if literal 
is not None:
 
  319        return _handle_literal(literal, obtained_type, schema, json_schema_id_to_schema)
 
 
  324    keys = [
"$ref", 
"$dynamicRef"]
 
  325    if id(schema) 
in memo: 
 
  327    if isinstance(schema, list):
 
  331    elif isinstance(schema, dict):
 
  336                for ref_key, ref_value 
in schema[key].items():
 
  339        for key, value 
in schema.items():
 
  342def _merge_key(schema:dict[str, typing.Any], ref_key:str, reference_value:typing.Any):
 
  343    if ref_key 
not in schema:
 
  344        schema[ref_key] = reference_value
 
  346    if schema[ref_key] 
is reference_value:
 
  348    if isinstance(schema[ref_key], dict) 
and isinstance(reference_value, dict):
 
  349        for new_ref_key, new_ref_value 
in reference_value.items():
 
  350            _merge_key(schema[ref_key], new_ref_key, new_ref_value)
 
  352    if ref_key 
in (
"$id", 
"$schema"):
 
  355    if isinstance(schema[ref_key], (str, int, float, bool)) 
and isinstance(reference_value, (str, int, float, bool)):
 
  356        if schema[ref_key] == reference_value:
 
  358    raise ValueError(f
"Duplicate keys in schema referenced by {ref_key} in JSON schema: {schema} is not supported")
 
 
  362    if id(schema) 
in memo:
 
  365    if isinstance(schema, list):
 
  370        schema.extend(new_list)
 
  371    if isinstance(schema, dict):
 
  374        resolver = registry.resolver(base_uri)
 
  375        keys = [
"$ref", 
"$dynamicRef"]
 
  379        for key, value 
in schema.items():
 
 
  384    resolved = resolver.lookup(schema[key])
 
  385    if resolved.contents 
is schema:
 
  386        raise ValueError(f
"Circular self reference detected in JSON schema: {schema}")
 
  387    schema[key] = resolved.contents