Edit on GitHub

sqlglot.helper

  1from __future__ import annotations
  2
  3import inspect
  4import logging
  5import re
  6import sys
  7import typing as t
  8from collections.abc import Collection
  9from contextlib import contextmanager
 10from copy import copy
 11from enum import Enum
 12from itertools import count
 13
 14if t.TYPE_CHECKING:
 15    from sqlglot import exp
 16    from sqlglot._typing import A, E, T
 17    from sqlglot.expressions import Expression
 18
 19
 20CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])")
 21PYTHON_VERSION = sys.version_info[:2]
 22logger = logging.getLogger("sqlglot")
 23
 24
 25class AutoName(Enum):
 26    """
 27    This is used for creating Enum classes where `auto()` is the string form
 28    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
 29
 30    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
 31    """
 32
 33    def _generate_next_value_(name, _start, _count, _last_values):
 34        return name
 35
 36
 37class classproperty(property):
 38    """
 39    Similar to a normal property but works for class methods
 40    """
 41
 42    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
 43        return classmethod(self.fget).__get__(None, owner)()  # type: ignore
 44
 45
 46def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
 47    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
 48    try:
 49        return seq[index]
 50    except IndexError:
 51        return None
 52
 53
 54@t.overload
 55def ensure_list(value: t.Collection[T]) -> t.List[T]:
 56    ...
 57
 58
 59@t.overload
 60def ensure_list(value: T) -> t.List[T]:
 61    ...
 62
 63
 64def ensure_list(value):
 65    """
 66    Ensures that a value is a list, otherwise casts or wraps it into one.
 67
 68    Args:
 69        value: The value of interest.
 70
 71    Returns:
 72        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
 73    """
 74    if value is None:
 75        return []
 76    if isinstance(value, (list, tuple)):
 77        return list(value)
 78
 79    return [value]
 80
 81
 82@t.overload
 83def ensure_collection(value: t.Collection[T]) -> t.Collection[T]:
 84    ...
 85
 86
 87@t.overload
 88def ensure_collection(value: T) -> t.Collection[T]:
 89    ...
 90
 91
 92def ensure_collection(value):
 93    """
 94    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
 95
 96    Args:
 97        value: The value of interest.
 98
 99    Returns:
100        The value if it's a collection, or else the value wrapped in a list.
101    """
102    if value is None:
103        return []
104    return (
105        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
106    )
107
108
109def csv(*args: str, sep: str = ", ") -> str:
110    """
111    Formats any number of string arguments as CSV.
112
113    Args:
114        args: The string arguments to format.
115        sep: The argument separator.
116
117    Returns:
118        The arguments formatted as a CSV string.
119    """
120    return sep.join(arg for arg in args if arg)
121
122
123def subclasses(
124    module_name: str,
125    classes: t.Type | t.Tuple[t.Type, ...],
126    exclude: t.Type | t.Tuple[t.Type, ...] = (),
127) -> t.List[t.Type]:
128    """
129    Returns all subclasses for a collection of classes, possibly excluding some of them.
130
131    Args:
132        module_name: The name of the module to search for subclasses in.
133        classes: Class(es) we want to find the subclasses of.
134        exclude: Class(es) we want to exclude from the returned list.
135
136    Returns:
137        The target subclasses.
138    """
139    return [
140        obj
141        for _, obj in inspect.getmembers(
142            sys.modules[module_name],
143            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
144        )
145    ]
146
147
148def apply_index_offset(
149    this: exp.Expression,
150    expressions: t.List[E],
151    offset: int,
152) -> t.List[E]:
153    """
154    Applies an offset to a given integer literal expression.
155
156    Args:
157        this: The target of the index.
158        expressions: The expression the offset will be applied to, wrapped in a list.
159        offset: The offset that will be applied.
160
161    Returns:
162        The original expression with the offset applied to it, wrapped in a list. If the provided
163        `expressions` argument contains more than one expression, it's returned unaffected.
164    """
165    if not offset or len(expressions) != 1:
166        return expressions
167
168    expression = expressions[0]
169
170    from sqlglot import exp
171    from sqlglot.optimizer.annotate_types import annotate_types
172    from sqlglot.optimizer.simplify import simplify
173
174    if not this.type:
175        annotate_types(this)
176
177    if t.cast(exp.DataType, this.type).this not in (
178        exp.DataType.Type.UNKNOWN,
179        exp.DataType.Type.ARRAY,
180    ):
181        return expressions
182
183    if not expression.type:
184        annotate_types(expression)
185    if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES:
186        logger.warning("Applying array index offset (%s)", offset)
187        expression = simplify(exp.Add(this=expression, expression=exp.Literal.number(offset)))
188        return [expression]
189
190    return expressions
191
192
193def camel_to_snake_case(name: str) -> str:
194    """Converts `name` from camelCase to snake_case and returns the result."""
195    return CAMEL_CASE_PATTERN.sub("_", name).upper()
196
197
198def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E:
199    """
200    Applies a transformation to a given expression until a fix point is reached.
201
202    Args:
203        expression: The expression to be transformed.
204        func: The transformation to be applied.
205
206    Returns:
207        The transformed expression.
208    """
209    while True:
210        for n, *_ in reversed(tuple(expression.walk())):
211            n._hash = hash(n)
212
213        start = hash(expression)
214        expression = func(expression)
215
216        for n, *_ in expression.walk():
217            n._hash = None
218        if start == hash(expression):
219            break
220
221    return expression
222
223
224def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
225    """
226    Sorts a given directed acyclic graph in topological order.
227
228    Args:
229        dag: The graph to be sorted.
230
231    Returns:
232        A list that contains all of the graph's nodes in topological order.
233    """
234    result = []
235
236    for node, deps in tuple(dag.items()):
237        for dep in deps:
238            if not dep in dag:
239                dag[dep] = set()
240
241    while dag:
242        current = {node for node, deps in dag.items() if not deps}
243
244        if not current:
245            raise ValueError("Cycle error")
246
247        for node in current:
248            dag.pop(node)
249
250        for deps in dag.values():
251            deps -= current
252
253        result.extend(sorted(current))  # type: ignore
254
255    return result
256
257
258def open_file(file_name: str) -> t.TextIO:
259    """Open a file that may be compressed as gzip and return it in universal newline mode."""
260    with open(file_name, "rb") as f:
261        gzipped = f.read(2) == b"\x1f\x8b"
262
263    if gzipped:
264        import gzip
265
266        return gzip.open(file_name, "rt", newline="")
267
268    return open(file_name, encoding="utf-8", newline="")
269
270
271@contextmanager
272def csv_reader(read_csv: exp.ReadCSV) -> t.Any:
273    """
274    Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`.
275
276    Args:
277        read_csv: A `ReadCSV` function call.
278
279    Yields:
280        A python csv reader.
281    """
282    args = read_csv.expressions
283    file = open_file(read_csv.name)
284
285    delimiter = ","
286    args = iter(arg.name for arg in args)
287    for k, v in zip(args, args):
288        if k == "delimiter":
289            delimiter = v
290
291    try:
292        import csv as csv_
293
294        yield csv_.reader(file, delimiter=delimiter)
295    finally:
296        file.close()
297
298
299def find_new_name(taken: t.Collection[str], base: str) -> str:
300    """
301    Searches for a new name.
302
303    Args:
304        taken: A collection of taken names.
305        base: Base name to alter.
306
307    Returns:
308        The new, available name.
309    """
310    if base not in taken:
311        return base
312
313    i = 2
314    new = f"{base}_{i}"
315    while new in taken:
316        i += 1
317        new = f"{base}_{i}"
318
319    return new
320
321
322def name_sequence(prefix: str) -> t.Callable[[], str]:
323    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
324    sequence = count()
325    return lambda: f"{prefix}{next(sequence)}"
326
327
328def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
329    """Returns a dictionary created from an object's attributes."""
330    return {
331        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
332        **kwargs,
333    }
334
335
336def split_num_words(
337    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
338) -> t.List[t.Optional[str]]:
339    """
340    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
341
342    Args:
343        value: The value to be split.
344        sep: The value to use to split on.
345        min_num_words: The minimum number of words that are going to be in the result.
346        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
347
348    Examples:
349        >>> split_num_words("db.table", ".", 3)
350        [None, 'db', 'table']
351        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
352        ['db', 'table', None]
353        >>> split_num_words("db.table", ".", 1)
354        ['db', 'table']
355
356    Returns:
357        The list of words returned by `split`, possibly augmented by a number of `None` values.
358    """
359    words = value.split(sep)
360    if fill_from_start:
361        return [None] * (min_num_words - len(words)) + words
362    return words + [None] * (min_num_words - len(words))
363
364
365def is_iterable(value: t.Any) -> bool:
366    """
367    Checks if the value is an iterable, excluding the types `str` and `bytes`.
368
369    Examples:
370        >>> is_iterable([1,2])
371        True
372        >>> is_iterable("test")
373        False
374
375    Args:
376        value: The value to check if it is an iterable.
377
378    Returns:
379        A `bool` value indicating if it is an iterable.
380    """
381    from sqlglot import Expression
382
383    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))
384
385
386def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
387    """
388    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
389    type `str` and `bytes` are not regarded as iterables.
390
391    Examples:
392        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
393        [1, 2, 3, 4, 5, 'bla']
394        >>> list(flatten([1, 2, 3]))
395        [1, 2, 3]
396
397    Args:
398        values: The value to be flattened.
399
400    Yields:
401        Non-iterable elements in `values`.
402    """
403    for value in values:
404        if is_iterable(value):
405            yield from flatten(value)
406        else:
407            yield value
408
409
410def dict_depth(d: t.Dict) -> int:
411    """
412    Get the nesting depth of a dictionary.
413
414    Example:
415        >>> dict_depth(None)
416        0
417        >>> dict_depth({})
418        1
419        >>> dict_depth({"a": "b"})
420        1
421        >>> dict_depth({"a": {}})
422        2
423        >>> dict_depth({"a": {"b": {}}})
424        3
425    """
426    try:
427        return 1 + dict_depth(next(iter(d.values())))
428    except AttributeError:
429        # d doesn't have attribute "values"
430        return 0
431    except StopIteration:
432        # d.values() returns an empty sequence
433        return 1
434
435
436def first(it: t.Iterable[T]) -> T:
437    """Returns the first element from an iterable (useful for sets)."""
438    return next(i for i in it)
439
440
441def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]:
442    """
443    Merges a sequence of ranges, represented as tuples (low, high) whose values
444    belong to some totally-ordered set.
445
446    Example:
447        >>> merge_ranges([(1, 3), (2, 6)])
448        [(1, 6)]
449    """
450    if not ranges:
451        return []
452
453    ranges = sorted(ranges)
454
455    merged = [ranges[0]]
456
457    for start, end in ranges[1:]:
458        last_start, last_end = merged[-1]
459
460        if start <= last_end:
461            merged[-1] = (last_start, max(last_end, end))
462        else:
463            merged.append((start, end))
464
465    return merged
CAMEL_CASE_PATTERN = re.compile('(?<!^)(?=[A-Z])')
PYTHON_VERSION = (3, 10)
logger = <Logger sqlglot (WARNING)>
class AutoName(enum.Enum):
26class AutoName(Enum):
27    """
28    This is used for creating Enum classes where `auto()` is the string form
29    of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
30
31    Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
32    """
33
34    def _generate_next_value_(name, _start, _count, _last_values):
35        return name

