Query plan and timeline

Embedded within query jobs, BigQuery includes diagnostic query plan and timing information. This is similar to the information provided by statements such as EXPLAIN in other database and analytical systems. This information can be retrieved from the API responses of methods such as jobs.get.

For long running queries, BigQuery will periodically update these statistics. These updates happen independently of the rate at which the job status is polled, but typically will not happen more frequently than every 30 seconds. Additionally, query jobs that do not leverage execution resources, such as dry run requests or results that can be served from cached results, will not include the additional diagnostic information, though other statistics may be present.

Background

When BigQuery executes a query job, it converts the declarative SQL statement into a graph of execution, broken up into a series of query stages, which themselves are composed of more granular sets of execution steps. BigQuery leverages a heavily distributed parallel architecture to run these queries. Stages model the units of work that many potential workers may execute in parallel. Stages communicate with one another by using a fast distributed shuffle architecture, which you can read about on the Google Cloud blog.

Within the query plan, the terms work units and workers are used to convey information specifically about parallelism. Elsewhere within BigQuery, you may encounter the term slot, which is an abstracted representation of multiple facets of query execution, including compute, memory, and I/O resources. Top level job statistics provide the estimate of individual query cost using the totalSlotMs estimate of the query using this abstracted accounting.

Another important property of the query execution architecture is that it is dynamic, which means the query plan can be modified while a query is running. Stages that are introduced while a query is running are often used to improve data distribution throughout query workers. In query plans where this occurs, these stages are typically labeled as Repartition stages.

In addition to the query plan, query jobs also expose a timeline of execution, which provides an accounting of units of work completed, pending, and active within query workers. A query can have multiple stages with active workers simultaneously, and the timeline is intended to show overall progress of the query.

Viewing information with the Google Cloud console

In the Google Cloud console, you can see details of the query plan for a completed query by clicking the Execution Details button (near the Results button).

The query plan.

Query plan information

Within the API response, query plans are represented as a list of query stages. Each item in the list shows per-stage overview statistics, detailed step information, and stage timing classifications. Not all details are rendered within the Google Cloud console, but they can all be present within the API responses.

Stage overview

The overview fields for each stage can include the following:

API field Description
id Unique numeric ID for the stage.
name Simple summary name for the stage. The steps within the stage provide additional details about execution steps.
status Execution status of the stage. Possible states include PENDING, RUNNING, COMPLETE, FAILED, and CANCELLED.
inputStages A list of the IDs that form the dependency graph of the stage. For example, a JOIN stage often needs two dependent stages that prepare the data on the left and right side of the JOIN relationship.
startMs Timestamp, in epoch milliseconds, that represents when the first worker within the stage began execution.
endMs Timestamp, in epoch milliseconds, that represents when the last worker completed execution.
steps More detailed list of execution steps within the stage. See next section for more information.
recordsRead Input size of the stage as number of records, across all stage workers.
recordsWritten Output size of the stage as number of records, across all stage workers.
parallelInputs Number of parallelizable units of work for the stage. Depending on the stage and query, this may represent the number of columnar segments within a table, or the number of partitions within an intermediate shuffle.
completedParallelInputs Number of units work within the stage that were completed. For some queries, not all inputs within a stage need to be completed for the stage to complete.
shuffleOutputBytes Represents the total bytes written across all workers within a query stage.
shuffleOutputBytesSpilled Queries that transmit significant data between stages may need to fallback to disk-based transmission. The spilled bytes statistic communicates how much data was spilled to disk. Depends on an optimization algorithm so it is not deterministic for any given query.

Per-stage step information

Steps represent the more granular operations that each worker within a stage must execute, presented as an ordered list of operations. Steps are categorized, with some operations providing more detailed information. The operation categories present in the query plan include the following:

Step Description
READ A read of one or more columns from an input table or intermediate shuffle. Only the first sixteen columns that are read are returned in the step details.
WRITE A write of one or more columns to an output table or intermediate result. For HASH partitioned outputs from a stage, this also includes the columns used as the partition key.
COMPUTE Operations such as expression evaluation and SQL functions.
FILTER Operator implementing the WHERE, OMIT IF and HAVING clauses.
SORT Sort or Order-By operation, includes the column keys and the sort direction.
AGGREGATE An aggregation operation, such as GROUP BY or COUNT.
LIMIT Operator implementing the LIMIT clause.
JOIN A JOIN operation, which includes the join type and the columns used.
ANALYTIC_FUNCTION An invocation of a window function (also known as an "analytic function").
USER_DEFINED_FUNCTION A call to a user-defined function.

Per-stage timing classification

