Edit on GitHub

sqlglot.optimizer.qualify_columns

  1from __future__ import annotations
  2
  3import itertools
  4import typing as t
  5
  6from sqlglot import alias, exp
  7from sqlglot.dialects.dialect import Dialect, DialectType
  8from sqlglot.errors import OptimizeError
  9from sqlglot.helper import seq_get, SingleValuedMapping
 10from sqlglot.optimizer.annotate_types import TypeAnnotator
 11from sqlglot.optimizer.scope import Scope, build_scope, traverse_scope, walk_in_scope
 12from sqlglot.optimizer.simplify import simplify_parens
 13from sqlglot.schema import Schema, ensure_schema
 14
 15if t.TYPE_CHECKING:
 16    from sqlglot._typing import E
 17
 18
 19def qualify_columns(
 20    expression: exp.Expression,
 21    schema: t.Dict | Schema,
 22    expand_alias_refs: bool = True,
 23    expand_stars: bool = True,
 24    infer_schema: t.Optional[bool] = None,
 25) -> exp.Expression:
 26    """
 27    Rewrite sqlglot AST to have fully qualified columns.
 28
 29    Example:
 30        >>> import sqlglot
 31        >>> schema = {"tbl": {"col": "INT"}}
 32        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
 33        >>> qualify_columns(expression, schema).sql()
 34        'SELECT tbl.col AS col FROM tbl'
 35
 36    Args:
 37        expression: Expression to qualify.
 38        schema: Database schema.
 39        expand_alias_refs: Whether to expand references to aliases.
 40        expand_stars: Whether to expand star queries. This is a necessary step
 41            for most of the optimizer's rules to work; do not set to False unless you
 42            know what you're doing!
 43        infer_schema: Whether to infer the schema if missing.
 44
 45    Returns:
 46        The qualified expression.
 47
 48    Notes:
 49        - Currently only handles a single PIVOT or UNPIVOT operator
 50    """
 51    schema = ensure_schema(schema)
 52    annotator = TypeAnnotator(schema)
 53    infer_schema = schema.empty if infer_schema is None else infer_schema
 54    dialect = Dialect.get_or_raise(schema.dialect)
 55    pseudocolumns = dialect.PSEUDOCOLUMNS
 56
 57    for scope in traverse_scope(expression):
 58        resolver = Resolver(scope, schema, infer_schema=infer_schema)
 59        _pop_table_column_aliases(scope.ctes)
 60        _pop_table_column_aliases(scope.derived_tables)
 61        using_column_tables = _expand_using(scope, resolver)
 62
 63        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
 64            _expand_alias_refs(
 65                scope,
 66                resolver,
 67                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
 68            )
 69
 70        _convert_columns_to_dots(scope, resolver)
 71        _qualify_columns(scope, resolver)
 72
 73        if not schema.empty and expand_alias_refs:
 74            _expand_alias_refs(scope, resolver)
 75
 76        if not isinstance(scope.expression, exp.UDTF):
 77            if expand_stars:
 78                _expand_stars(
 79                    scope,
 80                    resolver,
 81                    using_column_tables,
 82                    pseudocolumns,
 83                    annotator,
 84                )
 85            qualify_outputs(scope)
 86
 87        _expand_group_by(scope, dialect)
 88        _expand_order_by(scope, resolver)
 89
 90        if dialect == "bigquery":
 91            annotator.annotate_scope(scope)
 92
 93    return expression
 94
 95
 96def validate_qualify_columns(expression: E) -> E:
 97    """Raise an `OptimizeError` if any columns aren't qualified"""
 98    all_unqualified_columns = []
 99    for scope in traverse_scope(expression):
