Cloud Dataflow SQL streaming extensions

Use Cloud Dataflow SQL’s streaming extensions to create streaming data parallel-processing pipelines. If you are new to streaming data pipelines, learn about streaming pipeline basics to first familiarize yourself with collections, windows, watermarks, and triggers.

Streaming extensions syntax

SELECT { window_start_expression | window_end_expression | expression } [, …]
FROM from_item
GROUP BY window_expression [, expression [, ...] ]

window_expression: {TUMBLE | HOP | SESSION}
window_start_expression: {TUMBLE_START | HOP_START | SESSION_START}
window_end_expression: {TUMBLE_END | HOP_END}

For the standard SQL query syntax, see the Cloud Dataflow SQL syntax query page.

To learn about how to specify Cloud Dataflow SQL data sources and destinations, see the data sources and destinations page.

Setting the windowing function

Specify the windowing function (window_expression) in the GROUP BY clause. When using Cloud Dataflow SQL, there is a single aggregated output per window grouping when the watermark indicates that the window is complete. Data that arrives later is dropped.

expression is the key to use to group the elements.

window_expression can be one of the windowing functions:

Getting window timestamps

After you apply a window function in your GROUP BY, you optionally use window_start_expression and window_end_expression in the SELECT list to get the starting and ending timestamps of windows.

expression specifies the columns that the query will return.

window_start_expression can be one of the following functions:

window_end_expression can be one of the following functions:

Tumbling window functions

A tumbling window represents a consistent duration, non-overlapping time interval (the window duration) in the data stream. For more information, see tumbling windows in the streaming pipeline basics page.

TUMBLE

Syntax

TUMBLE(timestamp, window_duration)

Description

Use the TUMBLE function to indicate that you want your pipeline to use tumbling windows. You can optionally also use TUMBLE_START and TUMBLE_END to get the starting timestamp and ending timestamp for each tumbling window.

timestamp is the column to use for the event timestamp for each element. For a Cloud Pub/Sub source, you must use the event_timestamp field (for example, pubsub.topic.`my-project`.topicname.event_timestamp) as the event timestamp column.

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part".

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

TUMBLE_START

Syntax

TUMBLE_START(window_duration)

Description

The TUMBLE_START function returns the starting timestamp of the corresponding window returned by the GROUP BY clause.

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part". The window duration should be the same as the window duration for TUMBLE. If the durations are not the same, Cloud Dataflow SQL throws an exception.

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return type

TIMESTAMP

TUMBLE_END

Syntax

TUMBLE_END(window_duration)

Description

The TUMBLE_END function returns the ending timestamp of the corresponding window returned by the GROUP BY clause.

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part". The window duration should be the same as the window duration for TUMBLE. If the durations are not the same, Cloud Dataflow SQL throws an exception.

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return type

TIMESTAMP

Examples

The following query uses tumbling windows to aggregate transactions every 15 seconds.

SELECT
  TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
  SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
  TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")

Hopping window functions

A hopping window represents a consistent duration time interval in the data stream; however, hopping windows can overlap. For example, each window might capture five minutes worth of data (the window duration), but a new window starts every ten seconds. The frequency with which hopping windows begin is called the window period. Therefore, our example would have a window duration of five minutes and a window period of ten seconds. For more information, see hopping windows in the streaming pipeline basics page.

HOP

Syntax

HOP(timestamp, window_period, window_duration)

Description

Use the HOP function when you want your pipeline to use hopping windows. You can optionally also use HOP_START and HOP_END to get the starting timestamp and ending timestamp for each hopping window.

timestamp is the column to use for the event timestamp for each element. For a Cloud Pub/Sub source, you must use the event_timestamp field (for example, pubsub.topic.`my-project`.topicname.event_timestamp) as the event timestamp column.

window_period is how often a new window begins, specified as a string with the format "INTERVAL int64 date_part".

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part".

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

HOP_START

Syntax

HOP_START(window_period, window_duration)

Description

The HOP_START function returns the starting timestamp of the corresponding window returned by the GROUP BY clause.

window_period is how often a new window begins, specified as a string with the format "INTERVAL int64 date_part".

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part". The window duration should be the same as the window duration for HOP. If the durations are not the same, Cloud Dataflow SQL throws an exception.

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return type

TIMESTAMP

HOP_END

Syntax

HOP_END(window_period, window_duration)

Description

The HOP_END returns the ending timestamp of the corresponding window returned by the GROUP BY clause.

