Von Dataflow nach BigQuery schreiben

In diesem Dokument wird beschrieben, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus Dataflow in BigQuery schreiben.

Der BigQuery-E/A-Connector ist im Apache Beam SDK verfügbar. Wir empfehlen die Verwendung der aktuellsten SDK-Version. Weitere Informationen finden Sie unter Apache Beam 2.x SDKs.

Für Python steht außerdem eine sprachübergreifende Unterstützung zur Verfügung.

Übersicht

Der BigQuery-E/A-Connector unterstützt folgende Methoden zum Schreiben in BigQuery:

  • STORAGE_WRITE_API. In diesem Modus führt der Connector direkte Schreibvorgänge in BigQuery-Speicher über die BigQuery Storage Write API aus. Die Storage Write API kombiniert die Streamingaufnahme und das Laden im Batch in einer einzigen Hochleistungs-API. Dieser Modus stellt eine "Genau einmal"-Semantik sicher.
  • STORAGE_API_AT_LEAST_ONCE. Dieser Modus verwendet auch die Storage Write API, stellt aber eine "Mindestens einmal"-Semantik bereit. Dieser Modus führt bei den meisten Pipelines zu einer geringeren Latenz. Doppelte Schreibvorgänge sind jedoch möglich.
  • FILE_LOADS. In diesem Modus schreibt der Connector die Eingabedaten in Stagingdateien in Cloud Storage. Anschließend wird ein Ladejob in BigQuery ausgeführt, um die Daten in BigQuery zu laden. Dieser Modus ist die Standardeinstellung für begrenzte PCollections, die am häufigsten in Batch-Pipelines vorkommen.
  • STREAMING_INSERTS. In diesem Modus verwendet der Connector die Legacy-Streaming-API. Dieser Modus ist der Standard für unbegrenzte PCollections, wird aber für neue Projekte nicht empfohlen.

Beachten Sie bei der Auswahl einer Schreibmethode folgende Punkte:

  • Für Streamingjobs sollten Sie STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE verwenden, da diese Modi direkt in BigQuery-Speicher schreiben, ohne Zwischen-Staging-Dateien zu verwenden.
  • Wenn Sie die Pipeline im "Mindestens einmal"-Streamingmodus ausführen, legen Sie den Schreibmodus auf STORAGE_API_AT_LEAST_ONCE fest. Diese Einstellung ist effizienter und entspricht der Semantik des „Mindestens einmal“-Streamingmodus.
  • Für Dateiladevorgänge und die Storage Write API gelten unterschiedliche Kontingente und Limits.
  • Für Ladejobs wird entweder der freigegebene BigQuery-Slot-Pool oder reservierte Slots verwendet. Wenn Sie reservierte Slots verwenden möchten, führen Sie den Ladejob in einem Projekt mit einer Reservierungszuweisung vom Typ PIPELINE aus. Ladejobs sind kostenlos, wenn Sie den freigegebenen BigQuery-Slot-Pool verwenden. BigQuery gibt jedoch keine Garantien für verfügbare Kapazitäten des gemeinsamen Pools. Weitere Informationen finden Sie unter Einführung in Reservierungen.

Parallelität

  • Bei FILE_LOADS und STORAGE_WRITE_API in Streaming-Pipelines teilt der Connector die Daten in eine Reihe von Dateien oder Streams auf. Im Allgemeinen empfehlen wir, withAutoSharding aufzurufen, um die automatische Fragmentierung zu aktivieren.

  • Bei FILE_LOADS in Batch-Pipelines schreibt der Connector Daten in partitionierte Dateien, die dann parallel in BigQuery geladen werden.

  • Bei STORAGE_WRITE_API in Batch-Pipelines erstellt jeder Worker einen oder mehrere Streams, die in BigQuery geschrieben werden. Die Anzahl der Streams wird durch die Gesamtzahl der Fragmente bestimmt.

  • Für STORAGE_API_AT_LEAST_ONCE gibt es einen einzelnen Standardschreibstream. Mehrere Worker fügen diesem Stream Daten hinzu.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte für verschiedene BigQuery-E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Er hat Runner v2 nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben im Speicher 55 Mbit/s 54.000 Elemente pro Sekunde
Avro Load 78 Mbit/s 77.000 Elemente pro Sekunde
JSON-Lade 54 Mbit/s 53.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

Best Practices