This is used for creating Enum classes where auto() is the string form of the corresponding enum's identifier (e.g. FOO.value results in "FOO").

Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values

Inherited Members
enum.Enum
name
value
class classproperty(builtins.property):
38class classproperty(property):
39    """
40    Similar to a normal property but works for class methods
41    """
42
43    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
44        return classmethod(self.fget).__get__(None, owner)()  # type: ignore

Similar to a normal property but works for class methods

Inherited Members
builtins.property
property
getter
setter
deleter
fget
fset
fdel
def seq_get(seq: Sequence[~T], index: int) -> Optional[~T]:
47def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]:
48    """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds."""
49    try:
50        return seq[index]
51    except IndexError:
52        return None

Returns the value in seq at position index, or None if index is out of bounds.

def ensure_list(value):
65def ensure_list(value):
66    """
67    Ensures that a value is a list, otherwise casts or wraps it into one.
68
69    Args:
70        value: The value of interest.
71
72    Returns:
73        The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
74    """
75    if value is None:
76        return []
77    if isinstance(value, (list, tuple)):
78        return list(value)
79
80    return [value]

Ensures that a value is a list, otherwise casts or wraps it into one.

Arguments:
  • value: The value of interest.
Returns:

The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.

def ensure_collection(value):
 93def ensure_collection(value):
 94    """
 95    Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list.
 96
 97    Args:
 98        value: The value of interest.
 99
100    Returns:
101        The value if it's a collection, or else the value wrapped in a list.
102    """
103    if value is None:
104        return []
105    return (
106        value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value]
107    )

Ensures that a value is a collection (excluding str and bytes), otherwise wraps it into a list.

Arguments:
  • value: The value of interest.
Returns:

The value if it's a collection, or else the value wrapped in a list.

def csv(*args: str, sep: str = ', ') -> str:
110def csv(*args: str, sep: str = ", ") -> str:
111    """
112    Formats any number of string arguments as CSV.
113
114    Args:
115        args: The string arguments to format.
116        sep: The argument separator.
117
118    Returns:
119        The arguments formatted as a CSV string.
120    """
121    return sep.join(arg for arg in args if arg)

Formats any number of string arguments as CSV.

Arguments:
  • args: The string arguments to format.
  • sep: The argument separator.
Returns:

The arguments formatted as a CSV string.

def subclasses( module_name: str, classes: Union[Type, Tuple[Type, ...]], exclude: Union[Type, Tuple[Type, ...]] = ()) -> List[Type]:
124def subclasses(
125    module_name: str,
126    classes: t.Type | t.Tuple[t.Type, ...],
127    exclude: t.Type | t.Tuple[t.Type, ...] = (),
128) -> t.List[t.Type]:
129    """
130    Returns all subclasses for a collection of classes, possibly excluding some of them.
131
132    Args:
133        module_name: The name of the module to search for subclasses in.
134        classes: Class(es) we want to find the subclasses of.
135        exclude: Class(es) we want to exclude from the returned list.
136
137    Returns:
138        The target subclasses.
139    """
140    return [
141        obj
142        for _, obj in inspect.getmembers(
143            sys.modules[module_name],
144            lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude,
145        )
146    ]

