Scrivere da Dataflow a BigQuery

Questo documento descrive come scrivere dati da Dataflow in BigQuery utilizzando il connettore BigQuery I/O di Apache Beam.

Il connettore BigQuery I/O è disponibile nell'SDK Apache Beam. Ti consigliamo di utilizzare la versione più recente dell'SDK. Per ulteriori informazioni, consulta SDK Apache Beam 2.x.

Supporto di più lingue per Python.

Panoramica

Il connettore BigQuery I/O supporta i seguenti metodi per scrivere in BigQuery:

  • STORAGE_WRITE_API. In questa modalità, il connettore esegue scritture dirette nello spazio di archiviazione BigQuery, utilizzando API BigQuery Storage Writer. La L'API StorageWrite combina l'importazione di flussi di dati e il caricamento in batch in un'unica API ad alte prestazioni. Questa modalità garantisce una la semantica.
  • STORAGE_API_AT_LEAST_ONCE. Questa modalità utilizza anche l'API Storage Write, ma fornisce la semantica almeno una volta. Questa modalità comporta una latenza inferiore per la maggior parte delle pipeline. Tuttavia, i duplicati sono possibili.
  • FILE_LOADS. In questa modalità, il connettore scrive i dati di input nei file di staging in Cloud Storage. Poi esegue BigQuery load job per caricare i dati in in BigQuery. La modalità è quella predefinita per PCollections limitato, che si trovano più comunemente nelle pipeline in modalità batch.
  • STREAMING_INSERTS. In questa modalità, il connettore utilizza API di streaming legacy. Questa modalità è l'impostazione predefinita per PCollections illimitato, ma non è consigliata per le nuove in modo programmatico a gestire i progetti.

Quando scegli un metodo di scrittura, considera i seguenti punti:

  • Per i job di streaming, ti consigliamo di utilizzare STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE, perché queste modalità scrivono direttamente nello stoccaggio BigQuery, senza utilizzare file di staging intermedi.
  • Se esegui la pipeline utilizzando la modalità flusso di dati Almeno una volta, imposta la modalità di scrittura su STORAGE_API_AT_LEAST_ONCE. Questa impostazione è più efficiente corrisponde alla semantica della modalità flusso di dati "Almeno una volta".
  • I caricamenti dei file e l'API StorageWrite hanno quote e limiti.
  • I job di caricamento utilizzano il pool di slot BigQuery condiviso o riservato slot machine. Per utilizzare gli slot prenotati, esegui il job di caricamento in un progetto con una prenotazione dell'assegnazione di tipo PIPELINE. I job di caricamento sono gratuiti se utilizzi il pool di slot BigQuery condiviso. Tuttavia, BigQuery non garantisce la capacità disponibile del pool condiviso. Per ulteriori informazioni, consulta la pagina Introduzione alle prenotazioni.

Parallelismo

  • Per FILE_LOADS e STORAGE_WRITE_API nelle pipeline di streaming, il connettore suddivide i dati in una serie di file o stream. In generale, consigliamo chiamata withAutoSharding per abilitare il partizionamento automatico.

  • Per FILE_LOADS nelle pipeline in modalità batch, il connettore scrive i dati in partizionate che vengono poi caricati in BigQuery in parallelo.

  • Per STORAGE_WRITE_API nelle pipeline batch, ogni worker crea uno o più stream da scrivere in BigQuery, in base al numero totale di shard.

  • Per STORAGE_API_AT_LEAST_ONCE, esiste un singolo flusso di scrittura predefinito. Più di uno worker aggiungono a questo flusso.

Prestazioni

La tabella seguente mostra le metriche sulle prestazioni di vari Opzioni di lettura di BigQuery I/O. I carichi di lavoro sono stati eseguiti su un workere2-standard2 utilizzando l'SDK Apache Beam 2.49.0 per Java. L'hanno fatto non utilizzare Runner v2.

100 milioni di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Archiviazione in scrittura 55 Mbps 54.000 elementi al secondo
Carico Avro 78 MBps 77.000 elementi al secondo
Caricamento JSON 54 Mbps 53.000 elementi al secondo

