Storm SQL 语言参考

Storm SQL 使用 Apache Calcite 来转换和评估 SQL 语句. Storm SQL 还采用了来自 Calcite 的 Rex 编译器,因此 Storm SQL 将处理由 Calcite 的默认 SQL 解析器识别的 SQL 方言。

本文基于 Calcite 官网的 SQL 参考手册, 移除了部分 Storm SQL 不支持的内容, 添加了一些 Storm SQL 支持的内容.

请先阅读 Storm SQL integration 页面, 了解 Storm SQL 支持哪些特性.

语法

Calcite 提供丰富的 SQL 语法. 但是 Storm SQL 并不是一个数据库系统, 它用于处理流式数据, 因此只支持语法的子集. Storm SQL 不会重定义 SQL 语法, 只优化 Calcite 提供的转换器, 因此 SQL 语句仍然基于 Calcite 的 SQL 语法进行转换.

SQL 语法表现为 类BNF 的形式.

statement:
      setStatement
  |   resetStatement
  |   explain
  |   describe
  |   insert
  |   update
  |   merge
  |   delete
  |   query

setStatement:
      [ ALTER ( SYSTEM | SESSION ) ] SET identifier '=' expression

resetStatement:
      [ ALTER ( SYSTEM | SESSION ) ] RESET identifier
  |   [ ALTER ( SYSTEM | SESSION ) ] RESET ALL

explain:
      EXPLAIN PLAN
      [ WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION ]
      [ EXCLUDING ATTRIBUTES | INCLUDING [ ALL ] ATTRIBUTES ]
      FOR ( query | insert | update | merge | delete )

describe:
      DESCRIBE DATABASE databaseName
   |  DESCRIBE CATALOG [ databaseName . ] catalogName
   |  DESCRIBE SCHEMA [ [ databaseName . ] catalogName ] . schemaName
   |  DESCRIBE [ TABLE ] [ [ [ databaseName . ] catalogName . ] schemaName . ] tableName [ columnName ]
   |  DESCRIBE [ STATEMENT ] ( query | insert | update | merge | delete )

insert:
      ( INSERT | UPSERT ) INTO tablePrimary
      [ '(' column [, column ]* ')' ]
      query

update:
      UPDATE tablePrimary
      SET assign [, assign ]*
      [ WHERE booleanExpression ]

assign:
      identifier '=' expression

merge:
      MERGE INTO tablePrimary [ [ AS ] alias ]
      USING tablePrimary
      ON booleanExpression
      [ WHEN MATCHED THEN UPDATE SET assign [, assign ]* ]
      [ WHEN NOT MATCHED THEN INSERT VALUES '(' value [ , value ]* ')' ]

delete:
      DELETE FROM tablePrimary [ [ AS ] alias ]
      [ WHERE booleanExpression ]

query:
      values
  |   WITH withItem [ , withItem ]* query
  |   {
          select
      |   selectWithoutFrom
      |   query UNION [ ALL ] query
      |   query EXCEPT query
      |   query INTERSECT query
      }
      [ ORDER BY orderItem [, orderItem ]* ]
      [ LIMIT { count | ALL } ]
      [ OFFSET start { ROW | ROWS } ]
      [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ]

withItem:
      name
      [ '(' column [, column ]* ')' ]
      AS '(' query ')'

orderItem:
      expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]

select:
      SELECT [ STREAM ] [ ALL | DISTINCT ]
          { * | projectItem [, projectItem ]* }
      FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
      SELECT [ ALL | DISTINCT ]
          { * | projectItem [, projectItem ]* }

projectItem:
      expression [ [ AS ] columnAlias ]
  |   tableAlias . *

tableExpression:
      tableReference [, tableReference ]*
  |   tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
      ON booleanExpression
  |   USING '(' column [, column ]* ')'