Returns all subclasses for a collection of classes, possibly excluding some of them.

Arguments:
  • module_name: The name of the module to search for subclasses in.
  • classes: Class(es) we want to find the subclasses of.
  • exclude: Class(es) we want to exclude from the returned list.
Returns:

The target subclasses.

def apply_index_offset( this: sqlglot.expressions.Expression, expressions: List[~E], offset: int) -> List[~E]:
149def apply_index_offset(
150    this: exp.Expression,
151    expressions: t.List[E],
152    offset: int,
153) -> t.List[E]:
154    """
155    Applies an offset to a given integer literal expression.
156
157    Args:
158        this: The target of the index.
159        expressions: The expression the offset will be applied to, wrapped in a list.
160        offset: The offset that will be applied.
161
162    Returns:
163        The original expression with the offset applied to it, wrapped in a list. If the provided
164        `expressions` argument contains more than one expression, it's returned unaffected.
165    """
166    if not offset or len(expressions) != 1:
167        return expressions
168
169    expression = expressions[0]
170
171    from sqlglot import exp
172    from sqlglot.optimizer.annotate_types import annotate_types
173    from sqlglot.optimizer.simplify import simplify
174
175    if not this.type:
176        annotate_types(this)
177
178    if t.cast(exp.DataType, this.type).this not in (
179        exp.DataType.Type.UNKNOWN,
180        exp.DataType.Type.ARRAY,
181    ):
182        return expressions
183
184    if not expression.type:
185        annotate_types(expression)
186    if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES:
187        logger.warning("Applying array index offset (%s)", offset)
188        expression = simplify(exp.Add(this=expression, expression=exp.Literal.number(offset)))
189        return [expression]
190
191    return expressions

Applies an offset to a given integer literal expression.

Arguments:
  • this: The target of the index.
  • expressions: The expression the offset will be applied to, wrapped in a list.
  • offset: The offset that will be applied.
Returns:

The original expression with the offset applied to it, wrapped in a list. If the provided expressions argument contains more than one expression, it's returned unaffected.

def camel_to_snake_case(name: str) -> str:
194def camel_to_snake_case(name: str) -> str:
195    """Converts `name` from camelCase to snake_case and returns the result."""
196    return CAMEL_CASE_PATTERN.sub("_", name).upper()

Converts name from camelCase to snake_case and returns the result.

def while_changing( expression: sqlglot.expressions.Expression, func: Callable[[sqlglot.expressions.Expression], ~E]) -> ~E:
199def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E:
200    """
201    Applies a transformation to a given expression until a fix point is reached.
202
203    Args:
204        expression: The expression to be transformed.
205        func: The transformation to be applied.
206
207    Returns:
208        The transformed expression.
209    """
210    while True:
211        for n, *_ in reversed(tuple(expression.walk())):
212            n._hash = hash(n)
213
214        start = hash(expression)
215        expression = func(expression)
216
217        for n, *_ in expression.walk():
218            n._hash = None
219        if start == hash(expression):
220            break
221
222    return expression

Applies a transformation to a given expression until a fix point is reached.

Arguments:
  • expression: The expression to be transformed.
  • func: The transformation to be applied.
Returns:

The transformed expression.

def tsort(dag: Dict[~T, Set[~T]]) -> List[~T]:
225def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]:
226    """
227    Sorts a given directed acyclic graph in topological order.
228
229    Args:
230        dag: The graph to be sorted.
231
232    Returns:
233        A list that contains all of the graph's nodes in topological order.
234    """
235    result = []
236
237    for node, deps in tuple(dag.items()):
238        for dep in deps:
239            if not dep in dag:
240                dag[dep] = set()
241
242    while dag:
243        current = {node for node, deps in dag.items() if not deps}
244
245        if not current:
246            raise ValueError("Cycle error")
247
248        for node in current:
249            dag.pop(node)
250
251        for deps in dag.values():
252            deps -= current
253
254        result.extend(sorted(current))  # type: ignore
255
256    return result

Sorts a given directed acyclic graph in topological order.

Arguments:
  • dag: The graph to be sorted.
Returns:

A list that contains all of the graph's nodes in topological order.

def open_file(file_name: str) -> <class 'TextIO'>:
259def open_file(file_name: str) -> t.TextIO:
260    """Open a file that may be compressed as gzip and return it in universal newline mode."""
261    with open(file_name, "rb") as f:
262        gzipped = f.read(2) == b"\x1f\x8b"
263
264    if gzipped:
265        import gzip
266
267        return gzip.open(file_name, "rt", newline="")
268
269    return open(file_name, encoding="utf-8", newline="")

