Scrivere da Dataflow a BigQuery

Questo documento descrive come scrivere i dati da Dataflow a BigQuery.

Panoramica

Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare Managed I/O per scrivere in BigQuery. Managed I/O fornisce funzionalità come upgrade automatici e un'API di configurazione coerente. Quando scrivi in BigQuery, Managed I/O sceglie automaticamente il metodo di scrittura migliore per i job batch o di streaming.

Se hai bisogno di un'ottimizzazione delle prestazioni più avanzata, valuta l'utilizzo del connettore BigQueryIO. Per saperne di più, consulta la sezione Utilizzare il connettore BigQueryIO in questo documento.

Prestazioni

La tabella seguente mostra le metriche di rendimento per vari workload. Questi carichi di lavoro sono stati eseguiti su un worker e2-standard2, utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.

100 milioni di record | 1 kB | 1 colonna Velocità effettiva (byte) Throughput (elementi)
Storage Write 55 MBps 54.000 elementi al secondo
Avro Load 78 MBps 77.000 elementi al secondo
Json Load 54 MBps 53.000 elementi al secondo

Queste metriche si basano su semplici pipeline batch. Sono pensati per confrontare le prestazioni tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline del mondo reale. Il rendimento della pipeline Dataflow è complesso e dipende dal tipo di VM, dai dati in fase di elaborazione, dal rendimento delle origini e dei sink esterni e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche di prestazioni di altri SDK di linguaggio. Per ulteriori informazioni, vedi Prestazioni di Beam IO.

Utilizzare il connettore BigQueryIO

Il connettore BigQuery I/O supporta i seguenti metodi per la scrittura in BigQuery:

  • STORAGE_WRITE_API. In questa modalità, il connettore esegue scritture dirette nell'archiviazione BigQuery utilizzando l'API BigQuery Storage Write. L'API Storage Write combina l'importazione di flussi di dati e il caricamento in batch in un'unica API ad alte prestazioni. Questa modalità garantisce la semantica esattamente una volta.
  • STORAGE_API_AT_LEAST_ONCE. Questa modalità utilizza anche l'API Storage Write, ma fornisce la semantica at-least-once. Questa modalità comporta una latenza inferiore per la maggior parte delle pipeline. Tuttavia, sono possibili scritture duplicate.
  • FILE_LOADS. In questa modalità, il connettore scrive i dati di input nei file di staging in Cloud Storage. Poi esegue un job di caricamento BigQuery per caricare i dati in BigQuery. La modalità è quella predefinita per PCollections delimitati, che si trovano più comunemente nelle pipeline batch.
  • STREAMING_INSERTS. In questa modalità, il connettore utilizza l'API di inserimento di flussi precedente. Questa modalità è quella predefinita per PCollections senza limiti, ma non è consigliata per i nuovi progetti.

Quando scegli un metodo di scrittura, considera i seguenti punti:

  • Per i job di streaming, valuta la possibilità di utilizzare STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE, perché queste modalità scrivono direttamente nello spazio di archiviazione BigQuery, senza utilizzare file di staging intermedi.
  • Se esegui la pipeline utilizzando la modalità flusso di dati Almeno una volta, imposta la modalità di scrittura su STORAGE_API_AT_LEAST_ONCE. Questa impostazione è più efficiente e corrisponde alla semantica della modalità di streaming almeno una volta.
  • I caricamenti di file e l'API Storage Write hanno quote e limiti diversi.
  • I job di caricamento utilizzano il pool di slot BigQuery condiviso o gli slot riservati. Per utilizzare gli slot riservati, esegui il job di caricamento in un progetto con un'assegnazione di prenotazione di tipo PIPELINE. I job di caricamento sono gratuiti se utilizzi il pool di slot BigQuery condiviso. Tuttavia, BigQuery non offre alcuna garanzia sulla capacità disponibile del pool condiviso. Per ulteriori informazioni, consulta Introduzione alle prenotazioni.

Parallelismo

  • Per FILE_LOADS e STORAGE_WRITE_API nelle pipeline di streaming, il connettore suddivide i dati in un numero di file o flussi. In generale, consigliamo di chiamare withAutoSharding per abilitare lo sharding automatico.

  • Per FILE_LOADS nelle pipeline batch, il connettore scrive i dati in file partizionati, che vengono poi caricati in parallelo in BigQuery.

  • Per STORAGE_WRITE_API nelle pipeline batch, ogni worker crea uno o più stream per scrivere in BigQuery, in base al numero totale di shard.

  • Per STORAGE_API_AT_LEAST_ONCE, esiste un singolo stream di scrittura predefinito. Più worker vengono aggiunti a questo stream.

Best practice

  • L'API Storage Write ha limiti di quota. Il connettore gestisce questi limiti per la maggior parte delle pipeline. Tuttavia, alcuni scenari possono esaurire i flussi dell'API Storage Write. Ad esempio, questo problema potrebbe verificarsi in una pipeline che utilizza lo sharding automatico e la scalabilità automatica con un numero elevato di destinazioni, soprattutto in job a lunga esecuzione con carichi di lavoro molto variabili. Se si verifica questo problema, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE, che evita il problema.

  • Utilizza le metricheGoogle Cloud per monitorare l'utilizzo della quota dell'API Storage Write.

  • Quando si utilizzano i caricamenti di file, Avro in genere supera JSON. Per utilizzare Avro, chiama withAvroFormatFunction.

  • Per impostazione predefinita, i job di caricamento vengono eseguiti nello stesso progetto del job Dataflow. Per specificare un altro progetto, chiama withLoadJobProjectId.

  • Quando utilizzi l'SDK Java, valuta la possibilità di creare una classe che rappresenti lo schema della tabella BigQuery. Poi chiama useBeamSchema nella pipeline per convertire automaticamente i tipi Apache Beam Row e BigQuery TableRow. Per un esempio di classe schema, vedi ExampleModel.java.

  • Se carichi tabelle con schemi complessi contenenti migliaia di campi, valuta la possibilità di chiamare withMaxBytesPerPartition per impostare una dimensione massima inferiore per ogni job di caricamento.

  • Per impostazione predefinita, BigQueryIO utilizza le impostazioni dell'API Storage Write ragionevoli per la maggior parte delle pipeline. Tuttavia, se riscontri problemi di prestazioni, puoi impostare le opzioni della pipeline per ottimizzare queste impostazioni. Per saperne di più, consulta Ottimizzare l'API Storage Write nella documentazione di Apache Beam.

Pipeline in modalità flusso

I seguenti consigli si applicano alle pipeline di streaming.

  • Per le pipeline di streaming, consigliamo di utilizzare l'API Storage Write (STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE).

  • Una pipeline di streaming può utilizzare i caricamenti di file, ma questo approccio presenta degli svantaggi:

    • Richiede il windowing per scrivere i file. Non puoi utilizzare la finestra globale.
    • BigQuery carica i file in base al principio del "best effort" quando utilizza il pool di slot condiviso. Potrebbe esserci un ritardo significativo tra il momento in cui viene scritto un record e quello in cui è disponibile in BigQuery.
    • Se un job di caricamento non va a buon fine, ad esempio a causa di dati errati o di una mancata corrispondenza dello schema, l'intera pipeline non va a buon fine.
  • Se possibile, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Può comportare la scrittura di record duplicati in BigQuery, ma è meno costoso e più scalabile di STORAGE_WRITE_API.

  • In generale, evita di utilizzare STREAMING_INSERTS. Gli inserimenti in streaming sono più costosi dell'API Storage Write e non hanno lo stesso rendimento.

  • Lo sharding dei dati può migliorare le prestazioni nelle pipeline di streaming. Per la maggior parte delle pipeline, lo sharding automatico è un buon punto di partenza. Tuttavia, puoi ottimizzare lo sharding nel seguente modo:

  • Se utilizzi gli inserti di streaming, ti consigliamo di impostare retryTransientErrors come norme di ripetizione.

Pipeline batch

