Gravar do Dataflow para o Cloud Storage

Neste documento, descrevemos como gravar dados de texto do Dataflow para o Cloud Storage usando o conector de E/S do Apache Beam TextIO.

Incluir a dependência da biblioteca do Google Cloud

Para usar o conector TextIO com o Cloud Storage, inclua a dependência a seguir. Essa biblioteca fornece um gerenciador de esquema para os nomes de arquivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte Instalar o SDK do Apache Beam.

Ativar o gRPC no conector de E/S do Apache Beam no Dataflow

É possível se conectar ao Cloud Storage usando o gRPC pelo conector de E/S do Apache Beam no Dataflow. O gRPC é um framework de chamada de procedimento remoto (RPC) de código aberto de alto desempenho desenvolvido pelo Google que pode ser usado para interagir com o Cloud Storage.

Para acelerar as solicitações de gravação do job do Dataflow no Cloud Storage, ative o conector de E/S do Apache Beam no Dataflow para usar o gRPC.

Linha de comando

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Flags opcionais.

SDK do Apache Beam

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Opções básicas.

É possível configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC no Cloud Monitoring. As métricas relacionadas ao gRPC podem ajudar você a fazer o seguinte:

  • Monitore e otimize a performance das solicitações gRPC para o Cloud Storage.
  • Resolver problemas e depurar.
  • Receba insights sobre o uso e o comportamento do seu aplicativo.

Para informações sobre como configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC, consulte Usar métricas do lado do cliente. Se a coleta de métricas não for necessária para seu caso de uso, você pode desativar a coleta de métricas. Para instruções, consulte Desativar as métricas do lado do cliente.

Paralelismo

O paralelismo é determinado principalmente pelo número de fragmentos. Por padrão, o executor define esse valor automaticamente. Para a maioria dos pipelines, é recomendável usar o comportamento padrão. Confira as Práticas recomendadas neste documento.

Desempenho

A tabela a seguir mostra as métricas de desempenho para gravar no Cloud Storage. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Gravar 130 MBps 130.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

  • Em geral, evite definir um número específico de fragmentos. Isso permite que o executor selecione um valor adequado para a escala. Se você ajustar o número de fragmentos, recomendamos uma gravação entre 100 MB e 1 GB por fragmento. No entanto, o valor ideal depende da carga de trabalho.

  • O Cloud Storage pode escalonar para um número muito grande de solicitações por segundo. No entanto, se o pipeline tiver picos grandes no volume de gravação, grave em vários buckets para evitar a sobrecarga temporária de algum bucket do Cloud Storage.

  • Em geral, a gravação no Cloud Storage é mais eficiente quando cada gravação é maior (1 KB ou mais). Gravar pequenos registros em um grande número de arquivos pode resultar em um desempenho ruim por byte.

  • Ao gerar nomes de arquivos, use nomes não sequenciais para distribuir a carga. Para mais informações, confira Usar uma convenção de nomenclatura que distribua a carga uniformemente pelos intervalos de chaves.

  • Ao nomear arquivos, não use arroba ('@') seguido de um número em um asterisco ('*'). Para mais informações, confira "@*" e "@N" são especificações de fragmentação reservadas.

Exemplo: Gravar arquivos de texto para o Cloud Storage

O exemplo a seguir cria um pipeline em lote que grava arquivos de texto usando a compactação GZIP:

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se a entrada PCollection for ilimitada, você precisará definir uma janela ou um gatilho na coleção e especificar gravações em janelas chamando TextIO.Write.withWindowedWrites.

Python

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Para o caminho de saída, especifique um caminho do Cloud Storage que inclua o nome do bucket e um prefixo de nome de arquivo. Por exemplo, se você especificar gs://my_bucket/output/file, o conector TextIO gravará no bucket do Cloud Storage chamado my_bucket e os arquivos de saída terão o prefixo output/file*.

Por padrão, o conector TextIO fragmenta os arquivos de saída usando uma convenção de nomenclatura como esta: <file-prefix>-00000-of-00001. Também é possível especificar um sufixo de nome de arquivo e um esquema de compactação, conforme mostrado no exemplo.

Para garantir gravações idempotentes, o Dataflow grava em um arquivo temporário e copia o arquivo temporário concluído para o arquivo final. Para controlar o local de armazenamento desses arquivos temporários, use o método withTempDirectory.

A seguir