Von Dataflow in BigQuery schreiben

In diesem Dokument wird gezeigt, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus Dataflow in BigQuery schreiben.

Der BigQuery-E/A-Connector ist im Apache Beam SDK verfügbar. Wir empfehlen die Verwendung der aktuellsten SDK-Version. Weitere Informationen finden Sie unter Apache Beam 2.x SDKs.

Für Python steht außerdem eine sprachübergreifende Unterstützung zur Verfügung.

Überblick

Der BigQuery-E/A-Connector unterstützt folgende Methoden zum Schreiben in BigQuery:

  • STORAGE_WRITE_API. In diesem Modus führt der Connector direkte Schreibvorgänge in BigQuery-Speicher über die BigQuery Storage Write API aus. Die Storage Write API kombiniert die Streamingaufnahme und das Laden im Batch in einer einzigen Hochleistungs-API. Dieser Modus stellt eine "Genau einmal"-Semantik sicher.
  • STORAGE_API_AT_LEAST_ONCE. Dieser Modus verwendet auch die Storage Write API, stellt aber eine "Mindestens einmal"-Semantik bereit. Dieser Modus führt bei den meisten Pipelines zu einer geringeren Latenz. Doppelte Schreibvorgänge sind jedoch möglich.
  • FILE_LOADS. In diesem Modus schreibt der Connector die Eingabedaten in Staging-Dateien in Cloud Storage. Anschließend wird ein BigQuery-Ladejob ausgeführt, um die Daten in BigQuery zu laden. Der Modus ist die Standardeinstellung für begrenzte PCollections, die am häufigsten in Batchpipelines gefunden werden.
  • STREAMING_INSERTS. In diesem Modus verwendet der Connector die Legacy-Streaming API. Dieser Modus ist die Standardeinstellung für unbegrenzte PCollections, wird jedoch für neue Projekte nicht empfohlen.

Beachten Sie bei der Auswahl einer Schreibmethode folgende Punkte:

  • Für Streamingjobs sollten Sie STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE verwenden, da diese Modi direkt in BigQuery-Speicher schreiben, ohne Zwischen-Staging-Dateien zu verwenden.
  • Wenn Sie die Pipeline im "Mindestens einmal"-Streamingmodus ausführen, legen Sie den Schreibmodus auf STORAGE_API_AT_LEAST_ONCE fest. Diese Einstellung ist effizienter und entspricht der Semantik des "Mindestens einmal"-Streamingmodus.
  • Für Dateiladevorgänge und die Storage Write API gelten unterschiedliche Kontingente und Limits.
  • Ladejobs verwenden entweder den freigegebenen BigQuery-Slot-Pool oder reservierte Slots. Führen Sie den Ladejob in einem Projekt mit einer Reservierungszuweisung vom Typ PIPELINE aus, um reservierte Slots zu verwenden. Ladejobs sind kostenlos, wenn Sie den freigegebenen BigQuery-Slot-Pool verwenden. BigQuery gibt jedoch keine Garantien für verfügbare Kapazitäten des gemeinsamen Pools. Weitere Informationen finden Sie unter Einführung in Reservierungen.

Parallelität

  • Für FILE_LOADS und STORAGE_WRITE_API in Streamingpipelines teilt der Connector die Daten in eine Reihe von Dateien oder Streams. Im Allgemeinen empfehlen wir, withAutoSharding aufzurufen, um die automatische Fragmentierung zu aktivieren.

  • Bei FILE_LOADS in Batchpipelines schreibt der Connector Daten in partitionierte Dateien. Diese werden dann parallel in BigQuery geladen.

  • Bei STORAGE_WRITE_API in Batchpipelines erstellt jeder Worker einen oder mehrere Streams zum Schreiben in BigQuery, die durch die Gesamtzahl der Shards bestimmt werden.

  • Für STORAGE_API_AT_LEAST_ONCE gibt es einen einzelnen Standardschreibstream. An diesen Stream werden mehrere Worker angehängt.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte verschiedener BigQuery E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Sie haben nicht Runner v2 verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben im Speicher 55 Mbit/s 54.000 Elemente pro Sekunde
Avro-Last 78 Mbit/s 77.000 Elemente pro Sekunde
Json-Load 54 Mbit/s 53.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batchpipelines. Sie sollen die Leistung von E/A-Connectors vergleichen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Leistung von Beam IO.

Best Practices

In diesem Abschnitt werden Best Practices für das Schreiben von Daten in BigQuery aus Dataflow beschrieben.

Allgemeines

  • Für die Storage Write API gelten Kontingentlimits. Der Connector verarbeitet diese Limits für die meisten Pipelines. Einige Szenarien können jedoch die verfügbaren Storage Write API-Streams ausschöpfen. Dieses Problem kann beispielsweise in einer Pipeline auftreten, die automatische Fragmentierung und Autoscaling mit einer großen Anzahl von Zielen verwendet, insbesondere bei lang andauernden Jobs mit sehr variablen Arbeitslasten. Wenn dieses Problem auftritt, sollten Sie STORAGE_WRITE_API_AT_LEAST_ONCE verwenden. Dadurch wird das Problem vermieden.

  • Verwenden Sie Google Cloud-Messwerte, um die Kontingentnutzung der Storage Write API zu überwachen.

  • Bei Verwendung von Dateiladevorgängen übertrifft Avro normalerweise JSON. Rufen Sie withAvroFormatFunction auf, um Avro zu verwenden.

  • Standardmäßig werden Ladejobs im selben Projekt wie der Dataflow-Job ausgeführt. Wenn Sie ein anderes Projekt angeben möchten, rufen Sie withLoadJobProjectId auf.

  • Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann useBeamSchema in der Pipeline auf, um automatisch zwischen Apache Beam-Row- und BigQuery-TableRow-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unter ExampleModel.java.

  • Wenn Sie Tabellen mit komplexen Schemas laden, die Tausende von Feldern enthalten, sollten Sie vielleicht withMaxBytesPerPartition aufrufen, um für jeden Ladejob eine kleinere maximale Größe festzulegen.

Streamingpipelines

Die folgenden Empfehlungen gelten für Streamingpipelines.

  • Für Streamingpipelines empfehlen wir die Verwendung der Storage Write API (STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE).

  • Eine Streaming-Pipeline kann Dateiladevorgänge verwenden. Dieser Ansatz hat jedoch Nachteile:

    • Es ist Windowing erforderlich, um die Dateien zu schreiben. Das globale Fenster kann nicht verwendet werden.
    • BigQuery lädt Dateien auf Best-Effort-Basis, wenn der freigegebene Slot-Pool verwendet wird. Zwischen dem Schreiben eines Eintrags und dessen Verfügbarkeit in BigQuery kann es zu einer erheblichen Verzögerung kommen.
    • Wenn ein Ladejob fehlschlägt, z. B. aufgrund fehlerhafter Daten oder Schemaabweichungen, schlägt die gesamte Pipeline fehl.
  • Verwenden Sie nach Möglichkeit STORAGE_WRITE_API_AT_LEAST_ONCE. Dies kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden. Dieser Vorgang ist jedoch kostengünstiger und skalierbarer als STORAGE_WRITE_API.

  • Vermeiden Sie im Allgemeinen die Verwendung von STREAMING_INSERTS. Streaming-Insert-Anweisungen sind teurer als die Storage Write API und bieten auch keine gute Leistung.

  • Die Datenfragmentierung kann die Leistung in Streamingpipelines verbessern. Bei den meisten Pipelines ist die automatische Fragmentierung ein guter Ausgangspunkt. Sie können die Fragmentierung jedoch so optimieren:

  • Wenn Sie Streaming-Insert-Anweisungen verwenden, empfehlen wir, retryTransientErrors als Wiederholungsrichtlinie festzulegen.

Batchpipelines

Die folgenden Empfehlungen gelten für Batchpipelines.

  • Für die meisten großen Batchpipelines empfehlen wir, es zuerst mit FILE_LOADS zu versuchen. Eine Batchpipeline kann STORAGE_WRITE_API verwenden. Allerdings wird sie wahrscheinlich bei großen Datenmengen (mehr als 1.000 vCPUs) oder bei gleichzeitig ausgeführten Pipelines Kontingentlimits überschreiten. Apache Beam drosselt die maximale Anzahl von Schreibstreams für STORAGE_WRITE_API-Batchjobs nicht. Dadurch erreicht der Job schließlich die BigQuery Storage API-Limits.

  • Wenn Sie FILE_LOADS verwenden, können Sie entweder den freigegebenen BigQuery-Slot-Pool oder Ihren Pool reservierter Slots erschöpfen. Wenn diese Art von Fehlern auftritt, versuchen Sie die folgenden Ansätze:

    • Reduzieren Sie die maximale Anzahl von Workern oder die Worker-Größe für den Job.
    • Erwerben Sie mehr reservierte Slots.
    • Geeignete Methoden: STORAGE_WRITE_API
  • Kleine bis mittelgroße Pipelines (< 1.000 vCPUs) können von STORAGE_WRITE_API profitieren. Bei diesen kleineren Jobs sollten Sie STORAGE_WRITE_API verwenden, wenn Sie eine Warteschlange für unzustellbare Nachrichten benötigen oder wenn der freigegebene Slot-Pool FILE_LOADS nicht ausreicht.

  • Wenn Sie doppelte Daten tolerieren können, sollten Sie die Verwendung von STORAGE_WRITE_API_AT_LEAST_ONCE in Betracht ziehen. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch möglicherweise günstiger als die Option STORAGE_WRITE_API.

  • Unterschiedliche Schreibmodi können je nach den Eigenschaften Ihrer Pipeline unterschiedlich funktionieren. Experimentieren Sie, um den besten Schreibmodus für Ihre Arbeitslast zu ermitteln.

Fehler auf Zeilenebene behandeln

In diesem Abschnitt wird beschrieben, wie Sie mögliche Fehler auf Zeilenebene behandeln können, z. B. aufgrund falsch formatierter Eingabedaten oder wegen Schemaabweichungen.

Für die Storage Write API werden alle Zeilen, die nicht geschrieben werden können, in einen separaten PCollection geschrieben. Rufen Sie getFailedStorageApiInserts für das Objekt WriteResult auf, um diese Sammlung abzurufen. Ein Beispiel für diesen Ansatz finden Sie unter Daten in BigQuery streamen.

Es wird empfohlen, die Fehler zur späteren Verarbeitung an eine Warteschlange oder Tabelle für unzustellbare Nachrichten zu senden. Weitere Informationen zu diesem Muster finden Sie unter BigQueryIO-Dead-Letter-Muster.

Wenn für FILE_LOADS ein Fehler beim Laden der Daten auftritt, schlägt der Ladejob fehl und die Pipeline löst eine Laufzeitausnahme aus. Sie können den Fehler in den Dataflow-Logs oder im BigQuery-Jobverlauf aufrufen. Der E/A-Connector gibt keine Informationen zu einzelnen fehlgeschlagenen Zeilen zurück.

Weitere Informationen zur Fehlerbehebung finden Sie unter BigQuery-Connector-Fehler.

Beispiele

Die folgenden Beispiele zeigen, wie Sie mit Dataflow in BigQuery schreiben.

In vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die einen PCollection<MyData> in BigQuery schreibt, wobei MyData ein benutzerdefinierter Datentyp ist.

Die Methode BigQueryIO.write() gibt einen BigQueryIO.Write<T>-Typ zurück, der zum Konfigurieren des Schreibvorgangs verwendet wird. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter In Tabellen schreiben. In diesem Codebeispiel wird eine vorhandene Tabelle (CREATE_NEVER) geschrieben. Die neuen Zeilen werden an die Tabelle angehängt (WRITE_APPEND).

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

In neue oder vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine neue Tabelle erstellt, wenn die Zieltabelle nicht vorhanden ist. Dazu wird die Erstellungsanordnung auf CREATE_IF_NEEDED gesetzt. Wenn Sie diese Option verwenden, müssen Sie ein Tabellenschema angeben. Der Connector verwendet dieses Schema, wenn neue Tabellen erstellt werden.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Daten zu BigQuery streamen

Im folgenden Beispiel wird gezeigt, wie Sie Daten mit einer "Genau einmal"-Semantik streamen. Dazu setzen Sie den Schreibmodus auf STORAGE_WRITE_API.

Nicht alle Streamingpipelines erfordern eine "Genau einmal"-Semantik. Beispielsweise können Sie Duplikate eventuell aus der Zieltabelle manuell entfernen. Wenn für Ihr Szenario doppelt vorhandene Datensätze akzeptabel sind, sollten Sie die "Mindestens einmal"-Semantik verwenden. Dazu setzen Sie die Schreibmethode auf STORAGE_API_AT_LEAST_ONCE. Diese Methode ist im Allgemeinen effizienter und führt bei den meisten Pipelines zu einer geringeren Latenz.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Nächste Schritte