100        if isinstance(scope.expression, exp.Select):
101            unqualified_columns = scope.unqualified_columns
102
103            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
104                column = scope.external_columns[0]
105                for_table = f" for table: '{column.table}'" if column.table else ""
106                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
107
108            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
109                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
110                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
111                # this list here to ensure those in the former category will be excluded.
112                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
113                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
114
115            all_unqualified_columns.extend(unqualified_columns)
116
117    if all_unqualified_columns:
118        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
119
120    return expression
121
122
123def _unpivot_columns(unpivot: exp.Pivot) -> t.Iterator[exp.Column]:
124    name_column = []
125    field = unpivot.args.get("field")
126    if isinstance(field, exp.In) and isinstance(field.this, exp.Column):
127        name_column.append(field.this)
128
129    value_columns = (c for e in unpivot.expressions for c in e.find_all(exp.Column))
130    return itertools.chain(name_column, value_columns)
131
132
133def _pop_table_column_aliases(derived_tables: t.List[exp.CTE | exp.Subquery]) -> None:
134    """
135    Remove table column aliases.
136
137    For example, `col1` and `col2` will be dropped in SELECT ... FROM (SELECT ...) AS foo(col1, col2)
138    """
139    for derived_table in derived_tables:
140        if isinstance(derived_table.parent, exp.With) and derived_table.parent.recursive:
141            continue
142        table_alias = derived_table.args.get("alias")
143        if table_alias:
144            table_alias.args.pop("columns", None)
145
146
147def _expand_using(scope: Scope, resolver: Resolver) -> t.Dict[str, t.Any]:
148    joins = list(scope.find_all(exp.Join))
149    names = {join.alias_or_name for join in joins}
150    ordered = [key for key in scope.selected_sources if key not in names]
151
152    # Mapping of automatically joined column names to an ordered set of source names (dict).
153    column_tables: t.Dict[str, t.Dict[str, t.Any]] = {}
154
155    for i, join in enumerate(joins):
156        using = join.args.get("using")
157
158        if not using:
159            continue
160
161        join_table = join.alias_or_name
162
163        columns = {}
164
165        for source_name in scope.selected_sources:
166            if source_name in ordered:
167                for column_name in resolver.get_source_columns(source_name):
168                    if column_name not in columns:
169                        columns[column_name] = source_name
170
171        source_table = ordered[-1]
172        ordered.append(join_table)
173        join_columns = resolver.get_source_columns(join_table)
174        conditions = []
175        using_identifier_count = len(using)
176
177        for identifier in using:
178            identifier = identifier.name
179            table = columns.get(identifier)
180
181            if not table or identifier not in join_columns:
182                if (columns and "*" not in columns) and join_columns:
183                    raise OptimizeError(f"Cannot automatically join: {identifier}")
184
185            table = table or source_table
186
187            if i == 0 or using_identifier_count == 1:
188                lhs: exp.Expression = exp.column(identifier, table=table)
189            else:
190                coalesce_columns = [
191                    exp.column(identifier, table=t)
192                    for t in ordered[:-1]
193                    if identifier in resolver.get_source_columns(t)
194                ]
195                if len(coalesce_columns) > 1:
196                    lhs = exp.func("coalesce", *coalesce_columns)
197                else:
198                    lhs = exp.column(identifier, table=table)
199
200            conditions.append(lhs.eq(exp.column(identifier, table=join_table)))
201
202            # Set all values in the dict to None, because we only care about the key ordering
203            tables = column_tables.setdefault(identifier, {})
204            if table not in tables:
205                tables[table] = None
206            if join_table not in tables:
207                tables[join_table] = None
208
209        join.args.pop("using")
210        join.set("on", exp.and_(*conditions, copy=False))
211
212    if column_tables:
213        for column in scope.columns:
214            if not column.table and column.name in column_tables:
215                tables = column_tables[column.name]
216                coalesce_args = [exp.column(column.name, table=table) for table in tables]
217                replacement = exp.func("coalesce", *coalesce_args)
218
219                # Ensure selects keep their output name
220                if isinstance(column.parent, exp.Select):
221                    replacement = alias(replacement, alias=column.name, copy=False)
222
223                scope.replace(column, replacement)
224
225    return column_tables
226
227
228def _expand_alias_refs(scope: Scope, resolver: Resolver, expand_only_groupby: bool = False) -> None:
229    expression = scope.expression
230
231    if not isinstance(expression, exp.Select):
232        return
233
234    alias_to_expression: t.Dict[str, t.Tuple[exp.Expression, int]] = {}
235
236    def replace_columns(
237        node: t.Optional[exp.Expression], resolve_table: bool = False, literal_index: bool = False
238    ) -> None:
239        if not node or (expand_only_groupby and not isinstance(node, exp.Group)):
240            return
241
242        for column in walk_in_scope(node, prune=lambda node: node.is_star):
243            if not isinstance(column, exp.Column):
244                continue
245
246            table = resolver.get_table(column.name) if resolve_table and not column.table else None
247            alias_expr, i = alias_to_expression.get(column.name, (None, 1))
248            double_agg = (
249                (
250                    alias_expr.find(exp.AggFunc)
251                    and (
252                        column.find_ancestor(exp.AggFunc)
253                        and not isinstance(column.find_ancestor(exp.Window, exp.Select), exp.Window)
254                    )
255                )
256                if alias_expr
257                else False
258            )
259
260            if table and (not alias_expr or double_agg):
261                column.set("table", table)
262            elif not column.table and alias_expr and not double_agg:
263                if isinstance(alias_expr, exp.Literal) and (literal_index or resolve_table):
264                    if literal_index:
265                        column.replace(exp.Literal.number(i))
266                else:
267                    column = column.replace(exp.paren(alias_expr))
268                    simplified = simplify_parens(column)
269                    if simplified is not column:
270                        column.replace(simplified)
271
272    for i, projection in enumerate(scope.expression.selects):
273        replace_columns(projection)
274
275        if isinstance(projection, exp.Alias):
276            alias_to_expression[projection.alias] = (projection.this, i + 1)
277
278    parent_scope = scope
279    while parent_scope.is_union:
280        parent_scope = parent_scope.parent
281
282    # We shouldn't expand aliases if they match the recursive CTE's columns
283    if parent_scope.is_cte:
284        cte = parent_scope.expression.parent
285        if cte.find_ancestor(exp.With).recursive:
286            for recursive_cte_column in cte.args["alias"].columns or cte.this.selects:
287                alias_to_expression.pop(recursive_cte_column.output_name, None)
288
289    replace_columns(expression.args.get("where"))
290    replace_columns(expression.args.get("group"), literal_index=True)
291    replace_columns(expression.args.get("having"), resolve_table=True)
292    replace_columns(expression.args.get("qualify"), resolve_table=True)
293
294    scope.clear_cache()
295
296
297def _expand_group_by(scope: Scope, dialect: DialectType) -> None:
298    expression = scope.expression
299    group = expression.args.get("group")
300    if not group:
301        return
302
303    group.set("expressions", _expand_positional_references(scope, group.expressions, dialect))
304    expression.set("group", group)
305
306
307def _expand_order_by(scope: Scope, resolver: Resolver) -> None:
308    order = scope.expression.args.get("order")
309    if not order:
310        return
311
312    ordereds = order.expressions
313    for ordered, new_expression in zip(
314        ordereds,
315        _expand_positional_references(
316            scope, (o.this for o in ordereds), resolver.schema.dialect, alias=True
317        ),
318    ):
319        for agg in ordered.find_all(exp.AggFunc):
320            for col in agg.find_all(exp.Column):
321                if not col.table:
322                    col.set("table", resolver.get_table(col.name))
323
324        ordered.set("this", new_expression)
325
326    if scope.expression.args.get("group"):
327        selects = {s.this: exp.column(s.alias_or_name) for s in scope.expression.selects}
328
329        for ordered in ordereds:
330            ordered = ordered.this
331
332            ordered.replace(
333                exp.to_identifier(_select_by_pos(scope, ordered).alias)
334                if ordered.is_int
335                else selects.get(ordered, ordered)
336            )
337
338
339def _expand_positional_references(
340    scope: Scope, expressions: t.Iterable[exp.Expression], dialect: DialectType, alias: bool = False
341) -> t.List[exp.Expression]:
342    new_nodes: t.List[exp.Expression] = []
343    ambiguous_projections = None
344
345    for node in expressions:
346        if node.is_int:
347            select = _select_by_pos(scope, t.cast(exp.Literal, node))
348
349            if alias:
350                new_nodes.append(exp.column(select.args["alias"].copy()))
351            else:
352                select = select.this
353
354                if dialect == "bigquery":
355                    if ambiguous_projections is None:
356                        # When a projection name is also a source name and it is referenced in the
357                        # GROUP BY clause, BQ can't understand what the identifier corresponds to
358                        ambiguous_projections = {
359                            s.alias_or_name
360                            for s in scope.expression.selects
361                            if s.alias_or_name in scope.selected_sources
362                        }
363
364                    ambiguous = any(
365                        column.parts[0].name in ambiguous_projections
366                        for column in select.find_all(exp.Column)
367                    )
368                else:
369                    ambiguous = False
370
371                if (
372                    isinstance(select, exp.CONSTANTS)
373                    or select.find(exp.Explode, exp.Unnest)
374                    or ambiguous
375                ):
376                    new_nodes.append(node)
377                else:
378                    new_nodes.append(select.copy())
379        else:
380            new_nodes.append(node)
381
382    return new_nodes
383
384
385def _select_by_pos(scope: Scope, node: exp.Literal) -> exp.Alias:
386    try:
387        return scope.expression.selects[int(node.this) - 1].assert_is(exp.Alias)
388    except IndexError:
389        raise OptimizeError(f"Unknown output column: {node.name}")
390
391
392def _convert_columns_to_dots(scope: Scope, resolver: Resolver) -> None:
393    """
394    Converts `Column` instances that represent struct field lookup into chained `Dots`.
395
396    Struct field lookups look like columns (e.g. "struct"."field"), but they need to be
397    qualified separately and represented as Dot(Dot(...(<table>.<column>, field1), field2, ...)).
398    """
399    converted = False
400    for column in itertools.chain(scope.columns, scope.stars):
401        if isinstance(column, exp.Dot):
402            continue
403
404        column_table: t.Optional[str | exp.Identifier] = column.table
405        if (
406            column_table
407            and column_table not in scope.sources
408            and (
409                not scope.parent
410                or column_table not in scope.parent.sources
411                or not scope.is_correlated_subquery
412            )
413        ):
414            root, *parts = column.parts
415
416            if root.name in scope.sources:
417                # The struct is already qualified, but we still need to change the AST
418                column_table = root
419                root, *parts = parts
420            else:
421                column_table = resolver.get_table(root.name)
422
423            if column_table:
424                converted = True
425                column.replace(exp.Dot.build([exp.column(root, table=column_table), *parts]))
426
427    if converted:
428        # We want to re-aggregate the converted columns, otherwise they'd be skipped in
429        # a `for column in scope.columns` iteration, even though they shouldn't be
430        scope.clear_cache()
431
432
433def _qualify_columns(scope: Scope, resolver: Resolver) -> None:
434    """Disambiguate columns, ensuring each column specifies a source"""
435    for column in scope.columns:
436        column_table = column.table
437        column_name = column.name
438
439        if column_table and column_table in scope.sources:
440            source_columns = resolver.get_source_columns(column_table)
441            if source_columns and column_name not in source_columns and "*" not in source_columns:
442                raise OptimizeError(f"Unknown column: {column_name}")
443
444        if not column_table:
445            if scope.pivots and not column.find_ancestor(exp.Pivot):
446                # If the column is under the Pivot expression, we need to qualify it
447                # using the name of the pivoted source instead of the pivot's alias
448                column.set("table", exp.to_identifier(scope.pivots[0].alias))
449                continue
450
451            # column_table can be a '' because bigquery unnest has no table alias
452            column_table = resolver.get_table(column_name)
453            if column_table:
454                column.set("table", column_table)
455
456    for pivot in scope.pivots:
457        for column in pivot.find_all(exp.Column):
458            if not column.table and column.name in resolver.all_columns:
459                column_table = resolver.get_table(column.name)
460                if column_table:
461                    column.set("table", column_table)
462
463
464def _expand_struct_stars(
465    expression: exp.Dot,
466) -> t.List[exp.Alias]:
467    """[BigQuery] Expand/Flatten foo.bar.* where bar is a struct column"""
468
469    dot_column = t.cast(exp.Column, expression.find(exp.Column))
470    if not dot_column.is_type(exp.DataType.Type.STRUCT):
471        return []
472
473    # All nested struct values are ColumnDefs, so normalize the first exp.Column in one
474    dot_column = dot_column.copy()
475    starting_struct = exp.ColumnDef(this=dot_column.this, kind=dot_column.type)
476
477    # First part is the table name and last part is the star so they can be dropped
478    dot_parts = expression.parts[1:-1]
479
480    # If we're expanding a nested struct eg. t.c.f1.f2.* find the last struct (f2 in this case)
481    for part in dot_parts[1:]:
482        for field in t.cast(exp.DataType, starting_struct.kind).expressions:
483            # Unable to expand star unless all fields are named
484            if not isinstance(field.this, exp.Identifier):
485                return []
486
487            if field.name == part.name and field.kind.is_type(exp.DataType.Type.STRUCT):
488                starting_struct = field
489                break
490        else:
491            # There is no matching field in the struct
492            return []
493
494    taken_names = set()
495    new_selections = []
496
497    for field in t.cast(exp.DataType, starting_struct.kind).expressions:
498        name = field.name
499
500        # Ambiguous or anonymous fields can't be expanded
501        if name in taken_names or not isinstance(field.this, exp.Identifier):
502            return []
503
504        taken_names.add(name)
505
506        this = field.this.copy()
507        root, *parts = [part.copy() for part in itertools.chain(dot_parts, [this])]
508        new_column = exp.column(
509            t.cast(exp.Identifier, root), table=dot_column.args.get("table"), fields=parts
510        )
511        new_selections.append(alias(new_column, this, copy=False))
512
513    return new_selections
514
515
516def _expand_stars(
517    scope: Scope,
518    resolver: Resolver,
519    using_column_tables: t.Dict[str, t.Any],
520    pseudocolumns: t.Set[str],
521    annotator: TypeAnnotator,
522) -> None:
523    """Expand stars to lists of column selections"""
524
525    new_selections = []
526    except_columns: t.Dict[int, t.Set[str]] = {}
527    replace_columns: t.Dict[int, t.Dict[str, exp.Alias]] = {}
528    rename_columns: t.Dict[int, t.Dict[str, str]] = {}
529
530    coalesced_columns = set()
531    dialect = resolver.schema.dialect
532
533    pivot_output_columns = None
534    pivot_exclude_columns = None
535
536    pivot = t.cast(t.Optional[exp.Pivot], seq_get(scope.pivots, 0))
537    if isinstance(pivot, exp.Pivot) and not pivot.alias_column_names:
538        if pivot.unpivot:
539            pivot_output_columns = [c.output_name for c in _unpivot_columns(pivot)]
540
541            field = pivot.args.get("field")
542            if isinstance(field, exp.In):
543                pivot_exclude_columns = {
544                    c.output_name for e in field.expressions for c in e.find_all(exp.Column)
545                }
546        else:
547            pivot_exclude_columns = set(c.output_name for c in pivot.find_all(exp.Column))
548
549            pivot_output_columns = [c.output_name for c in pivot.args.get("columns", [])]
550            if not pivot_output_columns:
551                pivot_output_columns = [c.alias_or_name for c in pivot.expressions]
552
553    is_bigquery = dialect == "bigquery"
554    if is_bigquery and any(isinstance(col, exp.Dot) for col in scope.stars):
555        # Found struct expansion, annotate scope ahead of time
556        annotator.annotate_scope(scope)
557
558    for expression in scope.expression.selects:
559        tables = []
560        if isinstance(expression, exp.Star):
561            tables.extend(scope.selected_sources)
562            _add_except_columns(expression, tables, except_columns)
563            _add_replace_columns(expression, tables, replace_columns)
564            _add_rename_columns(expression, tables, rename_columns)
565        elif expression.is_star:
566            if not isinstance(expression, exp.Dot):
567                tables.append(expression.table)
568                _add_except_columns(expression.this, tables, except_columns)
569                _add_replace_columns(expression.this, tables, replace_columns)
570                _add_rename_columns(expression.this, tables, rename_columns)
571            elif is_bigquery:
572                struct_fields = _expand_struct_stars(expression)
573                if struct_fields:
574                    new_selections.extend(struct_fields)
575                    continue
576
577        if not tables:
578            new_selections.append(expression)
579            continue
580
581        for table in tables:
582            if table not in scope.sources:
583                raise OptimizeError(f"Unknown table: {table}")
584
585            columns = resolver.get_source_columns(table, only_visible=True)
586            columns = columns or scope.outer_columns
587
588            if pseudocolumns:
589                columns = [name for name in columns if name.upper() not in pseudocolumns]
590
591            if not columns or "*" in columns:
592                return
593
594            table_id = id(table)
595            columns_to_exclude = except_columns.get(table_id) or set()
596            renamed_columns = rename_columns.get(table_id, {})
597            replaced_columns = replace_columns.get(table_id, {})
598
599            if pivot:
600                if pivot_output_columns and pivot_exclude_columns:
601                    pivot_columns = [c for c in columns if c not in pivot_exclude_columns]
602                    pivot_columns.extend(pivot_output_columns)
603                else:
604                    pivot_columns = pivot.alias_column_names
605
606                if pivot_columns:
607                    new_selections.extend(
608                        alias(exp.column(name, table=pivot.alias), name, copy=False)
609                        for name in pivot_columns
610                        if name not in columns_to_exclude
611                    )
612                    continue
613
614            for name in columns:
615                if name in columns_to_exclude or name in coalesced_columns:
616                    continue
617                if name in using_column_tables and table in using_column_tables[name]:
618                    coalesced_columns.add(name)
619                    tables = using_column_tables[name]
620                    coalesce_args = [exp.column(name, table=table) for table in tables]
621
622                    new_selections.append(
623                        alias(exp.func("coalesce", *coalesce_args), alias=name, copy=False)
624                    )
625                else:
626                    alias_ = renamed_columns.get(name, name)
627                    selection_expr = replaced_columns.get(name) or exp.column(name, table=table)
628                    new_selections.append(
629                        alias(selection_expr, alias_, copy=False)
630                        if alias_ != name
631                        else selection_expr
632                    )
633
634    # Ensures we don't overwrite the initial selections with an empty list
635    if new_selections and isinstance(scope.expression, exp.Select):
636        scope.expression.set("expressions", new_selections)
637
638
639def _add_except_columns(
640    expression: exp.Expression, tables, except_columns: t.Dict[int, t.Set[str]]
641) -> None:
642    except_ = expression.args.get("except")
643
644    if not except_:
645        return
646
647    columns = {e.name for e in except_}
648
649    for table in tables:
650        except_columns[id(table)] = columns
651
652
653def _add_rename_columns(
654    expression: exp.Expression, tables, rename_columns: t.Dict[int, t.Dict[str, str]]
655) -> None:
656    rename = expression.args.get("rename")
657
658    if not rename:
659        return
660
661    columns = {e.this.name: e.alias for e in rename}
662
663    for table in tables:
664        rename_columns[id(table)] = columns
665
666
667def _add_replace_columns(
668    expression: exp.Expression, tables, replace_columns: t.Dict[int, t.Dict[str, exp.Alias]]
669) -> None:
670    replace = expression.args.get("replace")
671
672    if not replace:
673        return
674
675    columns = {e.alias: e for e in replace}
676
677    for table in tables:
678        replace_columns[id(table)] = columns
679
680
681def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
682    """Ensure all output columns are aliased"""
683    if isinstance(scope_or_expression, exp.Expression):
684        scope = build_scope(scope_or_expression)
685        if not isinstance(scope, Scope):
686            return
687    else:
688        scope = scope_or_expression
689
690    new_selections = []
691    for i, (selection, aliased_column) in enumerate(
692        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
693    ):
694        if selection is None:
695            break
696
697        if isinstance(selection, exp.Subquery):
698            if not selection.output_name:
699                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
700        elif not isinstance(selection, exp.Alias) and not selection.is_star:
701            selection = alias(
702                selection,
703                alias=selection.output_name or f"_col_{i}",
704                copy=False,
705            )
706        if aliased_column:
707            selection.set("alias", exp.to_identifier(aliased_column))
708
709        new_selections.append(selection)
710
711    if isinstance(scope.expression, exp.Select):
712        scope.expression.set("expressions", new_selections)
713
714
715def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
716    """Makes sure all identifiers that need to be quoted are quoted."""
717    return expression.transform(
718        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
719    )  # type: ignore
720
721
722def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
723    """
724    Pushes down the CTE alias columns into the projection,
725
726    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
727
728    Example:
729        >>> import sqlglot
730        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
731        >>> pushdown_cte_alias_columns(expression).sql()
732        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
733
734    Args:
735        expression: Expression to pushdown.
736
737    Returns:
738        The expression with the CTE aliases pushed down into the projection.
739    """
740    for cte in expression.find_all(exp.CTE):
741        if cte.alias_column_names:
742            new_expressions = []
743            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
744                if isinstance(projection, exp.Alias):
745                    projection.set("alias", _alias)
746                else:
747                    projection = alias(projection, alias=_alias)
748                new_expressions.append(projection)
749            cte.this.set("expressions", new_expressions)
750
751    return expression
752
753
754class Resolver:
755    """
756    Helper for resolving columns.
757
758    This is a class so we can lazily load some things and easily share them across functions.
759    """
760
761    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
762        self.scope = scope
763        self.schema = schema
764        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
765        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
766        self._all_columns: t.Optional[t.Set[str]] = None
767        self._infer_schema = infer_schema
768        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
769
770    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
771        """
772        Get the table for a column name.
773
774        Args:
775            column_name: The column name to find the table for.
776        Returns:
777            The table name if it can be found/inferred.
778        """
779        if self._unambiguous_columns is None:
780            self._unambiguous_columns = self._get_unambiguous_columns(
781                self._get_all_source_columns()
782            )
783
784        table_name = self._unambiguous_columns.get(column_name)
785
786        if not table_name and self._infer_schema:
787            sources_without_schema = tuple(
788                source
789                for source, columns in self._get_all_source_columns().items()
790                if not columns or "*" in columns
791            )
792            if len(sources_without_schema) == 1:
793                table_name = sources_without_schema[0]
794
795        if table_name not in self.scope.selected_sources:
796            return exp.to_identifier(table_name)
797
798        node, _ = self.scope.selected_sources.get(table_name)
799
800        if isinstance(node, exp.Query):
801            while node and node.alias != table_name:
802                node = node.parent
803
804        node_alias = node.args.get("alias")
805        if node_alias:
806            return exp.to_identifier(node_alias.this)
807
808        return exp.to_identifier(table_name)
809
810    @property
811    def all_columns(self) -> t.Set[str]:
812        """All available columns of all sources in this scope"""
813        if self._all_columns is None:
814            self._all_columns = {
815                column for columns in self._get_all_source_columns().values() for column in columns
816            }
817        return self._all_columns
818
819    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
820        """Resolve the source columns for a given source `name`."""
821        cache_key = (name, only_visible)
822        if cache_key not in self._get_source_columns_cache:
823            if name not in self.scope.sources:
824                raise OptimizeError(f"Unknown table: {name}")
825
826            source = self.scope.sources[name]
827
828            if isinstance(source, exp.Table):
829                columns = self.schema.column_names(source, only_visible)
830            elif isinstance(source, Scope) and isinstance(
831                source.expression, (exp.Values, exp.Unnest)
832            ):
833                columns = source.expression.named_selects
834
835                # in bigquery, unnest structs are automatically scoped as tables, so you can
836                # directly select a struct field in a query.
837                # this handles the case where the unnest is statically defined.
838                if self.schema.dialect == "bigquery":
839                    if source.expression.is_type(exp.DataType.Type.STRUCT):
840                        for k in source.expression.type.expressions:  # type: ignore
841                            columns.append(k.name)
842            else:
843                columns = source.expression.named_selects
844
845            node, _ = self.scope.selected_sources.get(name) or (None, None)
846            if isinstance(node, Scope):
847                column_aliases = node.expression.alias_column_names
848            elif isinstance(node, exp.Expression):
849                column_aliases = node.alias_column_names
850            else:
851                column_aliases = []
852
853            if column_aliases:
854                # If the source's columns are aliased, their aliases shadow the corresponding column names.
855                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
856                columns = [
857                    alias or name
858                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
859                ]
860
861            self._get_source_columns_cache[cache_key] = columns
862
863        return self._get_source_columns_cache[cache_key]
864
865    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
866        if self._source_columns is None:
867            self._source_columns = {
868                source_name: self.get_source_columns(source_name)
869                for source_name, source in itertools.chain(
870                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
871                )
872            }
873        return self._source_columns
874
875    def _get_unambiguous_columns(
876        self, source_columns: t.Dict[str, t.Sequence[str]]
877    ) -> t.Mapping[str, str]:
878        """
879        Find all the unambiguous columns in sources.
880
881        Args:
882            source_columns: Mapping of names to source columns.
883
884        Returns:
885            Mapping of column name to source name.
886        """
887        if not source_columns:
888            return {}
889
890        source_columns_pairs = list(source_columns.items())
891
892        first_table, first_columns = source_columns_pairs[0]
893
894        if len(source_columns_pairs) == 1:
895            # Performance optimization - avoid copying first_columns if there is only one table.
896            return SingleValuedMapping(first_columns, first_table)
897
898        unambiguous_columns = {col: first_table for col in first_columns}
899        all_columns = set(unambiguous_columns)
900
901        for table, columns in source_columns_pairs[1:]:
902            unique = set(columns)
903            ambiguous = all_columns.intersection(unique)
904            all_columns.update(columns)
905
906            for column in ambiguous:
907                unambiguous_columns.pop(column, None)
908            for column in unique.difference(ambiguous):
909                unambiguous_columns[column] = table
910
911        return unambiguous_columns
def qualify_columns( expression: sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema], expand_alias_refs: bool = True, expand_stars: bool = True, infer_schema: Optional[bool] = None) -> sqlglot.expressions.Expression:
20def qualify_columns(
21    expression: exp.Expression,
22    schema: t.Dict | Schema,
23    expand_alias_refs: bool = True,
24    expand_stars: bool = True,
25    infer_schema: t.Optional[bool] = None,
26) -> exp.Expression:
27    """
28    Rewrite sqlglot AST to have fully qualified columns.
29
30    Example:
31        >>> import sqlglot
32        >>> schema = {"tbl": {"col": "INT"}}
33        >>> expression = sqlglot.parse_one("SELECT col FROM tbl")
34        >>> qualify_columns(expression, schema).sql()
35        'SELECT tbl.col AS col FROM tbl'
36
37    Args:
38        expression: Expression to qualify.
39        schema: Database schema.
40        expand_alias_refs: Whether to expand references to aliases.
41        expand_stars: Whether to expand star queries. This is a necessary step
42            for most of the optimizer's rules to work; do not set to False unless you
43            know what you're doing!
44        infer_schema: Whether to infer the schema if missing.
45
46    Returns:
47        The qualified expression.
48
49    Notes:
50        - Currently only handles a single PIVOT or UNPIVOT operator
51    """
52    schema = ensure_schema(schema)
53    annotator = TypeAnnotator(schema)
54    infer_schema = schema.empty if infer_schema is None else infer_schema
55    dialect = Dialect.get_or_raise(schema.dialect)
56    pseudocolumns = dialect.PSEUDOCOLUMNS
57
58    for scope in traverse_scope(expression):
59        resolver = Resolver(scope, schema, infer_schema=infer_schema)
60        _pop_table_column_aliases(scope.ctes)
61        _pop_table_column_aliases(scope.derived_tables)
62        using_column_tables = _expand_using(scope, resolver)
63
64        if (schema.empty or dialect.FORCE_EARLY_ALIAS_REF_EXPANSION) and expand_alias_refs:
65            _expand_alias_refs(
66                scope,
67                resolver,
68                expand_only_groupby=dialect.EXPAND_ALIAS_REFS_EARLY_ONLY_IN_GROUP_BY,
69            )
70
71        _convert_columns_to_dots(scope, resolver)
72        _qualify_columns(scope, resolver)
73
74        if not schema.empty and expand_alias_refs:
75            _expand_alias_refs(scope, resolver)
76
77        if not isinstance(scope.expression, exp.UDTF):
78            if expand_stars:
79                _expand_stars(
80                    scope,
81                    resolver,
82                    using_column_tables,
83                    pseudocolumns,
84                    annotator,
85                )
86            qualify_outputs(scope)
87
88        _expand_group_by(scope, dialect)
89        _expand_order_by(scope, resolver)
90
91        if dialect == "bigquery":
92            annotator.annotate_scope(scope)
93
94    return expression