window_period is how often a new window begins, specified as a string with the format "INTERVAL int64 date_part".

window_duration is the window duration time, specified as a string with the format "INTERVAL int64 date_part". The window duration should be the same as the window duration for HOP. If the durations are not the same, Cloud Dataflow SQL throws an exception.

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return type

TIMESTAMP

Examples

The following query uses hopping windows. Every 10 seconds, the query aggregates transactions that happened in the last 15 seconds.

SELECT
  HOP_START("INTERVAL 10 SECOND", "INTERVAL 15 SECOND") AS period_start,
  SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
 HOP(tr.event_timestamp, "INTERVAL 10 SECOND", "INTERVAL 15 SECOND")

Session window functions

Session windows are windows that contain elements that are within a certain gap duration of another element. For more information about gap durations, see session windows in the streaming pipeline basics page.

SESSION

Syntax

SESSION(timestamp, session_gap)

Description

Use the SESSION function when you want your pipeline to use session windows. You can optionally also use SESSION_START to get the starting timestamp for each session window.

timestamp is the column to use for the event timestamp for each element. For a Cloud Pub/Sub source, you must use the event_timestamp field (for example, pubsub.topic.`my-project`.topicname.event_timestamp) as the event timestamp column.

session_gap is the minimum gap duration, specified as a string with the format "INTERVAL int64 date_part".

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

SESSION_START

Syntax

SESSION_START(session_gap)

Description

The SESSION_START function returns the starting timestamp of the corresponding window returned by the GROUP BY clause.

session_gap is the minimum gap duration, specified as a string with the format "INTERVAL int64 date_part". The gap duration should be the same as the gap duration for SESSION. If the gap durations are not the same, Cloud Dataflow SQL throws an exception.

date_part must be one of MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return type

TIMESTAMP

Examples

The following query uses session windows to aggregate transactions with a 30 minute inactivity gap duration.

SELECT
  SESSION_START("INTERVAL 30 MINUTE") AS period_start,
  SUM(tr.payload.amount) as amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
 SESSION(tr.event_timestamp, "INTERVAL 30 MINUTE")

Window requirements when joining two unbounded sources

When you join two unbounded (streaming) sources, both sources must have the same windowing. If the windows are not compatible, Cloud Dataflow SQL throws an IllegalArgumentException.

Valid JOIN with windowing

This example joins two unbounded sources with compatible windows. Both sources use tumbling windows with a 15 second window duration.

SELECT * FROM
(SELECT
      Tr.payload.transaction_id AS transaction_id, SUM(tr.payload.amount) AS combined_transaction_amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
       tr.payload.transaction_id,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")) AS transaction_stream
INNER JOIN
(SELECT
      invalid_tr.payload.transaction_id as transaction_id
FROM pubsub.topic.`dataflow-sql`.`invalid-transactions` AS invalid_tr
GROUP BY
       invalid_tr.payload.transaction_id,
       TUMBLE(invalid_tr.event_timestamp, "INTERVAL 15 SECOND")) as invalid_transaction_stream
ON
transaction_stream.transaction_id = invalid_transaction_stream.transaction_id

Invalid JOIN with windowing

This example joins two unbounded sources with incompatible windows. One source uses hopping windows with a window duration of 10 seconds. The other source uses tumbling windows with a window duration of 15 seconds. In this case, Cloud Dataflow SQL will throw an IllegalArgumentException.

SELECT * FROM
(SELECT
      Tr.payload.transaction_id AS transaction_id, SUM(tr.payload.amount) AS combined_transaction_amount
FROM pubsub.topic.`dataflow-sql`.transactions AS tr
GROUP BY
       tr.payload.transaction_id,
       HOP(tr.event_timestamp, "INTERVAL 15 SECOND", "INTERVAL 10 SECOND")) AS transaction_stream
INNER JOIN
(SELECT
      invalid_tr.payload.transaction_id as transaction_id
FROM pubsub.topic.`dataflow-sql`.`invalid-transactions` AS invalid_tr
GROUP BY
       invalid_tr.payload.transaction_id,
       TUMBLE(invalid_tr.event_timestamp, "INTERVAL 15 SECOND")) as invalid_transaction_stream
ON
transaction_stream.transaction_id = invalid_transaction_stream.transaction_id
Us ha resultat útil aquesta pàgina? Feu-nos-ho saber:

Envia suggeriments sobre...

Necessiteu ajuda? Visiteu la nostra pàgina d'assistència.