sqlglot.planner
1from __future__ import annotations 2 3import math 4import typing as t 5 6from sqlglot import alias, exp 7from sqlglot.helper import name_sequence 8from sqlglot.optimizer.eliminate_joins import join_condition 9 10 11class Plan: 12 def __init__(self, expression: exp.Expression) -> None: 13 self.expression = expression.copy() 14 self.root = Step.from_expression(self.expression) 15 self._dag: t.Dict[Step, t.Set[Step]] = {} 16 17 @property 18 def dag(self) -> t.Dict[Step, t.Set[Step]]: 19 if not self._dag: 20 dag: t.Dict[Step, t.Set[Step]] = {} 21 nodes = {self.root} 22 23 while nodes: 24 node = nodes.pop() 25 dag[node] = set() 26 for dep in node.dependencies: 27 dag[node].add(dep) 28 nodes.add(dep) 29 self._dag = dag 30 31 return self._dag 32 33 @property 34 def leaves(self) -> t.Iterator[Step]: 35 return (node for node, deps in self.dag.items() if not deps) 36 37 def __repr__(self) -> str: 38 return f"Plan\n----\n{repr(self.root)}" 39 40 41class Step: 42 @classmethod 43 def from_expression( 44 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 45 ) -> Step: 46 """ 47 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 48 Note: the expression's tables and subqueries must be aliased for this method to work. For 49 example, given the following expression: 50 51 SELECT 52 x.a, 53 SUM(x.b) 54 FROM x AS x 55 JOIN y AS y 56 ON x.a = y.a 57 GROUP BY x.a 58 59 the following DAG is produced (the expression IDs might differ per execution): 60 61 - Aggregate: x (4347984624) 62 Context: 63 Aggregations: 64 - SUM(x.b) 65 Group: 66 - x.a 67 Projections: 68 - x.a 69 - "x"."" 70 Dependencies: 71 - Join: x (4347985296) 72 Context: 73 y: 74 On: x.a = y.a 75 Projections: 76 Dependencies: 77 - Scan: x (4347983136) 78 Context: 79 Source: x AS x 80 Projections: 81 - Scan: y (4343416624) 82 Context: 83 Source: y AS y 84 Projections: 85 86 Args: 87 expression: the expression to build the DAG from. 88 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 89 90 Returns: 91 A Step DAG corresponding to `expression`. 92 """ 93 ctes = ctes or {} 94 expression = expression.unnest() 95 with_ = expression.args.get("with") 96 97 # CTEs break the mold of scope and introduce themselves to all in the context. 98 if with_: 99 ctes = ctes.copy() 100 for cte in with_.expressions: 101 step = Step.from_expression(cte.this, ctes) 102 step.name = cte.alias 103 ctes[step.name] = step # type: ignore 104 105 from_ = expression.args.get("from") 106 107 if isinstance(expression, exp.Select) and from_: 108 step = Scan.from_expression(from_.this, ctes) 109 elif isinstance(expression, exp.Union): 110 step = SetOperation.from_expression(expression, ctes) 111 else: 112 step = Scan() 113 114 joins = expression.args.get("joins") 115 116 if joins: 117 join = Join.from_joins(joins, ctes) 118 join.name = step.name 119 join.add_dependency(step) 120 step = join 121 122 projections = [] # final selects in this chain of steps representing a select 123 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 124 aggregations = set() 125 next_operand_name = name_sequence("_a_") 126 127 def extract_agg_operands(expression): 128 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 129 if agg_funcs: 130 aggregations.add(expression) 131 for agg in agg_funcs: 132 for operand in agg.unnest_operands(): 133 if isinstance(operand, exp.Column): 134 continue 135 if operand not in operands: 136 operands[operand] = next_operand_name() 137 operand.replace(exp.column(operands[operand], quoted=True)) 138 return bool(agg_funcs) 139 140 for e in expression.expressions: 141 if e.find(exp.AggFunc): 142 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 143 extract_agg_operands(e) 144 else: 145 projections.append(e) 146 147 where = expression.args.get("where") 148 149 if where: 150 step.condition = where.this 151 152 group = expression.args.get("group") 153 154 if group or aggregations: 155 aggregate = Aggregate() 156 aggregate.source = step.name 157 aggregate.name = step.name 158 159 having = expression.args.get("having") 160 161 if having: 162 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 163 aggregate.condition = exp.column("_h", step.name, quoted=True) 164 else: 165 aggregate.condition = having.this 166 167 aggregate.operands = tuple( 168 alias(operand, alias_) for operand, alias_ in operands.items() 169 ) 170 aggregate.aggregations = list(aggregations) 171 172 # give aggregates names and replace projections with references to them 173 aggregate.group = { 174 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 175 } 176 177 intermediate: t.Dict[str | exp.Expression, str] = {} 178 for k, v in aggregate.group.items(): 179 intermediate[v] = k 180 if isinstance(v, exp.Column): 181 intermediate[v.alias_or_name] = k 182 183 for projection in projections: 184 for node, *_ in projection.walk(): 185 name = intermediate.get(node) 186 if name: 187 node.replace(exp.column(name, step.name)) 188 if aggregate.condition: 189 for node, *_ in aggregate.condition.walk(): 190 name = intermediate.get(node) or intermediate.get(node.name) 191 if name: 192 node.replace(exp.column(name, step.name)) 193 194 aggregate.add_dependency(step) 195 step = aggregate 196 197 order = expression.args.get("order") 198 199 if order: 200 sort = Sort() 201 sort.name = step.name 202 sort.key = order.expressions 203 sort.add_dependency(step) 204 step = sort 205 206 step.projections = projections 207 208 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 209 distinct = Aggregate() 210 distinct.source = step.name 211 distinct.name = step.name 212 distinct.group = { 213 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 214 for e in projections or expression.expressions 215 } 216 distinct.add_dependency(step) 217 step = distinct 218 219 limit = expression.args.get("limit") 220 221 if limit: 222 step.limit = int(limit.text("expression")) 223 224 return step 225 226 def __init__(self) -> None: 227 self.name: t.Optional[str] = None 228 self.dependencies: t.Set[Step] = set() 229 self.dependents: t.Set[Step] = set() 230 self.projections: t.Sequence[exp.Expression] = [] 231 self.limit: float = math.inf 232 self.condition: t.Optional[exp.Expression] = None 233 234 def add_dependency(self, dependency: Step) -> None: 235 self.dependencies.add(dependency) 236 dependency.dependents.add(self) 237 238 def __repr__(self) -> str: 239 return self.to_s() 240 241 def to_s(self, level: int = 0) -> str: 242 indent = " " * level 243 nested = f"{indent} " 244 245 context = self._to_s(f"{nested} ") 246 247 if context: 248 context = [f"{nested}Context:"] + context 249 250 lines = [ 251 f"{indent}- {self.id}", 252 *context, 253 f"{nested}Projections:", 254 ] 255 256 for expression in self.projections: 257 lines.append(f"{nested} - {expression.sql()}") 258 259 if self.condition: 260 lines.append(f"{nested}Condition: {self.condition.sql()}") 261 262 if self.limit is not math.inf: 263 lines.append(f"{nested}Limit: {self.limit}") 264 265 if self.dependencies: 266 lines.append(f"{nested}Dependencies:") 267 for dependency in self.dependencies: 268 lines.append(" " + dependency.to_s(level + 1)) 269 270 return "\n".join(lines) 271 272 @property 273 def type_name(self) -> str: 274 return self.__class__.__name__ 275 276 @property 277 def id(self) -> str: 278 name = self.name 279 name = f" {name}" if name else "" 280 return f"{self.type_name}:{name} ({id(self)})" 281 282 def _to_s(self, _indent: str) -> t.List[str]: 283 return [] 284 285 286class Scan(Step): 287 @classmethod 288 def from_expression( 289 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 290 ) -> Step: 291 table = expression 292 alias_ = expression.alias_or_name 293 294 if isinstance(expression, exp.Subquery): 295 table = expression.this 296 step = Step.from_expression(table, ctes) 297 step.name = alias_ 298 return step 299 300 step = Scan() 301 step.name = alias_ 302 step.source = expression 303 if ctes and table.name in ctes: 304 step.add_dependency(ctes[table.name]) 305 306 return step 307 308 def __init__(self) -> None: 309 super().__init__() 310 self.source: t.Optional[exp.Expression] = None 311 312 def _to_s(self, indent: str) -> t.List[str]: 313 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore 314 315 316class Join(Step): 317 @classmethod 318 def from_joins( 319 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 320 ) -> Step: 321 step = Join() 322 323 for join in joins: 324 source_key, join_key, condition = join_condition(join) 325 step.joins[join.alias_or_name] = { 326 "side": join.side, # type: ignore 327 "join_key": join_key, 328 "source_key": source_key, 329 "condition": condition, 330 } 331 332 step.add_dependency(Scan.from_expression(join.this, ctes)) 333 334 return step 335 336 def __init__(self) -> None: 337 super().__init__() 338 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 339 340 def _to_s(self, indent: str) -> t.List[str]: 341 lines = [] 342 for name, join in self.joins.items(): 343 lines.append(f"{indent}{name}: {join['side']}") 344 if join.get("condition"): 345 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 346 return lines 347 348 349class Aggregate(Step): 350 def __init__(self) -> None: 351 super().__init__() 352 self.aggregations: t.List[exp.Expression] = [] 353 self.operands: t.Tuple[exp.Expression, ...] = () 354 self.group: t.Dict[str, exp.Expression] = {} 355 self.source: t.Optional[str] = None 356 357 def _to_s(self, indent: str) -> t.List[str]: 358 lines = [f"{indent}Aggregations:"] 359 360 for expression in self.aggregations: 361 lines.append(f"{indent} - {expression.sql()}") 362 363 if self.group: 364 lines.append(f"{indent}Group:") 365 for expression in self.group.values(): 366 lines.append(f"{indent} - {expression.sql()}") 367 if self.condition: 368 lines.append(f"{indent}Having:") 369 lines.append(f"{indent} - {self.condition.sql()}") 370 if self.operands: 371 lines.append(f"{indent}Operands:") 372 for expression in self.operands: 373 lines.append(f"{indent} - {expression.sql()}") 374 375 return lines 376 377 378class Sort(Step): 379 def __init__(self) -> None: 380 super().__init__() 381 self.key = None 382 383 def _to_s(self, indent: str) -> t.List[str]: 384 lines = [f"{indent}Key:"] 385 386 for expression in self.key: # type: ignore 387 lines.append(f"{indent} - {expression.sql()}") 388 389 return lines 390 391 392class SetOperation(Step): 393 def __init__( 394 self, 395 op: t.Type[exp.Expression], 396 left: str | None, 397 right: str | None, 398 distinct: bool = False, 399 ) -> None: 400 super().__init__() 401 self.op = op 402 self.left = left 403 self.right = right 404 self.distinct = distinct 405 406 @classmethod 407 def from_expression( 408 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 409 ) -> Step: 410 assert isinstance(expression, exp.Union) 411 left = Step.from_expression(expression.left, ctes) 412 right = Step.from_expression(expression.right, ctes) 413 step = cls( 414 op=expression.__class__, 415 left=left.name, 416 right=right.name, 417 distinct=bool(expression.args.get("distinct")), 418 ) 419 step.add_dependency(left) 420 step.add_dependency(right) 421 return step 422 423 def _to_s(self, indent: str) -> t.List[str]: 424 lines = [] 425 if self.distinct: 426 lines.append(f"{indent}Distinct: {self.distinct}") 427 return lines 428 429 @property 430 def type_name(self) -> str: 431 return self.op.__name__
12class Plan: 13 def __init__(self, expression: exp.Expression) -> None: 14 self.expression = expression.copy() 15 self.root = Step.from_expression(self.expression) 16 self._dag: t.Dict[Step, t.Set[Step]] = {} 17 18 @property 19 def dag(self) -> t.Dict[Step, t.Set[Step]]: 20 if not self._dag: 21 dag: t.Dict[Step, t.Set[Step]] = {} 22 nodes = {self.root} 23 24 while nodes: 25 node = nodes.pop() 26 dag[node] = set() 27 for dep in node.dependencies: 28 dag[node].add(dep) 29 nodes.add(dep) 30 self._dag = dag 31 32 return self._dag 33 34 @property 35 def leaves(self) -> t.Iterator[Step]: 36 return (node for node, deps in self.dag.items() if not deps) 37 38 def __repr__(self) -> str: 39 return f"Plan\n----\n{repr(self.root)}"
42class Step: 43 @classmethod 44 def from_expression( 45 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 46 ) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 expression = expression.unnest() 96 with_ = expression.args.get("with") 97 98 # CTEs break the mold of scope and introduce themselves to all in the context. 99 if with_: 100 ctes = ctes.copy() 101 for cte in with_.expressions: 102 step = Step.from_expression(cte.this, ctes) 103 step.name = cte.alias 104 ctes[step.name] = step # type: ignore 105 106 from_ = expression.args.get("from") 107 108 if isinstance(expression, exp.Select) and from_: 109 step = Scan.from_expression(from_.this, ctes) 110 elif isinstance(expression, exp.Union): 111 step = SetOperation.from_expression(expression, ctes) 112 else: 113 step = Scan() 114 115 joins = expression.args.get("joins") 116 117 if joins: 118 join = Join.from_joins(joins, ctes) 119 join.name = step.name 120 join.add_dependency(step) 121 step = join 122 123 projections = [] # final selects in this chain of steps representing a select 124 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 125 aggregations = set() 126 next_operand_name = name_sequence("_a_") 127 128 def extract_agg_operands(expression): 129 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 130 if agg_funcs: 131 aggregations.add(expression) 132 for agg in agg_funcs: 133 for operand in agg.unnest_operands(): 134 if isinstance(operand, exp.Column): 135 continue 136 if operand not in operands: 137 operands[operand] = next_operand_name() 138 operand.replace(exp.column(operands[operand], quoted=True)) 139 return bool(agg_funcs) 140 141 for e in expression.expressions: 142 if e.find(exp.AggFunc): 143 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 144 extract_agg_operands(e) 145 else: 146 projections.append(e) 147 148 where = expression.args.get("where") 149 150 if where: 151 step.condition = where.this 152 153 group = expression.args.get("group") 154 155 if group or aggregations: 156 aggregate = Aggregate() 157 aggregate.source = step.name 158 aggregate.name = step.name 159 160 having = expression.args.get("having") 161 162 if having: 163 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 164 aggregate.condition = exp.column("_h", step.name, quoted=True) 165 else: 166 aggregate.condition = having.this 167 168 aggregate.operands = tuple( 169 alias(operand, alias_) for operand, alias_ in operands.items() 170 ) 171 aggregate.aggregations = list(aggregations) 172 173 # give aggregates names and replace projections with references to them 174 aggregate.group = { 175 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 176 } 177 178 intermediate: t.Dict[str | exp.Expression, str] = {} 179 for k, v in aggregate.group.items(): 180 intermediate[v] = k 181 if isinstance(v, exp.Column): 182 intermediate[v.alias_or_name] = k 183 184 for projection in projections: 185 for node, *_ in projection.walk(): 186 name = intermediate.get(node) 187 if name: 188 node.replace(exp.column(name, step.name)) 189 if aggregate.condition: 190 for node, *_ in aggregate.condition.walk(): 191 name = intermediate.get(node) or intermediate.get(node.name) 192 if name: 193 node.replace(exp.column(name, step.name)) 194 195 aggregate.add_dependency(step) 196 step = aggregate 197 198 order = expression.args.get("order") 199 200 if order: 201 sort = Sort() 202 sort.name = step.name 203 sort.key = order.expressions 204 sort.add_dependency(step) 205 step = sort 206 207 step.projections = projections 208 209 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 210 distinct = Aggregate() 211 distinct.source = step.name 212 distinct.name = step.name 213 distinct.group = { 214 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 215 for e in projections or expression.expressions 216 } 217 distinct.add_dependency(step) 218 step = distinct 219 220 limit = expression.args.get("limit") 221 222 if limit: 223 step.limit = int(limit.text("expression")) 224 225 return step 226 227 def __init__(self) -> None: 228 self.name: t.Optional[str] = None 229 self.dependencies: t.Set[Step] = set() 230 self.dependents: t.Set[Step] = set() 231 self.projections: t.Sequence[exp.Expression] = [] 232 self.limit: float = math.inf 233 self.condition: t.Optional[exp.Expression] = None 234 235 def add_dependency(self, dependency: Step) -> None: 236 self.dependencies.add(dependency) 237 dependency.dependents.add(self) 238 239 def __repr__(self) -> str: 240 return self.to_s() 241 242 def to_s(self, level: int = 0) -> str: 243 indent = " " * level 244 nested = f"{indent} " 245 246 context = self._to_s(f"{nested} ") 247 248 if context: 249 context = [f"{nested}Context:"] + context 250 251 lines = [ 252 f"{indent}- {self.id}", 253 *context, 254 f"{nested}Projections:", 255 ] 256 257 for expression in self.projections: 258 lines.append(f"{nested} - {expression.sql()}") 259 260 if self.condition: 261 lines.append(f"{nested}Condition: {self.condition.sql()}") 262 263 if self.limit is not math.inf: 264 lines.append(f"{nested}Limit: {self.limit}") 265 266 if self.dependencies: 267 lines.append(f"{nested}Dependencies:") 268 for dependency in self.dependencies: 269 lines.append(" " + dependency.to_s(level + 1)) 270 271 return "\n".join(lines) 272 273 @property 274 def type_name(self) -> str: 275 return self.__class__.__name__ 276 277 @property 278 def id(self) -> str: 279 name = self.name 280 name = f" {name}" if name else "" 281 return f"{self.type_name}:{name} ({id(self)})" 282 283 def _to_s(self, _indent: str) -> t.List[str]: 284 return []
43 @classmethod 44 def from_expression( 45 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 46 ) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 expression = expression.unnest() 96 with_ = expression.args.get("with") 97 98 # CTEs break the mold of scope and introduce themselves to all in the context. 99 if with_: 100 ctes = ctes.copy() 101 for cte in with_.expressions: 102 step = Step.from_expression(cte.this, ctes) 103 step.name = cte.alias 104 ctes[step.name] = step # type: ignore 105 106 from_ = expression.args.get("from") 107 108 if isinstance(expression, exp.Select) and from_: 109 step = Scan.from_expression(from_.this, ctes) 110 elif isinstance(expression, exp.Union): 111 step = SetOperation.from_expression(expression, ctes) 112 else: 113 step = Scan() 114 115 joins = expression.args.get("joins") 116 117 if joins: 118 join = Join.from_joins(joins, ctes) 119 join.name = step.name 120 join.add_dependency(step) 121 step = join 122 123 projections = [] # final selects in this chain of steps representing a select 124 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 125 aggregations = set() 126 next_operand_name = name_sequence("_a_") 127 128 def extract_agg_operands(expression): 129 agg_funcs = tuple(expression.find_all(exp.AggFunc)) 130 if agg_funcs: 131 aggregations.add(expression) 132 for agg in agg_funcs: 133 for operand in agg.unnest_operands(): 134 if isinstance(operand, exp.Column): 135 continue 136 if operand not in operands: 137 operands[operand] = next_operand_name() 138 operand.replace(exp.column(operands[operand], quoted=True)) 139 return bool(agg_funcs) 140 141 for e in expression.expressions: 142 if e.find(exp.AggFunc): 143 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 144 extract_agg_operands(e) 145 else: 146 projections.append(e) 147 148 where = expression.args.get("where") 149 150 if where: 151 step.condition = where.this 152 153 group = expression.args.get("group") 154 155 if group or aggregations: 156 aggregate = Aggregate() 157 aggregate.source = step.name 158 aggregate.name = step.name 159 160 having = expression.args.get("having") 161 162 if having: 163 if extract_agg_operands(exp.alias_(having.this, "_h", quoted=True)): 164 aggregate.condition = exp.column("_h", step.name, quoted=True) 165 else: 166 aggregate.condition = having.this 167 168 aggregate.operands = tuple( 169 alias(operand, alias_) for operand, alias_ in operands.items() 170 ) 171 aggregate.aggregations = list(aggregations) 172 173 # give aggregates names and replace projections with references to them 174 aggregate.group = { 175 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 176 } 177 178 intermediate: t.Dict[str | exp.Expression, str] = {} 179 for k, v in aggregate.group.items(): 180 intermediate[v] = k 181 if isinstance(v, exp.Column): 182 intermediate[v.alias_or_name] = k 183 184 for projection in projections: 185 for node, *_ in projection.walk(): 186 name = intermediate.get(node) 187 if name: 188 node.replace(exp.column(name, step.name)) 189 if aggregate.condition: 190 for node, *_ in aggregate.condition.walk(): 191 name = intermediate.get(node) or intermediate.get(node.name) 192 if name: 193 node.replace(exp.column(name, step.name)) 194 195 aggregate.add_dependency(step) 196 step = aggregate 197 198 order = expression.args.get("order") 199 200 if order: 201 sort = Sort() 202 sort.name = step.name 203 sort.key = order.expressions 204 sort.add_dependency(step) 205 step = sort 206 207 step.projections = projections 208 209 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 210 distinct = Aggregate() 211 distinct.source = step.name 212 distinct.name = step.name 213 distinct.group = { 214 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 215 for e in projections or expression.expressions 216 } 217 distinct.add_dependency(step) 218 step = distinct 219 220 limit = expression.args.get("limit") 221 222 if limit: 223 step.limit = int(limit.text("expression")) 224 225 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
242 def to_s(self, level: int = 0) -> str: 243 indent = " " * level 244 nested = f"{indent} " 245 246 context = self._to_s(f"{nested} ") 247 248 if context: 249 context = [f"{nested}Context:"] + context 250 251 lines = [ 252 f"{indent}- {self.id}", 253 *context, 254 f"{nested}Projections:", 255 ] 256 257 for expression in self.projections: 258 lines.append(f"{nested} - {expression.sql()}") 259 260 if self.condition: 261 lines.append(f"{nested}Condition: {self.condition.sql()}") 262 263 if self.limit is not math.inf: 264 lines.append(f"{nested}Limit: {self.limit}") 265 266 if self.dependencies: 267 lines.append(f"{nested}Dependencies:") 268 for dependency in self.dependencies: 269 lines.append(" " + dependency.to_s(level + 1)) 270 271 return "\n".join(lines)
287class Scan(Step): 288 @classmethod 289 def from_expression( 290 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 291 ) -> Step: 292 table = expression 293 alias_ = expression.alias_or_name 294 295 if isinstance(expression, exp.Subquery): 296 table = expression.this 297 step = Step.from_expression(table, ctes) 298 step.name = alias_ 299 return step 300 301 step = Scan() 302 step.name = alias_ 303 step.source = expression 304 if ctes and table.name in ctes: 305 step.add_dependency(ctes[table.name]) 306 307 return step 308 309 def __init__(self) -> None: 310 super().__init__() 311 self.source: t.Optional[exp.Expression] = None 312 313 def _to_s(self, indent: str) -> t.List[str]: 314 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore
288 @classmethod 289 def from_expression( 290 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 291 ) -> Step: 292 table = expression 293 alias_ = expression.alias_or_name 294 295 if isinstance(expression, exp.Subquery): 296 table = expression.this 297 step = Step.from_expression(table, ctes) 298 step.name = alias_ 299 return step 300 301 step = Scan() 302 step.name = alias_ 303 step.source = expression 304 if ctes and table.name in ctes: 305 step.add_dependency(ctes[table.name]) 306 307 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
Inherited Members
317class Join(Step): 318 @classmethod 319 def from_joins( 320 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 321 ) -> Step: 322 step = Join() 323 324 for join in joins: 325 source_key, join_key, condition = join_condition(join) 326 step.joins[join.alias_or_name] = { 327 "side": join.side, # type: ignore 328 "join_key": join_key, 329 "source_key": source_key, 330 "condition": condition, 331 } 332 333 step.add_dependency(Scan.from_expression(join.this, ctes)) 334 335 return step 336 337 def __init__(self) -> None: 338 super().__init__() 339 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 340 341 def _to_s(self, indent: str) -> t.List[str]: 342 lines = [] 343 for name, join in self.joins.items(): 344 lines.append(f"{indent}{name}: {join['side']}") 345 if join.get("condition"): 346 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 347 return lines
318 @classmethod 319 def from_joins( 320 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 321 ) -> Step: 322 step = Join() 323 324 for join in joins: 325 source_key, join_key, condition = join_condition(join) 326 step.joins[join.alias_or_name] = { 327 "side": join.side, # type: ignore 328 "join_key": join_key, 329 "source_key": source_key, 330 "condition": condition, 331 } 332 333 step.add_dependency(Scan.from_expression(join.this, ctes)) 334 335 return step
Inherited Members
350class Aggregate(Step): 351 def __init__(self) -> None: 352 super().__init__() 353 self.aggregations: t.List[exp.Expression] = [] 354 self.operands: t.Tuple[exp.Expression, ...] = () 355 self.group: t.Dict[str, exp.Expression] = {} 356 self.source: t.Optional[str] = None 357 358 def _to_s(self, indent: str) -> t.List[str]: 359 lines = [f"{indent}Aggregations:"] 360 361 for expression in self.aggregations: 362 lines.append(f"{indent} - {expression.sql()}") 363 364 if self.group: 365 lines.append(f"{indent}Group:") 366 for expression in self.group.values(): 367 lines.append(f"{indent} - {expression.sql()}") 368 if self.condition: 369 lines.append(f"{indent}Having:") 370 lines.append(f"{indent} - {self.condition.sql()}") 371 if self.operands: 372 lines.append(f"{indent}Operands:") 373 for expression in self.operands: 374 lines.append(f"{indent} - {expression.sql()}") 375 376 return lines
Inherited Members
379class Sort(Step): 380 def __init__(self) -> None: 381 super().__init__() 382 self.key = None 383 384 def _to_s(self, indent: str) -> t.List[str]: 385 lines = [f"{indent}Key:"] 386 387 for expression in self.key: # type: ignore 388 lines.append(f"{indent} - {expression.sql()}") 389 390 return lines
Inherited Members
393class SetOperation(Step): 394 def __init__( 395 self, 396 op: t.Type[exp.Expression], 397 left: str | None, 398 right: str | None, 399 distinct: bool = False, 400 ) -> None: 401 super().__init__() 402 self.op = op 403 self.left = left 404 self.right = right 405 self.distinct = distinct 406 407 @classmethod 408 def from_expression( 409 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 410 ) -> Step: 411 assert isinstance(expression, exp.Union) 412 left = Step.from_expression(expression.left, ctes) 413 right = Step.from_expression(expression.right, ctes) 414 step = cls( 415 op=expression.__class__, 416 left=left.name, 417 right=right.name, 418 distinct=bool(expression.args.get("distinct")), 419 ) 420 step.add_dependency(left) 421 step.add_dependency(right) 422 return step 423 424 def _to_s(self, indent: str) -> t.List[str]: 425 lines = [] 426 if self.distinct: 427 lines.append(f"{indent}Distinct: {self.distinct}") 428 return lines 429 430 @property 431 def type_name(self) -> str: 432 return self.op.__name__
407 @classmethod 408 def from_expression( 409 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 410 ) -> Step: 411 assert isinstance(expression, exp.Union) 412 left = Step.from_expression(expression.left, ctes) 413 right = Step.from_expression(expression.right, ctes) 414 step = cls( 415 op=expression.__class__, 416 left=left.name, 417 right=right.name, 418 distinct=bool(expression.args.get("distinct")), 419 ) 420 step.add_dependency(left) 421 step.add_dependency(right) 422 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.