Rewrite sqlglot AST to have fully qualified columns.

Example:
>>> import sqlglot
>>> schema = {"tbl": {"col": "INT"}}
>>> expression = sqlglot.parse_one("SELECT col FROM tbl")
>>> qualify_columns(expression, schema).sql()
'SELECT tbl.col AS col FROM tbl'
Arguments:
  • expression: Expression to qualify.
  • schema: Database schema.
  • expand_alias_refs: Whether to expand references to aliases.
  • expand_stars: Whether to expand star queries. This is a necessary step for most of the optimizer's rules to work; do not set to False unless you know what you're doing!
  • infer_schema: Whether to infer the schema if missing.
Returns:

The qualified expression.

Notes:
  • Currently only handles a single PIVOT or UNPIVOT operator
def validate_qualify_columns(expression: ~E) -> ~E:
 97def validate_qualify_columns(expression: E) -> E:
 98    """Raise an `OptimizeError` if any columns aren't qualified"""
 99    all_unqualified_columns = []
100    for scope in traverse_scope(expression):
101        if isinstance(scope.expression, exp.Select):
102            unqualified_columns = scope.unqualified_columns
103
104            if scope.external_columns and not scope.is_correlated_subquery and not scope.pivots:
105                column = scope.external_columns[0]
106                for_table = f" for table: '{column.table}'" if column.table else ""
107                raise OptimizeError(f"Column '{column}' could not be resolved{for_table}")
108
109            if unqualified_columns and scope.pivots and scope.pivots[0].unpivot:
110                # New columns produced by the UNPIVOT can't be qualified, but there may be columns
111                # under the UNPIVOT's IN clause that can and should be qualified. We recompute
112                # this list here to ensure those in the former category will be excluded.
113                unpivot_columns = set(_unpivot_columns(scope.pivots[0]))
114                unqualified_columns = [c for c in unqualified_columns if c not in unpivot_columns]
115
116            all_unqualified_columns.extend(unqualified_columns)
117
118    if all_unqualified_columns:
119        raise OptimizeError(f"Ambiguous columns: {all_unqualified_columns}")
120
121    return expression