Open a file that may be compressed as gzip and return it in universal newline mode.

@contextmanager
def csv_reader(read_csv: sqlglot.expressions.ReadCSV) -> Any:
272@contextmanager
273def csv_reader(read_csv: exp.ReadCSV) -> t.Any:
274    """
275    Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`.
276
277    Args:
278        read_csv: A `ReadCSV` function call.
279
280    Yields:
281        A python csv reader.
282    """
283    args = read_csv.expressions
284    file = open_file(read_csv.name)
285
286    delimiter = ","
287    args = iter(arg.name for arg in args)
288    for k, v in zip(args, args):
289        if k == "delimiter":
290            delimiter = v
291
292    try:
293        import csv as csv_
294
295        yield csv_.reader(file, delimiter=delimiter)
296    finally:
297        file.close()

Returns a csv reader given the expression READ_CSV(name, ['delimiter', '|', ...]).

Arguments:
  • read_csv: A ReadCSV function call.
Yields:

A python csv reader.

def find_new_name(taken: Collection[str], base: str) -> str:
300def find_new_name(taken: t.Collection[str], base: str) -> str:
301    """
302    Searches for a new name.
303
304    Args:
305        taken: A collection of taken names.
306        base: Base name to alter.
307
308    Returns:
309        The new, available name.
310    """
311    if base not in taken:
312        return base
313
314    i = 2
315    new = f"{base}_{i}"
316    while new in taken:
317        i += 1
318        new = f"{base}_{i}"
319
320    return new

Searches for a new name.

Arguments:
  • taken: A collection of taken names.
  • base: Base name to alter.
Returns:

The new, available name.

def name_sequence(prefix: str) -> Callable[[], str]:
323def name_sequence(prefix: str) -> t.Callable[[], str]:
324    """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a")."""
325    sequence = count()
326    return lambda: f"{prefix}{next(sequence)}"

Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").

def object_to_dict(obj: Any, **kwargs) -> Dict:
329def object_to_dict(obj: t.Any, **kwargs) -> t.Dict:
330    """Returns a dictionary created from an object's attributes."""
331    return {
332        **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()},
333        **kwargs,
334    }

Returns a dictionary created from an object's attributes.

def split_num_words( value: str, sep: str, min_num_words: int, fill_from_start: bool = True) -> List[Optional[str]]:
337def split_num_words(
338    value: str, sep: str, min_num_words: int, fill_from_start: bool = True
339) -> t.List[t.Optional[str]]:
340    """
341    Perform a split on a value and return N words as a result with `None` used for words that don't exist.
342
343    Args:
344        value: The value to be split.
345        sep: The value to use to split on.
346        min_num_words: The minimum number of words that are going to be in the result.
347        fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list.
348
349    Examples:
350        >>> split_num_words("db.table", ".", 3)
351        [None, 'db', 'table']
352        >>> split_num_words("db.table", ".", 3, fill_from_start=False)
353        ['db', 'table', None]
354        >>> split_num_words("db.table", ".", 1)
355        ['db', 'table']
356
357    Returns:
358        The list of words returned by `split`, possibly augmented by a number of `None` values.
359    """
360    words = value.split(sep)
361    if fill_from_start:
362        return [None] * (min_num_words - len(words)) + words
363    return words + [None] * (min_num_words - len(words))

Perform a split on a value and return N words as a result with None used for words that don't exist.

Arguments:
  • value: The value to be split.
  • sep: The value to use to split on.
  • min_num_words: The minimum number of words that are going to be in the result.
  • fill_from_start: Indicates that if None values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3)
[None, 'db', 'table']
>>> split_num_words("db.table", ".", 3, fill_from_start=False)
['db', 'table', None]
>>> split_num_words("db.table", ".", 1)
['db', 'table']
Returns:

The list of words returned by split, possibly augmented by a number of None values.