The query stages also provide stage timing classifications, in both relative and absolute form. As each stage of execution represents work undertaken by one or more independent workers, information is provided in both average and worst-case times. These times represent the average performance for all workers in a stage as well as the long-tail slowest worker performance for a given classification. The average and max times are furthermore broken down into absolute and relative representations. For ratio-based statistics, the data is provided as a fraction of the longest time spent by any worker in any segment.

The Google Cloud console presents stage timing using the relative timing representations.

The stage timing information is reported as follows:

Relative timing Absolute timing Ratio numerator
waitRatioAvg waitMsAvg Time the average worker spent waiting to be scheduled.
waitRatioMax waitMsMax Time the slowest worker spent waiting to be scheduled.
readRatioAvg readMsAvg Time the average worker spent reading input data.
readRatioMax readMsMax Time the slowest worker spent reading input data.
computeRatioAvg computeMsAvg Time the average worker spent CPU bound.
computeRatioMax computeMsMax Time the slowest worker spent CPU bound.
writeRatioAvg writeMsAvg Time the average worker spent writing output data.
writeRatioMax writeMsMax Time the slowest worker spent writing output data.

Explanation for federated queries

Federated queries let you send a query statement to an external data source by using the EXTERNAL_QUERY function. Federated queries are subject to the optimization technique known as SQL pushdowns and the query plan shows operations pushed down to the external data source, if any. For example, if you run the following query:

SELECT id, name
FROM EXTERNAL_QUERY("<connection>", "SELECT * FROM company")
WHERE country_code IN ('ee', 'hu') AND name like '%TV%'

The query plan will show the following stage steps:

$1:id, $2:name, $3:country_code
FROM table_for_external_query_$_0(
  SELECT id, name, country_code
  FROM (
    /*native_query*/
    SELECT * FROM company
  )
  WHERE in(country_code, 'ee', 'hu')
)
WHERE and(in($3, 'ee', 'hu'), like($2, '%TV%'))
$1, $2
TO __stage00_output

In this plan, table_for_external_query_$_0(...) represents the EXTERNAL_QUERY function. In the parentheses you can see the query that the external data source executes. Based on that, you can notice that:

  • An external data source returns only 3 selected columns.
  • An external data source returns only rows for which country_code is either 'ee' or 'hu'.
  • The LIKE operator is not push down and is evaluated by BigQuery.

For comparison, if there are no pushdowns, the query plan will show the following stage steps:

$1:id, $2:name, $3:country_code
FROM table_for_external_query_$_0(
  SELECT id, name, description, country_code, primary_address, secondary address
  FROM (
    /*native_query*/
    SELECT * FROM company
  )
)
WHERE and(in($3, 'ee', 'hu'), like($2, '%TV%'))
$1, $2
TO __stage00_output

This time an external data source returns all the columns and all the rows from the company table and BigQuery performs filtering.

Timeline metadata

The query timeline reports progress at specific points in time, providing snapshot views of overall query progress. The timeline is represented as a series of samples that report the following details:

Field Description
elapsedMs Milliseconds elapsed since the start of query execution.
totalSlotMs A cumulative representation of the slot milliseconds used by the query.
pendingUnits Total units of work scheduled and waiting for execution.
activeUnits Total active units of work currently being processed by workers.
completedUnits Total units of work that have been completed while executing this query.

An example query

The following query counts the number of rows in the Shakespeare public dataset and has a second conditional count that restricts results to rows that reference 'hamlet':

SELECT
  COUNT(1) as rowcount,
  COUNTIF(corpus = 'hamlet') as rowcount_hamlet
FROM `publicdata.samples.shakespeare`

This example uses a very small sample table with a simple query, so there's only two units of work total. All work is completed almost immediately.

Click Execution details to see the query plan:

The hamlet query plan.

The color indicators show the relative timings for all steps across all stages.

To learn more about the steps of the execution stages, click to expand the details for the stage:

The hamlet query plan step details.

In this example, the longest time in any segment was the time the single worker in Stage 01 spent waiting for Stage 00 to complete. This is because Stage 01 was dependent on Stage 00's input, and could not start until the first stage wrote its output into intermediate shuffle.

Error reporting

It is possible for query jobs to fail mid-execution. Because plan information is updated periodically, you can observe where within the execution graph the failure occurred. Within the Google Cloud console, successful or failed stages are labeled with a checkmark or exclamation point next to the stage name.

For more information about interpreting and addressing errors, see the troubleshooting guide.

API sample representation

Query plan information is embedded in the job response information, and you can retrieve it by calling jobs.get. For example, the following excerpt of a JSON response for a job returning the sample hamlet query shows both the query plan and timeline information.

