BigQuery I/O

The Dataflow SDKs have built-in Read and Write transforms that can read data from, and write data to, a Google BigQuery table. You can read an entire table that you specify by name, or you can read partial data by using a query string.

Specifying a BigQuery Table Name

To read or write from a BigQuery table, you must provide a fully-qualified BigQuery table name. A fully-qualified BigQuery table name consists of three parts:

  • A Project ID: The ID for your Google Cloud Project. The default value comes from your pipeline options object.
  • A Dataset ID: The BigQuery dataset ID, which is unique within a given Cloud Project.
  • A Table ID: A table ID, which is unique within a given dataset.

Java

Note that you can use BigQueryIO without supplying a project name; if omitted, BigQueryIO uses the default project from your PipelineOptions object.

The BigQuery Java Client API takes an object of type TableReference to identify the target BigQuery table. The BigQueryIO package in the Dataflow SDK for Java contains a helper method, BigQueryIO.parseTableSpec, that you can use to construct a TableReference from a String containing the three parts of your BigQuery table name.

Most of the time, you won't need to use a TableReference object explicitly; the static factory methods for a BigQueryIO transform take the table name as a String; they then use parseTableSpec internally to construct a TableReference object from the provided String.

Table Name String Formats

You can specify the target BigQuery table using a string containing one of the following formats:

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

You can also omit project_id. If you omit project_id, Cloud Dataflow will use the default project ID from your pipeline options object. In Java, the ID can be accessed with PipelineOptions.getProject.

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

BigQuery Table Rows and Schemas

Java

BigQueryIO read and write transforms produce and consume data as PCollections of BigQuery TableRow objects. TableRow is part of the BigQuery Java Client API, in the package com.google.api.services.bigquery.model.TableRow.

In addition, when writing to BigQuery, you'll need to supply a TableSchema object for the fields you want to write to the target table. You'll need to make use of both the BigQuery TableSchema and TableFieldSchema classes. These classes are defined in the packages com.google.api.services.bigquery.model.TableSchema and com.google.api.services.bigquery.model.TableFieldSchema, respectively.

Reading from BigQuery

Java

To read from a BigQuery table, you apply a BigQueryIO.Read transform. BigQueryIO.Read returns a PCollection of BigQuery TableRow objects, where each element in the PCollection represents a single row in the table.

You can read an entire BigQuery table by supplying the BigQuery table name to BigQueryIO.Read by using the .from operation. The following example code shows how to apply the BigQueryIO.Read transform to read an entire BigQuery table:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

If you don't want to read the entire table, you can supply a query string to BigQueryIO.Read by using the .fromQuery operation. The following example code shows how to read specific fields from a BigQuery table by using a query string:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

Alternatively, you can use BigQuery's standard SQL dialect, as shown in the following example:

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

Note that when reading from BigQuery, the integer values in the TableRow objects are encoded as Strings to match BigQuery's exported JSON format.

When you apply a BigQueryIO.Read transform in batch mode, Dataflow invokes a BigQuery export request. Note that Dataflow's use of this API is subject to BigQuery's Quota and Pricing policies.

Writing to BigQuery

Java

To write to a BigQuery table, you apply a BigQueryIO.Write transform. You'll need to apply the transform to a PCollection<TableRow>.

In general, you'll need to use another transform, such as ParDo, to format your output data into a collection of BigQuery TableRow objects.

When you construct a BigQueryIO.Write transform, you'll need to provide some additional information based on the target table. In addition to the table name, you'll need to provide the following:

  • The target table's CreateDisposition. CreateDisposition specifies whether the target table must exist or can be created by the write operation.
  • The target table's WriteDisposition. WriteDisposition specifies whether the data you write will replace an existing table, append rows to an existing table, or write only to an empty table.

In addition, if your write operation creates a new BigQuery table, you must supply schema information about the target table. In this case, you will need to include a TableSchema object with your write operation.

CreateDisposition

The CreateDisposition controls whether or not your BigQuery write operation should create a table if the target table does not exist. You specify the CreateDisposition when constructing your BigQueryIO.Write transform by invoking the method .withCreateDisposition.

CreateDisposition is an enum with the following valid values:

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: Specifies that a table should never be created. If the target table does not exist, the write operation fails.
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: Specifies that the write operation should create a new table if one does not exist. If you use this value, you'll need to also supply a table schema using the .withSchema operation. CREATE_IF_NEEDED is the default behavior.

Note that if you specify CREATE_IF_NEEDED as the CreateDisposition and you don't supply a TableSchema, the transform may fail at runtime with a java.lang.IllegalArgumentException if the target table does not exist.

WriteDisposition

The WriteDisposition controls how your BigQuery write operation applies to an existing table. You specify the WriteDisposition when constructing your BigQueryIO.Write transform by invoking the method .withWriteDisposition.

WriteDisposition is an enum with the following valid values:

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: Specifies that the write operation should replace an existing table. Any existing rows in the target table are removed, and the new rows are added to the table.
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: Specifies that the write operation should append the rows to the end of the existing table.
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: Specifies that the write operation should fail at runtime if the target table is not empty. WRITE_EMPTY is the default behavior.

When using WRITE_EMPTY for the WriteDisposition, note that the check for whether or not the target table is empty may occur far in advance of the actual write operation. In addition, such a check doesn't guarantee that your pipeline will have exclusive access to the table. If two programs, running concurrently, attempt to write to the same output table with a WriteDisposition of WRITE_EMPTY, both may succeed.

Creating a TableSchema for Writing to a New Table

If your BigQuery write operation creates a new table, you'll need to provide schema information. You provide the schema information by creating a TableSchema object. You pass the TableSchema using the .withSchema operation when you construct your BigQueryIO.Write transform.

A TableSchema object contains information about each field in the table, using objects of type TableFieldSchema. You construct a TableSchema by first building a List of the fields in the table. Then you pass the list using the .setFields operation when you construct the TableSchema.

The following example code shows how to construct a TableSchema for a table with two fields of type String:

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

Applying a BigQueryIO.Write Transform

The following example code shows how to apply a BigQueryIO.Write transform to write a PCollection<TableRow> to a BigQuery table. The write operation creates a table if needed; if the table already exists, it will be replaced.

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam is a trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation