Escreva do Dataflow para o BigQuery

Este documento descreve como escrever dados do Dataflow para o BigQuery.

Vista geral

Para a maioria dos exemplos de utilização, considere usar a opção Managed I/O para escrever no BigQuery. A E/S gerida oferece funcionalidades como atualizações automáticas e uma API de configuração consistente. Quando escreve no BigQuery, a E/S gerida escolhe automaticamente o melhor método de escrita para tarefas em lote ou de streaming.

Se precisar de uma otimização do desempenho mais avançada, considere usar o conector BigQueryIO. Para mais informações, consulte a secção Use o conector BigQueryIO neste documento.

Desempenho

A tabela seguinte mostra as métricas de desempenho para várias cargas de trabalho. Estas cargas de trabalho foram executadas num e2-standard2 worker, usando o SDK do Apache Beam 2.49.0 para Java. Não usaram o Runner v2.

100 M de registos | 1 kB | 1 coluna Débito (bytes) Tráfego transmitido (elementos)
Escrita de armazenamento 55 MBps 54 000 elementos por segundo
Carregamento de Avro 78 MBps 77 000 elementos por segundo
Json Load 54 MBps 53 000 elementos por segundo

Estas métricas baseiam-se em pipelines de processamento em lote simples. Destinam-se a comparar o desempenho entre conectores de E/S e não são necessariamente representativos de pipelines do mundo real. O desempenho do pipeline do Dataflow é complexo e é uma função do tipo de VM, dos dados que estão a ser processados, do desempenho das origens e dos destinos externos, e do código do utilizador. As métricas baseiam-se na execução do SDK Java e não são representativas das características de desempenho de outros SDKs de idiomas. Para mais informações, consulte o artigo Desempenho do Beam IO.

Use o conetor BigQueryIO

O conetor BigQuery I/O suporta os seguintes métodos para escrever no BigQuery:

  • STORAGE_WRITE_API. Neste modo, o conector faz gravações diretas no armazenamento do BigQuery, usando a API BigQuery Storage Write. A API Storage Write combina o carregamento de streaming e o carregamento em lote numa única API de alto desempenho. Este modo garante a semântica exatamente uma vez.
  • STORAGE_API_AT_LEAST_ONCE. Este modo também usa a API Storage Write, mas oferece semântica pelo menos uma vez. Este modo resulta numa latência mais baixa para a maioria dos pipelines. No entanto, é possível fazer escritas duplicadas.
  • FILE_LOADS. Neste modo, o conetor escreve os dados de entrada em ficheiros de preparação no Cloud Storage. Em seguida, executa uma tarefa de carregamento do BigQuery para carregar os dados para o BigQuery. O modo é a predefinição para os conjuntos de dados delimitados PCollections, que são mais comuns em pipelines de processamento em lote.
  • STREAMING_INSERTS. Neste modo, o conetor usa a API Legacy Streaming. Este modo é o predefinido para PCollections ilimitados, mas não é recomendado para novos projetos.

Ao escolher um método de escrita, considere os seguintes pontos:

  • Para tarefas de streaming, considere usar STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE, porque estes modos escrevem diretamente no armazenamento do BigQuery, sem usar ficheiros de preparação intermédios.
  • Se executar o pipeline usando o modo de streaming at-least-once, defina o modo de escrita como STORAGE_API_AT_LEAST_ONCE. Esta definição é mais eficiente e corresponde à semântica do modo de streaming, pelo menos, uma vez.
  • O carregamento de ficheiros e a API Storage Write têm quotas e limites diferentes.
  • As tarefas de carregamento usam o conjunto de slots do BigQuery partilhado ou slots reservados. Para usar slots reservados, execute a tarefa de carregamento num projeto com uma atribuição de reserva do tipo PIPELINE. As tarefas de carregamento são gratuitas se usar o conjunto de slots do BigQuery partilhado. No entanto, o BigQuery não faz garantias sobre a capacidade disponível do conjunto partilhado. Para mais informações, consulte o artigo Introdução às reservas.