def is_iterable(value: Any) -> bool:
366def is_iterable(value: t.Any) -> bool:
367    """
368    Checks if the value is an iterable, excluding the types `str` and `bytes`.
369
370    Examples:
371        >>> is_iterable([1,2])
372        True
373        >>> is_iterable("test")
374        False
375
376    Args:
377        value: The value to check if it is an iterable.
378
379    Returns:
380        A `bool` value indicating if it is an iterable.
381    """
382    from sqlglot import Expression
383
384    return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))

Checks if the value is an iterable, excluding the types str and bytes.

Examples:
>>> is_iterable([1,2])
True
>>> is_iterable("test")
False
Arguments:
  • value: The value to check if it is an iterable.
Returns:

A bool value indicating if it is an iterable.

def flatten(values: Iterable[Union[Iterable[Any], Any]]) -> Iterator[Any]:
387def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]:
388    """
389    Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
390    type `str` and `bytes` are not regarded as iterables.
391
392    Examples:
393        >>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
394        [1, 2, 3, 4, 5, 'bla']
395        >>> list(flatten([1, 2, 3]))
396        [1, 2, 3]
397
398    Args:
399        values: The value to be flattened.
400
401    Yields:
402        Non-iterable elements in `values`.
403    """
404    for value in values:
405        if is_iterable(value):
406            yield from flatten(value)
407        else:
408            yield value

Flattens an iterable that can contain both iterable and non-iterable elements. Objects of type str and bytes are not regarded as iterables.

Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")]))
[1, 2, 3, 4, 5, 'bla']
>>> list(flatten([1, 2, 3]))
[1, 2, 3]
Arguments:
  • values: The value to be flattened.
Yields:

Non-iterable elements in values.

def dict_depth(d: Dict) -> int:
411def dict_depth(d: t.Dict) -> int:
412    """
413    Get the nesting depth of a dictionary.
414
415    Example:
416        >>> dict_depth(None)
417        0
418        >>> dict_depth({})
419        1
420        >>> dict_depth({"a": "b"})
421        1
422        >>> dict_depth({"a": {}})
423        2
424        >>> dict_depth({"a": {"b": {}}})
425        3
426    """
427    try:
428        return 1 + dict_depth(next(iter(d.values())))
429    except AttributeError:
430        # d doesn't have attribute "values"
431        return 0
432    except StopIteration:
433        # d.values() returns an empty sequence
434        return 1

Get the nesting depth of a dictionary.

Example:
>>> dict_depth(None)
0
>>> dict_depth({})
1
>>> dict_depth({"a": "b"})
1
>>> dict_depth({"a": {}})
2
>>> dict_depth({"a": {"b": {}}})
3
def first(it: Iterable[~T]) -> ~T:
437def first(it: t.Iterable[T]) -> T:
438    """Returns the first element from an iterable (useful for sets)."""
439    return next(i for i in it)

Returns the first element from an iterable (useful for sets).

def merge_ranges(ranges: List[Tuple[~A, ~A]]) -> List[Tuple[~A, ~A]]:
442def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]:
443    """
444    Merges a sequence of ranges, represented as tuples (low, high) whose values
445    belong to some totally-ordered set.
446
447    Example:
448        >>> merge_ranges([(1, 3), (2, 6)])
449        [(1, 6)]
450    """
451    if not ranges:
452        return []
453
454    ranges = sorted(ranges)
455
456    merged = [ranges[0]]
457
458    for start, end in ranges[1:]:
459        last_start, last_end = merged[-1]
460
461        if start <= last_end:
462            merged[-1] = (last_start, max(last_end, end))
463        else:
464            merged.append((start, end))
465
466    return merged

Merges a sequence of ranges, represented as tuples (low, high) whose values belong to some totally-ordered set.

Example:
>>> merge_ranges([(1, 3), (2, 6)])
[(1, 6)]