Queste metriche si basano su pipeline batch semplici. Sono progettati per confrontare il rendimento tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e sono una funzione del tipo di VM, in fase di elaborazione, le prestazioni di origini e sink esterni e il codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentativi delle caratteristiche prestazionali di altri SDK linguistici di grandi dimensioni. Per ulteriori informazioni, vedi Beam IO Prestazioni.

Best practice

Questa sezione descrive le best practice per la scrittura in BigQuery da Dataflow.

Considerazioni generali

  • L'API StorageWrite ha limiti di quota. Il connettore gestisce questi limiti per la maggior parte delle pipeline. Tuttavia, in alcuni scenari è possibile esaurire gli stream dell'API Storage Write disponibili. Ad esempio, questo problema potrebbe verificarsi in una pipeline che utilizza il sharding automatico e la scalabilità automatica con un e destinazioni, soprattutto nei job a lunga esecuzione con carichi di lavoro molto variabili. Se si verifica questo problema, ti consigliamo di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE, che evita il problema.

  • Utilizza le metriche di Google Cloud per monitorare l'utilizzo della quota dell'API Storage Write.

  • Quando si utilizza il caricamento di file, Avro solitamente ha prestazioni superiori a quelle di JSON. Per utilizzare Avro, chiama withAvroFormatFunction.

  • Per impostazione predefinita, i job di caricamento vengono eseguiti nello stesso progetto del job Dataflow. Per specificare un progetto diverso, richiama withLoadJobProjectId

  • Quando utilizzi l'SDK Java, ti consigliamo di creare una classe che rappresenti lo schema della tabella BigQuery. Quindi chiama useBeamSchema nella tua pipeline per effettuare una conversione automatica tra i tipi Apache Beam Row e TableRow di BigQuery. Per un esempio di classe schema, vedi ExampleModel.java.

  • Se carichi tabelle con schemi complessi contenenti migliaia di campi, valuta la possibilità di chiamare withMaxBytesPerPartition per impostare una dimensione massima più piccola per ogni job di caricamento.

Pipeline in modalità flusso

I consigli riportati di seguito si applicano alle pipeline di streaming.

  • Per le pipeline in modalità flusso, consigliamo di utilizzare l'API StorageWrite (STORAGE_WRITE_API o STORAGE_API_AT_LEAST_ONCE).

  • Una pipeline di streaming può utilizzare i caricamenti di file, ma questo approccio presenta degli svantaggi:

    • Richiede il windowing per scrivere i file. Non puoi utilizzare la finestra globale.
    • BigQuery carica i file secondo il criterio del "best effort" quando utilizzi pool di slot condiviso. Può esserci un ritardo significativo tra la scrittura di un record e la sua disponibilità in BigQuery.
    • Se un job di caricamento non riesce, ad esempio a causa di dati non validi o di uno schema mancata corrispondenza: l'intera pipeline ha esito negativo.
  • Se possibile, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Ciò può comportare la scrittura di record duplicati in BigQuery, ma è meno costoso e più scalabile di STORAGE_WRITE_API.

  • In generale, evita di utilizzare STREAMING_INSERTS. Gli inserimenti di flussi di dati è costoso dell'API StorageWrite e non è altrettanto costoso.

  • Lo sharding dei dati può migliorare le prestazioni nelle pipeline in streaming. Per la maggior parte di grandi dimensioni, il partizionamento automatico è un buon punto di partenza. Tuttavia, puoi ottimizzare sharding come segue:

  • Se usi l'inserimento di flussi di dati, consigliamo di impostare retryTransientErrors come il riprova criterio.

Pipeline batch

