Ler do BigQuery para o Dataflow

Neste documento, descrevemos como ler dados do BigQuery para o Dataflow usando o conector de E/S do BigQuery do Apache Beam.

Informações gerais

O conector de E/S do BigQuery é compatível com duas opções de leitura do BigQuery:

  • Leituras diretas da tabela. Essa opção é a mais rápida, porque usa a API BigQuery Storage Read.
  • Job de exportação. Com essa opção, o BigQuery executa um job de exportação que grava os dados da tabela no Cloud Storage. Em seguida, o conector lê os dados exportados do Cloud Storage. Essa opção é menos eficiente, porque requer a etapa de exportação.

Jobs de exportação são a opção padrão. Para especificar leituras diretas, chame withMethod(Method.DIRECT_READ).

O conector serializa os dados da tabela em um PCollection. Cada elemento na PCollection representa uma única linha da tabela. O conector é compatível com os seguintes métodos de serialização:

Paralelismo

O paralelismo nesse conector depende do método de leitura:

  • Leituras diretas: o conector de E/S produz um número dinâmico de streams, com base no tamanho da solicitação de exportação. Ele lê esses streams diretamente do BigQuery em paralelo.

  • Jobs de exportação: o BigQuery determina quantos arquivos serão gravados no Cloud Storage. O número de arquivos depende da consulta e do volume de dados. O conector de E/S lê os arquivos exportados em paralelo.

Desempenho

A tabela a seguir mostra as métricas de desempenho para várias opções de leitura de E/S do BigQuery. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Leitura de armazenamento 120 MBps 88.000 elementos por segundo
Exportação de Avro 105 MBps 78.000 elementos por segundo
Exportação para o JSON 110 MBps 81.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

  • Em geral, recomendamos o uso de leituras diretas de tabelas (Method.DIRECT_READ). A API Storage Read é mais adequada para pipelines de dados do que jobs de exportação porque não precisa da etapa intermediária de exportação de dados.

  • Se você ativar leituras diretas, será cobrado pelo uso da API Storage Read. Consulte Preços de extração de dados na página de preços do BigQuery.

  • Não há custo extra para jobs de exportação. No entanto, os jobs de exportação têm limites. Para uma grande movimentação de dados, em que a pontualidade é uma prioridade e o custo é ajustável, recomendam-se leituras diretas.

  • A API Storage Read tem limites de cota. Use as métricas do Google Cloud para monitorar o uso da sua cota.

  • Ao usar a API Storage Read, você poderá ver erros de expiração da alocação e de tempo limite da sessão nos registros, como:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    Esses erros podem ocorrer quando uma operação leva mais tempo do que o tempo limite, geralmente em pipelines executados por mais de seis horas. Para atenuar esse problema, mude para exportações de arquivos.

  • Ao usar o SDK do Java, crie uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame useBeamSchema no pipeline para converter automaticamente entre os tipos Row do Apache Beam e TableRow do BigQuery. Para conferir um exemplo de classe de esquema, confira ExampleModel.java.

Examples

Os exemplos de código nesta seção usam leituras diretas das tabelas.

Para usar um job de exportação, omita a chamada para withMethod ou especifique Method.EXPORT. Em seguida, defina a opção de pipeline --tempLocation para especificar um bucket do Cloud Storage para os arquivos exportados.

Esses exemplos de código presumem que a tabela de origem tenha as seguintes colunas:

  • name (string)
  • age (inteiro)

Especificado como um arquivo de esquema JSON:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Ler registros formatados em Avro

Para ler dados do BigQuery em registros formatados em Avro, use o método read(SerializableFunction). Esse método usa uma função definida pelo aplicativo que analisa objetos SchemaAndRecord e retorna um tipo de dados personalizado. A saída do conector é um PCollection do seu tipo de dados personalizado.

O código a seguir lê um PCollection<MyData> de uma tabela do BigQuery, em que MyData é uma classe definida pelo aplicativo.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

O método read usa uma interface SerializableFunction<SchemaAndRecord, T>, que define uma função para converter de registros Avro em uma classe de dados personalizada. No exemplo de código anterior, o método MyData.apply implementa essa função de conversão. A função de exemplo analisa os campos name e age do registro Avro e retorna uma instância MyData.

Para especificar qual tabela do BigQuery ler, chame o método from, conforme mostrado no exemplo anterior. Para mais informações, consulte Nomes de tabelas na documentação do conector de E/S do BigQuery.

Objetos TableRow lidos

O método readTableRows lê os dados do BigQuery em um PCollection de objetos TableRow. Cada TableRow é um mapa de pares de chave-valor que contém uma única linha de dados de tabela. Especifique a tabela do BigQuery a ser lida chamando o método from.

O código a seguir lê um PCollection<TableRows> de uma tabela do BigQuery.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Este exemplo também mostra como acessar os valores do dicionário TableRow. Valores inteiros são codificados como strings para corresponder ao formato JSON exportado do BigQuery.

Projeção e filtragem de colunas

Ao usar leituras diretas (Method.DIRECT_READ), é possível tornar as operações de leitura mais eficientes, reduzindo a quantidade de dados lidos do BigQuery e enviados pela rede.

  • Projeção de colunas: chame withSelectedFields para ler um subconjunto de colunas da tabela. Isso permite leituras eficientes quando as tabelas contêm muitas colunas.
  • Filtragem de linha: chame withRowRestriction para especificar um predicado que filtre dados no lado do servidor.

Os predicados de filtro precisam ser determinísticos, e a agregação não é compatível.

O exemplo a seguir projeta as colunas "user_name" e "age" e filtra as linhas que não correspondem ao predicado "age > 18".

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Ler a partir do resultado de uma consulta

Os exemplos anteriores mostram como ler linhas de uma tabela. Para ler o resultado de uma consulta SQL, chame fromQuery. Essa abordagem move parte do trabalho computacional para o BigQuery. Você também pode usar esse método para ler uma visualização do BigQuery ou uma visualização materializada, executando uma consulta na visualização.

No exemplo a seguir, uma consulta é executada em um conjunto de dados público do BigQuery e os resultados são lidos. Depois que o pipeline é executado, é possível ver o job de consulta no histórico de jobs do BigQuery.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

A seguir