Raise an OptimizeError if any columns aren't qualified

def qualify_outputs( scope_or_expression: sqlglot.optimizer.scope.Scope | sqlglot.expressions.Expression) -> None:
682def qualify_outputs(scope_or_expression: Scope | exp.Expression) -> None:
683    """Ensure all output columns are aliased"""
684    if isinstance(scope_or_expression, exp.Expression):
685        scope = build_scope(scope_or_expression)
686        if not isinstance(scope, Scope):
687            return
688    else:
689        scope = scope_or_expression
690
691    new_selections = []
692    for i, (selection, aliased_column) in enumerate(
693        itertools.zip_longest(scope.expression.selects, scope.outer_columns)
694    ):
695        if selection is None:
696            break
697
698        if isinstance(selection, exp.Subquery):
699            if not selection.output_name:
700                selection.set("alias", exp.TableAlias(this=exp.to_identifier(f"_col_{i}")))
701        elif not isinstance(selection, exp.Alias) and not selection.is_star:
702            selection = alias(
703                selection,
704                alias=selection.output_name or f"_col_{i}",
705                copy=False,
706            )
707        if aliased_column:
708            selection.set("alias", exp.to_identifier(aliased_column))
709
710        new_selections.append(selection)
711
712    if isinstance(scope.expression, exp.Select):
713        scope.expression.set("expressions", new_selections)