I suggerimenti seguenti si applicano alle pipeline in modalità batch.

  • Per la maggior parte delle pipeline batch di grandi dimensioni, consigliamo di provare prima FILE_LOADS. Una pipeline batch può utilizzare STORAGE_WRITE_API, ma è probabile che superi i limiti di quota su larga scala (più di 1000 vCPU) o se sono in esecuzione pipeline concorrenti. Apache Beam non limita il numero massimo di flussi di scrittura per i dati batch STORAGE_WRITE_API di job, quindi alla fine il job raggiunge l'API BigQuery Storage limiti.

  • Quando utilizzi FILE_LOADS, potresti esaurire le risorse condivise Pool di slot BigQuery o pool di slot prenotati. Se riscontri questo tipo di errore, prova i seguenti approcci:

    • Riduci il numero massimo di worker o le dimensioni dei worker per il job.
    • Acquista altri slot riservati.
    • Valuta la possibilità di utilizzare STORAGE_WRITE_API.
  • Le pipeline piccole e medie (<1000 vCPU) potrebbero trarre vantaggio dall'utilizzo STORAGE_WRITE_API. Per questi job più piccoli, ti consigliamo di utilizzare STORAGE_WRITE_API se vuoi una coda delle email non recapitate o quando il pool di slot condivisi FILE_LOADS non è sufficiente.

  • Se puoi tollerare dati duplicati, valuta la possibilità di utilizzare STORAGE_WRITE_API_AT_LEAST_ONCE. Questa modalità può generare duplicati record scritti in BigQuery, ma potrebbe essere meno costoso rispetto all'opzione STORAGE_WRITE_API.

  • Modalità di scrittura diverse potrebbero funzionare in modo diverso in base alle caratteristiche della tua pipeline. Prova a trovare la modalità di scrittura migliore per il tuo carico di lavoro.

Gestire gli errori a livello di riga

Questa sezione descrive come gestire gli errori che potrebbero verificarsi a livello di riga, ad esempio a causa di dati di input con formattazione errata o di mancata corrispondenza dello schema.

Per l'API StorageWrite, tutte le righe che non possono essere scritte vengono inserite in un oggetto PCollection separato. Per ottenere questa raccolta, chiama getFailedStorageApiInserts sull'oggetto WriteResult. Per un esempio di questo approccio, consulta Eseguire lo streaming di dati in BigQuery.

È buona prassi inviare gli errori a una coda o a una tabella di messaggi non recapitabili per elaborarli in un secondo momento. Per maggiori informazioni informazioni su questo pattern, vedi Pattern messaggi non recapitabili BigQueryIO.

Per FILE_LOADS, se si verifica un errore durante il caricamento dei dati, il job di caricamento non va a buon fine e la pipeline genera un'eccezione di runtime. Puoi visualizzare l'errore nel i log di Dataflow o la cronologia dei job BigQuery. Il connettore I/O non restituisce informazioni sulle singole righe con errori.

Per ulteriori informazioni sulla risoluzione degli errori, consulta Errori del connettore BigQuery.

Esempi

I seguenti esempi mostrano come utilizzare Dataflow per scrivere in BigQuery.

Scrivere in una tabella esistente

L'esempio seguente crea una pipeline batch che scrive un valore PCollection<MyData> in BigQuery, dove MyData è un tipo di dato personalizzato.

Il metodo BigQueryIO.write() restituisce un tipo BigQueryIO.Write<T>, che viene utilizzato per configurare l'operazione di scrittura. Per ulteriori informazioni, consulta la sezione Scrittura in una tabella della documentazione di Apache Beam. Questo esempio di codice scrive in una tabella esistente (CREATE_NEVER) e aggiunge le nuove righe alla tabella (WRITE_APPEND).

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Scrivere in una tabella nuova o esistente

L'esempio seguente crea una nuova tabella se la tabella di destinazione non esistenti, impostando crea disposizione a CREATE_IF_NEEDED. Quando utilizzi questa opzione, devi fornire una tabella . Il connettore utilizza questo schema se crea una nuova tabella.

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Trasmissione di flussi di dati a BigQuery

L'esempio seguente mostra come eseguire lo streaming dei dati utilizzando la semantica esattamente una volta impostando la modalità di scrittura su STORAGE_WRITE_API

Non tutte le pipeline di streaming richiedono la semantica esattamente una volta. Ad esempio, potrebbe riuscire a rimuovere manualmente i duplicati dalla tabella di destinazione. Se esiste la possibilità che vengano creati record duplicati accettabile per il tuo scenario, valuta l'utilizzo della semantica "at-least-once" impostando il metodo write in STORAGE_API_AT_LEAST_ONCE. Questo metodo è generalmente più efficiente e comporta una latenza inferiore per la maggior parte delle pipeline.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Passaggi successivi