1
0
Fork 0

Merging upstream version 23.7.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-13 21:30:28 +01:00
parent ebba7c6a18
commit d26905e4af
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
187 changed files with 86502 additions and 71397 deletions

View file

@ -28,10 +28,7 @@ class Node:
yield self
for d in self.downstream:
if isinstance(d, Node):
yield from d.walk()
else:
yield d
yield from d.walk()
def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
nodes = {}
@ -71,8 +68,10 @@ def lineage(
column: str | exp.Column,
sql: str | exp.Expression,
schema: t.Optional[t.Dict | Schema] = None,
sources: t.Optional[t.Dict[str, str | exp.Query]] = None,
sources: t.Optional[t.Mapping[str, str | exp.Query]] = None,
dialect: DialectType = None,
scope: t.Optional[Scope] = None,
trim_selects: bool = True,
**kwargs,
) -> Node:
"""Build the lineage graph for a column of a SQL query.
@ -83,6 +82,8 @@ def lineage(
schema: The schema of tables.
sources: A mapping of queries which will be used to continue building lineage.
dialect: The dialect of input SQL.
scope: A pre-created scope to use instead.
trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
**kwargs: Qualification optimizer kwargs.
Returns:
@ -99,14 +100,15 @@ def lineage(
dialect=dialect,
)
qualified = qualify.qualify(
expression,
dialect=dialect,
schema=schema,
**{"validate_qualify_columns": False, "identify": False, **kwargs}, # type: ignore
)
if not scope:
expression = qualify.qualify(
expression,
dialect=dialect,
schema=schema,
**{"validate_qualify_columns": False, "identify": False, **kwargs}, # type: ignore
)
scope = build_scope(qualified)
scope = build_scope(expression)
if not scope:
raise SqlglotError("Cannot build lineage, sql must be SELECT")
@ -114,7 +116,7 @@ def lineage(
if not any(select.alias_or_name == column for select in scope.expression.selects):
raise SqlglotError(f"Cannot find column '{column}' in query.")
return to_node(column, scope, dialect)
return to_node(column, scope, dialect, trim_selects=trim_selects)
def to_node(
@ -125,6 +127,7 @@ def to_node(
upstream: t.Optional[Node] = None,
source_name: t.Optional[str] = None,
reference_node_name: t.Optional[str] = None,
trim_selects: bool = True,
) -> Node:
source_names = {
dt.alias: dt.comments[0].split()[1]
@ -143,6 +146,17 @@ def to_node(
)
)
if isinstance(scope.expression, exp.Subquery):
for source in scope.subquery_scopes:
return to_node(
column,
scope=source,
dialect=dialect,
upstream=upstream,
source_name=source_name,
reference_node_name=reference_node_name,
trim_selects=trim_selects,
)
if isinstance(scope.expression, exp.Union):
upstream = upstream or Node(name="UNION", source=scope.expression, expression=select)
@ -170,11 +184,12 @@ def to_node(
upstream=upstream,
source_name=source_name,
reference_node_name=reference_node_name,
trim_selects=trim_selects,
)
return upstream
if isinstance(scope.expression, exp.Select):
if trim_selects and isinstance(scope.expression, exp.Select):
# For better ergonomics in our node labels, replace the full select with
# a version that has only the column we care about.
# "x", SELECT x, y FROM foo
@ -206,7 +221,13 @@ def to_node(
continue
for name in subquery.named_selects:
to_node(name, scope=subquery_scope, dialect=dialect, upstream=node)
to_node(
name,
scope=subquery_scope,
dialect=dialect,
upstream=node,
trim_selects=trim_selects,
)
# if the select is a star add all scope sources as downstreams
if select.is_star:
@ -237,6 +258,7 @@ def to_node(
upstream=node,
source_name=source_names.get(table) or source_name,
reference_node_name=selected_node.name if selected_node else None,
trim_selects=trim_selects,
)
else:
# The source is not a scope - we've reached the end of the line. At this point, if a source is not found