1
0
Fork 0
sqlglot/sqlglot/dialects/athena.py
Daniel Baumann e9f53ab285
Merging upstream version 26.14.0.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-04-16 09:04:43 +02:00

167 lines
6.2 KiB
Python

from __future__ import annotations
import typing as t
from sqlglot import exp
from sqlglot.dialects.trino import Trino
from sqlglot.dialects.hive import Hive
from sqlglot.tokens import TokenType
def _generate_as_hive(expression: exp.Expression) -> bool:
if isinstance(expression, exp.Create):
if expression.kind == "TABLE":
properties: t.Optional[exp.Properties] = expression.args.get("properties")
if properties and properties.find(exp.ExternalProperty):
return True # CREATE EXTERNAL TABLE is Hive
if not isinstance(expression.expression, exp.Query):
return True # any CREATE TABLE other than CREATE TABLE AS SELECT is Hive
else:
return expression.kind != "VIEW" # CREATE VIEW is never Hive but CREATE SCHEMA etc is
# https://docs.aws.amazon.com/athena/latest/ug/ddl-reference.html
elif isinstance(expression, (exp.Alter, exp.Drop, exp.Describe)):
if isinstance(expression, exp.Drop) and expression.kind == "VIEW":
# DROP VIEW is Trino (I guess because CREATE VIEW is)
return False
# Everything else is Hive
return True
return False
def _is_iceberg_table(properties: exp.Properties) -> bool:
table_type_property = next(
(
p
for p in properties.expressions
if isinstance(p, exp.Property) and p.name == "table_type"
),
None,
)
return bool(table_type_property and table_type_property.text("value").lower() == "iceberg")
def _location_property_sql(self: Athena.Generator, e: exp.LocationProperty):
# If table_type='iceberg', the LocationProperty is called 'location'
# Otherwise, it's called 'external_location'
# ref: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html
prop_name = "external_location"
if isinstance(e.parent, exp.Properties):
if _is_iceberg_table(e.parent):
prop_name = "location"
return f"{prop_name}={self.sql(e, 'this')}"
def _partitioned_by_property_sql(self: Athena.Generator, e: exp.PartitionedByProperty) -> str:
# If table_type='iceberg' then the table property for partitioning is called 'partitioning'
# If table_type='hive' it's called 'partitioned_by'
# ref: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties
prop_name = "partitioned_by"
if isinstance(e.parent, exp.Properties):
if _is_iceberg_table(e.parent):
prop_name = "partitioning"
return f"{prop_name}={self.sql(e, 'this')}"
class Athena(Trino):
"""
Over the years, it looks like AWS has taken various execution engines, bolted on AWS-specific modifications and then
built the Athena service around them.
Thus, Athena is not simply hosted Trino, it's more like a router that routes SQL queries to an execution engine depending
on the query type.
As at 2024-09-10, assuming your Athena workgroup is configured to use "Athena engine version 3", the following engines exist:
Hive:
- Accepts mostly the same syntax as Hadoop / Hive
- Uses backticks to quote identifiers
- Has a distinctive DDL syntax (around things like setting table properties, storage locations etc) that is different from Trino
- Used for *most* DDL, with some exceptions that get routed to the Trino engine instead:
- CREATE [EXTERNAL] TABLE (without AS SELECT)
- ALTER
- DROP
Trino:
- Uses double quotes to quote identifiers
- Used for DDL operations that involve SELECT queries, eg:
- CREATE VIEW / DROP VIEW
- CREATE TABLE... AS SELECT
- Used for DML operations
- SELECT, INSERT, UPDATE, DELETE, MERGE
The SQLGlot Athena dialect tries to identify which engine a query would be routed to and then uses the parser / generator for that engine
rather than trying to create a universal syntax that can handle both types.
"""
class Tokenizer(Trino.Tokenizer):
"""
The Tokenizer is flexible enough to tokenize queries across both the Hive and Trino engines
"""
IDENTIFIERS = ['"', "`"]
KEYWORDS = {
**Hive.Tokenizer.KEYWORDS,
**Trino.Tokenizer.KEYWORDS,
"UNLOAD": TokenType.COMMAND,
}
class Parser(Trino.Parser):
"""
Parse queries for the Athena Trino execution engine
"""
STATEMENT_PARSERS = {
**Trino.Parser.STATEMENT_PARSERS,
TokenType.USING: lambda self: self._parse_as_command(self._prev),
}
class _HiveGenerator(Hive.Generator):
def alter_sql(self, expression: exp.Alter) -> str:
# package any ALTER TABLE ADD actions into a Schema object
# so it gets generated as `ALTER TABLE .. ADD COLUMNS(...)`
# instead of `ALTER TABLE ... ADD COLUMN` which is invalid syntax on Athena
if isinstance(expression, exp.Alter) and expression.kind == "TABLE":
if expression.actions and isinstance(expression.actions[0], exp.ColumnDef):
new_actions = exp.Schema(expressions=expression.actions)
expression.set("actions", [new_actions])
return super().alter_sql(expression)
class Generator(Trino.Generator):
"""
Generate queries for the Athena Trino execution engine
"""
PROPERTIES_LOCATION = {
**Trino.Generator.PROPERTIES_LOCATION,
exp.LocationProperty: exp.Properties.Location.POST_WITH,
}
TRANSFORMS = {
**Trino.Generator.TRANSFORMS,
exp.FileFormatProperty: lambda self, e: f"format={self.sql(e, 'this')}",
exp.PartitionedByProperty: _partitioned_by_property_sql,
exp.LocationProperty: _location_property_sql,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
hive_kwargs = {**kwargs, "dialect": "hive"}
self._hive_generator = Athena._HiveGenerator(*args, **hive_kwargs)
def generate(self, expression: exp.Expression, copy: bool = True) -> str:
if _generate_as_hive(expression):
return self._hive_generator.generate(expression, copy)
return super().generate(expression, copy)