Edit on GitHub

sqlglot.jsonpath

  1from __future__ import annotations
  2
  3import typing as t
  4
  5from sqlglot.errors import ParseError
  6from sqlglot.expressions import SAFE_IDENTIFIER_RE
  7from sqlglot.tokens import Token, Tokenizer, TokenType
  8
  9if t.TYPE_CHECKING:
 10    from sqlglot._typing import Lit
 11
 12
 13class JSONPathTokenizer(Tokenizer):
 14    SINGLE_TOKENS = {
 15        "(": TokenType.L_PAREN,
 16        ")": TokenType.R_PAREN,
 17        "[": TokenType.L_BRACKET,
 18        "]": TokenType.R_BRACKET,
 19        ":": TokenType.COLON,
 20        ",": TokenType.COMMA,
 21        "-": TokenType.DASH,
 22        ".": TokenType.DOT,
 23        "?": TokenType.PLACEHOLDER,
 24        "@": TokenType.PARAMETER,
 25        "'": TokenType.QUOTE,
 26        '"': TokenType.QUOTE,
 27        "$": TokenType.DOLLAR,
 28        "*": TokenType.STAR,
 29    }
 30
 31    KEYWORDS = {
 32        "..": TokenType.DOT,
 33    }
 34
 35    IDENTIFIER_ESCAPES = ["\\"]
 36    STRING_ESCAPES = ["\\"]
 37
 38
 39JSONPathNode = t.Dict[str, t.Any]
 40
 41
 42def _node(kind: str, value: t.Any = None, **kwargs: t.Any) -> JSONPathNode:
 43    node = {"kind": kind, **kwargs}
 44
 45    if value is not None:
 46        node["value"] = value
 47
 48    return node
 49
 50
 51def parse(path: str) -> t.List[JSONPathNode]:
 52    """Takes in a JSONPath string and converts into a list of nodes."""
 53    tokens = JSONPathTokenizer().tokenize(path)
 54    size = len(tokens)
 55
 56    i = 0
 57
 58    def _curr() -> t.Optional[TokenType]:
 59        return tokens[i].token_type if i < size else None
 60
 61    def _prev() -> Token:
 62        return tokens[i - 1]
 63
 64    def _advance() -> Token:
 65        nonlocal i
 66        i += 1
 67        return _prev()
 68
 69    def _error(msg: str) -> str:
 70        return f"{msg} at index {i}: {path}"
 71
 72    @t.overload
 73    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 74        pass
 75
 76    @t.overload
 77    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 78        pass
 79
 80    def _match(token_type, raise_unmatched=False):
 81        if _curr() == token_type:
 82            return _advance()
 83        if raise_unmatched:
 84            raise ParseError(_error(f"Expected {token_type}"))
 85        return None
 86
 87    def _parse_literal() -> t.Any:
 88        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 89        if token:
 90            return token.text
 91        if _match(TokenType.STAR):
 92            return _node("wildcard")
 93        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 94            script = _prev().text == "("
 95            start = i
 96
 97            while True:
 98                if _match(TokenType.L_BRACKET):
 99                    _parse_bracket()  # nested call which we can throw away
