Gravar do Dataflow para o BigQuery

Neste documento, descrevemos como gravar dados do Dataflow para o BigQuery usando o conector de E/S do BigQuery do Apache Beam.

O conector de E/S do BigQuery está disponível no SDK do Apache Beam. Recomendamos usar a versão mais recente do SDK. Para mais informações, consulte SDKs do Apache Beam 2.x.

O suporte a várias linguagens para Python também está disponível.

Informações gerais

O conector de E/S do BigQuery aceita os seguintes métodos de gravação no BigQuery:

  • STORAGE_WRITE_API. Neste modo, o conector realiza gravações diretas no armazenamento do BigQuery usando a API BigQuery Storage Write. A API Storage Write combina ingestão de streaming e carregamento em lote em uma única API de alto desempenho. Esse modo garante semântica exatamente uma vez.
  • STORAGE_API_AT_LEAST_ONCE. Esse modo também usa a API Storage Write, mas fornece semântica de pelo menos uma vez. Esse modo resulta em menor latência para a maioria dos pipelines. No entanto, gravações duplicadas são possíveis.
  • FILE_LOADS. Nesse modo, o conector grava os dados de entrada em arquivos de preparo no Cloud Storage. Em seguida, ele executa um job de carregamento do BigQuery para carregar os dados no BigQuery. O modo é o padrão para PCollections limitado, encontrado com frequência em pipelines de lote.
  • STREAMING_INSERTS. Neste modo, o conector usa a API de streaming legada. Esse modo é o padrão para PCollections ilimitado, mas não é recomendado para novos projetos.

Ao escolher um método de gravação, considere os seguintes pontos:

  • Para jobs de streaming, considere usar STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE, porque esses modos gravam diretamente no armazenamento do BigQuery, sem usar arquivos de preparo intermediários.
  • Se você executar o pipeline usando pelo menos um modo de streaming, defina o modo de gravação como STORAGE_API_AT_LEAST_ONCE. Essa configuração é mais eficiente e corresponde à semântica do modo de streaming pelo menos uma vez.
  • Os carregamentos de arquivos e a API Storage Write têm cotas e limites diferentes.
  • Os jobs de carregamento usam o pool de slots compartilhado ou os slots reservados do BigQuery. Para usar slots reservados, execute o job de carregamento em um projeto com uma atribuição de reserva do tipo PIPELINE. Os jobs de carregamento serão gratuitos se você usar o pool de slots compartilhado do BigQuery. No entanto, o BigQuery não garante a capacidade disponível do pool compartilhado. Para mais informações, consulte Introdução às reservas.

Paralelismo

  • Para FILE_LOADS e STORAGE_WRITE_API em pipelines de streaming, o conector fragmenta os dados em vários arquivos ou streams. Em geral, recomendamos chamar withAutoSharding para ativar a fragmentação automática.

  • Para FILE_LOADS em pipelines de lote, o conector grava dados em arquivos particionados, que são carregados no BigQuery em paralelo.

  • Para STORAGE_WRITE_API em pipelines de lote, cada worker cria um ou mais streams para gravar no BigQuery, determinado pelo número total de fragmentos.

  • Para STORAGE_API_AT_LEAST_ONCE, há um único fluxo de gravação padrão. Vários workers são anexados a esse stream.

Performance

A tabela a seguir mostra as métricas de desempenho para várias opções de leitura de E/S do BigQuery. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Gravação de armazenamento 55 MBps 54.000 elementos por segundo
Carregamento do Avro 78 MBps 77.000 elementos por segundo
Carregamento do JSON 54 MBps 53.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

Esta seção descreve as práticas recomendadas para gravar no BigQuery a partir do Dataflow.

Considerações gerais

  • A API Storage Write tem limites de cota. O conector lida com esses limites na maioria dos pipelines. No entanto, alguns cenários podem esgotar os fluxos disponíveis da API Storage Write. Por exemplo, esse problema pode acontecer em um pipeline que usa fragmentação automática e escalonamento automático com um grande número de destinos, especialmente em jobs de longa duração com cargas de trabalho altamente variáveis. Se esse problema ocorrer, use STORAGE_WRITE_API_AT_LEAST_ONCE para evitá-lo.

  • Use as métricas do Google Cloud para monitorar o uso da cota da API Storage Write.

  • Ao usar o carregamentos de arquivos, o Avro costuma ter um desempenho melhor do que o JSON. Para usar o Avro, chame withAvroFormatFunction.

  • Por padrão, os jobs de carregamento são executados no mesmo projeto que o job do Dataflow. Para especificar um projeto diferente, chame withLoadJobProjectId.

  • Ao usar o SDK do Java, crie uma classe que represente o esquema da tabela do BigQuery. Em seguida, chame useBeamSchema no pipeline para converter automaticamente entre os tipos Row do Apache Beam e TableRow do BigQuery. Para conferir um exemplo de classe de esquema, confira ExampleModel.java.

  • Se você carregar tabelas com esquemas complexos com milhares de campos, considere chamar withMaxBytesPerPartition para definir um tamanho máximo menor para cada job de carregamento.

Pipelines de streaming

