2025-02-13 14:52:26 +01:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
import abc
|
2025-02-13 14:52:26 +01:00
|
|
|
import typing as t
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 15:02:59 +01:00
|
|
|
import sqlglot
|
2025-02-13 14:47:39 +01:00
|
|
|
from sqlglot import expressions as exp
|
2025-02-13 14:52:26 +01:00
|
|
|
from sqlglot.errors import SchemaError
|
2025-02-13 14:53:43 +01:00
|
|
|
from sqlglot.helper import dict_depth
|
2025-02-13 14:52:26 +01:00
|
|
|
from sqlglot.trie import in_trie, new_trie
|
|
|
|
|
|
|
|
if t.TYPE_CHECKING:
|
|
|
|
from sqlglot.dataframe.sql.types import StructType
|
|
|
|
|
|
|
|
ColumnMapping = t.Union[t.Dict, str, StructType, t.List]
|
|
|
|
|
|
|
|
TABLE_ARGS = ("this", "db", "catalog")
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:53:43 +01:00
|
|
|
T = t.TypeVar("T")
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
class Schema(abc.ABC):
|
|
|
|
"""Abstract base class for database schemas"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
2025-02-13 14:52:26 +01:00
|
|
|
def add_table(
|
|
|
|
self, table: exp.Table | str, column_mapping: t.Optional[ColumnMapping] = None
|
|
|
|
) -> None:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
2025-02-13 14:52:26 +01:00
|
|
|
Register or update a table. Some implementing classes may require column information to also be provided.
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
table: table expression instance or string representing the table.
|
|
|
|
column_mapping: a column mapping that describes the structure of the table.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
2025-02-13 14:52:26 +01:00
|
|
|
def column_names(self, table: exp.Table | str, only_visible: bool = False) -> t.List[str]:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
Get the column names for a table.
|
2025-02-13 14:52:26 +01:00
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
table: the `Table` expression instance.
|
|
|
|
only_visible: whether to include invisible columns.
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
Returns:
|
2025-02-13 14:52:26 +01:00
|
|
|
The list of column names.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
2025-02-13 14:57:38 +01:00
|
|
|
def get_column_type(self, table: exp.Table | str, column: exp.Column) -> exp.DataType:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
2025-02-13 14:52:26 +01:00
|
|
|
Get the :class:`sqlglot.exp.DataType` type of a column in the schema.
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
table: the source table.
|
|
|
|
column: the target column.
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
Returns:
|
2025-02-13 14:52:26 +01:00
|
|
|
The resulting column type.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
|
2025-02-13 14:53:43 +01:00
|
|
|
@property
|
|
|
|
def supported_table_args(self) -> t.Tuple[str, ...]:
|
|
|
|
"""
|
|
|
|
Table arguments this schema support, e.g. `("this", "db", "catalog")`
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
|
|
class AbstractMappingSchema(t.Generic[T]):
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
mapping: dict | None = None,
|
|
|
|
) -> None:
|
|
|
|
self.mapping = mapping or {}
|
|
|
|
self.mapping_trie = self._build_trie(self.mapping)
|
|
|
|
self._supported_table_args: t.Tuple[str, ...] = tuple()
|
|
|
|
|
|
|
|
def _build_trie(self, schema: t.Dict) -> t.Dict:
|
|
|
|
return new_trie(tuple(reversed(t)) for t in flatten_schema(schema, depth=self._depth()))
|
|
|
|
|
|
|
|
def _depth(self) -> int:
|
|
|
|
return dict_depth(self.mapping)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def supported_table_args(self) -> t.Tuple[str, ...]:
|
|
|
|
if not self._supported_table_args and self.mapping:
|
|
|
|
depth = self._depth()
|
|
|
|
|
|
|
|
if not depth: # None
|
|
|
|
self._supported_table_args = tuple()
|
|
|
|
elif 1 <= depth <= 3:
|
|
|
|
self._supported_table_args = TABLE_ARGS[:depth]
|
|
|
|
else:
|
|
|
|
raise SchemaError(f"Invalid mapping shape. Depth: {depth}")
|
|
|
|
|
|
|
|
return self._supported_table_args
|
|
|
|
|
|
|
|
def table_parts(self, table: exp.Table) -> t.List[str]:
|
|
|
|
if isinstance(table.this, exp.ReadCSV):
|
|
|
|
return [table.this.name]
|
|
|
|
return [table.text(part) for part in TABLE_ARGS if table.text(part)]
|
|
|
|
|
|
|
|
def find(
|
|
|
|
self, table: exp.Table, trie: t.Optional[t.Dict] = None, raise_on_missing: bool = True
|
|
|
|
) -> t.Optional[T]:
|
|
|
|
parts = self.table_parts(table)[0 : len(self.supported_table_args)]
|
|
|
|
value, trie = in_trie(self.mapping_trie if trie is None else trie, parts)
|
|
|
|
|
|
|
|
if value == 0:
|
2025-02-13 15:06:33 +01:00
|
|
|
return None
|
2025-02-13 14:53:43 +01:00
|
|
|
elif value == 1:
|
|
|
|
possibilities = flatten_schema(trie, depth=dict_depth(trie) - 1)
|
|
|
|
if len(possibilities) == 1:
|
|
|
|
parts.extend(possibilities[0])
|
|
|
|
else:
|
|
|
|
message = ", ".join(".".join(parts) for parts in possibilities)
|
|
|
|
if raise_on_missing:
|
|
|
|
raise SchemaError(f"Ambiguous mapping for {table}: {message}.")
|
|
|
|
return None
|
|
|
|
return self._nested_get(parts, raise_on_missing=raise_on_missing)
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:53:43 +01:00
|
|
|
def _nested_get(
|
|
|
|
self, parts: t.Sequence[str], d: t.Optional[t.Dict] = None, raise_on_missing=True
|
|
|
|
) -> t.Optional[t.Any]:
|
|
|
|
return _nested_get(
|
|
|
|
d or self.mapping,
|
|
|
|
*zip(self.supported_table_args, reversed(parts)),
|
|
|
|
raise_on_missing=raise_on_missing,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MappingSchema(AbstractMappingSchema[t.Dict[str, str]], Schema):
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
Schema based on a nested mapping.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
schema (dict): Mapping in one of the following forms:
|
|
|
|
1. {table: {col: type}}
|
|
|
|
2. {db: {table: {col: type}}}
|
|
|
|
3. {catalog: {db: {table: {col: type}}}}
|
|
|
|
4. None - Tables will be added later
|
|
|
|
visible (dict): Optional mapping of which columns in the schema are visible. If not provided, all columns
|
|
|
|
are assumed to be visible. The nesting should mirror that of the schema:
|
|
|
|
1. {table: set(*cols)}}
|
|
|
|
2. {db: {table: set(*cols)}}}
|
|
|
|
3. {catalog: {db: {table: set(*cols)}}}}
|
|
|
|
dialect (str): The dialect to be used for custom type mappings.
|
|
|
|
"""
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
schema: t.Optional[t.Dict] = None,
|
|
|
|
visible: t.Optional[t.Dict] = None,
|
|
|
|
dialect: t.Optional[str] = None,
|
|
|
|
) -> None:
|
2025-02-13 14:47:39 +01:00
|
|
|
self.dialect = dialect
|
2025-02-13 15:02:59 +01:00
|
|
|
self.visible = visible or {}
|
2025-02-13 15:01:11 +01:00
|
|
|
self._type_mapping_cache: t.Dict[str, exp.DataType] = {}
|
2025-02-13 15:02:59 +01:00
|
|
|
super().__init__(self._normalize(schema or {}))
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
@classmethod
|
2025-02-13 14:52:26 +01:00
|
|
|
def from_mapping_schema(cls, mapping_schema: MappingSchema) -> MappingSchema:
|
|
|
|
return MappingSchema(
|
2025-02-13 14:53:43 +01:00
|
|
|
schema=mapping_schema.mapping,
|
2025-02-13 14:52:26 +01:00
|
|
|
visible=mapping_schema.visible,
|
|
|
|
dialect=mapping_schema.dialect,
|
|
|
|
)
|
|
|
|
|
|
|
|
def copy(self, **kwargs) -> MappingSchema:
|
2025-02-13 14:47:39 +01:00
|
|
|
return MappingSchema(
|
2025-02-13 14:52:26 +01:00
|
|
|
**{ # type: ignore
|
2025-02-13 14:53:43 +01:00
|
|
|
"schema": self.mapping.copy(),
|
2025-02-13 14:52:26 +01:00
|
|
|
"visible": self.visible.copy(),
|
|
|
|
"dialect": self.dialect,
|
|
|
|
**kwargs,
|
|
|
|
}
|
2025-02-13 14:47:39 +01:00
|
|
|
)
|
|
|
|
|
2025-02-13 15:02:59 +01:00
|
|
|
def _normalize(self, schema: t.Dict) -> t.Dict:
|
|
|
|
"""
|
|
|
|
Converts all identifiers in the schema into lowercase, unless they're quoted.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
schema: the schema to normalize.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The normalized schema mapping.
|
|
|
|
"""
|
|
|
|
flattened_schema = flatten_schema(schema, depth=dict_depth(schema) - 1)
|
|
|
|
|
|
|
|
normalized_mapping: t.Dict = {}
|
|
|
|
for keys in flattened_schema:
|
|
|
|
columns = _nested_get(schema, *zip(keys, keys))
|
|
|
|
assert columns is not None
|
|
|
|
|
|
|
|
normalized_keys = [self._normalize_name(key) for key in keys]
|
|
|
|
for column_name, column_type in columns.items():
|
|
|
|
_nested_set(
|
|
|
|
normalized_mapping,
|
|
|
|
normalized_keys + [self._normalize_name(column_name)],
|
|
|
|
column_type,
|
|
|
|
)
|
|
|
|
|
|
|
|
return normalized_mapping
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def add_table(
|
|
|
|
self, table: exp.Table | str, column_mapping: t.Optional[ColumnMapping] = None
|
|
|
|
) -> None:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
Register or update a table. Updates are only performed if a new column mapping is provided.
|
|
|
|
|
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
table: the `Table` expression instance or string representing the table.
|
|
|
|
column_mapping: a column mapping that describes the structure of the table.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
2025-02-13 14:52:26 +01:00
|
|
|
table_ = self._ensure_table(table)
|
2025-02-13 14:47:39 +01:00
|
|
|
column_mapping = ensure_column_mapping(column_mapping)
|
2025-02-13 14:53:43 +01:00
|
|
|
schema = self.find(table_, raise_on_missing=False)
|
2025-02-13 14:52:26 +01:00
|
|
|
|
|
|
|
if schema and not column_mapping:
|
2025-02-13 14:47:39 +01:00
|
|
|
return
|
2025-02-13 14:52:26 +01:00
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
_nested_set(
|
2025-02-13 14:53:43 +01:00
|
|
|
self.mapping,
|
2025-02-13 14:52:26 +01:00
|
|
|
list(reversed(self.table_parts(table_))),
|
2025-02-13 14:47:39 +01:00
|
|
|
column_mapping,
|
|
|
|
)
|
2025-02-13 14:53:43 +01:00
|
|
|
self.mapping_trie = self._build_trie(self.mapping)
|
|
|
|
|
2025-02-13 15:02:59 +01:00
|
|
|
def _normalize_name(self, name: str) -> str:
|
|
|
|
try:
|
|
|
|
identifier: t.Optional[exp.Expression] = sqlglot.parse_one(
|
|
|
|
name, read=self.dialect, into=exp.Identifier
|
|
|
|
)
|
|
|
|
except:
|
|
|
|
identifier = exp.to_identifier(name)
|
|
|
|
assert isinstance(identifier, exp.Identifier)
|
|
|
|
|
|
|
|
if identifier.quoted:
|
|
|
|
return identifier.name
|
|
|
|
return identifier.name.lower()
|
|
|
|
|
2025-02-13 14:53:43 +01:00
|
|
|
def _depth(self) -> int:
|
|
|
|
# The columns themselves are a mapping, but we don't want to include those
|
|
|
|
return super()._depth() - 1
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def _ensure_table(self, table: exp.Table | str) -> exp.Table:
|
|
|
|
table_ = exp.to_table(table)
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
if not table_:
|
|
|
|
raise SchemaError(f"Not a valid table '{table}'")
|
|
|
|
|
|
|
|
return table_
|
|
|
|
|
|
|
|
def column_names(self, table: exp.Table | str, only_visible: bool = False) -> t.List[str]:
|
|
|
|
table_ = self._ensure_table(table)
|
2025-02-13 14:53:43 +01:00
|
|
|
schema = self.find(table_)
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
if schema is None:
|
2025-02-13 15:06:33 +01:00
|
|
|
return []
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
if not only_visible or not self.visible:
|
2025-02-13 14:52:26 +01:00
|
|
|
return list(schema)
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
visible = self._nested_get(self.table_parts(table_), self.visible)
|
|
|
|
return [col for col in schema if col in visible] # type: ignore
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:57:38 +01:00
|
|
|
def get_column_type(self, table: exp.Table | str, column: exp.Column | str) -> exp.DataType:
|
2025-02-13 14:52:26 +01:00
|
|
|
column_name = column if isinstance(column, str) else column.name
|
|
|
|
table_ = exp.to_table(table)
|
|
|
|
if table_:
|
2025-02-13 14:57:38 +01:00
|
|
|
table_schema = self.find(table_, raise_on_missing=False)
|
|
|
|
if table_schema:
|
2025-02-13 14:59:33 +01:00
|
|
|
column_type = table_schema.get(column_name)
|
|
|
|
|
|
|
|
if isinstance(column_type, exp.DataType):
|
|
|
|
return column_type
|
|
|
|
elif isinstance(column_type, str):
|
|
|
|
return self._to_data_type(column_type.upper())
|
|
|
|
raise SchemaError(f"Unknown column type '{column_type}'")
|
2025-02-13 14:57:38 +01:00
|
|
|
return exp.DataType(this=exp.DataType.Type.UNKNOWN)
|
2025-02-13 14:52:26 +01:00
|
|
|
raise SchemaError(f"Could not convert table '{table}'")
|
2025-02-13 14:47:39 +01:00
|
|
|
|
2025-02-13 14:59:33 +01:00
|
|
|
def _to_data_type(self, schema_type: str) -> exp.DataType:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
2025-02-13 14:52:26 +01:00
|
|
|
Convert a type represented as a string to the corresponding :class:`sqlglot.exp.DataType` object.
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
schema_type: the type we want to convert.
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
Returns:
|
2025-02-13 14:52:26 +01:00
|
|
|
The resulting expression type.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
if schema_type not in self._type_mapping_cache:
|
|
|
|
try:
|
2025-02-13 14:52:26 +01:00
|
|
|
expression = exp.maybe_parse(schema_type, into=exp.DataType, dialect=self.dialect)
|
|
|
|
if expression is None:
|
|
|
|
raise ValueError(f"Could not parse {schema_type}")
|
2025-02-13 14:57:38 +01:00
|
|
|
self._type_mapping_cache[schema_type] = expression # type: ignore
|
2025-02-13 14:47:39 +01:00
|
|
|
except AttributeError:
|
2025-02-13 14:52:26 +01:00
|
|
|
raise SchemaError(f"Failed to convert type {schema_type}")
|
2025-02-13 14:47:39 +01:00
|
|
|
|
|
|
|
return self._type_mapping_cache[schema_type]
|
|
|
|
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def ensure_schema(schema: t.Any) -> Schema:
|
2025-02-13 14:47:39 +01:00
|
|
|
if isinstance(schema, Schema):
|
|
|
|
return schema
|
|
|
|
|
|
|
|
return MappingSchema(schema)
|
|
|
|
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def ensure_column_mapping(mapping: t.Optional[ColumnMapping]):
|
2025-02-13 14:47:39 +01:00
|
|
|
if isinstance(mapping, dict):
|
|
|
|
return mapping
|
|
|
|
elif isinstance(mapping, str):
|
|
|
|
col_name_type_strs = [x.strip() for x in mapping.split(",")]
|
|
|
|
return {
|
|
|
|
name_type_str.split(":")[0].strip(): name_type_str.split(":")[1].strip()
|
|
|
|
for name_type_str in col_name_type_strs
|
|
|
|
}
|
|
|
|
# Check if mapping looks like a DataFrame StructType
|
|
|
|
elif hasattr(mapping, "simpleString"):
|
2025-02-13 14:52:26 +01:00
|
|
|
return {struct_field.name: struct_field.dataType.simpleString() for struct_field in mapping} # type: ignore
|
2025-02-13 14:47:39 +01:00
|
|
|
elif isinstance(mapping, list):
|
|
|
|
return {x.strip(): None for x in mapping}
|
|
|
|
elif mapping is None:
|
|
|
|
return {}
|
|
|
|
raise ValueError(f"Invalid mapping provided: {type(mapping)}")
|
|
|
|
|
|
|
|
|
2025-02-13 14:53:43 +01:00
|
|
|
def flatten_schema(
|
|
|
|
schema: t.Dict, depth: int, keys: t.Optional[t.List[str]] = None
|
|
|
|
) -> t.List[t.List[str]]:
|
2025-02-13 14:52:26 +01:00
|
|
|
tables = []
|
|
|
|
keys = keys or []
|
|
|
|
|
|
|
|
for k, v in schema.items():
|
2025-02-13 14:53:43 +01:00
|
|
|
if depth >= 2:
|
|
|
|
tables.extend(flatten_schema(v, depth - 1, keys + [k]))
|
|
|
|
elif depth == 1:
|
2025-02-13 14:52:26 +01:00
|
|
|
tables.append(keys + [k])
|
|
|
|
return tables
|
|
|
|
|
|
|
|
|
|
|
|
def _nested_get(
|
|
|
|
d: t.Dict, *path: t.Tuple[str, str], raise_on_missing: bool = True
|
|
|
|
) -> t.Optional[t.Any]:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
Get a value for a nested dictionary.
|
|
|
|
|
|
|
|
Args:
|
2025-02-13 14:52:26 +01:00
|
|
|
d: the dictionary to search.
|
|
|
|
*path: tuples of (name, key), where:
|
2025-02-13 14:47:39 +01:00
|
|
|
`key` is the key in the dictionary to get.
|
|
|
|
`name` is a string to use in the error if `key` isn't found.
|
|
|
|
|
|
|
|
Returns:
|
2025-02-13 14:52:26 +01:00
|
|
|
The value or None if it doesn't exist.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
for name, key in path:
|
2025-02-13 14:52:26 +01:00
|
|
|
d = d.get(key) # type: ignore
|
2025-02-13 14:47:39 +01:00
|
|
|
if d is None:
|
|
|
|
if raise_on_missing:
|
|
|
|
name = "table" if name == "this" else name
|
2025-02-13 14:53:43 +01:00
|
|
|
raise ValueError(f"Unknown {name}: {key}")
|
2025-02-13 14:47:39 +01:00
|
|
|
return None
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
def _nested_set(d: t.Dict, keys: t.List[str], value: t.Any) -> t.Dict:
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
In-place set a value for a nested dictionary
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
Example:
|
2025-02-13 14:47:39 +01:00
|
|
|
>>> _nested_set({}, ["top_key", "second_key"], "value")
|
|
|
|
{'top_key': {'second_key': 'value'}}
|
2025-02-13 14:52:26 +01:00
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
>>> _nested_set({"top_key": {"third_key": "third_value"}}, ["top_key", "second_key"], "value")
|
|
|
|
{'top_key': {'third_key': 'third_value', 'second_key': 'value'}}
|
|
|
|
|
2025-02-13 14:52:26 +01:00
|
|
|
Args:
|
|
|
|
d: dictionary to update.
|
|
|
|
keys: the keys that makeup the path to `value`.
|
|
|
|
value: the value to set in the dictionary for the given key path.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The (possibly) updated dictionary.
|
2025-02-13 14:47:39 +01:00
|
|
|
"""
|
|
|
|
if not keys:
|
2025-02-13 14:52:26 +01:00
|
|
|
return d
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
if len(keys) == 1:
|
|
|
|
d[keys[0]] = value
|
2025-02-13 14:52:26 +01:00
|
|
|
return d
|
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
subd = d
|
|
|
|
for key in keys[:-1]:
|
|
|
|
if key not in subd:
|
|
|
|
subd = subd.setdefault(key, {})
|
|
|
|
else:
|
|
|
|
subd = subd[key]
|
2025-02-13 14:52:26 +01:00
|
|
|
|
2025-02-13 14:47:39 +01:00
|
|
|
subd[keys[-1]] = value
|
|
|
|
return d
|