Edit on GitHub

sqlglot.lineage

  1from __future__ import annotations
  2
  3import json
  4import logging
  5import typing as t
  6from dataclasses import dataclass, field
  7
  8from sqlglot import Schema, exp, maybe_parse
  9from sqlglot.errors import SqlglotError
 10from sqlglot.optimizer import Scope, build_scope, find_all_in_scope, normalize_identifiers, qualify
 11
 12if t.TYPE_CHECKING:
 13    from sqlglot.dialects.dialect import DialectType
 14
 15logger = logging.getLogger("sqlglot")
 16
 17
 18@dataclass(frozen=True)
 19class Node:
 20    name: str
 21    expression: exp.Expression
 22    source: exp.Expression
 23    downstream: t.List[Node] = field(default_factory=list)
 24    source_name: str = ""
 25    reference_node_name: str = ""
 26
 27    def walk(self) -> t.Iterator[Node]:
 28        yield self
 29
 30        for d in self.downstream:
 31            yield from d.walk()
 32
 33    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
 34        nodes = {}
 35        edges = []
 36
 37        for node in self.walk():
 38            if isinstance(node.expression, exp.Table):
 39                label = f"FROM {node.expression.this}"
 40                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
 41                group = 1
 42            else:
 43                label = node.expression.sql(pretty=True, dialect=dialect)
 44                source = node.source.transform(
 45                    lambda n: (
 46                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
 47                    ),
 48                    copy=False,
 49                ).sql(pretty=True, dialect=dialect)
 50                title = f"<pre>{source}</pre>"
 51                group = 0
 52
 53            node_id = id(node)
 54
 55            nodes[node_id] = {
 56                "id": node_id,
 57                "label": label,
 58                "title": title,
 59                "group": group,
 60            }
 61
 62            for d in node.downstream:
 63                edges.append({"from": node_id, "to": id(d)})
 64        return GraphHTML(nodes, edges, **opts)
 65
 66
 67def lineage(
 68    column: str | exp.Column,
 69    sql: str | exp.Expression,
 70    schema: t.Optional[t.Dict | Schema] = None,
 71    sources: t.Optional[t.Mapping[str, str | exp.Query]] = None,
 72    dialect: DialectType = None,
 73    scope: t.Optional[Scope] = None,
 74    trim_selects: bool = True,
 75    **kwargs,
 76) -> Node:
 77    """Build the lineage graph for a column of a SQL query.
 78
 79    Args:
 80        column: The column to build the lineage for.
 81        sql: The SQL string or expression.
 82        schema: The schema of tables.
 83        sources: A mapping of queries which will be used to continue building lineage.
 84        dialect: The dialect of input SQL.
 85        scope: A pre-created scope to use instead.
 86        trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
 87        **kwargs: Qualification optimizer kwargs.
 88
 89    Returns:
 90        A lineage node.
 91    """
 92
 93    expression = maybe_parse(sql, dialect=dialect)
 94    column = normalize_identifiers.normalize_identifiers(column, dialect=dialect).name
 95
 96    if sources:
 97        expression = exp.expand(
 98            expression,
 99            {k: t.cast(exp.Query, maybe_parse(v, dialect=dialect)) for k, v in sources.items()},
100            dialect=dialect,
101        )
102
103    if not scope:
104        expression = qualify.qualify(
105            expression,
106            dialect=dialect,
107            schema=schema,
108            **{"validate_qualify_columns": False, "identify": False, **kwargs},  # type: ignore
109        )
110
111        scope = build_scope(expression)
112
113    if not scope:
114        raise SqlglotError("Cannot build lineage, sql must be SELECT")
115
116    if not any(select.alias_or_name == column for select in scope.expression.selects):
117        raise SqlglotError(f"Cannot find column '{column}' in query.")
118
119    return to_node(column, scope, dialect, trim_selects=trim_selects)
120
121
122def to_node(
123    column: str | int,
124    scope: Scope,
125    dialect: DialectType,
126    scope_name: t.Optional[str] = None,
127    upstream: t.Optional[Node] = None,
128    source_name: t.Optional[str] = None,
129    reference_node_name: t.Optional[str] = None,
130    trim_selects: bool = True,
131) -> Node:
132    source_names = {
133        dt.alias: dt.comments[0].split()[1]
134        for dt in scope.derived_tables
135        if dt.comments and dt.comments[0].startswith("source: ")
136    }
137
138    # Find the specific select clause that is the source of the column we want.
139    # This can either be a specific, named select or a generic `*` clause.
140    select = (
141        scope.expression.selects[column]
142        if isinstance(column, int)
143        else next(
144            (select for select in scope.expression.selects if select.alias_or_name == column),
145            exp.Star() if scope.expression.is_star else scope.expression,
146        )
147    )
148
149    if isinstance(scope.expression, exp.Subquery):
150        for source in scope.subquery_scopes:
151            return to_node(
152                column,
153                scope=source,
154                dialect=dialect,
155                upstream=upstream,
156                source_name=source_name,
157                reference_node_name=reference_node_name,
158                trim_selects=trim_selects,
159            )
160    if isinstance(scope.expression, exp.Union):
161        upstream = upstream or Node(name="UNION", source=scope.expression, expression=select)
162
163        index = (
164            column
165            if isinstance(column, int)
166            else next(
167                (
168                    i
169                    for i, select in enumerate(scope.expression.selects)
170                    if select.alias_or_name == column or select.is_star
171                ),
172                -1,  # mypy will not allow a None here, but a negative index should never be returned
173            )
174        )
175
176        if index == -1:
177            raise ValueError(f"Could not find {column} in {scope.expression}")
178
179        for s in scope.union_scopes:
180            to_node(
181                index,
182                scope=s,
183                dialect=dialect,
184                upstream=upstream,
185                source_name=source_name,
186                reference_node_name=reference_node_name,
187                trim_selects=trim_selects,
188            )
189
190        return upstream
191
192    if trim_selects and isinstance(scope.expression, exp.Select):
193        # For better ergonomics in our node labels, replace the full select with
194        # a version that has only the column we care about.
195        #   "x", SELECT x, y FROM foo
196        #     => "x", SELECT x FROM foo
197        source = t.cast(exp.Expression, scope.expression.select(select, append=False))
198    else:
199        source = scope.expression
200
201    # Create the node for this step in the lineage chain, and attach it to the previous one.
202    node = Node(
203        name=f"{scope_name}.{column}" if scope_name else str(column),
204        source=source,
205        expression=select,
206        source_name=source_name or "",
207        reference_node_name=reference_node_name or "",
208    )
209
210    if upstream:
211        upstream.downstream.append(node)
212
213    subquery_scopes = {
214        id(subquery_scope.expression): subquery_scope for subquery_scope in scope.subquery_scopes
215    }
216
217    for subquery in find_all_in_scope(select, exp.UNWRAPPED_QUERIES):
218        subquery_scope = subquery_scopes.get(id(subquery))
219        if not subquery_scope:
220            logger.warning(f"Unknown subquery scope: {subquery.sql(dialect=dialect)}")
221            continue
222
223        for name in subquery.named_selects:
224            to_node(
225                name,
226                scope=subquery_scope,
227                dialect=dialect,
228                upstream=node,
229                trim_selects=trim_selects,
230            )
231
232    # if the select is a star add all scope sources as downstreams
233    if select.is_star:
234        for source in scope.sources.values():
235            if isinstance(source, Scope):
236                source = source.expression
237            node.downstream.append(Node(name=select.sql(), source=source, expression=source))
238
239    # Find all columns that went into creating this one to list their lineage nodes.
240    source_columns = set(find_all_in_scope(select, exp.Column))
241
242    # If the source is a UDTF find columns used in the UTDF to generate the table
243    if isinstance(source, exp.UDTF):
244        source_columns |= set(source.find_all(exp.Column))
245
246    for c in source_columns:
247        table = c.table
248        source = scope.sources.get(table)
249
250        if isinstance(source, Scope):
251            selected_node, _ = scope.selected_sources.get(table, (None, None))
252            # The table itself came from a more specific scope. Recurse into that one using the unaliased column name.
253            to_node(
254                c.name,
255                scope=source,
256                dialect=dialect,
257                scope_name=table,
258                upstream=node,
259                source_name=source_names.get(table) or source_name,
260                reference_node_name=selected_node.name if selected_node else None,
261                trim_selects=trim_selects,
262            )
263        else:
264            # The source is not a scope - we've reached the end of the line. At this point, if a source is not found
265            # it means this column's lineage is unknown. This can happen if the definition of a source used in a query
266            # is not passed into the `sources` map.
267            source = source or exp.Placeholder()
268            node.downstream.append(Node(name=c.sql(), source=source, expression=source))
269
270    return node
271
272
273class GraphHTML:
274    """Node to HTML generator using vis.js.
275
276    https://visjs.github.io/vis-network/docs/network/
277    """
278
279    def __init__(
280        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
281    ):
282        self.imports = imports
283
284        self.options = {
285            "height": "500px",
286            "width": "100%",
287            "layout": {
288                "hierarchical": {
289                    "enabled": True,
290                    "nodeSpacing": 200,
291                    "sortMethod": "directed",
292                },
293            },
294            "interaction": {
295                "dragNodes": False,
296                "selectable": False,
297            },
298            "physics": {
299                "enabled": False,
300            },
301            "edges": {
302                "arrows": "to",
303            },
304            "nodes": {
305                "font": "20px monaco",
306                "shape": "box",
307                "widthConstraint": {
308                    "maximum": 300,
309                },
310            },
311            **(options or {}),
312        }
313
314        self.nodes = nodes
315        self.edges = edges
316
317    def __str__(self):
318        nodes = json.dumps(list(self.nodes.values()))
319        edges = json.dumps(self.edges)
320        options = json.dumps(self.options)
321        imports = (
322            """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script>
323  <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script>
324  <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />"""
325            if self.imports
326            else ""
327        )
328
329        return f"""<div>
330  <div id="sqlglot-lineage"></div>
331  {imports}
332  <script type="text/javascript">
333    var nodes = new vis.DataSet({nodes})
334    nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0])
335
336    new vis.Network(
337        document.getElementById("sqlglot-lineage"),
338        {{
339            nodes: nodes,
340            edges: new vis.DataSet({edges})
341        }},
342        {options},
343    )
344  </script>
345</div>"""
346
347    def _repr_html_(self) -> str:
348        return self.__str__()
logger = <Logger sqlglot (WARNING)>
@dataclass(frozen=True)
class Node:
19@dataclass(frozen=True)
20class Node:
21    name: str
22    expression: exp.Expression
23    source: exp.Expression
24    downstream: t.List[Node] = field(default_factory=list)
25    source_name: str = ""
26    reference_node_name: str = ""
27
28    def walk(self) -> t.Iterator[Node]:
29        yield self
30
31        for d in self.downstream:
32            yield from d.walk()
33
34    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
35        nodes = {}
36        edges = []
37
38        for node in self.walk():
39            if isinstance(node.expression, exp.Table):
40                label = f"FROM {node.expression.this}"
41                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
42                group = 1
43            else:
44                label = node.expression.sql(pretty=True, dialect=dialect)
45                source = node.source.transform(
46                    lambda n: (
47                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
48                    ),
49                    copy=False,
50                ).sql(pretty=True, dialect=dialect)
51                title = f"<pre>{source}</pre>"
52                group = 0
53
54            node_id = id(node)
55
56            nodes[node_id] = {
57                "id": node_id,
58                "label": label,
59                "title": title,
60                "group": group,
61            }
62
63            for d in node.downstream:
64                edges.append({"from": node_id, "to": id(d)})
65        return GraphHTML(nodes, edges, **opts)
Node( name: str, expression: sqlglot.expressions.Expression, source: sqlglot.expressions.Expression, downstream: List[Node] = <factory>, source_name: str = '', reference_node_name: str = '')
name: str
downstream: List[Node]
source_name: str = ''
reference_node_name: str = ''
def walk(self) -> Iterator[Node]:
28    def walk(self) -> t.Iterator[Node]:
29        yield self
30
31        for d in self.downstream:
32            yield from d.walk()
def to_html( self, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, **opts) -> GraphHTML:
34    def to_html(self, dialect: DialectType = None, **opts) -> GraphHTML:
35        nodes = {}
36        edges = []
37
38        for node in self.walk():
39            if isinstance(node.expression, exp.Table):
40                label = f"FROM {node.expression.this}"
41                title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>"
42                group = 1
43            else:
44                label = node.expression.sql(pretty=True, dialect=dialect)
45                source = node.source.transform(
46                    lambda n: (
47                        exp.Tag(this=n, prefix="<b>", postfix="</b>") if n is node.expression else n
48                    ),
49                    copy=False,
50                ).sql(pretty=True, dialect=dialect)
51                title = f"<pre>{source}</pre>"
52                group = 0
53
54            node_id = id(node)
55
56            nodes[node_id] = {
57                "id": node_id,
58                "label": label,
59                "title": title,
60                "group": group,
61            }
62
63            for d in node.downstream:
64                edges.append({"from": node_id, "to": id(d)})
65        return GraphHTML(nodes, edges, **opts)
def lineage( column: str | sqlglot.expressions.Column, sql: str | sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema, NoneType] = None, sources: Optional[Mapping[str, str | sqlglot.expressions.Query]] = None, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, scope: Optional[sqlglot.optimizer.scope.Scope] = None, trim_selects: bool = True, **kwargs) -> Node:
 68def lineage(
 69    column: str | exp.Column,
 70    sql: str | exp.Expression,
 71    schema: t.Optional[t.Dict | Schema] = None,
 72    sources: t.Optional[t.Mapping[str, str | exp.Query]] = None,
 73    dialect: DialectType = None,
 74    scope: t.Optional[Scope] = None,
 75    trim_selects: bool = True,
 76    **kwargs,
 77) -> Node:
 78    """Build the lineage graph for a column of a SQL query.
 79
 80    Args:
 81        column: The column to build the lineage for.
 82        sql: The SQL string or expression.
 83        schema: The schema of tables.
 84        sources: A mapping of queries which will be used to continue building lineage.
 85        dialect: The dialect of input SQL.
 86        scope: A pre-created scope to use instead.
 87        trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
 88        **kwargs: Qualification optimizer kwargs.
 89
 90    Returns:
 91        A lineage node.
 92    """
 93
 94    expression = maybe_parse(sql, dialect=dialect)
 95    column = normalize_identifiers.normalize_identifiers(column, dialect=dialect).name
 96
 97    if sources:
 98        expression = exp.expand(
 99            expression,
100            {k: t.cast(exp.Query, maybe_parse(v, dialect=dialect)) for k, v in sources.items()},
101            dialect=dialect,
102        )
103
104    if not scope:
105        expression = qualify.qualify(
106            expression,
107            dialect=dialect,
108            schema=schema,
109            **{"validate_qualify_columns": False, "identify": False, **kwargs},  # type: ignore
110        )
111
112        scope = build_scope(expression)
113
114    if not scope:
115        raise SqlglotError("Cannot build lineage, sql must be SELECT")
116
117    if not any(select.alias_or_name == column for select in scope.expression.selects):
118        raise SqlglotError(f"Cannot find column '{column}' in query.")
119
120    return to_node(column, scope, dialect, trim_selects=trim_selects)

