Scrivere da Dataflow a BigQuery

Questo documento descrive come scrivere dati da Dataflow a BigQuery utilizzando il connettore BigQuery I/O di Apache Beam.

Il connettore BigQuery I/O è disponibile nell'SDK Apache Beam. Ti consigliamo di utilizzare l'ultima versione dell'SDK. Per maggiori informazioni, consulta SDK Apache Beam 2.x.

È disponibile anche il supporto di più linguaggi per Python.

Panoramica

Il connettore I/O di BigQuery supporta i seguenti metodi per la scrittura in BigQuery:

  • STORAGE_WRITE_API. In questa modalità, il connettore esegue scritture dirette nello spazio di archiviazione di BigQuery utilizzando l'API BigQuery Storage Write. L'API Storage Write combina l'importazione di flussi di dati e il caricamento batch in un'unica API ad alte prestazioni. Questa modalità garantisce la semantica "esattamente una volta".
  • STORAGE_API_AT_LEAST_ONCE. Anche questa modalità utilizza l'API Storage Write, ma fornisce una semantica "at almeno una volta". Questa modalità riduce la latenza per la maggior parte delle pipeline. Tuttavia, sono possibili scritture duplicate.
  • FILE_LOADS. In questa modalità, il connettore scrive i dati di input nei file di gestione temporanea in Cloud Storage. Quindi esegue un job di caricamento BigQuery per caricare i dati in BigQuery. La modalità è quella predefinita per PCollections vincolato, che si trova più comunemente nelle pipeline in modalità batch.
  • STREAMING_INSERTS. In questa modalità, il connettore utilizza l'API di inserimento di flussi legacy. Questa modalità è l'impostazione predefinita per il campo PCollections illimitato, ma non è consigliata per i nuovi progetti.

Quando scegli un metodo di scrittura, considera i seguenti punti:

  • Per i job di inserimento di flussi, valuta la possibilità di utilizzare STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE, perché queste modalità scrivono direttamente nell'archiviazione BigQuery, senza utilizzare file di gestione temporanea intermedi.
  • Se esegui la pipeline utilizzando la modalità di flusso at-least-once, imposta la modalità di scrittura su STORAGE_API_AT_LEAST_ONCE. Questa impostazione è più efficiente e corrisponde alla semantica della modalità di streaming "atleast-once".
  • I caricamenti di file e l'API Storage Write hanno quote e limiti diversi.
  • I job di caricamento utilizzano il pool di slot BigQuery condiviso o gli slot riservati. Per utilizzare gli slot riservati, esegui il job di caricamento in un progetto con un'assegnazione di prenotazione di tipo PIPELINE. I job di caricamento sono gratuiti se utilizzi il pool di slot BigQuery condiviso. Tuttavia, BigQuery non offre garanzie sulla capacità disponibile del pool condiviso. Per scoprire di più, consulta la pagina Introduzione alle prenotazioni.

Parallelismo

  • Per FILE_LOADS e STORAGE_WRITE_API nelle pipeline in modalità flusso, il connettore esegue il clustering dei dati su una serie di file o flussi. In generale, ti consigliamo di chiamare withAutoSharding per attivare la suddivisione automatica.

  • Per FILE_LOADS in pipeline in modalità batch, il connettore scrive i dati in file partizionati, che vengono poi caricati in BigQuery in parallelo.

  • Per STORAGE_WRITE_API nelle pipeline in modalità batch, ogni worker crea uno o più flussi per scrivere in BigQuery, determinato dal numero totale di shard.

  • Per STORAGE_API_AT_LEAST_ONCE, è presente un singolo stream di scrittura predefinito. Più worker vengono aggiunti a questo flusso.

Prestazioni

La seguente tabella mostra le metriche delle prestazioni per varie opzioni di lettura I/O di BigQuery. I carichi di lavoro sono stati eseguiti su un worker e2-standard2 utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.

100 M di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Scrittura spazio di archiviazione 55 Mbps 54.000 elementi al secondo
Carico Avro 78 Mbps 77.000 elementi al secondo
Json Load 54 Mbps 53.000 elementi al secondo