Ensure all output columns are aliased

def quote_identifiers( expression: ~E, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, identify: bool = True) -> ~E:
716def quote_identifiers(expression: E, dialect: DialectType = None, identify: bool = True) -> E:
717    """Makes sure all identifiers that need to be quoted are quoted."""
718    return expression.transform(
719        Dialect.get_or_raise(dialect).quote_identifier, identify=identify, copy=False
720    )  # type: ignore

Makes sure all identifiers that need to be quoted are quoted.

def pushdown_cte_alias_columns( expression: sqlglot.expressions.Expression) -> sqlglot.expressions.Expression:
723def pushdown_cte_alias_columns(expression: exp.Expression) -> exp.Expression:
724    """
725    Pushes down the CTE alias columns into the projection,
726
727    This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.
728
729    Example:
730        >>> import sqlglot
731        >>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
732        >>> pushdown_cte_alias_columns(expression).sql()
733        'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
734
735    Args:
736        expression: Expression to pushdown.
737
738    Returns:
739        The expression with the CTE aliases pushed down into the projection.
740    """
741    for cte in expression.find_all(exp.CTE):
742        if cte.alias_column_names:
743            new_expressions = []
744            for _alias, projection in zip(cte.alias_column_names, cte.this.expressions):
745                if isinstance(projection, exp.Alias):
746                    projection.set("alias", _alias)
747                else:
748                    projection = alias(projection, alias=_alias)
749                new_expressions.append(projection)
750            cte.this.set("expressions", new_expressions)
751
752    return expression

