sqlglot.jsonpath
1from __future__ import annotations 2 3import typing as t 4 5import sqlglot.expressions as exp 6from sqlglot.errors import ParseError 7from sqlglot.tokens import Token, Tokenizer, TokenType 8 9if t.TYPE_CHECKING: 10 from sqlglot._typing import Lit 11 12 13class JSONPathTokenizer(Tokenizer): 14 SINGLE_TOKENS = { 15 "(": TokenType.L_PAREN, 16 ")": TokenType.R_PAREN, 17 "[": TokenType.L_BRACKET, 18 "]": TokenType.R_BRACKET, 19 ":": TokenType.COLON, 20 ",": TokenType.COMMA, 21 "-": TokenType.DASH, 22 ".": TokenType.DOT, 23 "?": TokenType.PLACEHOLDER, 24 "@": TokenType.PARAMETER, 25 "'": TokenType.QUOTE, 26 '"': TokenType.QUOTE, 27 "$": TokenType.DOLLAR, 28 "*": TokenType.STAR, 29 } 30 31 KEYWORDS = { 32 "..": TokenType.DOT, 33 } 34 35 IDENTIFIER_ESCAPES = ["\\"] 36 STRING_ESCAPES = ["\\"] 37 38 39def parse(path: str) -> exp.JSONPath: 40 """Takes in a JSON path string and parses it into a JSONPath expression.""" 41 tokens = JSONPathTokenizer().tokenize(path) 42 size = len(tokens) 43 44 i = 0 45 46 def _curr() -> t.Optional[TokenType]: 47 return tokens[i].token_type if i < size else None 48 49 def _prev() -> Token: 50 return tokens[i - 1] 51 52 def _advance() -> Token: 53 nonlocal i 54 i += 1 55 return _prev() 56 57 def _error(msg: str) -> str: 58 return f"{msg} at index {i}: {path}" 59 60 @t.overload 61 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 62 pass 63 64 @t.overload 65 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 66 pass 67 68 def _match(token_type, raise_unmatched=False): 69 if _curr() == token_type: 70 return _advance() 71 if raise_unmatched: 72 raise ParseError(_error(f"Expected {token_type}")) 73 return None 74 75 def _parse_literal() -> t.Any: 76 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 77 if token: 78 return token.text 79 if _match(TokenType.STAR): 80 return exp.JSONPathWildcard() 81 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 82 script = _prev().text == "(" 83 start = i 84 85 while True: 86 if _match(TokenType.L_BRACKET): 87 _parse_bracket() # nested call which we can throw away 88 if _curr() in (TokenType.R_BRACKET, None): 89 break 90 _advance() 91 92 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 93 return expr_type(this=path[tokens[start].start : tokens[i].end]) 94 95 number = "-" if _match(TokenType.DASH) else "" 96 97 token = _match(TokenType.NUMBER) 98 if token: 99 number += token.text 100 101 if number: 102 return int(number) 103 104 return False 105 106 def _parse_slice() -> t.Any: 107 start = _parse_literal() 108 end = _parse_literal() if _match(TokenType.COLON) else None 109 step = _parse_literal() if _match(TokenType.COLON) else None 110 111 if end is None and step is None: 112 return start 113 114 return exp.JSONPathSlice(start=start, end=end, step=step) 115 116 def _parse_bracket() -> exp.JSONPathPart: 117 literal = _parse_slice() 118 119 if isinstance(literal, str) or literal is not False: 120 indexes = [literal] 121 while _match(TokenType.COMMA): 122 literal = _parse_slice() 123 124 if literal: 125 indexes.append(literal) 126 127 if len(indexes) == 1: 128 if isinstance(literal, str): 129 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 130 elif isinstance(literal, exp.JSONPathPart) and isinstance( 131 literal, (exp.JSONPathScript, exp.JSONPathFilter) 132 ): 133 node = exp.JSONPathSelector(this=indexes[0]) 134 else: 135 node = exp.JSONPathSubscript(this=indexes[0]) 136 else: 137 node = exp.JSONPathUnion(expressions=indexes) 138 else: 139 raise ParseError(_error("Cannot have empty segment")) 140 141 _match(TokenType.R_BRACKET, raise_unmatched=True) 142 143 return node 144 145 # We canonicalize the JSON path AST so that it always starts with a 146 # "root" element, so paths like "field" will be generated as "$.field" 147 _match(TokenType.DOLLAR) 148 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 149 150 while _curr(): 151 if _match(TokenType.DOT) or _match(TokenType.COLON): 152 recursive = _prev().text == ".." 153 154 if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 155 value: t.Optional[str | exp.JSONPathWildcard] = _prev().text 156 elif _match(TokenType.STAR): 157 value = exp.JSONPathWildcard() 158 else: 159 value = None 160 161 if recursive: 162 expressions.append(exp.JSONPathRecursive(this=value)) 163 elif value: 164 expressions.append(exp.JSONPathKey(this=value)) 165 else: 166 raise ParseError(_error("Expected key name or * after DOT")) 167 elif _match(TokenType.L_BRACKET): 168 expressions.append(_parse_bracket()) 169 elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 170 expressions.append(exp.JSONPathKey(this=_prev().text)) 171 elif _match(TokenType.STAR): 172 expressions.append(exp.JSONPathWildcard()) 173 else: 174 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 175 176 return exp.JSONPath(expressions=expressions) 177 178 179JSON_PATH_PART_TRANSFORMS: t.Dict[t.Type[exp.Expression], t.Callable[..., str]] = { 180 exp.JSONPathFilter: lambda _, e: f"?{e.this}", 181 exp.JSONPathKey: lambda self, e: self._jsonpathkey_sql(e), 182 exp.JSONPathRecursive: lambda _, e: f"..{e.this or ''}", 183 exp.JSONPathRoot: lambda *_: "$", 184 exp.JSONPathScript: lambda _, e: f"({e.this}", 185 exp.JSONPathSelector: lambda self, e: f"[{self.json_path_part(e.this)}]", 186 exp.JSONPathSlice: lambda self, e: ":".join( 187 "" if p is False else self.json_path_part(p) 188 for p in [e.args.get("start"), e.args.get("end"), e.args.get("step")] 189 if p is not None 190 ), 191 exp.JSONPathSubscript: lambda self, e: self._jsonpathsubscript_sql(e), 192 exp.JSONPathUnion: lambda self, 193 e: f"[{','.join(self.json_path_part(p) for p in e.expressions)}]", 194 exp.JSONPathWildcard: lambda *_: "*", 195} 196 197ALL_JSON_PATH_PARTS = set(JSON_PATH_PART_TRANSFORMS)
14class JSONPathTokenizer(Tokenizer): 15 SINGLE_TOKENS = { 16 "(": TokenType.L_PAREN, 17 ")": TokenType.R_PAREN, 18 "[": TokenType.L_BRACKET, 19 "]": TokenType.R_BRACKET, 20 ":": TokenType.COLON, 21 ",": TokenType.COMMA, 22 "-": TokenType.DASH, 23 ".": TokenType.DOT, 24 "?": TokenType.PLACEHOLDER, 25 "@": TokenType.PARAMETER, 26 "'": TokenType.QUOTE, 27 '"': TokenType.QUOTE, 28 "$": TokenType.DOLLAR, 29 "*": TokenType.STAR, 30 } 31 32 KEYWORDS = { 33 "..": TokenType.DOT, 34 } 35 36 IDENTIFIER_ESCAPES = ["\\"] 37 STRING_ESCAPES = ["\\"]
SINGLE_TOKENS =
{'(': <TokenType.L_PAREN: 'L_PAREN'>, ')': <TokenType.R_PAREN: 'R_PAREN'>, '[': <TokenType.L_BRACKET: 'L_BRACKET'>, ']': <TokenType.R_BRACKET: 'R_BRACKET'>, ':': <TokenType.COLON: 'COLON'>, ',': <TokenType.COMMA: 'COMMA'>, '-': <TokenType.DASH: 'DASH'>, '.': <TokenType.DOT: 'DOT'>, '?': <TokenType.PLACEHOLDER: 'PLACEHOLDER'>, '@': <TokenType.PARAMETER: 'PARAMETER'>, "'": <TokenType.QUOTE: 'QUOTE'>, '"': <TokenType.QUOTE: 'QUOTE'>, '$': <TokenType.DOLLAR: 'DOLLAR'>, '*': <TokenType.STAR: 'STAR'>}
Inherited Members
- sqlglot.tokens.Tokenizer
- Tokenizer
- BIT_STRINGS
- BYTE_STRINGS
- HEX_STRINGS
- RAW_STRINGS
- HEREDOC_STRINGS
- UNICODE_STRINGS
- IDENTIFIERS
- QUOTES
- VAR_SINGLE_TOKENS
- HEREDOC_TAG_IS_IDENTIFIER
- HEREDOC_STRING_ALTERNATIVE
- WHITE_SPACE
- COMMANDS
- COMMAND_PREFIX_TOKENS
- NUMERIC_LITERALS
- COMMENTS
- dialect
- reset
- tokenize
- tokenize_rs
- size
- sql
- tokens
40def parse(path: str) -> exp.JSONPath: 41 """Takes in a JSON path string and parses it into a JSONPath expression.""" 42 tokens = JSONPathTokenizer().tokenize(path) 43 size = len(tokens) 44 45 i = 0 46 47 def _curr() -> t.Optional[TokenType]: 48 return tokens[i].token_type if i < size else None 49 50 def _prev() -> Token: 51 return tokens[i - 1] 52 53 def _advance() -> Token: 54 nonlocal i 55 i += 1 56 return _prev() 57 58 def _error(msg: str) -> str: 59 return f"{msg} at index {i}: {path}" 60 61 @t.overload 62 def _match(token_type: TokenType, raise_unmatched: Lit[True] = True) -> Token: 63 pass 64 65 @t.overload 66 def _match(token_type: TokenType, raise_unmatched: Lit[False] = False) -> t.Optional[Token]: 67 pass 68 69 def _match(token_type, raise_unmatched=False): 70 if _curr() == token_type: 71 return _advance() 72 if raise_unmatched: 73 raise ParseError(_error(f"Expected {token_type}")) 74 return None 75 76 def _parse_literal() -> t.Any: 77 token = _match(TokenType.STRING) or _match(TokenType.IDENTIFIER) 78 if token: 79 return token.text 80 if _match(TokenType.STAR): 81 return exp.JSONPathWildcard() 82 if _match(TokenType.PLACEHOLDER) or _match(TokenType.L_PAREN): 83 script = _prev().text == "(" 84 start = i 85 86 while True: 87 if _match(TokenType.L_BRACKET): 88 _parse_bracket() # nested call which we can throw away 89 if _curr() in (TokenType.R_BRACKET, None): 90 break 91 _advance() 92 93 expr_type = exp.JSONPathScript if script else exp.JSONPathFilter 94 return expr_type(this=path[tokens[start].start : tokens[i].end]) 95 96 number = "-" if _match(TokenType.DASH) else "" 97 98 token = _match(TokenType.NUMBER) 99 if token: 100 number += token.text 101 102 if number: 103 return int(number) 104 105 return False 106 107 def _parse_slice() -> t.Any: 108 start = _parse_literal() 109 end = _parse_literal() if _match(TokenType.COLON) else None 110 step = _parse_literal() if _match(TokenType.COLON) else None 111 112 if end is None and step is None: 113 return start 114 115 return exp.JSONPathSlice(start=start, end=end, step=step) 116 117 def _parse_bracket() -> exp.JSONPathPart: 118 literal = _parse_slice() 119 120 if isinstance(literal, str) or literal is not False: 121 indexes = [literal] 122 while _match(TokenType.COMMA): 123 literal = _parse_slice() 124 125 if literal: 126 indexes.append(literal) 127 128 if len(indexes) == 1: 129 if isinstance(literal, str): 130 node: exp.JSONPathPart = exp.JSONPathKey(this=indexes[0]) 131 elif isinstance(literal, exp.JSONPathPart) and isinstance( 132 literal, (exp.JSONPathScript, exp.JSONPathFilter) 133 ): 134 node = exp.JSONPathSelector(this=indexes[0]) 135 else: 136 node = exp.JSONPathSubscript(this=indexes[0]) 137 else: 138 node = exp.JSONPathUnion(expressions=indexes) 139 else: 140 raise ParseError(_error("Cannot have empty segment")) 141 142 _match(TokenType.R_BRACKET, raise_unmatched=True) 143 144 return node 145 146 # We canonicalize the JSON path AST so that it always starts with a 147 # "root" element, so paths like "field" will be generated as "$.field" 148 _match(TokenType.DOLLAR) 149 expressions: t.List[exp.JSONPathPart] = [exp.JSONPathRoot()] 150 151 while _curr(): 152 if _match(TokenType.DOT) or _match(TokenType.COLON): 153 recursive = _prev().text == ".." 154 155 if _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 156 value: t.Optional[str | exp.JSONPathWildcard] = _prev().text 157 elif _match(TokenType.STAR): 158 value = exp.JSONPathWildcard() 159 else: 160 value = None 161 162 if recursive: 163 expressions.append(exp.JSONPathRecursive(this=value)) 164 elif value: 165 expressions.append(exp.JSONPathKey(this=value)) 166 else: 167 raise ParseError(_error("Expected key name or * after DOT")) 168 elif _match(TokenType.L_BRACKET): 169 expressions.append(_parse_bracket()) 170 elif _match(TokenType.VAR) or _match(TokenType.IDENTIFIER): 171 expressions.append(exp.JSONPathKey(this=_prev().text)) 172 elif _match(TokenType.STAR): 173 expressions.append(exp.JSONPathWildcard()) 174 else: 175 raise ParseError(_error(f"Unexpected {tokens[i].token_type}")) 176 177 return exp.JSONPath(expressions=expressions)
Takes in a JSON path string and parses it into a JSONPath expression.
JSON_PATH_PART_TRANSFORMS: Dict[Type[sqlglot.expressions.Expression], Callable[..., str]] =
{<class 'sqlglot.expressions.JSONPathFilter'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathKey'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRecursive'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathRoot'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathScript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSelector'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSlice'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathSubscript'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathUnion'>: <function <lambda>>, <class 'sqlglot.expressions.JSONPathWildcard'>: <function <lambda>>}
ALL_JSON_PATH_PARTS =
{<class 'sqlglot.expressions.JSONPathKey'>, <class 'sqlglot.expressions.JSONPathWildcard'>, <class 'sqlglot.expressions.JSONPathFilter'>, <class 'sqlglot.expressions.JSONPathUnion'>, <class 'sqlglot.expressions.JSONPathSubscript'>, <class 'sqlglot.expressions.JSONPathSelector'>, <class 'sqlglot.expressions.JSONPathSlice'>, <class 'sqlglot.expressions.JSONPathScript'>, <class 'sqlglot.expressions.JSONPathRoot'>, <class 'sqlglot.expressions.JSONPathRecursive'>}