Use Dataflow SQL’s streaming extensions to create streaming data parallel-processing pipelines. If you are new to streaming data pipelines, learn about streaming pipeline basics to first familiarize yourself with collections, windows, watermarks, and triggers.
Streaming extensions syntax
SELECT { window_start_expression | window_end_expression | expression } [, …]
FROM from_item
GROUP BY window_expression [, expression [, ...] ]
window_expression: {TUMBLE | HOP | SESSION}
window_start_expression: {TUMBLE_START | HOP_START | SESSION_START}
window_end_expression: {TUMBLE_END | HOP_END}
For the standard SQL query syntax, see the Dataflow SQL syntax query page.
To learn about how to specify Dataflow SQL data sources and destinations, see the data sources and destinations page.
Setting the windowing function
Specify the windowing function (window_expression
) in the GROUP BY
clause.
When using Dataflow SQL, there is a single aggregated output per
window grouping when the watermark indicates that the window is complete. Data
that arrives later is dropped.
expression
is the key to use to group the elements.
window_expression
can be one of the windowing functions:
Getting window timestamps
After you apply a window function in your GROUP BY, you optionally use
window_start_expression
and window_end_expression
in the SELECT
list to
get the starting and ending timestamps of windows.
expression
specifies the columns that the query will return.
window_start_expression
can be one of the following functions:
window_end_expression
can be one of the following functions:
Tumbling window functions
A tumbling window represents a consistent duration, non-overlapping time interval (the window duration) in the data stream. For more information, see tumbling windows in the streaming pipeline basics page.
TUMBLE
Syntax
TUMBLE(timestamp, window_duration)
Description
Use the TUMBLE
function to indicate that you want your pipeline to use
tumbling windows. You can optionally also use TUMBLE_START
and TUMBLE_END
to
get the starting timestamp and ending timestamp for each tumbling window.
timestamp
is the column to use for the event timestamp for each
element. For a Pub/Sub source, you must use the
event_timestamp
field (for example,
pubsub.topic.`my-project`.topicname.event_timestamp
) as the event
timestamp column.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
TUMBLE_START
Syntax
TUMBLE_START(window_duration)
Description
The TUMBLE_START
function returns the starting timestamp of the corresponding
window returned by the GROUP BY
clause.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
. The window duration should be the same as
the window duration for TUMBLE
. If the durations are not the same,
Dataflow SQL throws an exception.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
Return type
TIMESTAMP
TUMBLE_END
Syntax
TUMBLE_END(window_duration)
Description
The TUMBLE_END
function returns the ending timestamp of the corresponding
window returned by the GROUP BY
clause.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
. The window duration should be the same as
the window duration for TUMBLE
. If the durations are not the same,
Dataflow SQL throws an exception.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
Return type
TIMESTAMP
Examples
The following query uses tumbling windows to aggregate transactions every 15 seconds.
SELECT
TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
Hopping window functions
A hopping window represents a consistent duration time interval in the data stream; however, hopping windows can overlap. For example, each window might capture five minutes worth of data (the window duration), but a new window starts every ten seconds. The frequency with which hopping windows begin is called the window period. Therefore, our example would have a window duration of five minutes and a window period of ten seconds. For more information, see hopping windows in the streaming pipeline basics page.
HOP
Syntax
HOP(timestamp, window_period, window_duration)
Description
Use the HOP
function when you want your pipeline to use hopping windows. You
can optionally also use HOP_START
and HOP_END
to get the starting timestamp
and ending timestamp for each hopping window.
timestamp
is the column to use for the event timestamp for each
element. For a Pub/Sub source, you must use the
event_timestamp
field (for example,
pubsub.topic.`my-project`.topicname.event_timestamp
) as the event
timestamp column.
window_period
is how often a new window begins, specified as a
string with the format "INTERVAL int64 date_part"
.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
HOP_START
Syntax
HOP_START(window_period, window_duration)
Description
The HOP_START
function returns the starting timestamp of the corresponding
window returned by the GROUP BY
clause.
window_period
is how often a new window begins, specified as a
string with the format "INTERVAL int64 date_part"
.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
. The window duration should be the same as
the window duration for HOP
. If the durations are not the same,
Dataflow SQL throws an exception.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
Return type
TIMESTAMP
HOP_END
Syntax
HOP_END(window_period, window_duration)
Description
The HOP_END
returns the ending timestamp of the corresponding
window returned by the GROUP BY
clause.
window_period
is how often a new window begins, specified as a
string with the format "INTERVAL int64 date_part"
.
window_duration
is the window duration time, specified as a string with the
format "INTERVAL int64 date_part"
. The window duration should be the same as
the window duration for HOP
. If the durations are not the same,
Dataflow SQL throws an exception.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
Return type
TIMESTAMP
Examples
The following query uses hopping windows. Every 10 seconds, the query aggregates transactions that happened in the last 15 seconds.
SELECT
HOP_START("INTERVAL 10 SECOND", "INTERVAL 15 SECOND") AS period_start,
SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
HOP(tr.event_timestamp, "INTERVAL 10 SECOND", "INTERVAL 15 SECOND")
Session window functions
Session windows are windows that contain elements that are within a certain gap duration of another element. For more information about gap durations, see session windows in the streaming pipeline basics page.
SESSION
Syntax
SESSION(timestamp, session_gap)
Description
Use the SESSION
function when you want your pipeline to use session windows.
You can optionally also use SESSION_START
to get the starting timestamp for
each session window.
timestamp
is the column to use for the event timestamp for each
element. For a Pub/Sub source, you must use the
event_timestamp
field (for example,
pubsub.topic.`my-project`.topicname.event_timestamp
) as the event
timestamp column.
session_gap
is the minimum gap duration, specified as a string with the format
"INTERVAL int64 date_part"
.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
SESSION_START
Syntax
SESSION_START(session_gap)
Description
The SESSION_START
function returns the starting timestamp of the corresponding
window returned by the GROUP BY
clause.
session_gap
is the minimum gap duration, specified as a string with the format
"INTERVAL int64 date_part"
. The gap duration should be the same as
the gap duration for SESSION
. If the gap durations are not the same,
Dataflow SQL throws an exception.
date_part
must be one of MILLISECOND
, SECOND
, MINUTE
, HOUR
, DAY
,
MONTH
, or YEAR
.
Return type
TIMESTAMP
Examples
The following query uses session windows to aggregate transactions with a 30 minute inactivity gap duration.
SELECT
SESSION_START("INTERVAL 30 MINUTE") AS period_start,
SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
SESSION(tr.event_timestamp, "INTERVAL 30 MINUTE")
Window requirements when joining two unbounded sources
When you join two unbounded (streaming) sources, both sources must have
the same windowing. If the windows are not compatible, Dataflow
SQL throws an IllegalArgumentException
.
Valid JOIN with windowing
This example joins two unbounded sources with compatible windows. Both sources use tumbling windows with a 15 second window duration.
SELECT * FROM
(SELECT
Tr.payload.transaction_id AS transaction_id, SUM(tr.payload.amount) AS combined_transaction_amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
tr.payload.transaction_id,
TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")) AS transaction_stream
INNER JOIN
(SELECT
invalid_tr.payload.transaction_id as transaction_id
FROM pubsub.topic.`dataflow-sql`.`invalid-transactions` AS invalid_tr
GROUP BY
invalid_tr.payload.transaction_id,
TUMBLE(invalid_tr.event_timestamp, "INTERVAL 15 SECOND")) as invalid_transaction_stream
ON
transaction_stream.transaction_id = invalid_transaction_stream.transaction_id
Invalid JOIN with windowing
This example joins two unbounded sources with incompatible windows. One source
uses hopping windows with a window duration of 10 seconds. The other source
uses tumbling windows with a window duration of 15 seconds. In this case,
Dataflow SQL will throw an IllegalArgumentException
.
SELECT * FROM
(SELECT
Tr.payload.transaction_id AS transaction_id, SUM(tr.payload.amount) AS combined_transaction_amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
tr.payload.transaction_id,
HOP(tr.event_timestamp, "INTERVAL 15 SECOND", "INTERVAL 10 SECOND")) AS transaction_stream
INNER JOIN
(SELECT
invalid_tr.payload.transaction_id as transaction_id
FROM pubsub.topic.`dataflow-sql`.`invalid-transactions` AS invalid_tr
GROUP BY
invalid_tr.payload.transaction_id,
TUMBLE(invalid_tr.event_timestamp, "INTERVAL 15 SECOND")) as invalid_transaction_stream
ON
transaction_stream.transaction_id = invalid_transaction_stream.transaction_id