I seguenti consigli si applicano alle pipeline batch.

  • Per la maggior parte delle pipeline di grandi batch, ti consigliamo di provare prima FILE_LOADS. Una pipeline batch può utilizzare STORAGE_WRITE_API, ma è probabile che superi i limiti di quota su larga scala (più di 1000 vCPU) o se sono in esecuzione pipeline simultanee. Apache Beam non limita il numero massimo di flussi di scrittura per i job batch STORAGE_WRITE_API, quindi il job alla fine raggiunge i limiti dell'API BigQuery Storage.

  • Quando utilizzi FILE_LOADS, potresti esaurire il pool di slot BigQuery condiviso o il pool di slot riservati. Se riscontri questo tipo di errore, prova i seguenti approcci:

    • Riduci il numero massimo di worker o le dimensioni dei worker per il job.
    • Acquista altri slot riservati.
    • Valuta la possibilità di utilizzare STORAGE_WRITE_API.
  • Le pipeline di dimensioni piccole e medie (< 1000 vCPU) potrebbero trarre vantaggio dall'utilizzo di STORAGE_WRITE_API. Per questi job più piccoli, valuta la possibilità di utilizzare STORAGE_WRITE_API se vuoi una coda di messaggi non recapitabili o quando il pool di slot condiviso FILE_LOADS non è sufficiente.

  • Se puoi tollerare i dati duplicati, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Questa modalità può comportare la scrittura di record duplicati in BigQuery, ma potrebbe essere meno costosa dell'opzione STORAGE_WRITE_API.

  • Le diverse modalità di scrittura potrebbero avere prestazioni diverse in base alle caratteristiche della pipeline. Esegui esperimenti per trovare la modalità di scrittura migliore per il tuo carico di lavoro.

Gestire gli errori a livello di riga

Questa sezione descrive come gestire gli errori che potrebbero verificarsi a livello di riga, ad esempio a causa di dati di input malformati o mancata corrispondenza dello schema.

Per l'API Storage Write, le righe che non possono essere scritte vengono inserite in un PCollection separato. Per ottenere questa raccolta, chiama getFailedStorageApiInserts sull'oggetto WriteResult. Per un esempio di questo approccio, consulta Trasmetti flussi di dati a BigQuery.

È buona prassi inviare gli errori a una coda o tabella di messaggi non recapitabili per l'elaborazione successiva. Per ulteriori informazioni su questo pattern, consulta BigQueryIOpattern di coda dei messaggi non recapitabili.

Per FILE_LOADS, se si verifica un errore durante il caricamento dei dati, il job di caricamento non va a buon fine e la pipeline genera un'eccezione di runtime. Puoi visualizzare l'errore nei log di Dataflow o esaminare la cronologia dei job BigQuery. Il connettore I/O non restituisce informazioni sulle singole righe non riuscite.

Per ulteriori informazioni sulla risoluzione dei problemi relativi agli errori, consulta Errori del connettore BigQuery.

Esempi

Gli esempi riportati di seguito mostrano come utilizzare Dataflow per scrivere in BigQuery. Questi esempi utilizzano il connettore BigQueryIO.

Scrivere in una tabella esistente

L'esempio seguente crea una pipeline batch che scrive un PCollection<MyData> in BigQuery, dove MyData è un tipo di dati personalizzato.

Il metodo BigQueryIO.write() restituisce un tipo BigQueryIO.Write<T>, che viene utilizzato per configurare l'operazione di scrittura. Per saperne di più, consulta la sezione Scrittura in una tabella nella documentazione di Apache Beam. Questo esempio di codice scrive in una tabella esistente (CREATE_NEVER) e aggiunge le nuove righe alla tabella (WRITE_APPEND).

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Scrivi in una tabella nuova o esistente

L'esempio seguente crea una nuova tabella se la tabella di destinazione non esiste, impostando create disposition su CREATE_IF_NEEDED. Quando utilizzi questa opzione, devi fornire uno schema della tabella. Il connettore utilizza questo schema se crea una nuova tabella.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Trasmetti flussi di dati a BigQuery

Il seguente esempio mostra come trasmettere dati in streaming utilizzando la semantica exactly-once impostando la modalità di scrittura su STORAGE_WRITE_API.

Non tutte le pipeline di streaming richiedono la semantica exactly-once. Ad esempio, potresti essere in grado di rimuovere manualmente i duplicati dalla tabella di destinazione. Se la possibilità di record duplicati è accettabile per il tuo scenario, valuta la possibilità di utilizzare la semantica at-least-once impostando il metodo di scrittura su STORAGE_API_AT_LEAST_ONCE. Questo metodo è generalmente più efficiente e comporta una latenza inferiore per la maggior parte delle pipeline.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Passaggi successivi