100                if _curr() in (TokenType.R_BRACKET, None):
101                    break
102                _advance()
103            return _node(
104                "script" if script else "filter", path[tokens[start].start : tokens[i].end]
105            )
106
107        number = "-" if _match(TokenType.DASH) else ""
108
109        token = _match(TokenType.NUMBER)
110        if token:
111            number += token.text
112
113        if number:
114            return int(number)
115        return False
116
117    def _parse_slice() -> t.Any:
118        start = _parse_literal()
119        end = _parse_literal() if _match(TokenType.COLON) else None
120        step = _parse_literal() if _match(TokenType.COLON) else None
121
122        if end is None and step is None:
123            return start
124        return _node("slice", start=start, end=end, step=step)
125
126    def _parse_bracket() -> JSONPathNode:
127        literal = _parse_slice()
128
129        if isinstance(literal, str) or literal is not False:
130            indexes = [literal]
131            while _match(TokenType.COMMA):
132                literal = _parse_slice()
133
134                if literal:
135                    indexes.append(literal)
136
137            if len(indexes) == 1:
138                if isinstance(literal, str):
139                    node = _node("key", indexes[0])
140                elif isinstance(literal, dict) and literal["kind"] in ("script", "filter"):
141                    node = _node("selector", indexes[0])
142                else:
143                    node = _node("subscript", indexes[0])
144            else:
145                node = _node("union", indexes)
146        else:
147            raise ParseError(_error("Cannot have empty segment"))
148
149        _match(TokenType.R_BRACKET, raise_unmatched=True)
150
151        return node
152
153    nodes = []
154
155    while _curr():
156        if _match(TokenType.DOLLAR):
157            nodes.append(_node("root"))
158        elif _match(TokenType.DOT):
159            recursive = _prev().text == ".."
160            value = _match(TokenType.VAR) or _match(TokenType.STAR)
161            nodes.append(
162                _node("recursive" if recursive else "child", value=value.text if value else None)
163            )
164        elif _match(TokenType.L_BRACKET):
165            nodes.append(_parse_bracket())
166        elif _match(TokenType.VAR):
167            nodes.append(_node("key", _prev().text))
168        elif _match(TokenType.STAR):
169            nodes.append(_node("wildcard"))
170        elif _match(TokenType.PARAMETER):
171            nodes.append(_node("current"))
172        else:
173            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
174
175    return nodes
176
177
178MAPPING = {
179    "child": lambda n: f".{n['value']}" if n.get("value") is not None else "",
180    "filter": lambda n: f"?{n['value']}",
181    "key": lambda n: f".{n['value']}"
182    if SAFE_IDENTIFIER_RE.match(n["value"])
183    else f'[{generate([n["value"]])}]',
184    "recursive": lambda n: f"..{n['value']}" if n.get("value") is not None else "..",
185    "root": lambda _: "$",
186    "script": lambda n: f"({n['value']}",
187    "slice": lambda n: ":".join(
188        "" if p is False else generate([p])
189        for p in [n["start"], n["end"], n["step"]]
190        if p is not None
191    ),
192    "selector": lambda n: f"[{generate([n['value']])}]",
193    "subscript": lambda n: f"[{generate([n['value']])}]",
194    "union": lambda n: f"[{','.join(generate([p]) for p in n['value'])}]",
195    "wildcard": lambda _: "*",
196}
197
198
199def generate(
200    nodes: t.List[JSONPathNode],
201    mapping: t.Optional[t.Dict[str, t.Callable[[JSONPathNode], str]]] = None,
202) -> str:
203    mapping = MAPPING if mapping is None else mapping
204    path = []
205
206    for node in nodes:
207        if isinstance(node, dict):
208            path.append(mapping[node["kind"]](node))
209        elif isinstance(node, str):
210            escaped = node.replace('"', '\\"')
211            path.append(f'"{escaped}"')
212        else:
213            path.append(str(node))
214
215    return "".join(path)
class JSONPathTokenizer(sqlglot.tokens.Tokenizer):
14class JSONPathTokenizer(Tokenizer):
15    SINGLE_TOKENS = {
16        "(": TokenType.L_PAREN,
17        ")": TokenType.R_PAREN,
18        "[": TokenType.L_BRACKET,
19        "]": TokenType.R_BRACKET,
20        ":": TokenType.COLON,
21        ",": TokenType.COMMA,
22        "-": TokenType.DASH,
23        ".": TokenType.DOT,
24        "?": TokenType.PLACEHOLDER,
25        "@": TokenType.PARAMETER,
26        "'": TokenType.QUOTE,
27        '"': TokenType.QUOTE,
28        "$": TokenType.DOLLAR,
29        "*": TokenType.STAR,
30    }
31
32    KEYWORDS = {
33        "..": TokenType.DOT,
34    }
35
36    IDENTIFIER_ESCAPES = ["\\"]
37    STRING_ESCAPES = ["\\"]
SINGLE_TOKENS = {'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
KEYWORDS = {'..': <TokenType.DOT: 'DOT'>}
IDENTIFIER_ESCAPES = ['\\']
STRING_ESCAPES = ['\\']
JSONPathNode = typing.Dict[str, typing.Any]
def parse(path: str) -> List[Dict[str, Any]]:
 52def parse(path: str) -> t.List[JSONPathNode]:
 53    """Takes in a JSONPath string and converts into a list of nodes."""
 54    tokens = JSONPathTokenizer().tokenize(path)
 55    size = len(tokens)
 56
 57    i = 0
 58
 59    def _curr() -> t.Optional[TokenType]:
 60        return tokens[i].token_type if i < size else None
 61
 62    def _prev() -> Token:
 63        return tokens[i - 1]
 64
 65    def _advance() -> Token:
 66        nonlocal i
 67        i += 1
 68        return _prev()
 69
 70    def _error(msg: str) -> str:
 71        return f"{msg} at index {i}: {path}"
 72
 73    @t.overload
 74    def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token:
 75        pass
 76
 77    @t.overload
 78    def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]:
 79        pass
 80
 81    def _match(token_type, raise_unmatched=False):
 82        if _curr() == token_type:
 83            return _advance()
 84        if raise_unmatched:
 85            raise ParseError(_error(f"Expected {token_type}"))
 86        return None
 87
 88    def _parse_literal() -> t.Any:
 89        token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER)
 90        if token:
 91            return token.text
 92        if _match(TokenType.STAR):
 93            return _node("wildcard")
 94        if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN):
 95            script = _prev().text == "("
 96            start = i
 97
 98            while True:
 99                if _match(TokenType.L_BRACKET):
