Leia do BigQuery para o Dataflow

Este documento descreve como ler dados do BigQuery para o Dataflow.

Vista geral

Para a maioria dos exemplos de utilização, considere usar a opção Managed I/O para ler a partir do BigQuery. A E/S gerida oferece funcionalidades como atualizações automáticas e uma API de configuração consistente. Quando lê a partir do BigQuery, a E/S gerida executa leituras diretas de tabelas, o que oferece o melhor desempenho de leitura.

Se precisar de uma otimização do desempenho mais avançada, considere usar o conector BigQueryIO. O conector BigQueryIO suporta leituras diretas de tabelas e leituras de tarefas de exportação do BigQuery. Também oferece um controlo mais detalhado sobre a desserialização de registos de tabelas. Para mais informações, consulte a secção Use o conector BigQueryIO neste documento.

Projeção e filtragem de colunas

Para reduzir o volume de dados que o pipeline lê do BigQuery, pode usar as seguintes técnicas:

  • A projeção de colunas especifica um subconjunto de colunas a ler da tabela. Use a projeção de colunas quando a sua tabela tiver um grande número de colunas e só precisar de ler um subconjunto das mesmas.
  • A filtragem de linhas especifica um predicado a aplicar à tabela. A operação de leitura do BigQuery só devolve linhas que correspondem ao filtro, o que pode reduzir a quantidade total de dados carregados pelo pipeline.

O exemplo seguinte lê as colunas "user_name" e "age" de uma tabela e filtra as linhas que não correspondem ao predicado "age > 18". Este exemplo usa a E/S gerida.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Ler a partir de um resultado da consulta

O exemplo seguinte usa a E/S gerida para ler o resultado de uma consulta SQL. Executa uma consulta num conjunto de dados públicos do BigQuery. Também pode usar consultas SQL para ler a partir de uma vista do BigQuery ou de uma vista materializada.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Use o conetor BigQueryIO

O conector BigQueryIO suporta os seguintes métodos de serialização:

O conector suporta duas opções para ler dados:

  • Tarefa de exportação. Por predefinição, o conetor BigQueryIO executa uma tarefa de exportação do BigQuery que escreve os dados da tabela no Cloud Storage. Em seguida, o conetor lê os dados do Cloud Storage.
  • Leituras diretas de tabelas. Esta opção é mais rápida do que as tarefas de exportação, porque usa a API BigQuery Storage Read e ignora a etapa de exportação. Para usar leituras diretas de tabelas, chame withMethod(Method.DIRECT_READ) quando criar o pipeline.

Quando escolher a opção a usar, considere os seguintes pontos:

  • Em geral, recomendamos a utilização de leituras diretas de tabelas. A API Storage Read é mais adequada para pipelines de dados do que as tarefas de exportação, porque não precisa do passo intermédio de exportação de dados.

  • Se usar leituras diretas, é-lhe cobrada a utilização da API Storage Read. Consulte os preços da extração de dados na página de preços do BigQuery.

  • Não existem custos adicionais para tarefas de exportação. No entanto, as tarefas de exportação têm limites. Para a movimentação de grandes quantidades de dados, em que a atualidade é uma prioridade e o custo é ajustável, recomenda-se a leitura direta.

  • A API Storage Read tem limites de quota. Use as métricas da Google Cloud Platform para monitorizar a sua utilização de quotas.

  • Se usar tarefas de exportação, defina a --tempLocation opção de pipeline para especificar um contentor do Cloud Storage para os ficheiros exportados.

  • Quando usa a API Storage Read, pode ver erros de expiração de concessão e de tempo limite da sessão nos registos, como:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    Estes erros podem ocorrer quando uma operação demora mais tempo do que o limite de tempo, normalmente em pipelines que são executados durante mais de 6 horas. Para mitigar este problema, mude para exportações de ficheiros.

  • O grau de paralelismo depende do método de leitura:

    • Leituras diretas: o conector de E/S produz um número dinâmico de streams com base no tamanho do pedido de exportação. Lê estas streams diretamente do BigQuery em paralelo.

    • Tarefas de exportação: o BigQuery determina quantos ficheiros escrever no Cloud Storage. O número de ficheiros depende da consulta e do volume de dados. O conetor de E/S lê os ficheiros exportados em paralelo.

A tabela seguinte mostra as métricas de desempenho para várias opções de leitura do BigQuery I/O. As cargas de trabalho foram executadas num e2-standard2 worker, usando o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.

100 M de registos | 1 kB | 1 coluna Débito (bytes) Tráfego transmitido (elementos)
Leitura de armazenamento 120 MBps 88 000 elementos por segundo
Exportação Avro 105 MBps 78 000 elementos por segundo
Exportação JSON 110 MBps 81 000 elementos por segundo

Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.

Exemplos

Os exemplos de código seguintes usam o conector BigQueryIO com leituras diretas de tabelas. Para usar uma tarefa de exportação, omita a chamada para withMethod.

Leia registos formatados em Avro

Este exemplo mostra como usar o conetor BigQueryIO para ler registos formatados em Avro.

Para ler dados do BigQuery em registos formatados em Avro, use o método read(SerializableFunction). Este método usa uma função definida pela aplicação que analisa objetos SchemaAndRecord e devolve um tipo de dados personalizado. O resultado do conetor é um PCollection do seu tipo de dados personalizado.

O código seguinte lê um PCollection<MyData> de uma tabela do BigQuery, em que MyData é uma classe definida pela aplicação.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

O método read usa uma interface SerializableFunction<SchemaAndRecord, T> que define uma função para converter registos Avro numa classe de dados personalizada. No exemplo de código anterior, o método MyData.apply implementa esta função de conversão. A função de exemplo analisa os campos name e age do registo Avro e devolve uma instância MyData.

Para especificar que tabela do BigQuery ler, chame o método from, conforme mostrado no exemplo anterior. Para mais informações, consulte a secção Nomes das tabelas na documentação do conetor BigQuery I/O.

Ler TableRow objetos

Este exemplo mostra como usar o conector BigQueryIO para ler objetos TableRow.

O método readTableRows lê os dados do BigQuery num PCollection de objetos TableRow. Cada TableRow é um mapa de pares de chave-valor que contém uma única linha de dados da tabela. Especifique a tabela do BigQuery a ler chamando o método from.

O código seguinte lê um PCollection<TableRows> de uma tabela do BigQuery.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Este exemplo também mostra como aceder aos valores do dicionário TableRow. Os valores inteiros são codificados como strings para corresponderem ao formato JSON exportado do BigQuery.

O que se segue?