Pushes down the CTE alias columns into the projection,

This step is useful in Snowflake where the CTE alias columns can be referenced in the HAVING.

Example:
>>> import sqlglot
>>> expression = sqlglot.parse_one("WITH y (c) AS (SELECT SUM(a) FROM ( SELECT 1 a ) AS x HAVING c > 0) SELECT c FROM y")
>>> pushdown_cte_alias_columns(expression).sql()
'WITH y(c) AS (SELECT SUM(a) AS c FROM (SELECT 1 AS a) AS x HAVING c > 0) SELECT c FROM y'
Arguments:
  • expression: Expression to pushdown.
Returns:

The expression with the CTE aliases pushed down into the projection.

class Resolver:
755class Resolver:
756    """
757    Helper for resolving columns.
758
759    This is a class so we can lazily load some things and easily share them across functions.
760    """
761
762    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
763        self.scope = scope
764        self.schema = schema
765        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
766        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
767        self._all_columns: t.Optional[t.Set[str]] = None
768        self._infer_schema = infer_schema
769        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
770
771    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
772        """
773        Get the table for a column name.
774
775        Args:
776            column_name: The column name to find the table for.
777        Returns:
778            The table name if it can be found/inferred.
779        """
780        if self._unambiguous_columns is None:
781            self._unambiguous_columns = self._get_unambiguous_columns(
782                self._get_all_source_columns()
783            )
784
785        table_name = self._unambiguous_columns.get(column_name)
786
787        if not table_name and self._infer_schema:
788            sources_without_schema = tuple(
789                source
790                for source, columns in self._get_all_source_columns().items()
791                if not columns or "*" in columns
792            )
793            if len(sources_without_schema) == 1:
794                table_name = sources_without_schema[0]
795
796        if table_name not in self.scope.selected_sources:
797            return exp.to_identifier(table_name)
798
799        node, _ = self.scope.selected_sources.get(table_name)
800
801        if isinstance(node, exp.Query):
802            while node and node.alias != table_name:
803                node = node.parent
804
805        node_alias = node.args.get("alias")
806        if node_alias:
807            return exp.to_identifier(node_alias.this)
808
809        return exp.to_identifier(table_name)
810
811    @property
812    def all_columns(self) -> t.Set[str]:
813        """All available columns of all sources in this scope"""
814        if self._all_columns is None:
815            self._all_columns = {
816                column for columns in self._get_all_source_columns().values() for column in columns
817            }
818        return self._all_columns
819
820    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
821        """Resolve the source columns for a given source `name`."""
822        cache_key = (name, only_visible)
823        if cache_key not in self._get_source_columns_cache:
824            if name not in self.scope.sources:
825                raise OptimizeError(f"Unknown table: {name}")
826
827            source = self.scope.sources[name]
828
829            if isinstance(source, exp.Table):
830                columns = self.schema.column_names(source, only_visible)
831            elif isinstance(source, Scope) and isinstance(
832                source.expression, (exp.Values, exp.Unnest)
833            ):
834                columns = source.expression.named_selects
835
836                # in bigquery, unnest structs are automatically scoped as tables, so you can
837                # directly select a struct field in a query.
838                # this handles the case where the unnest is statically defined.
839                if self.schema.dialect == "bigquery":
840                    if source.expression.is_type(exp.DataType.Type.STRUCT):
841                        for k in source.expression.type.expressions:  # type: ignore
842                            columns.append(k.name)
843            else:
844                columns = source.expression.named_selects
845
846            node, _ = self.scope.selected_sources.get(name) or (None, None)
847            if isinstance(node, Scope):
848                column_aliases = node.expression.alias_column_names
849            elif isinstance(node, exp.Expression):
850                column_aliases = node.alias_column_names
851            else:
852                column_aliases = []
853
854            if column_aliases:
855                # If the source's columns are aliased, their aliases shadow the corresponding column names.
856                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
857                columns = [
858                    alias or name
859                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
860                ]
861
862            self._get_source_columns_cache[cache_key] = columns
863
864        return self._get_source_columns_cache[cache_key]
865
866    def _get_all_source_columns(self) -> t.Dict[str, t.Sequence[str]]:
867        if self._source_columns is None:
868            self._source_columns = {
869                source_name: self.get_source_columns(source_name)
870                for source_name, source in itertools.chain(
871                    self.scope.selected_sources.items(), self.scope.lateral_sources.items()
872                )
873            }
874        return self._source_columns
875
876    def _get_unambiguous_columns(
877        self, source_columns: t.Dict[str, t.Sequence[str]]
878    ) -> t.Mapping[str, str]:
879        """
880        Find all the unambiguous columns in sources.
881
882        Args:
883            source_columns: Mapping of names to source columns.
884
885        Returns:
886            Mapping of column name to source name.
887        """
888        if not source_columns:
889            return {}
890
891        source_columns_pairs = list(source_columns.items())
892
893        first_table, first_columns = source_columns_pairs[0]
894
895        if len(source_columns_pairs) == 1:
896            # Performance optimization - avoid copying first_columns if there is only one table.
897            return SingleValuedMapping(first_columns, first_table)
898
899        unambiguous_columns = {col: first_table for col in first_columns}
900        all_columns = set(unambiguous_columns)
901
902        for table, columns in source_columns_pairs[1:]:
903            unique = set(columns)
904            ambiguous = all_columns.intersection(unique)
905            all_columns.update(columns)
906
907            for column in ambiguous:
908                unambiguous_columns.pop(column, None)
909            for column in unique.difference(ambiguous):
910                unambiguous_columns[column] = table
911
912        return unambiguous_columns