In diesem Abschnitt werden Best Practices für das Schreiben von Daten in BigQuery aus Dataflow beschrieben.

Allgemeines

  • Für die Storage Write API gelten Kontingentlimits. Der Connector verarbeitet diese Limits für die meisten Pipelines. In einigen Fällen können jedoch die verfügbaren Storage Write API-Streams erschöpft werden. Dieses Problem kann beispielsweise in einer Pipeline auftreten, die automatische Fragmentierung und Autoskalierung mit einer großen Anzahl von Zielen verwendet, insbesondere bei lang laufenden Jobs mit stark variablen Arbeitslasten. Wenn dieses Problem auftritt, können Sie stattdessen STORAGE_WRITE_API_AT_LEAST_ONCE verwenden.

  • Mit den Google Cloud-Messwerten können Sie die Nutzung Ihres Storage Write API-Kontingents im Blick behalten.

  • Bei Dateiuploads ist Avro in der Regel schneller als JSON. Wenn Sie Avro verwenden möchten, rufen Sie withAvroFormatFunction auf.

  • Standardmäßig werden Ladejobs im selben Projekt wie der Dataflow-Job ausgeführt. Wenn Sie ein anderes Projekt angeben möchten, drücken Sie die Taste withLoadJobProjectId.

  • Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann useBeamSchema in Ihrer Pipeline auf, um automatisch zwischen Apache Beam-Row- und BigQuery-TableRow-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unter ExampleModel.java.

  • Wenn Sie Tabellen mit komplexen Schemas laden, die Tausende von Feldern enthalten, sollten Sie vielleicht withMaxBytesPerPartition aufrufen, um für jeden Ladejob eine kleinere maximale Größe festzulegen.

Streamingpipelines

Die folgenden Empfehlungen gelten für Streamingpipelines.

  • Für Streamingpipelines empfehlen wir die Verwendung der Storage Write API (STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE).

  • Eine Streamingpipeline kann Dateiladungen verwenden, dieser Ansatz hat jedoch Nachteile:

    • Zum Schreiben der Dateien ist Windowing erforderlich. Sie können das globale Fenster nicht verwenden.
    • BigQuery lädt Dateien nach dem Best-Effort-Prinzip, wenn der freigegebene Slot-Pool verwendet wird. Zwischen dem Schreiben eines Datensatzes und seiner Verfügbarkeit in BigQuery kann es zu einer erheblichen Verzögerung kommen.
    • Wenn ein Ladejob fehlschlägt, z. B. aufgrund fehlerhafter Daten oder Schemaabweichungen, schlägt die gesamte Pipeline fehl.
  • Verwenden Sie nach Möglichkeit STORAGE_WRITE_API_AT_LEAST_ONCE. Dies kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch kostengünstiger und skalierbarer als STORAGE_WRITE_API.

  • Vermeiden Sie im Allgemeinen die Verwendung von STREAMING_INSERTS. Streaming-Insert-Anweisungen sind teurer als die Storage Write API und bieten auch keine gute Leistung.

  • Durch die Fragmentierung von Daten kann die Leistung in Streaming-Pipelines verbessert werden. Für die meisten Pipelines ist die automatische Fragmentierung ein guter Ausgangspunkt. Sie können die Fragmentierung jedoch so optimieren:

  • Wenn Sie Streaming-Insert-Anweisungen verwenden, empfehlen wir, retryTransientErrors als Wiederholungsrichtlinie festzulegen.

Batchpipelines

