Vorlage „BigQuery-Export für Parquet (über Storage API)“

Die Vorlage „BigQuery für Parquet“ ist eine Batchpipeline, die Daten aus einer BigQuery-Tabelle liest und im Parquet-Format in einen Cloud Storage-Bucket schreibt. Diese Vorlage verwendet die BigQuery Storage API zum Exportieren der Daten.

Pipelineanforderungen

  • Die BigQuery-Eingabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Erforderliche Parameter

  • tableRef: Der Speicherort der BigQuery-Eingabetabelle. Beispiel: your-project:your-dataset.your-table-name.
  • bucket: Der Cloud Storage-Ordner, in den die Parquet-Dateien geschrieben werden sollen. Beispiel: gs://your-bucket/export/.

Optionale Parameter

  • numShards: Die Anzahl der Shards der Ausgabedatei. Der Standardwert ist 1.
  • Felder: Eine durch Kommas getrennte Liste von Feldern, die aus der BigQuery-Eingabetabelle ausgewählt werden sollen.
  • rowRestriction: Nur lesbare Zeilen, die dem angegebenen Filter entsprechen. Dieser muss ein mit Google Standard-SQL kompatibler SQL-Ausdruck sein (https://cloud.google.com/bigquery/docs/reference/standard-sql). Wenn kein Wert angegeben ist, werden alle Zeilen zurückgegeben.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery export to Parquet (via Storage API) templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Parquet \
    --region=REGION_NAME \
    --parameters \
tableRef=BIGQUERY_TABLE,\
bucket=OUTPUT_DIRECTORY,\
numShards=NUM_SHARDS,\
fields=FIELDS

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • OUTPUT_DIRECTORY: Ihr Cloud Storage-Ordner für Ausgabedateien
  • NUM_SHARDS: Die gewünschte Anzahl von Ausgabedateien-Shards
  • FIELDS: Eine durch Kommas getrennte Liste von Feldern, die aus der BigQuery-Eingabetabelle ausgewählt werden

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "tableRef": "BIGQUERY_TABLE",
          "bucket": "OUTPUT_DIRECTORY",
          "numShards": "NUM_SHARDS",
          "fields": "FIELDS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Parquet",
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • OUTPUT_DIRECTORY: Ihr Cloud Storage-Ordner für Ausgabedateien
  • NUM_SHARDS: Die gewünschte Anzahl von Ausgabedateien-Shards
  • FIELDS: Eine durch Kommas getrennte Liste von Feldern, die aus der BigQuery-Eingabetabelle ausgewählt werden
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToParquet.BigQueryToParquetOptions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link BigQueryToParquet} pipeline exports data from a BigQuery table to Parquet file(s) in a
 * Google Cloud Storage bucket.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-parquet/README_BigQuery_to_Parquet.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Parquet",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery export to Parquet (via Storage API)",
    description =
        "The BigQuery export to Parquet template is a batch pipeline that reads data from a BigQuery table and writes it to a Cloud Storage bucket in Parquet format. "
            + "This template utilizes the <a href=\"https://cloud.google.com/bigquery/docs/reference/storage\">BigQuery Storage API</a> to export the data.",
    optionsClass = BigQueryToParquetOptions.class,
    flexContainerName = "bigquery-to-parquet",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-parquet",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The input BigQuery table must exist before running the pipeline.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigQueryToParquet {

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(BigQueryToParquet.class);

  /** File suffix for file to be written. */
  private static final String FILE_SUFFIX = ".parquet";

  /** Factory to create BigQueryStorageClients. */
  static class BigQueryStorageClientFactory {

    /**
     * Creates BigQueryStorage client for use in extracting table schema.
     *
     * @return BigQueryStorageClient
     */
    static BigQueryStorageClient create() {
      try {
        return BigQueryStorageClient.create();
      } catch (IOException e) {
        LOG.error("Error connecting to BigQueryStorage API: " + e.getMessage());
        throw new RuntimeException(e);
      }
    }
  }

  /** Factory to create ReadSessions. */
  static class ReadSessionFactory {

    /**
     * Creates ReadSession for schema extraction.
     *
     * @param client BigQueryStorage client used to create ReadSession.
     * @param tableString String that represents table to export from.
     * @param tableReadOptions TableReadOptions that specify any fields in the table to filter on.
     * @return session ReadSession object that contains the schema for the export.
     */
    static ReadSession create(
        BigQueryStorageClient client, String tableString, TableReadOptions tableReadOptions) {
      TableReference tableReference = BigQueryHelpers.parseTableSpec(tableString);
      String parentProjectId = "projects/" + tableReference.getProjectId();

      TableReferenceProto.TableReference storageTableRef =
          TableReferenceProto.TableReference.newBuilder()
              .setProjectId(tableReference.getProjectId())
              .setDatasetId(tableReference.getDatasetId())
              .setTableId(tableReference.getTableId())
              .build();

      CreateReadSessionRequest.Builder builder =
          CreateReadSessionRequest.newBuilder()
              .setParent(parentProjectId)
              .setReadOptions(tableReadOptions)
              .setTableReference(storageTableRef);
      try {
        return client.createReadSession(builder.build());
      } catch (InvalidArgumentException iae) {
        LOG.error("Error creating ReadSession: " + iae.getMessage());
        throw new RuntimeException(iae);
      }
    }
  }

  /**
   * The {@link BigQueryToParquetOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToParquetOptions extends PipelineOptions {
    @TemplateParameter.BigQueryTable(
        order = 1,
        description = "BigQuery table to export",
        groupName = "Source",
        helpText = "The BigQuery input table location.",
        example = "your-project:your-dataset.your-table-name")
    @Required
    String getTableRef();

    void setTableRef(String tableRef);

    @TemplateParameter.GcsWriteFile(
        order = 2,
        description = "Output Cloud Storage file(s)",
        groupName = "Target",
        helpText = "The Cloud Storage folder to write the Parquet files to.",
        example = "gs://your-bucket/export/")
    @Required
    String getBucket();

    void setBucket(String bucket);

    @TemplateParameter.Integer(
        order = 3,
        optional = true,
        description = "Maximum output shards",
        helpText = "The number of output file shards. The default value is `1`.")
    @Default.Integer(0)
    Integer getNumShards();

    void setNumShards(Integer numShards);

    @TemplateParameter.Text(
        order = 4,
        optional = true,
        description = "List of field names",
        helpText = "A comma-separated list of fields to select from the input BigQuery table.")
    String getFields();

    void setFields(String fields);

    @TemplateParameter.Text(
        order = 5,
        optional = true,
        description = "Row restrictions/filter.",
        helpText =
            "Read only rows which match the specified filter, which must be a SQL expression"
                + " compatible with Google standard SQL"
                + " (https://cloud.google.com/bigquery/docs/reference/standard-sql). If no value is"
                + " specified, then all rows are returned.")
    String getRowRestriction();

    void setRowRestriction(String restriction);
  }

  /**
   * The {@link BigQueryToParquet#getTableSchema(ReadSession)} method gets Avro schema for table
   * using from the {@link ReadSession} object.
   *
   * @param session ReadSession that contains schema for table, filtered by fields if any.
   * @return avroSchema Avro schema for table. If fields are provided then schema will only contain
   *     those fields.
   */
  private static Schema getTableSchema(ReadSession session) {
    Schema avroSchema;

    avroSchema = new Schema.Parser().parse(session.getAvroSchema().getSchema());
    LOG.info("Schema for export is: " + avroSchema.toString());

    return avroSchema;
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToParquetOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToParquetOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  private static PipelineResult run(BigQueryToParquetOptions options) {

    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    TableReadOptions.Builder builder = TableReadOptions.newBuilder();

    /* Add fields to filter export on, if any. */
    if (options.getFields() != null) {
      builder.addAllSelectedFields(Arrays.asList(options.getFields().split(",\\s*")));
    }

    TableReadOptions tableReadOptions = builder.build();
    BigQueryStorageClient client = BigQueryStorageClientFactory.create();
    ReadSession session =
        ReadSessionFactory.create(client, options.getTableRef(), tableReadOptions);

    // Extract schema from ReadSession
    Schema schema = getTableSchema(session);
    client.close();

    TypedRead<GenericRecord> readFromBQ =
        BigQueryIO.read(SchemaAndRecord::getRecord)
            .from(options.getTableRef())
            .withTemplateCompatibility()
            .withMethod(Method.DIRECT_READ)
            .withCoder(AvroCoder.of(schema));

    if (options.getFields() != null) {
      List<String> selectedFields = Splitter.on(",").splitToList(options.getFields());
      readFromBQ =
          selectedFields.isEmpty() ? readFromBQ : readFromBQ.withSelectedFields(selectedFields);
    }

    // Add row restrictions/filter if any.
    if (!Strings.isNullOrEmpty(options.getRowRestriction())) {
      readFromBQ = readFromBQ.withRowRestriction(options.getRowRestriction());
    }

    /*
     * Steps: 1) Read records from BigQuery via BigQueryIO.
     *        2) Write records to Google Cloud Storage in Parquet format.
     */
    pipeline
        /*
         * Step 1: Read records via BigQueryIO using supplied schema as a PCollection of
         *         {@link GenericRecord}.
         */
        .apply("ReadFromBigQuery", readFromBQ)
        /*
         * Step 2: Write records to Google Cloud Storage as one or more Parquet files
         *         via {@link ParquetIO}.
         */
        .apply(
            "WriteToParquet",
            FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getBucket())
                .withNumShards(options.getNumShards())
                .withSuffix(FILE_SUFFIX));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Nächste Schritte