tableReference:
      tablePrimary
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
      [ [ catalogName . ] schemaName . ] tableName
      '(' TABLE [ [ catalogName . ] schemaName . ] tableName ')'
  |   [ LATERAL ] '(' query ')'
  |   UNNEST '(' expression ')' [ WITH ORDINALITY ]
  |   [ LATERAL ] TABLE '(' [ SPECIFIC ] functionName '(' expression [, expression ]* ')' ')'

values:
      VALUES expression [, expression ]*

groupItem:
      expression
  |   '(' ')'
  |   '(' expression [, expression ]* ')'
  |   CUBE '(' expression [, expression ]* ')'
  |   ROLLUP '(' expression [, expression ]* ')'
  |   GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
      windowName
  |   windowSpec

windowSpec:
      [ windowName ]
      '('
      [ ORDER BY orderItem [, orderItem ]* ]
      [ PARTITION BY expression [, expression ]* ]
      [
          RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING }
      |   ROWS numericExpression { PRECEDING | FOLLOWING }
      ]
      ')'

merge 中, 必须有 WHEN MATCH 或者 WHEN NOT MATCH 从句的其中之一.

orderItem 中, 如果 expression 是一个正整数 n, 他表示 SELECT 从句中的第 n 个项目.

聚合查询是包含 GROUP BY 或 HAVING 的查询, 子句或 SELECT 子句中的聚合函数。 在 SELECT 中,汇总查询的 HAVING 和 ORDER BY 子句,所有表达式必须在当前组内是恒定的(即分组常量由GROUP BY子句定义,或常量)或聚合函数或常量和聚合的组合功能。 聚合和分组功能只能出现在聚合查询,并且仅在 SELECT,HAVING 或 ORDER BY 子句中。

一个标量子查询是像表达式一样使用的子查询. 如果子查询没有返回行, 值为 NULL; 如果返回多行, 就是错误的.

IN, EXISTS 和 标量子查询可以在任何使用 expression 的地方使用(比如 SELECT 从句, WHERE 从句, JOIN 后面的 ON 从句, 或者作为一个聚合函数的参数).

一个 IN, EXISTS 或者标量子查询可能是相关的; 意思是, 可以引用封闭查询的 FROM 从句中的表。

selectWithoutFrom 等价于 VALUES, 但是并非标准 SQL, 只在允许在特定的conformance levels上.

关键词

下列是一个 SQL 的关键词列表. 这个列表页来自于 Calcite SQL 的参考手册.

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE.

标识符

标识符是在 SQL 查询中使用的表名, 列, 和其他元数据元素.

未被引号括起来的标识符, 比如 emp, 必须以字母打头且只能包含字母, 数字, 下划线. 他们会隐式的转换为大写.

被引号引起来的标识符, 例如 "Employee Name", 以双引号开始和结束. 他们可以包含几乎任何字符, 包括空白和其他标点. 如果你想在标识符中包含一个双引号, 使用双引号进行转义, 像这样: "An employee called ""Fred"".".

在 Calcite 中, 与引用的对象的名称匹配的标识符是大小写敏感的. 但是记住, 被引号括起来的标识符会在匹配前隐式转化为大写, 如果它引用的对象的名称是使用未被引号括起来的标识符创建的, 它的名称也会被转换成大写.

数据类型

标量类型

Data type Description Range and examples
BOOLEAN Logical values Values: TRUE, FALSE, UNKNOWN
TINYINT 1 byte signed integer Range is -255 to 256
SMALLINT 2 byte signed integer Range is -32768 to 32767
INTEGER, INT 4 byte signed integer Range is -2147483648 to 2147483647
BIGINT 8 byte signed integer Range is -9223372036854775808 to 9223372036854775807
DECIMAL(p, s) Fixed point Example: 123.45 is a DECIMAL(5, 2) value.
NUMERIC Fixed point
REAL, FLOAT 4 byte floating point 6 decimal digits precision
DOUBLE 8 byte floating point 15 decimal digits precision
CHAR(n), CHARACTER(n) Fixed-width character string 'Hello', '' (empty string), _latin1'Hello', n'Hello', _UTF16'Hello', 'Hello' 'there' (literal split into multiple parts)
VARCHAR(n), CHARACTER VARYING(n) Variable-length character string As CHAR(n)
BINARY(n) Fixed-width binary string x'45F0AB', x'' (empty binary string), x'AB' 'CD' (multi-part binary string literal)
VARBINARY(n), BINARY VARYING(n) Variable-length binary string As BINARY(n)
DATE Date Example: DATE '1969-07-20'
TIME Time of day Example: TIME '20:17:40'
TIMESTAMP [ WITHOUT TIME ZONE ] Date and time Example: TIMESTAMP '1969-07-20 20:17:40'
TIMESTAMP WITH TIME ZONE Date and time with time zone Example: TIMESTAMP '1969-07-20 20:17:40 America/Los Angeles'
INTERVAL timeUnit [ TO timeUnit ] Date time interval Examples: INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY
Anchored interval Date time interval Example: (DATE '1969-07-20', DATE '1972-08-29')

Where:

timeUnit:
  MILLENNIUM | CENTURY | DECADE | YEAR | QUARTER | MONTH | WEEK | DOY | DOW | DAY | HOUR | MINUTE | SECOND | EPOCH

注意:

  • DATE, TIME, TIMESTAMP 是不带时区的. 也没有比如UTC(像Java那样)或者本地时区作为默认时区. 它需要用户或者应用提供时区的处理.

非标量类型

类型 描述
ANY 一个类型未知的值
ROW 一列或者多列组成的行
MAP 键值映射的集合
MULTISET 可能包含重复内容的未排序的集合
ARRAY 有序的,可能包含重复的连续集合
CURSOR 查询结果的游标

运算符和函数

运算符优先级

运算符优先级和结合性, 从高到低.

运算符 优先级
.
+ - (unary plus, minus)
* /
+ -
BETWEEN, IN, LIKE, SIMILAR -
< > = <= >= <> !=
IS NULL, IS FALSE, IS NOT TRUE etc. -
NOT
AND
OR

比较运算符

语法 描述
value1 = value2 相等
value1 <> value2 不等
value1 != value2 不等 (only available at some conformance levels)
value1 > value2 大于
value1 >= value2 大于等于
value1 < value2 小于
value1 <= value2 小于等于
value IS NULL value 是否为 null
value IS NOT NULL value 是否不为 null
value1 IS DISTINCT FROM value2 两个值是否不等, null 值认为是相等
value1 IS NOT DISTINCT FROM value2 两个值是否相等, null 值认为是相等
value1 BETWEEN value2 AND value3 Whether value1 is greater than or equal to value2 and less than or equal to value3
value1 NOT BETWEEN value2 AND value3 value1 是否小于 value2 或大于 value3
string1 LIKE string2 [ ESCAPE string3 ] string1 与模式 string2 是否匹配
string1 NOT LIKE string2 [ ESCAPE string3 ] string1 与模式 string2 是否不匹配
string1 SIMILAR TO string2 [ ESCAPE string3 ] string1 与正则 string2 是否匹配
string1 NOT SIMILAR TO string2 [ ESCAPE string3 ] string1 与正则 string2 是否不匹配
value IN (value [, value]* ) value 是否与列表中的每一个值都相等
value NOT IN (value [, value]* ) value 是否与列表中的每一个值都不等

Not supported yet on Storm SQL: 目前 Storm SQL 中不支持的运算符:

语法 描述
value IN (sub-query) 是否 value 等于 sub-query 返回的行
value NOT IN (sub-query) 是否 value 不等于 sub-query 返回的行
EXISTS (sub-query) 是否 sub-query 返回至少一个行

Storm SQL 当前不支持子查询, 因此上面的操作不能正常工作. 这个问题会在不久的将来修复.