Paralelismo

  • Para FILE_LOADS e STORAGE_WRITE_API em pipelines de streaming, o conetor fragmenta os dados num número de ficheiros ou streams. Em geral, recomendamos que chame withAutoSharding para ativar a divisão automática.

  • Para FILE_LOADS em pipelines em lote, o conetor escreve dados em ficheiros particionados, que são, em seguida, carregados em paralelo para o BigQuery.

  • Para STORAGE_WRITE_API em pipelines de processamento em lote, cada trabalhador cria um ou mais streams para escrever no BigQuery, determinado pelo número total de fragmentos.

  • Para STORAGE_API_AT_LEAST_ONCE, existe uma única stream de gravação predefinida. Vários trabalhadores são anexados a esta stream.

Práticas recomendadas

  • A API Storage Write tem limites de quota. O conector processa estes limites para a maioria dos pipelines. No entanto, alguns cenários podem esgotar as streams da API Storage Write disponíveis. Por exemplo, este problema pode ocorrer num pipeline que usa a divisão automática e a escala automática com um grande número de destinos, especialmente em tarefas de longa duração com cargas de trabalho altamente variáveis. Se este problema ocorrer, pondere usar STORAGE_WRITE_API_AT_LEAST_ONCE, o que evita o problema.

  • Use as métricas da Google Cloud Platform para monitorizar a utilização da quota da API Storage Write.

  • Quando usa carregamentos de ficheiros, o Avro tem normalmente um desempenho superior ao JSON. Para usar o Avro, ligue para withAvroFormatFunction.

  • Por predefinição, as tarefas de carregamento são executadas no mesmo projeto que a tarefa do Dataflow. Para especificar um projeto diferente, chame withLoadJobProjectId.

  • Quando usar o SDK Java, considere criar uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame useBeamSchema no seu pipeline para converter automaticamente entre os tipos Row do Apache Beam e TableRow do BigQuery. Para ver um exemplo de uma classe de esquema, consulte ExampleModel.java.

  • Se carregar tabelas com esquemas complexos que contenham milhares de campos, considere chamar withMaxBytesPerPartition para definir um tamanho máximo mais pequeno para cada tarefa de carregamento.

  • Por predefinição, o BigQueryIO usa definições da API Storage Write que são razoáveis para a maioria dos pipelines. No entanto, se tiver problemas de desempenho, pode definir opções de pipeline para ajustar estas definições. Para mais informações, consulte o artigo Ajuste a API Storage Write na documentação do Apache Beam.

Pipelines de streaming

As seguintes recomendações aplicam-se a pipelines de streaming.

  • Para pipelines de streaming, recomendamos que use a API Storage Write (STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE).

  • Um pipeline de streaming pode usar carregamentos de ficheiros, mas esta abordagem tem desvantagens:

    • Requer janelas para escrever os ficheiros. Não pode usar a janela global.
    • O BigQuery carrega ficheiros com base no melhor esforço quando usa o conjunto de slots partilhado. Pode haver um atraso significativo entre o momento em que um registo é escrito e o momento em que fica disponível no BigQuery.
    • Se uma tarefa de carregamento falhar, por exemplo, devido a dados incorretos ou a uma incompatibilidade de esquema, toda a pipeline falha.
  • Considere usar STORAGE_WRITE_API_AT_LEAST_ONCE sempre que possível. Pode resultar na escrita de registos duplicados no BigQuery, mas é menos dispendioso e mais escalável do que STORAGE_WRITE_API.

  • Em geral, evite usar STREAMING_INSERTS. As inserções de streaming são mais caras do que a API Storage Write e não têm um desempenho tão bom.

  • A divisão de dados pode melhorar o desempenho nos pipelines de streaming. Para a maioria dos pipelines, a divisão automática é um bom ponto de partida. No entanto, pode ajustar a divisão em partições da seguinte forma:

  • Se usar inserções de streaming, recomendamos que defina retryTransientErrors como a política de repetição.

Pipelines em lote