"statistics": {
  "creationTime": "1576544129234",
  "startTime": "1576544129348",
  "endTime": "1576544129681",
  "totalBytesProcessed": "2464625",
  "query": {
    "queryPlan": [
      {
        "name": "S00: Input",
        "id": "0",
        "startMs": "1576544129436",
        "endMs": "1576544129465",
        "waitRatioAvg": 0.04,
        "waitMsAvg": "1",
        "waitRatioMax": 0.04,
        "waitMsMax": "1",
        "readRatioAvg": 0.32,
        "readMsAvg": "8",
        "readRatioMax": 0.32,
        "readMsMax": "8",
        "computeRatioAvg": 1,
        "computeMsAvg": "25",
        "computeRatioMax": 1,
        "computeMsMax": "25",
        "writeRatioAvg": 0.08,
        "writeMsAvg": "2",
        "writeRatioMax": 0.08,
        "writeMsMax": "2",
        "shuffleOutputBytes": "18",
        "shuffleOutputBytesSpilled": "0",
        "recordsRead": "164656",
        "recordsWritten": "1",
        "parallelInputs": "1",
        "completedParallelInputs": "1",
        "status": "COMPLETE",
        "steps": [
          {
            "kind": "READ",
            "substeps": [
              "$1:corpus",
              "FROM publicdata.samples.shakespeare"
            ]
          },
          {
            "kind": "AGGREGATE",
            "substeps": [
              "$20 := COUNT($30)",
              "$21 := COUNTIF($31)"
            ]
          },
          {
            "kind": "COMPUTE",
            "substeps": [
              "$30 := 1",
              "$31 := equal($1, 'hamlet')"
            ]
          },
          {
            "kind": "WRITE",
            "substeps": [
              "$20, $21",
              "TO __stage00_output"
            ]
          }
        ]
      },
      {
        "name": "S01: Output",
        "id": "1",
        "startMs": "1576544129465",
        "endMs": "1576544129480",
        "inputStages": [
          "0"
        ],
        "waitRatioAvg": 0.44,
        "waitMsAvg": "11",
        "waitRatioMax": 0.44,
        "waitMsMax": "11",
        "readRatioAvg": 0,
        "readMsAvg": "0",
        "readRatioMax": 0,
        "readMsMax": "0",
        "computeRatioAvg": 0.2,
        "computeMsAvg": "5",
        "computeRatioMax": 0.2,
        "computeMsMax": "5",
        "writeRatioAvg": 0.16,
        "writeMsAvg": "4",
        "writeRatioMax": 0.16,
        "writeMsMax": "4",
        "shuffleOutputBytes": "17",
        "shuffleOutputBytesSpilled": "0",
        "recordsRead": "1",
        "recordsWritten": "1",
        "parallelInputs": "1",
        "completedParallelInputs": "1",
        "status": "COMPLETE",
        "steps": [
          {
            "kind": "READ",
            "substeps": [
              "$20, $21",
              "FROM __stage00_output"
            ]
          },
          {
            "kind": "AGGREGATE",
            "substeps": [
              "$10 := SUM_OF_COUNTS($20)",
              "$11 := SUM_OF_COUNTS($21)"
            ]
          },
          {
            "kind": "WRITE",
            "substeps": [
              "$10, $11",
              "TO __stage01_output"
            ]
          }
        ]
      }
    ],
    "estimatedBytesProcessed": "2464625",
    "timeline": [
      {
        "elapsedMs": "304",
        "totalSlotMs": "50",
        "pendingUnits": "0",
        "completedUnits": "2"
      }
    ],
    "totalPartitionsProcessed": "0",
    "totalBytesProcessed": "2464625",
    "totalBytesBilled": "10485760",
    "billingTier": 1,
    "totalSlotMs": "50",
    "cacheHit": false,
    "referencedTables": [
      {
        "projectId": "publicdata",
        "datasetId": "samples",
        "tableId": "shakespeare"
      }
    ],
    "statementType": "SELECT"
  },
  "totalSlotMs": "50"
},

Using execution information

BigQuery query plans provide information about how the service executes queries, but the managed nature of the service limits whether some details are directly actionable. Many optimizations happen automatically by using the service, which can differ from other environments where tuning, provisioning, and monitoring can require dedicated, knowledgeable staff.

For specific techniques that can improve query execution and performance, see the best practices documentation. The query plan and timeline statistics can help you understand whether certain stages dominate resource utilization. For example, a JOIN stage that generates far more output rows than input rows can indicate an opportunity to filter earlier in the query.

Additionally, timeline information can help identify whether a given query is slow in isolation or due to effects from other queries contending for the same resources. If you observe that the number of active units remains limited throughout the lifetime of the query but the amount of queued units of work remains high, this can represent cases where reducing the number of concurrent queries can significantly improve overall execution time for certain queries.