Queste metriche si basano su semplici pipeline in modalità batch. Hanno lo scopo di confrontare le prestazioni dei connettori I/O e non sono necessariamente rappresentativi di pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e dipendono dal tipo di VM, dai dati elaborati, dalle prestazioni dei sink e delle origini esterne e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche delle prestazioni degli SDK di altri linguaggi. Per maggiori informazioni, consulta Prestazioni degli IO di Beam.

best practice

Questa sezione descrive le best practice per la scrittura in BigQuery da Dataflow.

Considerazioni generali

  • L'API Storage Write ha limiti di quota. Il connettore gestisce questi limiti per la maggior parte delle pipeline. Tuttavia, alcuni scenari possono esaurire i flussi dell'API Storage Write disponibili. Ad esempio, questo problema potrebbe verificarsi in una pipeline che utilizza la suddivisione automatica e la scalabilità automatica con un numero elevato di destinazioni, soprattutto in job a lunga esecuzione con carichi di lavoro altamente variabili. Se si verifica questo problema, puoi utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE, per evitarlo.

  • Utilizza le metriche di Google Cloud per monitorare l'utilizzo della quota dell'API Storage Write.

  • Durante il caricamento di file, Avro in genere ottiene prestazioni migliori di JSON. Per utilizzare Avro, chiama withAvroFormatFunction.

  • Per impostazione predefinita, i job di caricamento vengono eseguiti nello stesso progetto del job Dataflow. Per specificare un progetto diverso, chiama withLoadJobProjectId.

  • Quando utilizzi l'SDK Java, valuta la possibilità di creare una classe che rappresenti lo schema della tabella BigQuery. Quindi chiama useBeamSchema nella tua pipeline per convertire automaticamente i tipi Row di Apache Beam e quelli di TableRow BigQuery. Per un esempio di classe schema, consulta ExampleModel.java.

  • Se carichi tabelle con schemi complessi contenenti migliaia di campi, valuta la possibilità di chiamare withMaxBytesPerPartition per impostare una dimensione massima inferiore per ogni job di caricamento.

Pipeline in modalità flusso

I suggerimenti seguenti si applicano alle pipeline in modalità flusso.

  • Per le pipeline in modalità flusso, consigliamo di utilizzare l'API Storage Write (STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE).

  • Una pipeline in modalità flusso può utilizzare il caricamento di file, ma questo approccio presenta degli svantaggi:

    • Richiede il windowing per scrivere i file. Non puoi utilizzare la finestra globale.
    • BigQuery carica i file secondo il criterio del "best effort" quando viene utilizzato il pool di slot condiviso. Può verificarsi un ritardo significativo tra la scrittura di un record e il momento in cui diventa disponibile in BigQuery.
    • Se un job di caricamento non riesce, ad esempio a causa di dati errati o di una mancata corrispondenza dello schema, l'intera pipeline ha esito negativo.
  • Se possibile, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Può comportare la scrittura di record duplicati in BigQuery, ma è meno costoso e più scalabile rispetto a STORAGE_WRITE_API.

  • In generale, evita di utilizzare STREAMING_INSERTS. Gli inserti di flussi di dati sono più costosi dell'API Storage Write e hanno scarse prestazioni.

  • Lo sharding dei dati può migliorare le prestazioni nelle pipeline in modalità flusso. Per la maggior parte delle pipeline, la suddivisione automatica è un buon punto di partenza. Tuttavia, puoi ottimizzare la suddivisione in segmenti nel seguente modo:

  • Se utilizzi l'inserimento di flussi di dati, ti consigliamo di impostare retryTransientErrors come criterio di nuovo tentativo.

Pipeline batch