Die folgenden Empfehlungen gelten für Batchpipelines.

  • Für die meisten großen Batchpipelines empfehlen wir, es zuerst mit FILE_LOADS zu versuchen. Eine Batchpipeline kann STORAGE_WRITE_API verwenden. Allerdings wird sie wahrscheinlich bei großen Datenmengen (mehr als 1.000 vCPUs) oder bei gleichzeitig ausgeführten Pipelines Kontingentlimits überschreiten. Apache Beam drosselt die maximale Anzahl von Schreibstreams für STORAGE_WRITE_API-Batchjobs nicht. Dadurch erreicht der Job schließlich die BigQuery Storage API-Limits.

  • Wenn Sie FILE_LOADS verwenden, können Sie entweder den freigegebenen BigQuery-Slot-Pool oder Ihren Pool reservierter Slots erschöpfen. Wenn diese Art von Fehlern auftritt, versuchen Sie die folgenden Ansätze:

    • Reduzieren Sie die maximale Anzahl von Workern oder die Worker-Größe für den Job.
    • Erwerben Sie mehr reservierte Slots.
    • Geeignete Methoden: STORAGE_WRITE_API
  • Kleine bis mittelgroße Pipelines (< 1.000 vCPUs) können von STORAGE_WRITE_API profitieren. Bei diesen kleineren Jobs sollten Sie STORAGE_WRITE_API verwenden, wenn Sie eine Warteschlange für unzustellbare Nachrichten benötigen oder wenn der freigegebene Slot-Pool FILE_LOADS nicht ausreicht.

  • Wenn Sie doppelte Daten tolerieren können, sollten Sie die Verwendung von STORAGE_WRITE_API_AT_LEAST_ONCE in Betracht ziehen. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch möglicherweise günstiger als die Option STORAGE_WRITE_API.

  • Unterschiedliche Schreibmodi können je nach den Eigenschaften Ihrer Pipeline unterschiedlich funktionieren. Experimentieren Sie, um den besten Schreibmodus für Ihre Arbeitslast zu ermitteln.

Fehler auf Zeilenebene behandeln

In diesem Abschnitt wird beschrieben, wie Sie mögliche Fehler auf Zeilenebene behandeln können, z. B. aufgrund falsch formatierter Eingabedaten oder wegen Schemaabweichungen.

Bei der Storage Write API werden alle Zeilen, die nicht geschrieben werden können, in eine separate PCollection verschoben. Rufen Sie zum Abrufen dieser Sammlung Folgendes auf: getFailedStorageApiInserts auf dem WriteResult-Objekt. Ein Beispiel für diesen Ansatz finden Sie unter Daten in BigQuery streamen.

Es empfiehlt sich, die Fehler zur späteren Verarbeitung an eine Warteschlange oder Tabelle für unzustellbare Nachrichten zu senden. Weitere Informationen zu diesem Muster finden Sie unter BigQueryIO-Muster für unzustellbare Nachrichten.

Wenn bei FILE_LOADS beim Laden der Daten ein Fehler auftritt, schlägt der Ladejob fehl und die Pipeline löst eine Laufzeitausnahme aus. Sie können den Fehler in den Dataflow-Logs oder im BigQuery-Jobverlauf sehen. Der E/O-Connector gibt keine Informationen zu einzelnen fehlgeschlagenen Zeilen zurück.

Weitere Informationen zur Fehlerbehebung finden Sie unter BigQuery-Connector-Fehler.

Beispiele

Die folgenden Beispiele zeigen, wie Sie mit Dataflow in BigQuery schreiben.

In vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die einen PCollection<MyData> in BigQuery schreibt, wobei MyData ein benutzerdefinierter Datentyp ist.

Die Methode BigQueryIO.write() gibt einen BigQueryIO.Write<T>-Typ zurück, der zum Konfigurieren des Schreibvorgangs verwendet wird. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter In Tabellen schreiben. In diesem Codebeispiel wird in eine vorhandene Tabelle (CREATE_NEVER) geschrieben und die neuen Zeilen werden an die Tabelle angehängt (WRITE_APPEND).

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

In neue oder vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine neue Tabelle erstellt, wenn die Zieltabelle nicht vorhanden ist. Dazu wird die Erstellungsanordnung auf CREATE_IF_NEEDED gesetzt. Wenn Sie diese Option verwenden, müssen Sie ein Tabellenschema angeben. Der Connector verwendet dieses Schema, wenn neue Tabellen erstellt werden.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Daten nach BigQuery streamen

Im folgenden Beispiel wird gezeigt, wie Sie Daten mit einer "Genau einmal"-Semantik streamen. Dazu setzen Sie den Schreibmodus auf STORAGE_WRITE_API.

Nicht alle Streamingpipelines erfordern eine "Genau einmal"-Semantik. Beispielsweise können Sie Duplikate eventuell aus der Zieltabelle manuell entfernen. Wenn für Ihr Szenario doppelt vorhandene Datensätze akzeptabel sind, sollten Sie die "Mindestens einmal"-Semantik verwenden. Dazu setzen Sie die Schreibmethode auf STORAGE_API_AT_LEAST_ONCE. Diese Methode ist in der Regel effizienter und führt bei den meisten Pipelines zu einer geringeren Latenz.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Nächste Schritte