逻辑运算符

语法 描述
boolean1 OR boolean2
boolean1 AND boolean2
NOT boolean 是否为True ; boolean 为 UNKNOWN 则返回 UNKNOWN
boolean IS FALSE 是否为False; boolean 为 UNKNOWN 则返回 UNKNOWN
boolean IS NOT FALSE 是否不为False; boolean 为 UNKNOWN 则返回 UNKNOWN
boolean IS TRUE 是否为True; boolean 为 UNKNOWN 则返回 UNKNOWN
boolean IS NOT TRUE 是否不为True; boolean 为 UNKNOWN 则返回 UNKNOWN
boolean IS UNKNOWN 判断是否是 UNKNOWN
boolean IS NOT UNKNOWN 判断是否不是 UNKNOWN

数学运算符和函数

语法 描述
+ numeric 返回 numeric
:- numeric 返回负 numeric
numeric1 + numeric2 返回 numeric1numeric2
numeric1 - numeric2 返回 numeric1numeric2
numeric1 * numeric2 返回 numeric1 乘以 numeric2
numeric1 / numeric2 返回 numeric1 除以 numeric2
POWER(numeric1, numeric2) 返回 numeric1numeric2 次方
ABS(numeric) 返回绝对值 numeric
MOD(numeric, numeric) 取余 numeric1 除以 numeric2. 如果被除数为负, 则余数为负
SQRT(numeric) 返回平方根 numeric
LN(numeric) 返回 numeric 的自然对数 (底数 e)
LOG10(numeric) 返回 numeric 的常用对数 (底数 10)
EXP(numeric) 返回 enumeric 次方
CEIL(numeric) 向上取整 numeric, 返回大于或者等于 numeric 的最小整数
FLOOR(numeric) 向下取整 numeric, 返回小于或者等于 numeric 的最小整数

字符串运算符和函数

语法 描述
string || string 连接2个字符串
CHAR_LENGTH(string) 返回字符串长度
CHARACTER_LENGTH(string) 与 CHAR_LENGTH(string) 等价
UPPER(string) 转换为大写
LOWER(string) 转换为小写
POSITION(string1 IN string2) 返回 string1string2 中首次出现位置
TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) string2 的 头/尾/两头 移除 string1 的最长匹配
OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ]) string2 替换 string1
SUBSTRING(string FROM integer) 从给定的位置开始返回一个子串
SUBSTRING(string FROM integer FOR integer) 从给定位置返回一个指定长度的子串
INITCAP(string) string 中每个单词首字母大写其他字母小写. 单词是由空白字符分隔开的字符序列

未实现的:

  • SUBSTRING(string FROM regexp FOR regexp)

二进制字符串操作符和函数

语法 描述
binary || binary 连接2个二进制字符串.
POSITION(binary1 IN binary2) 返回二进制串 binary1binary2 中首次出现位置
OVERLAY(binary1 PLACING binary2 FROM integer [ FOR integer2 ]) 替换

已知的 bug:

语法 描述
SUBSTRING(binary FROM integer) 从指定的位置截取
SUBSTRING(binary FROM integer FOR integer) 从指定位置截取指定长度的串

Calcite 1.9.0 有bug, 当编译 SUBSTRING 函数的时候会抛出异常. 这个问题会在后续版本中修复.

Date/time 函数

语法 描述
EXTRACT(timeUnit FROM datetime) 返回时间中的指定字段
FLOOR(datetime TO timeUnit) 根据 timeUnit 向下取整
CEIL(datetime TO timeUnit) 根据 timeUnit 向上取整

未实现的:

  • EXTRACT(timeUnit FROM interval)
  • CEIL(interval)
  • FLOOR(interval)
  • datetime - datetime timeUnit [ TO timeUnit ]
  • interval OVERLAPS interval
  • + interval
  • - interval
  • interval + interval
  • interval - interval
  • interval / interval
  • datetime + interval
  • datetime - interval

Storm SQL 特有的:

语法 描述
LOCALTIME 以类型 TIME 返回当前会话时区的当前时间
LOCALTIME(precision) 以类型 TIME 返回当前会话时区的当前时间, 精度precision
LOCALTIMESTAMP 以类型 TIMESTAMP 返回当前会话时区的当前时间
LOCALTIMESTAMP(precision) 以类型 TIMESTAMP 返回当前会话时区的当前时间, 精度precision
CURRENT_TIME 以类型 TIMESTAMP WITH TIME ZONE 返回当前会话时区的当前时间
CURRENT_DATE 以类型 DATE 返回当前会话时区的当前时间
CURRENT_TIMESTAMP 以类型 TIMESTAMP WITH TIME ZONE 返回当前会话时区的当前时间

SQL标准规定,上述运算符在评估查询时应返回相同的值。 Storm SQL将每个查询转换为Trident拓扑并运行,因此在评估SQL语句时,技术上当前的日期/时间应该是固定的。 由于这个限制,当创建Trident拓扑时,当前日期/时间将被修复,并且这些运算符应该在拓扑生命周期中返回相同的值。

系统函数

Storm SQL 当前不支持的函数:

语法 描述
USER 等价于CURRENT_USER
CURRENT_USER 当前执行上下文的用户名
SESSION_USER 会话的用户名
SYSTEM_USER 返回系统标识的当前数据存储的用户 user
CURRENT_PATH 返回表示当前查找范围的字符串,用于引用用户定义的例程和类型
CURRENT_ROLE 返回当前活动 role

这些操作符并不代表 Storm SQL 的运行时,所以除非我们找到正确的语义,否则这些操作可能永远不会被支持。

条件函数和操作符

语法 描述
CASE value
WHEN value1 [, value11 ]* THEN result1
[ WHEN valueN [, valueN1 ]* THEN resultN ]*
[ ELSE resultZ ]
END
简单 case 语句
CASE
WHEN condition1 THEN result1
[ WHEN conditionN THEN resultN ]*
[ ELSE resultZ ]
END
搜索 case 语句
NULLIF(value, value) 值相同返回.

例如, NULLIF(5, 5) 返回 NULL; NULLIF(5, 0) 返回 5.
COALESCE(value, value [, value ]* ) 前一个值为 null 则返回后一个值.

例如, COALESCE(NULL, 5) 返回 5.

类型转换

语法 描述
CAST(value AS type) 把值转换为给定的类型.

值构造器

语法 描述
ROW (value [, value]* ) 从值列表中创建一个行.
map '[' key ']' 根据 key 返回 value.
array '[' index ']' 返回某个位置的 array 的元素值.
ARRAY '[' value [, value ]* ']' 从值列表中创建一个 array.
MAP '[' key, value [, key, value ]* ']' 从 key-value 列表中创建一个 map.

集合函数

语法 描述
ELEMENT(value) 返回 array 或 multiset 的唯一元素; 如果集合为空,则为null; 如果它有多个元素,则抛出异常。
CARDINALITY(value) 返回 array 或 multiset 的元素数.

另请参考: UNNEST关系运算符将集合转换为关系.

JDBC 函数转义

数字

语法 描述
{fn ABS(numeric)} 返回 numeric 的绝对值
{fn EXP(numeric)} 返回 enumeric 次方
{fn LOG(numeric)} 返回 numeric 的自然对数 (底数为 e)
{fn LOG10(numeric)} 返回 numeric 的以10为底的对数
{fn MOD(numeric1, numeric2)} 返回 numeric1numeric2 除的余数. 被除数 numeric1 为负数的时候余数为负
{fn POWER(numeric1, numeric2)} 返回 numeric1numeric2 次方