100                    _parse_bracket()  # nested call which we can throw away
101                if _curr() in (TokenType.R_BRACKET, None):
102                    break
103                _advance()
104            return _node(
105                "script" if script else "filter", path[tokens[start].start : tokens[i].end]
106            )
107
108        number = "-" if _match(TokenType.DASH) else ""
109
110        token = _match(TokenType.NUMBER)
111        if token:
112            number += token.text
113
114        if number:
115            return int(number)
116        return False
117
118    def _parse_slice() -> t.Any:
119        start = _parse_literal()
120        end = _parse_literal() if _match(TokenType.COLON) else None
121        step = _parse_literal() if _match(TokenType.COLON) else None
122
123        if end is None and step is None:
124            return start
125        return _node("slice", start=start, end=end, step=step)
126
127    def _parse_bracket() -> JSONPathNode:
128        literal = _parse_slice()
129
130        if isinstance(literal, str) or literal is not False:
131            indexes = [literal]
132            while _match(TokenType.COMMA):
133                literal = _parse_slice()
134
135                if literal:
136                    indexes.append(literal)
137
138            if len(indexes) == 1:
139                if isinstance(literal, str):
140                    node = _node("key", indexes[0])
141                elif isinstance(literal, dict) and literal["kind"] in ("script", "filter"):
142                    node = _node("selector", indexes[0])
143                else:
144                    node = _node("subscript", indexes[0])
145            else:
146                node = _node("union", indexes)
147        else:
148            raise ParseError(_error("Cannot have empty segment"))
149
150        _match(TokenType.R_BRACKET, raise_unmatched=True)
151
152        return node
153
154    nodes = []
155
156    while _curr():
157        if _match(TokenType.DOLLAR):
158            nodes.append(_node("root"))
159        elif _match(TokenType.DOT):
160            recursive = _prev().text == ".."
161            value = _match(TokenType.VAR) or _match(TokenType.STAR)
162            nodes.append(
163                _node("recursive" if recursive else "child", value=value.text if value else None)
164            )
165        elif _match(TokenType.L_BRACKET):
166            nodes.append(_parse_bracket())
167        elif _match(TokenType.VAR):
168            nodes.append(_node("key", _prev().text))
169        elif _match(TokenType.STAR):
170            nodes.append(_node("wildcard"))
171        elif _match(TokenType.PARAMETER):
172            nodes.append(_node("current"))
173        else:
174            raise ParseError(_error(f"Unexpected {tokens[i].token_type}"))
175
176    return nodes

Takes in a JSONPath string and converts into a list of nodes.

MAPPING = {'child': <function <lambda>>, 'filter': <function <lambda>>, 'key': <function <lambda>>, 'recursive': <function <lambda>>, 'root': <function <lambda>>, 'script': <function <lambda>>, 'slice': <function <lambda>>, 'selector': <function <lambda>>, 'subscript': <function <lambda>>, 'union': <function <lambda>>, 'wildcard': <function <lambda>>}
def generate( nodes: List[Dict[str, Any]], mapping: Optional[Dict[str, Callable[[Dict[str, Any]], str]]] = None) -> str:
200def generate(
201    nodes: t.List[JSONPathNode],
202    mapping: t.Optional[t.Dict[str, t.Callable[[JSONPathNode], str]]] = None,
203) -> str:
204    mapping = MAPPING if mapping is None else mapping
205    path = []
206
207    for node in nodes:
208        if isinstance(node, dict):
209            path.append(mapping[node["kind"]](node))
210        elif isinstance(node, str):
211            escaped = node.replace('"', '\\"')
212            path.append(f'"{escaped}"')
213        else:
214            path.append(str(node))
215
216    return "".join(path)