Ler do BigQuery para o Dataflow

Neste documento, descrevemos como ler dados do BigQuery para o Dataflow.

Visão geral

Para a maioria dos casos de uso, use a E/S gerenciada para ler do BigQuery. A E/S gerenciada oferece recursos como upgrades automáticos e uma API de configuração consistente. Ao ler do BigQuery, a E/S gerenciada realiza leituras diretas de tabela, que oferecem o melhor desempenho de leitura.

Se você precisar de um ajuste de desempenho mais avançado, use o conector BigQueryIO. O conector BigQueryIO oferece suporte a leituras de tabelas diretas e leitura de jobs de exportação do BigQuery. Ele também oferece um controle mais detalhado sobre a desserialização de registros de tabela. Para mais informações, consulte Usar o conector BigQueryIO neste documento.

Projeção e filtragem de colunas

Para reduzir o volume de dados que o pipeline lê do BigQuery, use estas técnicas:

  • A projeção de colunas especifica um subconjunto de colunas a serem lidas na tabela. Use a projeção de colunas quando a tabela tiver um grande número de colunas e você precisar ler apenas um subconjunto delas.
  • A filtragem de linha especifica um predicado a ser aplicado à tabela. A operação de leitura do BigQuery só retorna linhas que correspondem ao filtro, o que pode reduzir a quantidade total de dados ingeridos pelo pipeline.

O exemplo a seguir lê as colunas "user_name" e "age" de uma tabela e filtra as linhas que não correspondem ao predicado "age > 18". Este exemplo usa a E/S gerenciada.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Ler a partir do resultado de uma consulta

O exemplo a seguir usa E/S gerenciada para ler o resultado de uma consulta SQL. Ele executa uma consulta em um conjunto de dados público do BigQuery. Também é possível usar consultas SQL para ler uma visualização do BigQuery ou uma visualização materializada.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Usar o conector BigQueryIO

O conector BigQueryIO é compatível com os seguintes métodos de serialização:

O conector oferece duas opções para ler dados:

  • Export job. Por padrão, o conector BigQueryIO executa um job de exportação do BigQuery que grava os dados da tabela no Cloud Storage. Em seguida, o conector lê os dados do Cloud Storage.
  • Leituras diretas de tabelas. Essa opção é mais rápida do que os jobs de exportação porque usa a API BigQuery Storage Read e pula a etapa de exportação. Para usar leituras diretas de tabelas, chame withMethod(Method.DIRECT_READ) ao criar o pipeline.

Ao escolher qual opção usar, considere os seguintes pontos:

  • Em geral, recomendamos usar leituras diretas de tabelas. A API Storage Read é mais adequada para pipelines de dados do que jobs de exportação porque não precisa da etapa intermediária de exportação de dados.

  • Se você ativar leituras diretas, será cobrado pelo uso da API Storage Read. Consulte Preços de extração de dados na página de preços do BigQuery.

  • Não há custo extra para jobs de exportação. No entanto, os jobs de exportação têm limites. Para uma grande movimentação de dados, em que a pontualidade é uma prioridade e o custo é ajustável, recomendam-se leituras diretas.

  • A API Storage Read tem limites de cota. Use as métricasGoogle Cloud para monitorar o uso da cota.

  • Se você usa jobs de exportação, defina a opção de pipeline --tempLocation para especificar um bucket do Cloud Storage para os arquivos exportados.

  • Ao usar a API Storage Read, você poderá ver erros de expiração da alocação e de tempo limite da sessão nos registros, como:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    Esses erros podem ocorrer quando uma operação leva mais tempo do que o tempo limite, geralmente em pipelines executados por mais de seis horas. Para atenuar esse problema, mude para exportações de arquivos.

  • O grau de paralelismo depende do método de leitura:

    • Leituras diretas: o conector de E/S produz um número dinâmico de streams, com base no tamanho da solicitação de exportação. Ele lê esses streams diretamente do BigQuery em paralelo.

    • Jobs de exportação: o BigQuery determina quantos arquivos serão gravados no Cloud Storage. O número de arquivos depende da consulta e do volume de dados. O conector de E/S lê os arquivos exportados em paralelo.

A tabela a seguir mostra as métricas de desempenho para várias opções de leitura de E/S do BigQuery. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Leitura de armazenamento 120 MBps 88.000 elementos por segundo
Exportação de Avro 105 MBps 78.000 elementos por segundo
Exportação para o JSON 110 MBps 81.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Exemplos

Os exemplos de código a seguir usam o conector BigQueryIO com leituras diretas de tabelas. Para usar um job de exportação, omita a chamada para withMethod.

Ler registros formatados em Avro

Este exemplo mostra como usar o conector BigQueryIO para ler registros formatados em Avro.

Para ler dados do BigQuery em registros formatados em Avro, use o método read(SerializableFunction). Esse método usa uma função definida pelo aplicativo que analisa objetos SchemaAndRecord e retorna um tipo de dados personalizado. A saída do conector é um PCollection do seu tipo de dados personalizado.

O código a seguir lê um PCollection<MyData> de uma tabela do BigQuery, em que MyData é uma classe definida pelo aplicativo.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

O método read usa uma interface SerializableFunction<SchemaAndRecord, T>, que define uma função para converter de registros Avro em uma classe de dados personalizada. No exemplo de código anterior, o método MyData.apply implementa essa função de conversão. A função de exemplo analisa os campos name e age do registro Avro e retorna uma instância MyData.

Para especificar qual tabela do BigQuery ler, chame o método from, conforme mostrado no exemplo anterior. Para mais informações, consulte Nomes de tabelas na documentação do conector de E/S do BigQuery.

Objetos TableRow lidos

Este exemplo mostra como usar o conector BigQueryIO para ler objetos TableRow.

O método readTableRows lê os dados do BigQuery em um PCollection de objetos TableRow. Cada TableRow é um mapa de pares de chave-valor que contém uma única linha de dados de tabela. Especifique a tabela do BigQuery a ser lida chamando o método from.

O código a seguir lê um PCollection<TableRows> de uma tabela do BigQuery.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Este exemplo também mostra como acessar os valores do dicionário TableRow. Valores inteiros são codificados como strings para corresponder ao formato JSON exportado do BigQuery.

A seguir