The INSERT INTO statement must be used in conjunction with SELECT subqueries.
INSERT INTO Sink SELECT Clause
Insert the result of a SELECT query into the sink named KafkaSink1:
INSERT INTO KafkaSink1 SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_ FROM KafkaSource1 AS s1, KafkaSource2 AS s2 WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;
SELECT Comma-separated fields to be selected FROM Source or view WHERE Filter condition Other subqueries
SELECT s1.time_, s1.client_ip, s1.uri, s1.protocol_version, s2.status_code, s2.date_ FROM KafkaSource1 AS s1, KafkaSource2 AS s2 WHERE s1.time_ = s2.time_ AND s1.client_ip = s2.client_ip;
SELECT cannot be used alone. It must be used with CREATE VIEW … AS or INSERT INTO, otherwise a prompt saying that no suitable operator exists will appear.
WHERE is used to filter query conditions (predicates). Multiple parallel conditions can be joined by AND/OR.
To JOIN with a table of an external CDB, only AND is used to join conditions. To use OR, see UNION ALL to achieve the same purpose.
HAVING is used to filter the results after GROUP BY. Note that WHERE is used before GROUP BY, and HAVING is used after GROUP BY.
SELECT SUM(amount) FROM Orders WHERE price > 10 GROUP BY users HAVING SUM(amount) > 50
In SCS, GROUP BY is used to group and aggregate results, including time window-based GROUP BY, and non-window-based GROUP BY (also known as persistent query). Since the former will not update the previous results, a data stream of Append type is generated, which can only be written into the Stream Connector sink of Tuple type or CKafka. However, the latter will update the previous records, so a data stream of Upsert type is generated, which can only be written into the Stream Connector sink of Upsert type.
This example defines the GROUP BY query statement containing a time window.
SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
In the Event Time mode (where WATERMARK FOR BOUNDED is used to define the timestamp field), the first parameter of the TUMBLE window function must be this field. In the Processing Time mode, the first parameter of the TUMBLE window function must be PROCTIME (uppercase). This applies to both HOP and SESSION.
This example defines the GROUP BY query statement that does not contain a time window, which is called persistent query. Because it calculates and determines whether to update the results issued previously based on each piece of new data, an Upsert stream is generated.
SELECT a, SUM(b) as d FROM Orders GROUP BY a
This method may cause memory overflow due to too many keys or too much data. So, be careful when setting the object timeout. Do not set this value too large.
SCS only supports Equi-JOIN (where the JOIN condition contains at least a filter condition that makes a field in the left table equivalent to that in the right table) and Inner JOIN. Outer JOIN will be available in future versions.
There are two types of stream-stream JOIN: those with and without a time range. The former generates streams of Append type, while the latter generates streams of UP SERT type.
The WEHERE condition of a JOIN with a time range contains at least an Equi-JOIN condition and a specified time range. The time range can be represented by <, <=, >=, >, or BETWEEN ... AND.
Example of a time range:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
It only needs at least one Equi-JOIN, and the time range is optional. That is, it calculates all active data in the history (inactive elements can be removed by specifying a timeout).
It may take up too much memory and should be used with caution. Generally, an appropriate object timeout should be set to remove inactive objects in a timely manner.
This query will generate an Upsert stream, and only a Stream Connector sink of Upsert type can be used to receive data.
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
SCS also supports JOIN between a stream and a data table of CDB for MySQL. The syntax is the same as described above, but the CDB table must be the right table in a JOIN condition.
Note that a JOIN query condition should include all the defined keys of the table, otherwise the task will fail due to excessive query results and memory usage.
SELECT d.client_agent AS time_, d.client_ip, d.numbers AS request_body_length FROM StreamSource AS s, DimSource AS d WHERE s.client_ip = d.client_ip AND d.`month` LIKE '20180%' AND ABS(d.numbers) BETWEEN 0 AND 2000
Joining a defined array object (the value constructor in the section 4.10.4 can be used to construct an array object ARRAY) is also allowed by SCS.
Example: (if tags is a defined array)
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
UNION ALL is used to merge the results of two queries. Besides, since joining query conditions with OR is not supported in case of a JOIN between a stream and a CDB table, UNION ALL can also be used to achieve the same query result.
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL (SELECT user FROM Orders WHERE b = 0) )
SCS only supports UNION ALL instead of UNION, which means de-duplication is not implemented in a row. To perform de-duplication to implement UNION, use it with DISTINCT. Note that DISTINCT will change the result from Append stream to Upsert stream, so only a Stream Connector sink of Upsert type can be used.
Use OVER to perform sliding-window aggregation (instead of GROUP BY aggregation) on data streams. You can specify PARTITION, ORDER, window range and other parameters in OVER.
The following example defines a sliding-window aggregation query to calculate the amount of a sliding window with a size of 3. Perform PRECEDING on the previous rows, but FOLLOWING is not supported.
In addition, only one timestamp filed can be placed after ORDER BY. In this example, the PROCTIME column is automatically added by the system in the Processing Time mode.
SELECT SUM(amount) OVER ( PARTITION BY user ORDER BY PROCTIME ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders
ORDER BY is used to sort the results of a query. The default value is ASC (ascending order), or you can also explicitly specify DESC (descending order).
The first sorting item must be the time-based column (Event Time timestamp, or Processing Time timestamp, i.e. PROCTIME) in ascending order, and other sorting items can be freely specified.
SELECT * FROM Orders ORDER BY `orderTime`, `username` DESC, `userId` ASC
DISTINCT is used to de-duplicate query results, which must be placed after SELECT.
SELECT DISTINCT users FROM Orders
DISTINCT will generate an Upsert stream, so only a sink of Upsert type can receive its results. Moreover, queries that take a long time may lead to excessive memory usage. Please use it with caution.
Set an appropriate object timeout to remove inactive objects in time to save memory.
The following SQL syntax structures are not supported:
GROUPING SETS, ROLLUP, CUBE, IN, UNION (which can be implement using UNION ALL and DISTINCK), LIMIT, etc. More syntaxes will be available in future versions, such as real-time query of Top data.