| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 | from collections.abc import Mappingfrom typing import Anyfrom core.file.file_obj import FileVarfrom .segments import (    ArrayAnySegment,    FileSegment,    FloatSegment,    IntegerSegment,    NoneSegment,    ObjectSegment,    Segment,    StringSegment,)from .types import SegmentTypefrom .variables import (    ArrayFileVariable,    ArrayNumberVariable,    ArrayObjectVariable,    ArrayStringVariable,    FileVariable,    FloatVariable,    IntegerVariable,    ObjectVariable,    SecretVariable,    StringVariable,    Variable,)def build_variable_from_mapping(m: Mapping[str, Any], /) -> Variable:    if (value_type := m.get('value_type')) is None:        raise ValueError('missing value type')    if not m.get('name'):        raise ValueError('missing name')    if (value := m.get('value')) is None:        raise ValueError('missing value')    match value_type:        case SegmentType.STRING:            return StringVariable.model_validate(m)        case SegmentType.SECRET:            return SecretVariable.model_validate(m)        case SegmentType.NUMBER if isinstance(value, int):            return IntegerVariable.model_validate(m)        case SegmentType.NUMBER if isinstance(value, float):            return FloatVariable.model_validate(m)        case SegmentType.NUMBER if not isinstance(value, float | int):            raise ValueError(f'invalid number value {value}')        case SegmentType.FILE:            return FileVariable.model_validate(m)        case SegmentType.OBJECT if isinstance(value, dict):            return ObjectVariable.model_validate(                {**m, 'value': {k: build_variable_from_mapping(v) for k, v in value.items()}}            )        case SegmentType.ARRAY_STRING if isinstance(value, list):            return ArrayStringVariable.model_validate({**m, 'value': [build_variable_from_mapping(v) for v in value]})        case SegmentType.ARRAY_NUMBER if isinstance(value, list):            return ArrayNumberVariable.model_validate({**m, 'value': [build_variable_from_mapping(v) for v in value]})        case SegmentType.ARRAY_OBJECT if isinstance(value, list):            return ArrayObjectVariable.model_validate({**m, 'value': [build_variable_from_mapping(v) for v in value]})        case SegmentType.ARRAY_FILE if isinstance(value, list):            return ArrayFileVariable.model_validate({**m, 'value': [build_variable_from_mapping(v) for v in value]})    raise ValueError(f'not supported value type {value_type}')def build_segment(value: Any, /) -> Segment:    if value is None:        return NoneSegment()    if isinstance(value, str):        return StringSegment(value=value)    if isinstance(value, int):        return IntegerSegment(value=value)    if isinstance(value, float):        return FloatSegment(value=value)    if isinstance(value, dict):        # TODO: Limit the depth of the object        obj = {k: build_segment(v) for k, v in value.items()}        return ObjectSegment(value=obj)    if isinstance(value, list):        # TODO: Limit the depth of the array        elements = [build_segment(v) for v in value]        return ArrayAnySegment(value=elements)    if isinstance(value, FileVar):        return FileSegment(value=value)    raise ValueError(f'not supported value {value}')
 |