Helper for resolving columns.

This is a class so we can lazily load some things and easily share them across functions.

Resolver( scope: sqlglot.optimizer.scope.Scope, schema: sqlglot.schema.Schema, infer_schema: bool = True)
762    def __init__(self, scope: Scope, schema: Schema, infer_schema: bool = True):
763        self.scope = scope
764        self.schema = schema
765        self._source_columns: t.Optional[t.Dict[str, t.Sequence[str]]] = None
766        self._unambiguous_columns: t.Optional[t.Mapping[str, str]] = None
767        self._all_columns: t.Optional[t.Set[str]] = None
768        self._infer_schema = infer_schema
769        self._get_source_columns_cache: t.Dict[t.Tuple[str, bool], t.Sequence[str]] = {}
scope
schema
def get_table(self, column_name: str) -> Optional[sqlglot.expressions.Identifier]:
771    def get_table(self, column_name: str) -> t.Optional[exp.Identifier]:
772        """
773        Get the table for a column name.
774
775        Args:
776            column_name: The column name to find the table for.
777        Returns:
778            The table name if it can be found/inferred.
779        """
780        if self._unambiguous_columns is None:
781            self._unambiguous_columns = self._get_unambiguous_columns(
782                self._get_all_source_columns()
783            )
784
785        table_name = self._unambiguous_columns.get(column_name)
786
787        if not table_name and self._infer_schema:
788            sources_without_schema = tuple(
789                source
790                for source, columns in self._get_all_source_columns().items()
791                if not columns or "*" in columns
792            )
793            if len(sources_without_schema) == 1:
794                table_name = sources_without_schema[0]
795
796        if table_name not in self.scope.selected_sources:
797            return exp.to_identifier(table_name)
798
799        node, _ = self.scope.selected_sources.get(table_name)
800
801        if isinstance(node, exp.Query):
802            while node and node.alias != table_name:
803                node = node.parent
804
805        node_alias = node.args.get("alias")
806        if node_alias:
807            return exp.to_identifier(node_alias.this)
808
809        return exp.to_identifier(table_name)