未实现的:

  • {fn ACOS(numeric)} - Returns the arc cosine of numeric
  • {fn ASIN(numeric)} - Returns the arc sine of numeric
  • {fn ATAN(numeric)} - Returns the arc tangent of numeric
  • {fn ATAN2(numeric, numeric)}
  • {fn CEILING(numeric)} - Rounds numeric up, and returns the smallest number that is greater than or equal to numeric
  • {fn COS(numeric)} - Returns the cosine of numeric
  • {fn COT(numeric)}
  • {fn DEGREES(numeric)} - Converts numeric from radians to degrees
  • {fn FLOOR(numeric)} - Rounds numeric down, and returns the largest number that is less than or equal to numeric
  • {fn PI()} - Returns a value that is closer than any other value to pi
  • {fn RADIANS(numeric)} - Converts numeric from degrees to radians
  • {fn RAND(numeric)}
  • {fn ROUND(numeric, numeric)}
  • {fn SIGN(numeric)}
  • {fn SIN(numeric)} - Returns the sine of numeric
  • {fn SQRT(numeric)} - Returns the square root of numeric
  • {fn TAN(numeric)} - Returns the tangent of numeric
  • {fn TRUNCATE(numeric, numeric)}

字符串

字符串 描述
{fn CONCAT(character, character)} 连接字符串
{fn LOCATE(string1, string2)} 返回 string2string1 中首次出现的位置. 如果指定了 integer , 则从 integer 为起点开搜索.
{fn INSERT(string1, start, length, string2)} string2 插入 string1
{fn LCASE(string)} 返回小写
{fn LENGTH(string)} 返回字符数
{fn SUBSTRING(string, offset, length)} 字符串截取, 从 offset 的位置开始截取,长度为 length
{fn UCASE(string)} 返回大写

已知的bug:

语法 描述
{fn LOCATE(string1, string2 [, integer])} 返回 string2string1 中首次出现的位置. 如果指定了 integer , 则从 integer 为起点开搜索.
{fn LTRIM(string)} 移除 string 头部的空白字符
{fn RTRIM(string)} 移除 string 尾部的空白字符

Calcite 1.9.0 在函数使用位置参数的时候会抛出异常, {fn LTRIM} 和 {fn RTRIM} 在编译 SQL 语句的时候. 这个能在将来的版本中修复.

未实现的:

  • {fn ASCII(string)} - 转换单个字符的字符串为 ASCII 码, 为 0 - 255 之间的整数
  • {fn CHAR(string)}
  • {fn DIFFERENCE(string, string)}
  • {fn LEFT(string, integer)}
  • {fn REPEAT(string, integer)}
  • {fn REPLACE(string, string, string)}
  • {fn RIGHT(string, integer)}
  • {fn SOUNDEX(string)}
  • {fn SPACE(integer)}

Date/time

语法 描述
{fn CURDATE()} 等价于 CURRENT_DATE
{fn CURTIME()} 等价于 LOCALTIME
{fn NOW()} 等价于 LOCALTIMESTAMP
{fn QUARTER(date)} 等价于 EXTRACT(QUARTER FROM date). 返回 1 和 4 之间的整数.
{fn TIMESTAMPADD(timeUnit, count, timestamp)} 添加 counttimeUnit 间隔到 timestamp
{fn TIMESTAMPDIFF(timeUnit, timestamp1, timestamp2)} timestamp1 减去 timestamp2 结果存在 timeUnit

未实现的:

  • {fn DAYNAME(date)}
  • {fn DAYOFMONTH(date)}
  • {fn DAYOFWEEK(date)}
  • {fn DAYOFYEAR(date)}
  • {fn HOUR(time)}
  • {fn MINUTE(time)}
  • {fn MONTH(date)}
  • {fn MONTHNAME(date)}
  • {fn SECOND(time)}
  • {fn WEEK(date)}
  • {fn YEAR(date)}

系统

未实现的:

  • {fn DATABASE()}
  • {fn IFNULL(value, value)}
  • {fn USER(value, value)}
  • {fn CONVERT(value, type)}