As seguintes recomendações aplicam-se a pipelines em lote.

  • Para a maioria dos pipelines de processamento em lote grandes, recomendamos que experimente primeiro a opção FILE_LOADS. Um pipeline de lotes pode usar STORAGE_WRITE_API, mas é provável que exceda os limites de quota em grande escala (mais de 1000 vCPUs) ou se estiverem a ser executados pipelines concorrentes. O Apache Beam não limita o número máximo de streams de escrita para tarefas em lote, pelo que a tarefa acaba por atingir os limites da API BigQuery Storage.STORAGE_WRITE_API

  • Quando usa o FILE_LOADS, pode esgotar o conjunto de slots do BigQuery partilhado ou o seu conjunto de slots reservados. Se encontrar este tipo de falha, experimente as seguintes abordagens:

    • Reduza o número máximo de trabalhadores ou o tamanho dos trabalhadores para a tarefa.
    • Compre mais horários reservados.
    • Considere usar STORAGE_WRITE_API.
  • Os pipelines pequenos a médios (<1000 vCPUs) podem beneficiar da utilização de STORAGE_WRITE_API. Para estas tarefas mais pequenas, pondere usar o STORAGE_WRITE_API se quiser uma fila de mensagens rejeitadas ou quando o conjunto de ranuras partilhadas FILE_LOADS não for suficiente.

  • Se puder tolerar dados duplicados, considere usar o STORAGE_WRITE_API_AT_LEAST_ONCE. Este modo pode resultar na gravação de registos duplicados no BigQuery, mas pode ser menos dispendioso do que a opção STORAGE_WRITE_API.

  • Os diferentes modos de escrita podem ter um desempenho diferente com base nas características do seu pipeline. Faça experiências para encontrar o melhor modo de escrita para a sua carga de trabalho.

Resolva erros ao nível da linha

Esta secção descreve como processar erros que podem ocorrer ao nível da linha, por exemplo, devido a dados de entrada mal formados ou incompatibilidades de esquemas.

Para a API Storage Write, todas as linhas que não podem ser escritas são colocadas numa tabela PCollection separada. Para obter esta coleção, chame getFailedStorageApiInserts no objeto WriteResult. Para ver um exemplo desta abordagem, consulte o artigo Fazer stream de dados para o BigQuery.

É uma boa prática enviar os erros para uma fila ou uma tabela de mensagens não entregues para processamento posterior. Para mais informações sobre este padrão, consulte o BigQueryIO padrão de mensagens não entregues.

Para o FILE_LOADS, se ocorrer um erro ao carregar os dados, a tarefa de carregamento falha e o pipeline gera uma exceção de tempo de execução. Pode ver o erro nos registos do Dataflow ou consultar o histórico de tarefas do BigQuery. O conector de E/S não devolve informações sobre linhas individuais com falhas.

Para mais informações sobre a resolução de problemas de erros, consulte o artigo Erros do conetor do BigQuery.

Exemplos

Os exemplos seguintes mostram como usar o Dataflow para escrever no BigQuery. Estes exemplos usam o conetor BigQueryIO.

Escrever numa tabela existente

O exemplo seguinte cria um pipeline em lote que escreve um PCollection<MyData> no BigQuery, onde MyData é um tipo de dados personalizado.

O método BigQueryIO.write() devolve um tipo BigQueryIO.Write<T>, que é usado para configurar a operação de escrita. Para mais informações, consulte o artigo Escrever numa tabela na documentação do Apache Beam. Este exemplo de código escreve numa tabela existente (CREATE_NEVER) e anexa as novas linhas à tabela (WRITE_APPEND).

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Escrever numa tabela nova ou existente

O exemplo seguinte cria uma nova tabela se a tabela de destino não existir, definindo a disposição de criação como CREATE_IF_NEEDED. Quando usa esta opção, tem de fornecer um esquema da tabela. O conetor usa este esquema se criar uma nova tabela.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Transmita dados para o BigQuery

O exemplo seguinte mostra como fazer streaming de dados usando a semântica exatamente uma vez, através da definição do modo de escrita como STORAGE_WRITE_API

Nem todos os pipelines de streaming requerem semântica exatamente uma vez. Por exemplo, pode remover manualmente duplicados da tabela de destino. Se a possibilidade de registos duplicados for aceitável para o seu cenário, considere usar a semântica pelo menos uma vez definindo o método de escrita como STORAGE_API_AT_LEAST_ONCE. Geralmente, este método é mais eficiente e resulta numa latência inferior para a maioria dos pipelines.

Java

Para se autenticar no Dataflow, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

O que se segue?