Ler dados do Cloud Storage para Dataflow

Para ler dados do Cloud Storage para Dataflow, use o conector de E/S TextIO ou AvroIO do Apache Beam.

Incluir a dependência da biblioteca do Google Cloud

Para usar o conector TextIO ou AvroIOcom o Cloud Storage, inclua a dependência a seguir. Essa biblioteca fornece um gerenciador de esquema para os nomes de arquivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para mais informações, consulte Instalar o SDK do Apache Beam.

Ativar o gRPC no conector de E/S do Apache Beam no Dataflow

É possível se conectar ao Cloud Storage usando o gRPC pelo conector de E/S do Apache Beam no Dataflow. O gRPC é um framework de chamada de procedimento remoto (RPC) de código aberto de alto desempenho desenvolvido pelo Google que pode ser usado para interagir com o Cloud Storage.

Para acelerar as solicitações de leitura do job do Dataflow para o Cloud Storage, ative o conector de E/S do Apache Beam no Dataflow para usar o gRPC.

Linha de comando

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --additional-experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Flags opcionais.

SDK do Apache Beam

  1. Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
  2. Para executar um job do Dataflow, use a opção de pipeline --experiments=use_grpc_for_gcs. Para informações sobre as diferentes opções de pipeline, consulte Opções básicas.

É possível configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC no Cloud Monitoring. As métricas relacionadas ao gRPC podem ajudar você a fazer o seguinte:

  • Monitore e otimize a performance das solicitações gRPC para o Cloud Storage.
  • Resolver problemas e depurar.
  • Receba insights sobre o uso e o comportamento do seu aplicativo.

Para informações sobre como configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC, consulte Usar métricas do lado do cliente. Se a coleta de métricas não for necessária para seu caso de uso, você pode desativar a coleta de métricas. Para instruções, consulte Desativar as métricas do lado do cliente.

Paralelismo

Os conectores TextIO e AvroIO são compatíveis com dois níveis de paralelismo:

  • Os arquivos individuais são codificados separadamente para que vários workers possam lê-los.
  • Se os arquivos não estiverem compactados, o conector consegue ler os subintervalos de cada arquivo separadamente, levando a um nível muito alto de paralelismo. Essa divisão só é possível se cada linha do arquivo for um registro significativo. Por exemplo, por padrão, ele está indisponível para arquivos JSON.

Desempenho

A tabela a seguir mostra as métricas de desempenho de leitura do Cloud Storage. As cargas de trabalho foram executadas em um worker e2-standard2 usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.

100 milhões de registros | 1 KB | 1 coluna Capacidade de processamento (bytes) Capacidade de processamento (elementos)
Ler 320 MBps 320.000 elementos por segundo

Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.

Práticas recomendadas

Exemplo

O exemplo a seguir mostra como ler do Cloud Storage.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

A seguir