To process streaming data with Dataflow SQL, write queries with Dataflow SQL streaming extensions.
Streaming extensions syntax
SELECT window_timestamps [, ...] FROM window_function_expression [table_expression, expression [, ...] ] window_timestamps: { window_start | window_end } windowing_function_expression: { tumble_function | hop_function | session_function }
For the standard SQL query syntax, see the Dataflow SQL syntax query page.
Window timestamps
Expressions in the SELECT
list can refer to columns with the starting and
ending timestamps of a window.
window_start
A column with the starting timestamps.
window_end
A column with the ending timestamps.
Window function expression
The window_function_expression
in the FROM
clause is a windowing function.
window_expression
can be one of the windowing functions:
Windowing functions
Windowing functions divide streaming data into finite windows. When a watermark indicates that a window is complete, windowing functions return the data in the window, the starting timestamp, and the ending timestamp. Late data is dropped.
To compute aggregate values of streaming data, aggregate the data in each window.
TUMBLE
TUMBLE(table_expression, DESCRIPTOR(timestamp_column), window_duration)
Description
Divides streaming data into tumbling windows. A tumbling window represents a consistent, disjoint time interval in the data stream.
For more information, see Tumbling windows.
Supported Argument Types
The table_expression
is a table with the streaming data to divide into tumbling
windows.
The timestamp_column
is the column in table_expression
with the event timestamps
of each element.
- Event timestamps must be
TIMESTAMP
types
For a Pub/Sub source, you must specify the event_timestamp
field
as the timestamp_column
.
The window_duration
is a string with the format "INTERVAL int64 date_part"
.
date_part
must beMILLISECOND
,SECOND
,MINUTE
,HOUR
,DAY
,MONTH
, orYEAR
.
Return Data Types
A table with the columns from the input table, a TIMESTAMP
column with the
start of the windows, and a TIMESTAMP
column with the end of the windows.
Examples
The following query uses tumbling windows to sum transaction amounts every 15 minutes.
SELECT
window_start as period_start,
window_end as period_end,
SUM(tr.amount) as amount
FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
DESCRIPTOR(event_timestamp),
"INTERVAL 15 MINUTE") as tr
GROUP BY tr.window_start
+---------------------+---------------------+--------+
| period_start | period_end | amount |
+---------------------+------------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:15:00 | 2.50 |
+---------------------+---------------------+--------+
| 2020-01-01 00:15:00 | 2020-01-01 00:30:00 | 2.20 |
+----------------------------------------------------+
HOP
HOP(table_expression, DESCRIPTOR(timestamp_column), window_period, window_duration)
Description
Divides streaming data into hopping windows. A hopping window represents a consistent time interval in the data stream. Hopping windows can overlap, whereas tumbling windows are disjoint.
For more information, see Hopping windows.
Supported Argument Types
The table_expression
is a table with the streaming data to divide into hopping
windows.
The timestamp_column
is the column in table_expression
with the event timestamps
of each element.
- Event timestamps must be TIMESTAMP types
For a Pub/Sub source, you must specify the event_timestamp
field
as the timestamp_column
.
The window_period
and window_duration
is a string with the format "INTERVAL int64 date_part"
.
date_part
must beMILLISECOND
,SECOND
,MINUTE
,HOUR
,DAY
,MONTH
, orYEAR
.
Return Data Types
A table with the columns from the input table, a TIMESTAMP
column with the
start of the windows, and a TIMESTAMP
column with the end of the windows.
Examples
The following query uses hopping windows to calculate a 15-minute running total of transaction amounts every 10 minutes.
SELECT
window_start as period_start,
window_end as period_end,
SUM(tr.amount) as amount
FROM HOP((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
DESCRIPTOR(event_timestamp),
"INTERVAL 10 MINUTE",
"INTERVAL 15 MINUTE") as tr
GROUP BY tr.window_start
+---------------------+---------------------+--------+
| period_start | period_end | amount |
+---------------------+------------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:15:00 | 2.50 |
+---------------------+---------------------+--------+
| 2020-01-01 00:10:00 | 2020-01-01 00:25:00 | 2.30 |
+----------------------------------------------------+
SESSION
SESSION(table_expression, DESCRIPTOR(timestamp_column), DESCRIPTOR(key1, key2, ...), session_gap_duration)
Description
Divides streaming data into session windows. A session window contains elements within a gap duration of another element. The gap duration is an interval between new data in a data stream. If data arrives after the gap duration, the data is assigned to a new window.
For more information, see Session windows.
Supported Argument Types
The table_expression
is a table with the streaming data to divide into session
windows.
The timestamp_column
is the column in table_expression
with the event timestamps
of each element.
- Event timestamps must be
TIMESTAMP
types
For a Pub/Sub source, you must specify the event_timestamp
field
as the timestamp_column
.
The session_gap_duration
is a string with the format "INTERVAL int64 date_part"
.
date_part
must beMILLISECOND
,SECOND
,MINUTE
,HOUR
,DAY
,MONTH
, orYEAR
.
Return Data Types
A table with the columns from the input table, a TIMESTAMP
column with the
start of the windows, and a TIMESTAMP
column with the end of the windows.
Examples
The following query uses session windows to sum transaction amounts that occur within 30 minutes of each other.
SELECT
window_start as period_start,
window_end as period_end,
SUM(tr.amount) as amount
FROM SESSION((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
DESCRIPTOR(event_timestamp),
DESCRIPTOR(transaction_id),
"INTERVAL 30 MINUTE") as tr
GROUP BY tr.window_start, transaction_id
+---------------------+---------------------+--------+----------------+
| period_start | period_end | amount | transaction_id |
+---------------------+---------------------+-------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:10:00 | 2.50 | 1001 |
+---------------------+---------------------+-------------------------+
| 2020-01-01 00:45:00 | 2020-01-01 01:45:00 | 2.25 | 1002 |
+---------------------+---------------------+--------+----------------+
Window requirements when joining two unbounded sources
To join two unbounded collections, both collections must have the same windows.
If the windows are not compatible, Dataflow SQL throws an IllegalArgumentException
.
Valid JOIN with windowing
This example joins two unbounded collections with compatible windows. Both collections are divided into tumbling windows with a 15-minute window duration.
SELECT * FROM
(SELECT
tr.transaction_id AS transaction_id,
SUM(tr.amount) AS combined_transaction_amount
FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
DESCRIPTOR(event_timestamp),
"INTERVAL 15 MINUTE") as tr
GROUP BY
tr.transaction_id)
INNER JOIN
(SELECT
invalid_tr.transaction_id as transaction_id
FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.`invalid-transactions`),
DESCRIPTOR(event_timestamp),
"INTERVAL 15 MINUTE") as invalid_tr
GROUP BY
invalid_tr.transaction_id)
ON tr.transaction_id.transaction_id = invalid_tr.transaction_id.transaction_id
Invalid JOIN with windowing
This example joins two unbounded sources with incompatible windows. One collection is divided into hopping windows with a 10-minute window duration. The other collection is divided into tumbling windows with a 15-minute window duration.
If you run the following query, Dataflow SQL throws an IllegalArgumentException
.
SELECT * FROM
(SELECT
tr.transaction_id AS transaction_id,
SUM(tr.amount) AS combined_transaction_amount
FROM HOP((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
DESCRIPTOR(event_timestamp),
"INTERVAL 10 MINUTE", "INTERVAL 15 MINUTE") as tr
GROUP BY
tr.transaction_id)
INNER JOIN
(SELECT
invalid_tr.transaction_id as transaction_id
FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.`invalid-transactions`),
DESCRIPTOR(event_timestamp),
"INTERVAL 15 MINUTE") as invalid_tr
GROUP BY
invalid_tr.transaction_id)
ON tr.transaction_id.transaction_id = invalid_tr.transaction_id.transaction_id