As recomendações a seguir se aplicam a pipelines de streaming.

  • Para pipelines de streaming, recomendamos o uso da API Storage Write (STORAGE_WRITE_API ou STORAGE_API_AT_LEAST_ONCE).

  • Um pipeline de streaming pode usar carregamentos de arquivos, mas essa abordagem tem desvantagens:

    • É necessário janelamento para gravar os arquivos. Não é possível usar a janela global.
    • O BigQuery carrega arquivos com base no melhor esforço ao usar o pool de slots compartilhado. Pode haver um atraso significativo entre o momento em que um registro é gravado e quando ele fica disponível no BigQuery.
    • Se um job de carregamento falhar, por exemplo, devido a dados incorretos ou incompatibilidade de esquema, todo o pipeline falha.
  • Considere usar STORAGE_WRITE_API_AT_LEAST_ONCE sempre que possível. Isso pode resultar na gravação de registros duplicados no BigQuery, mas é mais barato e mais escalonável que STORAGE_WRITE_API.

  • Em geral, evite usar STREAMING_INSERTS. As inserções por streaming são mais caras do que a API Storage Write e não têm um desempenho tão bom.

  • A fragmentação de dados pode melhorar o desempenho em pipelines de streaming. Para a maioria dos pipelines, a fragmentação automática é um bom ponto de partida. No entanto, é possível ajustar a fragmentação da seguinte maneira:

  • Se você usa inserções de streaming, recomendamos definir retryTransientErrors como a política de nova tentativa.

Pipelines em lote

As recomendações a seguir se aplicam a pipelines em lote.

  • Para a maioria dos pipelines em lote grandes, recomendamos primeiro testar FILE_LOADS. Um pipeline em lote pode usar STORAGE_WRITE_API, mas é provável que exceda os limites de cota em grande escala (mais de 1.000 vCPUs) ou se pipelines simultâneos estiverem em execução. O Apache Beam não limita o número máximo de fluxos de gravação para jobs STORAGE_WRITE_API em lote, de modo que o job acaba atingindo os limites da API BigQuery Storage.

  • Ao usar FILE_LOADS, é possível esgotar o pool de slots compartilhado do BigQuery ou o conjunto de slots reservados. Se você encontrar esse tipo de falha, tente as seguintes abordagens:

    • Reduza o número máximo ou o tamanho de workers para o job.
    • Compre mais slots reservados.
    • Considere usar STORAGE_WRITE_API.
  • Pipelines pequenos e médios (menos de 1.000 vCPUs) podem se beneficiar do uso de STORAGE_WRITE_API. Para esses jobs menores, considere usar STORAGE_WRITE_API se quiser uma fila de mensagens inativas ou quando o pool de slots compartilhado FILE_LOADS não for suficiente.

  • Se for possível tolerar dados duplicados, use STORAGE_WRITE_API_AT_LEAST_ONCE. Esse modo pode resultar na gravação de registros duplicados no BigQuery, mas pode ser mais barato do que a opção STORAGE_WRITE_API.

  • Diferentes modos de gravação podem ter um desempenho diferente com base nas características do pipeline. Experimente encontrar o melhor modo de gravação para sua carga de trabalho.

Solucionar erros no nível da linha

Nesta seção, descrevemos como lidar com erros que podem acontecer na linha, por exemplo, devido a dados de entrada malformados ou incompatibilidades de esquema.

Para a API Storage Write, todas as linhas que não podem ser gravadas são colocadas em um PCollection separado. Para acessar essa coleção, chame getFailedStorageApiInserts no objeto WriteResult. Para ver um exemplo dessa abordagem, consulte Fazer streaming de dados para o BigQuery.

É uma prática recomendada enviar os erros para uma fila ou tabela de mensagens inativas para processamento posterior. Para mais informações sobre esse padrão, consulte BigQueryIO padrão de mensagens inativas.

Para FILE_LOADS, se ocorrer um erro ao carregar os dados, o job de carregamento falhará e o pipeline gerará uma exceção de tempo de execução. É possível ver o erro nos registros do Dataflow ou analisar o histórico de jobs do BigQuery. O conector de E/S não retorna informações sobre linhas individuais com falha.

Para mais informações sobre como resolver erros, consulte Erros do conector do BigQuery.

Exemplos

Os exemplos a seguir mostram como usar o Dataflow para gravar no BigQuery.

Gravar em uma tabela existente

O exemplo a seguir cria um pipeline em lote que grava um PCollection<MyData> no BigQuery, em que MyData é um tipo de dados personalizado.

O método BigQueryIO.write() retorna um tipo BigQueryIO.Write<T>, que é usado para configurar a operação de gravação. Para mais informações, consulte Como gravar em uma tabela na documentação do Apache Beam. Este exemplo de código grava em uma tabela atual (CREATE_NEVER) e anexa as novas linhas à tabela (WRITE_APPEND).

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Gravar em uma tabela nova ou atual

O exemplo a seguir cria uma nova tabela caso a tabela de destino não exista, definindo a disposição de criação como CREATE_IF_NEEDED. Ao usar essa opção, você precisa fornecer um esquema de tabela. O conector usará esse esquema se criar uma nova tabela.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Transmitir dados para o BigQuery

O exemplo a seguir mostra como fazer streaming de dados usando semântica de exatamente uma vez, definindo o modo de gravação como STORAGE_WRITE_API.

Nem todos os pipelines de streaming exigem semântica exatamente uma vez. Por exemplo, talvez seja possível remover cópias duplicadas manualmente da tabela de destino. Se a possibilidade de registros duplicados for aceitável no cenário, use a semântica pelo menos uma vez definindo o método de gravação como STORAGE_API_AT_LEAST_ONCE. Esse método geralmente é mais eficiente e resulta em menor latência para a maioria dos pipelines.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

A seguir