Lettura da BigQuery a Dataflow

Questo documento descrive come leggere i dati da BigQuery in Dataflow utilizzando il connettore BigQuery I/O di Apache Beam.

Panoramica

Il connettore BigQuery I/O supporta due opzioni per la lettura BigQuery:

  • Letture dirette delle tabelle. Questa è l'opzione più veloce, perché utilizza API BigQuery Storage Read.
  • Job di esportazione. Con questa opzione, BigQuery esegue un job di esportazione che scrive i dati della tabella in Cloud Storage. Il connettore legge quindi i dati esportati da Cloud Storage. Questa opzione è meno efficiente perché richiede .

I job di esportazione sono l'opzione predefinita. Per specificare letture dirette, chiama withMethod(Method.DIRECT_READ).

Il connettore esegue la serializzazione dei dati della tabella in un PCollection. Ogni elemento in PCollection rappresenta una singola riga della tabella. Il connettore supporta i seguenti metodi di serializzazione:

Parallelismo

Il parallelismo in questo connettore dipende dal metodo di lettura:

  • Letture dirette: il connettore di I/O produce un numero dinamico di flussi, basato su le dimensioni della richiesta di esportazione. Legge questi flussi direttamente BigQuery in parallelo.

  • Job di esportazione: BigQuery determina il numero di file in cui scrivere di archiviazione ideale in Cloud Storage. Il numero di file dipende dalla query e volume di dati. Il connettore I/O legge i file esportati in parallelo.

Prestazioni

La tabella seguente mostra le metriche relative alle prestazioni per varie opzioni di lettura I/O di BigQuery. I carichi di lavoro sono stati eseguiti su un workere2-standard2 utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.

100 mln di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Lettura dello spazio di archiviazione 120 MB/s 88.000 elementi al secondo
Esportazione di Avro 105 Mbps 78.000 elementi al secondo
Esportazione JSON 110 MB/s 81.000 elementi al secondo

Queste metriche si basano su semplici pipeline batch. Sono studiati per confrontare il rendimento tra connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e sono una funzione del tipo di VM, in fase di elaborazione, le prestazioni di origini e sink esterni e il codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentativi delle caratteristiche prestazionali di altri SDK linguistici di grandi dimensioni. Per ulteriori informazioni, consulta Rendimento IO di Beam.

Best practice

  • In generale, consigliamo di utilizzare le letture di tabelle dirette (Method.DIRECT_READ). L'API Storage Read è più adatta alle pipeline di dati rispetto ai job di esportazione, in quanto non richiede il passaggio intermedio dell'esportazione dei dati.

  • Se utilizzi le letture dirette, ti viene addebitata l'API Storage Read all'utilizzo delle risorse. Consulta: Prezzi dell'estrazione dati nel pagina dei prezzi di BigQuery.

  • Non sono previsti costi aggiuntivi per i job di esportazione. Tuttavia, i job di esportazione hanno limiti previsti. Per il trasferimento di grandi quantità di dati, dove la tempestività è una priorità e il costo è regolabile, sono consigliate le letture dirette.

  • L'API Storage di lettura ha limiti di quota. Utilizza le funzionalità di metriche di Google Cloud di monitorare l'utilizzo della quota.

  • Quando utilizzi l'API Storage Read, potresti vedere la scadenza del lease e di timeout della sessione nei log, ad esempio:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Questi errori possono verificarsi quando un'operazione richiede più tempo del timeout, solitamente nelle pipeline che vengono eseguite per più di 6 ore. Per risolvere il problema, passa alle esportazioni di file.

  • Quando utilizzi l'SDK Java, ti consigliamo di creare una classe che rappresenti lo schema della tabella BigQuery. Quindi chiama useBeamSchema nella tua pipeline per effettuare una conversione automatica tra i tipi Apache Beam Row e TableRow di BigQuery. Per un esempio di classe dello schema, consulta ExampleModel.java.

Esempi

Gli esempi di codice in questa sezione utilizzano letture dirette delle tabelle.

Per utilizzare un job di esportazione, ometti la chiamata a withMethod o specifica Method.EXPORT. Imposta --tempLocation l'opzione pipeline di e specificare un bucket Cloud Storage per i file esportati.

Questi esempi di codice presuppongono che la tabella di origine abbia le seguenti colonne:

  • name (stringa)
  • age (numero intero)

Specificato come File di schema JSON:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Leggi i record in formato Avro

Per leggere i dati BigQuery in record in formato Avro, utilizza read(SerializableFunction). Questo metodo accetta una funzione definita dall'applicazione che analizza gli oggetti SchemaAndRecord e restituisce un tipo di dati personalizzato. L'output del connettore è un PCollection dei tuoi personalizzato.

Il seguente codice legge un PCollection<MyData> da un account BigQuery in cui MyData è una classe definita dall'applicazione.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Il metodo read utilizza un'interfaccia SerializableFunction<SchemaAndRecord, T>, che definisce una funzione per convertire da record Avro a una classe di dati personalizzata. Nella nell'esempio di codice precedente, il metodo MyData.apply implementa questa conversione personalizzata. La funzione di esempio analizza i campi name e age dall'elenco Avro il record e restituisce un'istanza MyData.

Per specificare la tabella BigQuery da leggere, chiama il metodo from, come mostrato nell'esempio precedente. Per ulteriori informazioni, consulta Nomi delle tabelle nella documentazione del connettore BigQuery I/O.

Leggi oggetti TableRow

Il metodo readTableRows legge i dati di BigQuery in un PCollection di oggetti TableRow. Ogni TableRow è una mappa di coppie chiave-valore che contiene una singola riga di dati della tabella. Specifica la tabella BigQuery da leggere chiamando il metodo from.

Il seguente codice legge un PCollection<TableRows> da un Tabella BigQuery.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Questo esempio mostra anche come accedere ai valori dal dizionario TableRow. I valori interi sono codificati come stringhe per corrispondere a BigQuery nel formato JSON esportato.

Proiezione e filtri delle colonne

Quando utilizzi le letture dirette (Method.DIRECT_READ), puoi rendere più efficienti le operazioni di lettura riducendo la quantità di dati letti da BigQuery e inviati tramite la rete.

  • Proiezione colonne: chiama withSelectedFields per leggere un sottoinsieme di colonne da nella tabella. Ciò consente letture efficienti quando le tabelle contengono molte colonne.
  • Filtro riga: chiama withRowRestriction per specificare un predicato che applica il filtro sul lato server.

I predicati di filtro devono essere deterministici e l'aggregazione non è supportata.

L'esempio seguente mostra le colonne "user_name" e "age" e filtra le righe che non corrispondono al predicato "age > 18".

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Leggi dal risultato di una query

Gli esempi precedenti mostrano come leggere le righe di una tabella. Puoi anche leggere dal risultato di una query SQL, chiamando fromQuery. Questo approccio cambia parte del lavoro di calcolo in BigQuery. Puoi anche utilizzare questo metodo per leggere da una vista BigQuery o da una vista materializzata, eseguire una query sulla vista.

L'esempio seguente esegue una query su un server BigQuery e legge i risultati. Dopo l'esecuzione della pipeline, puoi vedere il job di query nella cronologia dei job di BigQuery.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Passaggi successivi