Dataflow SQL streaming extensions

To process streaming data with Dataflow SQL, write queries with Dataflow SQL streaming extensions.

Streaming extensions syntax

SELECT window_timestamps [, ...]
FROM window_function_expression [table_expression, expression [, ...] ]

window_timestamps:
    { window_start | window_end }

windowing_function_expression:
    { tumble_function | hop_function | session_function }

For the standard SQL query syntax, see the Dataflow SQL syntax query page.

Window timestamps

Expressions in the SELECT list can refer to columns with the starting and ending timestamps of a window.

window_start

A column with the starting timestamps.

window_end

A column with the ending timestamps.

Window function expression

The window_function_expression in the FROM clause is a windowing function.

window_expression can be one of the windowing functions:

Windowing functions

Windowing functions divide streaming data into finite windows. When a watermark indicates that a window is complete, windowing functions return the data in the window, the starting timestamp, and the ending timestamp. Late data is dropped.

To compute aggregate values of streaming data, aggregate the data in each window.

TUMBLE

TUMBLE(table_expression, DESCRIPTOR(timestamp_column), window_duration)

Description

Divides streaming data into tumbling windows. A tumbling window represents a consistent, disjoint time interval in the data stream.

For more information, see Tumbling windows.

Supported Argument Types

The table_expression is a table with the streaming data to divide into tumbling windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column.

The window_duration is a string with the format "INTERVAL int64 date_part".

  • date_part must be MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses tumbling windows to sum transaction amounts every 15 minutes.

SELECT
  window_start as period_start,
  window_end as period_end,
  SUM(tr.amount) as amount
FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
            DESCRIPTOR(event_timestamp),
            "INTERVAL 15 MINUTE") as tr
GROUP BY tr.window_start
+---------------------+---------------------+--------+
| period_start        | period_end          | amount |
+---------------------+------------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:15:00 | 2.50   |
+---------------------+---------------------+--------+
| 2020-01-01 00:15:00 | 2020-01-01 00:30:00 | 2.20   |
+----------------------------------------------------+

HOP

HOP(table_expression, DESCRIPTOR(timestamp_column), window_period, window_duration)

Description

Divides streaming data into hopping windows. A hopping window represents a consistent time interval in the data stream. Hopping windows can overlap, whereas tumbling windows are disjoint.

For more information, see Hopping windows.

Supported Argument Types

The table_expression is a table with the streaming data to divide into hopping windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column.

The window_period and window_duration is a string with the format "INTERVAL int64 date_part".

  • date_part must be MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses hopping windows to calculate a 15-minute running total of transaction amounts every 10 minutes.

SELECT
  window_start as period_start,
  window_end as period_end,
  SUM(tr.amount) as amount
FROM HOP((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
         DESCRIPTOR(event_timestamp),
         "INTERVAL 10 MINUTE",
         "INTERVAL 15 MINUTE") as tr
GROUP BY tr.window_start
+---------------------+---------------------+--------+
| period_start        | period_end          | amount |
+---------------------+------------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:15:00 | 2.50   |
+---------------------+---------------------+--------+
| 2020-01-01 00:10:00 | 2020-01-01 00:25:00 | 2.30   |
+----------------------------------------------------+

SESSION

SESSION(table_expression, DESCRIPTOR(timestamp_column), DESCRIPTOR(key1, key2, ...), session_gap_duration)

Description

Divides streaming data into session windows. A session window contains elements within a gap duration of another element. The gap duration is an interval between new data in a data stream. If data arrives after the gap duration, the data is assigned to a new window.

For more information, see Session windows.

Supported Argument Types

The table_expression is a table with the streaming data to divide into session windows.

The timestamp_column is the column in table_expression with the event timestamps of each element.

  • Event timestamps must be TIMESTAMP types

For a Pub/Sub source, you must specify the event_timestamp field as the timestamp_column.

The session_gap_duration is a string with the format "INTERVAL int64 date_part".

  • date_part must be MILLISECOND, SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

Return Data Types

A table with the columns from the input table, a TIMESTAMP column with the start of the windows, and a TIMESTAMP column with the end of the windows.

Examples

The following query uses session windows to sum transaction amounts that occur within 30 minutes of each other.

SELECT
  window_start as period_start,
  window_end as period_end,
  SUM(tr.amount) as amount
FROM SESSION((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
             DESCRIPTOR(event_timestamp),
             DESCRIPTOR(transaction_id),
             "INTERVAL 30 MINUTE") as tr
GROUP BY tr.window_start, transaction_id
+---------------------+---------------------+--------+----------------+
| period_start        | period_end          | amount | transaction_id |
+---------------------+---------------------+-------------------------+
| 2020-01-01 00:00:00 | 2020-01-01 00:10:00 | 2.50   | 1001           |
+---------------------+---------------------+-------------------------+
| 2020-01-01 00:45:00 | 2020-01-01 01:45:00 | 2.25   | 1002           |
+---------------------+---------------------+--------+----------------+

Window requirements when joining two unbounded sources

To join two unbounded collections, both collections must have the same windows. If the windows are not compatible, Dataflow SQL throws an IllegalArgumentException.

Valid JOIN with windowing

This example joins two unbounded collections with compatible windows. Both collections are divided into tumbling windows with a 15-minute window duration.

SELECT * FROM
  (SELECT
        tr.transaction_id AS transaction_id,
        SUM(tr.amount) AS combined_transaction_amount
  FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
              DESCRIPTOR(event_timestamp),
              "INTERVAL 15 MINUTE") as tr
  GROUP BY
         tr.transaction_id)
INNER JOIN
  (SELECT
        invalid_tr.transaction_id as transaction_id
  FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.`invalid-transactions`),
              DESCRIPTOR(event_timestamp),
              "INTERVAL 15 MINUTE") as invalid_tr
  GROUP BY
         invalid_tr.transaction_id)
ON tr.transaction_id.transaction_id = invalid_tr.transaction_id.transaction_id

Invalid JOIN with windowing

This example joins two unbounded sources with incompatible windows. One collection is divided into hopping windows with a 10-minute window duration. The other collection is divided into tumbling windows with a 15-minute window duration.

If you run the following query, Dataflow SQL throws an IllegalArgumentException.

SELECT * FROM
  (SELECT
        tr.transaction_id AS transaction_id,
        SUM(tr.amount) AS combined_transaction_amount
  FROM HOP((SELECT * FROM pubsub.topic.`dataflow-sql`.transaction),
           DESCRIPTOR(event_timestamp),
           "INTERVAL 10 MINUTE", "INTERVAL 15 MINUTE") as tr
  GROUP BY
         tr.transaction_id)
INNER JOIN
  (SELECT
        invalid_tr.transaction_id as transaction_id
  FROM TUMBLE((SELECT * FROM pubsub.topic.`dataflow-sql`.`invalid-transactions`),
              DESCRIPTOR(event_timestamp),
              "INTERVAL 15 MINUTE") as invalid_tr
  GROUP BY
         invalid_tr.transaction_id)
ON tr.transaction_id.transaction_id = invalid_tr.transaction_id.transaction_id