I seguenti suggerimenti si applicano alle pipeline in modalità batch.

  • Per la maggior parte delle pipeline batch di grandi dimensioni, ti consigliamo di provare prima FILE_LOADS. Una pipeline batch può utilizzare STORAGE_WRITE_API, ma è probabile che superi i limiti di quota su larga scala (oltre 1000 vCPU) o se sono in esecuzione pipeline in parallelo. Apache Beam non limita il numero massimo di flussi di scrittura per i job batch STORAGE_WRITE_API, per cui il job alla fine raggiunge i limiti dell'API BigQuery Storage.

  • Quando utilizzi FILE_LOADS, potresti esaurire il pool di slot BigQuery condiviso o il pool di slot prenotati. Se riscontri questo tipo di errore, prova i seguenti approcci:

    • Riduci il numero massimo di worker o le dimensioni dei worker per il job.
    • Acquistare altri slot prenotati.
    • Valuta la possibilità di utilizzare STORAGE_WRITE_API.
  • Le pipeline piccole e medie (< 1000 vCPU) potrebbero trarre vantaggio dall'utilizzo di STORAGE_WRITE_API. Per questi job più piccoli, valuta la possibilità di utilizzare STORAGE_WRITE_API se vuoi una coda di messaggi non recapitabili o quando il pool di slot condivisi FILE_LOADS non è sufficiente.

  • Se puoi tollerare dati duplicati, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Questa modalità può comportare la scrittura di record duplicati in BigQuery, ma potrebbe essere meno costosa dell'opzione STORAGE_WRITE_API.

  • Modalità di scrittura diverse potrebbero funzionare in modo diverso in base alle caratteristiche della pipeline. Fai delle prove per trovare la modalità di scrittura migliore per il tuo carico di lavoro.

Gestire gli errori a livello di riga

Questa sezione descrive come gestire gli errori che potrebbero verificarsi a livello di riga, ad esempio a causa di dati di input formattati in modo errato o mancate corrispondenze dello schema.

Per l'API Storage Write, tutte le righe che non possono essere scritte vengono inserite in un PCollection separato. Per ottenere questa raccolta, chiama getFailedStorageApiInserts sull'oggetto WriteResult. Per un esempio di questo approccio, consulta Trasmettere dati in BigQuery.

È buona norma inviare gli errori a una tabella o a una coda di messaggi non recapitabili per elaborarli in un secondo momento. Per ulteriori informazioni su questo pattern, consulta BigQueryIO pattern messaggi non recapitabili.

Per FILE_LOADS, se si verifica un errore durante il caricamento dei dati, il job di caricamento non riesce e la pipeline genera un'eccezione di runtime. Puoi visualizzare l'errore nei log di Dataflow o nella cronologia dei job BigQuery. Il connettore I/O non restituisce informazioni sulle singole righe con errori.

Per saperne di più sulla risoluzione degli errori, consulta Errori del connettore BigQuery.

Esempi

I seguenti esempi mostrano come utilizzare Dataflow per scrivere in BigQuery.

Scrivere in una tabella esistente

L'esempio seguente crea una pipeline batch che scrive una PCollection<MyData> in BigQuery, dove MyData è un tipo di dati personalizzato.

Il metodo BigQueryIO.write() restituisce un tipo BigQueryIO.Write<T>, che viene utilizzato per configurare l'operazione di scrittura. Per ulteriori informazioni, consulta Scrittura in una tabella nella documentazione di Apache Beam. Questo esempio di codice scrive in una tabella esistente (CREATE_NEVER) e aggiunge le nuove righe alla tabella (WRITE_APPEND).

Java

Per eseguire l'autenticazione in Dataflow, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Scrivere in una tabella nuova o esistente

Nell'esempio seguente viene creata una nuova tabella se la tabella di destinazione non esiste, impostando Crea disposizione su CREATE_IF_NEEDED. Quando utilizzi questa opzione, devi fornire uno schema di tabella. Il connettore utilizza questo schema se crea una nuova tabella.

Java

Per eseguire l'autenticazione in Dataflow, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Trasmetti il flusso di dati in BigQuery

L'esempio seguente mostra come trasmettere dati in flussi utilizzando la semantica "exactly-once", impostando la modalità di scrittura su STORAGE_WRITE_API

Non tutte le pipeline in modalità flusso richiedono una semantica "exactly-once". Ad esempio, potresti essere in grado di rimuovere manualmente i duplicati dalla tabella di destinazione. Se la possibilità di record duplicati è accettabile per il tuo scenario, valuta la possibilità di utilizzare la semantica "at-least-once" impostando il metodo di scrittura su STORAGE_API_AT_LEAST_ONCE. Questo metodo è generalmente più efficiente e comporta una latenza inferiore per la maggior parte delle pipeline.

Java

Per eseguire l'autenticazione in Dataflow, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Passaggi successivi