Build the lineage graph for a column of a SQL query.

Arguments:
  • column: The column to build the lineage for.
  • sql: The SQL string or expression.
  • schema: The schema of tables.
  • sources: A mapping of queries which will be used to continue building lineage.
  • dialect: The dialect of input SQL.
  • scope: A pre-created scope to use instead.
  • trim_selects: Whether or not to clean up selects by trimming to only relevant columns.
  • **kwargs: Qualification optimizer kwargs.
Returns:

A lineage node.

def to_node( column: str | int, scope: sqlglot.optimizer.scope.Scope, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], scope_name: Optional[str] = None, upstream: Optional[Node] = None, source_name: Optional[str] = None, reference_node_name: Optional[str] = None, trim_selects: bool = True) -> Node:
123def to_node(
124    column: str | int,
125    scope: Scope,
126    dialect: DialectType,
127    scope_name: t.Optional[str] = None,
128    upstream: t.Optional[Node] = None,
129    source_name: t.Optional[str] = None,
130    reference_node_name: t.Optional[str] = None,
131    trim_selects: bool = True,
132) -> Node:
133    source_names = {
134        dt.alias: dt.comments[0].split()[1]
135        for dt in scope.derived_tables
136        if dt.comments and dt.comments[0].startswith("source: ")
137    }
138
139    # Find the specific select clause that is the source of the column we want.
140    # This can either be a specific, named select or a generic `*` clause.
141    select = (
142        scope.expression.selects[column]
143        if isinstance(column, int)
144        else next(
145            (select for select in scope.expression.selects if select.alias_or_name == column),
146            exp.Star() if scope.expression.is_star else scope.expression,
147        )
148    )
149
150    if isinstance(scope.expression, exp.Subquery):
151        for source in scope.subquery_scopes:
152            return to_node(
153                column,
154                scope=source,
155                dialect=dialect,
156                upstream=upstream,
157                source_name=source_name,
158                reference_node_name=reference_node_name,
159                trim_selects=trim_selects,
160            )
161    if isinstance(scope.expression, exp.Union):
162        upstream = upstream or Node(name="UNION", source=scope.expression, expression=select)
163
164        index = (
165            column
166            if isinstance(column, int)
167            else next(
168                (
169                    i
170                    for i, select in enumerate(scope.expression.selects)
171                    if select.alias_or_name == column or select.is_star
172                ),
173                -1,  # mypy will not allow a None here, but a negative index should never be returned
174            )
175        )
176
177        if index == -1:
178            raise ValueError(f"Could not find {column} in {scope.expression}")
179
180        for s in scope.union_scopes:
181            to_node(
182                index,
183                scope=s,
184                dialect=dialect,
185                upstream=upstream,
186                source_name=source_name,
187                reference_node_name=reference_node_name,
188                trim_selects=trim_selects,
189            )
190
191        return upstream
192
193    if trim_selects and isinstance(scope.expression, exp.Select):
194        # For better ergonomics in our node labels, replace the full select with
195        # a version that has only the column we care about.
196        #   "x", SELECT x, y FROM foo
197        #     => "x", SELECT x FROM foo
198        source = t.cast(exp.Expression, scope.expression.select(select, append=False))
199    else:
200        source = scope.expression
201
202    # Create the node for this step in the lineage chain, and attach it to the previous one.
203    node = Node(
204        name=f"{scope_name}.{column}" if scope_name else str(column),
205        source=source,
206        expression=select,
207        source_name=source_name or "",
208        reference_node_name=reference_node_name or "",
209    )
210
211    if upstream:
212        upstream.downstream.append(node)
213
214    subquery_scopes = {
215        id(subquery_scope.expression): subquery_scope for subquery_scope in scope.subquery_scopes
216    }
217
218    for subquery in find_all_in_scope(select, exp.UNWRAPPED_QUERIES):
219        subquery_scope = subquery_scopes.get(id(subquery))
220        if not subquery_scope:
221            logger.warning(f"Unknown subquery scope: {subquery.sql(dialect=dialect)}")
222            continue
223
224        for name in subquery.named_selects:
225            to_node(
226                name,
227                scope=subquery_scope,
228                dialect=dialect,
229                upstream=node,
230                trim_selects=trim_selects,
231            )
232
233    # if the select is a star add all scope sources as downstreams
234    if select.is_star:
235        for source in scope.sources.values():
236            if isinstance(source, Scope):
237                source = source.expression
238            node.downstream.append(Node(name=select.sql(), source=source, expression=source))
239
240    # Find all columns that went into creating this one to list their lineage nodes.
241    source_columns = set(find_all_in_scope(select, exp.Column))
242
243    # If the source is a UDTF find columns used in the UTDF to generate the table
244    if isinstance(source, exp.UDTF):
245        source_columns |= set(source.find_all(exp.Column))
246
247    for c in source_columns:
248        table = c.table
249        source = scope.sources.get(table)
250
251        if isinstance(source, Scope):
252            selected_node, _ = scope.selected_sources.get(table, (None, None))
253            # The table itself came from a more specific scope. Recurse into that one using the unaliased column name.
254            to_node(
255                c.name,
256                scope=source,
257                dialect=dialect,
258                scope_name=table,
259                upstream=node,
260                source_name=source_names.get(table) or source_name,
261                reference_node_name=selected_node.name if selected_node else None,
262                trim_selects=trim_selects,
263            )
264        else:
265            # The source is not a scope - we've reached the end of the line. At this point, if a source is not found
266            # it means this column's lineage is unknown. This can happen if the definition of a source used in a query
267            # is not passed into the `sources` map.
268            source = source or exp.Placeholder()
269            node.downstream.append(Node(name=c.sql(), source=source, expression=source))
270
271    return node
class GraphHTML:
274class GraphHTML:
275    """Node to HTML generator using vis.js.
276
277    https://visjs.github.io/vis-network/docs/network/
278    """
279
280    def __init__(
281        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
282    ):
283        self.imports = imports
284
285        self.options = {
286            "height": "500px",
287            "width": "100%",
288            "layout": {
289                "hierarchical": {
290                    "enabled": True,
291                    "nodeSpacing": 200,
292                    "sortMethod": "directed",
293                },
294            },
295            "interaction": {
296                "dragNodes": False,
297                "selectable": False,
298            },
299            "physics": {
300                "enabled": False,
301            },
302            "edges": {
303                "arrows": "to",
304            },
305            "nodes": {
306                "font": "20px monaco",
307                "shape": "box",
308                "widthConstraint": {
309                    "maximum": 300,
310                },
311            },
312            **(options or {}),
313        }
314
315        self.nodes = nodes
316        self.edges = edges
317
318    def __str__(self):
319        nodes = json.dumps(list(self.nodes.values()))
320        edges = json.dumps(self.edges)
321        options = json.dumps(self.options)
322        imports = (
323            """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script>
324  <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script>
325  <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />"""
326            if self.imports
327            else ""
328        )
329
330        return f"""<div>
331  <div id="sqlglot-lineage"></div>
332  {imports}
333  <script type="text/javascript">
334    var nodes = new vis.DataSet({nodes})
335    nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0])
336
337    new vis.Network(
338        document.getElementById("sqlglot-lineage"),
339        {{
340            nodes: nodes,
341            edges: new vis.DataSet({edges})
342        }},
343        {options},
344    )
345  </script>
346</div>"""
347
348    def _repr_html_(self) -> str:
349        return self.__str__()

Node to HTML generator using vis.js.

https://visjs.github.io/vis-network/docs/network/

GraphHTML( nodes: Dict, edges: List, imports: bool = True, options: Optional[Dict] = None)
280    def __init__(
281        self, nodes: t.Dict, edges: t.List, imports: bool = True, options: t.Optional[t.Dict] = None
282    ):
283        self.imports = imports
284
285        self.options = {
286            "height": "500px",
287            "width": "100%",
288            "layout": {
289                "hierarchical": {
290                    "enabled": True,
291                    "nodeSpacing": 200,
292                    "sortMethod": "directed",
293                },
294            },
295            "interaction": {
296                "dragNodes": False,
297                "selectable": False,
298            },
299            "physics": {
300                "enabled": False,
301            },
302            "edges": {
303                "arrows": "to",
304            },
305            "nodes": {
306                "font": "20px monaco",
307                "shape": "box",
308                "widthConstraint": {
309                    "maximum": 300,
310                },
311            },
312            **(options or {}),
313        }
314
315        self.nodes = nodes
316        self.edges = edges
imports
options
nodes
edges