聚合函数(aggregrate functions)

Storm SQL 当前不支持聚合函数.

窗口函数(windowing functions)

Storm SQL 当前不支持窗口函数

分组函数(grouping functions)

Storm SQL 不支持分组函数

用户定义函数(User-defined functions)

用户可以使用 CREATE FUNCTION 语句定义 user defined function(标量). 例如, 下面的语句使用 org.apache.storm.sql.TestUtils$MyPlus 类定义了一个 MYPLUS 函数.

CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'

Storm SQL 通过检验定义的个方法来确定函数是标量还是聚合. 如果类中定义了一个 evaluate 方法, Storm SQL 把这个函数作为 标量 对待.

标量函数的类的例子:

  public class MyPlus {
    public static Integer evaluate(Integer x, Integer y) {
      return x + y;
    }
  }

请注意, 用户在运行 Storm SQL runner 时候应当使用 --jars 或者 --artifacts, 来确保 UDFs 在 classpath 路径下可见.

外部数据源

指定外部数据源

在 StormSQL 中数据表现为一个外部表. 用户可以使用 CREATE EXTERNAL TABLE 语句指定数据源. CREATE EXTERNAL TABLE 的语法与 Hive 数据定义语言 定义的语法紧密相关.

CREATE EXTERNAL TABLE table_name field_list
    [ STORED AS
      INPUTFORMAT input_format_classname
      OUTPUTFORMAT output_format_classname
    ]
    LOCATION location
    [ TBLPROPERTIES tbl_properties ]
    [ AS select_stmt ]

默认输入格式和输出格式都是 JSON. 我们会在将来的章节介绍 supported formats.

例如, 下面的语句指定了一个 Kafka spout 和 sink:

CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'

请注意, 用户在运行 Storm SQL runner 时候应当使用 --jars 或者 --artifacts, 来确保 UDFs 在 classpath 可见.

植入外部数据源

用户通过实现 ISqlTridentDataSource 接口并使用 Java 的 service loader 机制进行注册,以植入外部数据源. 基于表的 URI 的模式来选择外部数据源. 更多细节请参考 storm-sql-kafka 的实现.

支持的格式

格式 输入格式类 输出格式类 需要属性
JSON org.apache.storm.sql.runtime.serde.json.JsonScheme org.apache.storm.sql.runtime.serde.json.JsonSerializer No
Avro org.apache.storm.sql.runtime.serde.avro.AvroScheme org.apache.storm.sql.runtime.serde.avro.AvroSerializer Yes
CSV org.apache.storm.sql.runtime.serde.csv.CsvScheme org.apache.storm.sql.runtime.serde.csv.CsvSerializer No
TSV org.apache.storm.sql.runtime.serde.tsv.TsvScheme org.apache.storm.sql.runtime.serde.tsv.TsvSerializer No

Avro

Avro 需要用户描述记录的模式(输入和输出). 模式应该在 TBLPROPERTIES 上描述. 输入格式需要描述给 input.avro.schema, 输出格式需要描述给 output.avro.schema. 模式字符串应当是一个转义后的 JSON, 因此 TBLPROPERTIES 是有效的 JSON.

示例 Schema 定义:

"input.avro.schema": "{\"type\": \"record\", \"name\": \"large_orders\", \"fields\" : [ {\"name\": \"ID\", \"type\": \"int\"}, {\"name\": \"TOTAL\", \"type\": \"int\"} ]}"

"output.avro.schema": "{\"type\": \"record\", \"name\": \"large_orders\", \"fields\" : [ {\"name\": \"ID\", \"type\": \"int\"}, {\"name\": \"TOTAL\", \"type\": \"int\"} ]}"

CSV

使用 Standard RFC4180 CSV Parser, 不需要任何其他的属性.

TSV

默认情况下, TSV 使用 \t 作为分隔符, 但是用户可以通过 input.tsv.delimiter 或者 output.tsv.delimiter 设置其他的分隔符.