Get the table for a column name.

Arguments:
  • column_name: The column name to find the table for.
Returns:

The table name if it can be found/inferred.

all_columns: Set[str]
811    @property
812    def all_columns(self) -> t.Set[str]:
813        """All available columns of all sources in this scope"""
814        if self._all_columns is None:
815            self._all_columns = {
816                column for columns in self._get_all_source_columns().values() for column in columns
817            }
818        return self._all_columns

All available columns of all sources in this scope

def get_source_columns(self, name: str, only_visible: bool = False) -> Sequence[str]:
820    def get_source_columns(self, name: str, only_visible: bool = False) -> t.Sequence[str]:
821        """Resolve the source columns for a given source `name`."""
822        cache_key = (name, only_visible)
823        if cache_key not in self._get_source_columns_cache:
824            if name not in self.scope.sources:
825                raise OptimizeError(f"Unknown table: {name}")
826
827            source = self.scope.sources[name]
828
829            if isinstance(source, exp.Table):
830                columns = self.schema.column_names(source, only_visible)
831            elif isinstance(source, Scope) and isinstance(
832                source.expression, (exp.Values, exp.Unnest)
833            ):
834                columns = source.expression.named_selects
835
836                # in bigquery, unnest structs are automatically scoped as tables, so you can
837                # directly select a struct field in a query.
838                # this handles the case where the unnest is statically defined.
839                if self.schema.dialect == "bigquery":
840                    if source.expression.is_type(exp.DataType.Type.STRUCT):
841                        for k in source.expression.type.expressions:  # type: ignore
842                            columns.append(k.name)
843            else:
844                columns = source.expression.named_selects
845
846            node, _ = self.scope.selected_sources.get(name) or (None, None)
847            if isinstance(node, Scope):
848                column_aliases = node.expression.alias_column_names
849            elif isinstance(node, exp.Expression):
850                column_aliases = node.alias_column_names
851            else:
852                column_aliases = []
853
854            if column_aliases:
855                # If the source's columns are aliased, their aliases shadow the corresponding column names.
856                # This can be expensive if there are lots of columns, so only do this if column_aliases exist.
857                columns = [
858                    alias or name
859                    for (name, alias) in itertools.zip_longest(columns, column_aliases)
860                ]
861
862            self._get_source_columns_cache[cache_key] = columns
863
864        return self._get_source_columns_cache[cache_key]

Resolve the source columns for a given source name.