可支持的数据源

数据源 Artifact Name 位置前缀 支持输入数据源 支持输出数据源 需要属性
Socket socket://host:port Yes Yes No
Kafka org.apache.storm:storm-sql-kafka kafka://zkhost:port/broker_path?topic=topic Yes Yes Yes
Redis org.apache.storm:storm-sql-redis redis://:[password]@host:port/[dbIdx] No Yes Yes
MongoDB org.apache.stormg:storm-sql-mongodb mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] No Yes Yes
HDFS org.apache.storm:storm-sql-hdfs hdfs://host:port/path-to-file No Yes Yes

Socket

Socket 数据源是一个内置的特性, 因此用户无需在 --artifacts 选项中添加任何依赖.

请注意, Socket 数据源只是用于测试: 不能保证 exactly-once 和 at-least-once.

贴士: netcat 是一个 Socket 便捷工具: 用户可以使用 netcat 连接 Socket 数据源, 既可以作为输入也可以用作输出.

Kafka

Kafka 仅当用作输出数据源的时候需要定义下列属性:

  • producer: 指定 Kafka Producer 配置 - 更多细节请参考 Kafka producer configs.
    • bootstrap.servers 必须在 producer 中定义这个值

请注意, storm-sql-kafka 需要用户提供 storm-kafka 依赖, storm-kafka 又依赖于 kafka , kafka-clients. 你可以在 --artifacts 选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本

org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2

Redis

Redis 数据源需要设置下列属性:

  • data.type: 用于存储的数据类型 - 仅支持 "STRING""HASH"
  • data.additional.key: key, 当数据类型同时需要 key 和 field 时设置 (field 作为字段使用)
  • redis.timeout: 超时时间, 毫秒 (ex. "3000")
  • use.redis.cluster: 如果 Redis 是集群环境为 "true", 否则为 "false".

请注意, storm-sql-redis 需要用户提供 storm-redis 依赖. 你可以在 --artifacts 选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本

org.apache.storm:storm-sql-redis:2.0.0-SNAPSHOT,org.apache.storm:storm-redis:2.0.0-SNAPSHOT

MongoDB

MongoDB 数据源需要设置以下属性

{"collection.name": "storm_sql_mongo", "trident.ser.field": "serfield"}

  • trident.ser.field: 存储字段 - 记录会序列化并以 BSON 存储在字段中
  • collection.name: 集合名称

请注意, storm-sql-mongodb 需要用户提供 storm-mongodb 依赖. 你可以在 --artifacts 选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本

org.apache.storm:storm-sql-mongodb:2.0.0-SNAPSHOT,org.apache.storm:storm-mongodb:2.0.0-SNAPSHOT

当前不支持使用保留字段存储.

HDFS

HDFS 数据源需要设置下列属性

  • hdfs.file.path: HDFS 文件路径
  • hdfs.file.name: HDFS 文件名 - 参考 SimpleFileNameFormat
  • hdfs.rotation.size.kb: HDFS FileSizeRotationPolicy 单位 KB
  • hdfs.rotation.time.seconds: HDFS TimedRotationPolicy 单位 seconds

请注意 hdfs.rotation.size.kbhdfs.rotation.time.seconds 只能采用其中一种来实现文件滚动.

还要注意 storm-sql-hdfs 需要用户提供 storm-hdfs 依赖. 你可以在 --artifacts 选项中使用下列的工作引用, 并且在需要的时候修改依赖的版本

org.apache.storm:storm-sql-hdfs:2.0.0-SNAPSHOT,org.apache.storm:storm-hdfs:2.0.0-SNAPSHOT

还有, 需要提供 hdfs 配置文件. 可以将 core-site.xmlhdfs-site.xml 文件放到 Storm 安装目录的 conf 目录下面.


我们一直在努力

apachecn/storm-doc-zh

云计算之嫣然伊笑