Neste documento, descrevemos como gravar dados de texto do Dataflow para o
Cloud Storage usando o conector de E/S
do Apache Beam TextIO
.
Incluir a dependência da biblioteca do Google Cloud
Para usar o conector TextIO
com o Cloud Storage, inclua a dependência
a seguir. Essa biblioteca fornece um gerenciador de esquema para os nomes de arquivo "gs://"
.
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Python
apache-beam[gcp]==VERSION
Go
import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Para mais informações, consulte Instalar o SDK do Apache Beam.
Ativar o gRPC no conector de E/S do Apache Beam no Dataflow
É possível se conectar ao Cloud Storage usando o gRPC pelo conector de E/S do Apache Beam no Dataflow. O gRPC é um framework de chamada de procedimento remoto (RPC) de código aberto de alto desempenho desenvolvido pelo Google que pode ser usado para interagir com o Cloud Storage.
Para acelerar as solicitações de gravação do job do Dataflow no Cloud Storage, ative o conector de E/S do Apache Beam no Dataflow para usar o gRPC.
Linha de comando
- Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
- Para executar um job do Dataflow, use a opção de pipeline
--additional-experiments=use_grpc_for_gcs
. Para informações sobre as diferentes opções de pipeline, consulte Flags opcionais.
SDK do Apache Beam
- Use a versão 2.55.0 ou mais recente do SDK do Apache Beam.
-
Para executar um job do Dataflow, use a opção de pipeline
--experiments=use_grpc_for_gcs
. Para informações sobre as diferentes opções de pipeline, consulte Opções básicas.
É possível configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC no Cloud Monitoring. As métricas relacionadas ao gRPC podem ajudar você a fazer o seguinte:
- Monitore e otimize a performance das solicitações gRPC para o Cloud Storage.
- Resolver problemas e depurar.
- Receba insights sobre o uso e o comportamento do seu aplicativo.
Para informações sobre como configurar o conector de E/S do Apache Beam no Dataflow para gerar métricas relacionadas ao gRPC, consulte Usar métricas do lado do cliente. Se a coleta de métricas não for necessária para seu caso de uso, você pode desativar a coleta de métricas. Para instruções, consulte Desativar as métricas do lado do cliente.
Paralelismo
O paralelismo é determinado principalmente pelo número de fragmentos. Por padrão, o executor define esse valor automaticamente. Para a maioria dos pipelines, é recomendável usar o comportamento padrão. Confira as Práticas recomendadas neste documento.
Desempenho
A tabela a seguir mostra as métricas de desempenho para gravar no
Cloud Storage. As cargas de trabalho foram executadas em um worker e2-standard2
usando o SDK do Apache Beam 2.49.0 para Java. Eles não usaram o Runner v2.
100 milhões de registros | 1 KB | 1 coluna | Capacidade de processamento (bytes) | Capacidade de processamento (elementos) |
---|---|---|
Gravar | 130 MBps | 130.000 elementos por segundo |
Essas métricas são baseadas em pipelines de lote simples. Elas servem para comparar o desempenho entre conectores de E/S e não representam necessariamente pipelines reais. O desempenho do pipeline do Dataflow é complexo e depende do tipo de VM, dos dados processados, do desempenho de origens e coletores externos e do código do usuário. As métricas se baseiam na execução do SDK do Java e não representam as características de desempenho de outros SDKs da linguagem. Para mais informações, confira Desempenho do E/S do Beam.
Práticas recomendadas
Em geral, evite definir um número específico de fragmentos. Isso permite que o executor selecione um valor adequado para a escala. Se você ajustar o número de fragmentos, recomendamos uma gravação entre 100 MB e 1 GB por fragmento. No entanto, o valor ideal depende da carga de trabalho.
O Cloud Storage pode escalonar para um número muito grande de solicitações por segundo. No entanto, se o pipeline tiver picos grandes no volume de gravação, grave em vários buckets para evitar a sobrecarga temporária de algum bucket do Cloud Storage.
Em geral, a gravação no Cloud Storage é mais eficiente quando cada gravação é maior (1 KB ou mais). Gravar pequenos registros em um grande número de arquivos pode resultar em um desempenho ruim por byte.
Ao gerar nomes de arquivos, use nomes não sequenciais para distribuir a carga. Para mais informações, confira Usar uma convenção de nomenclatura que distribua a carga uniformemente pelos intervalos de chaves.
Ao nomear arquivos, não use arroba ('@') seguido de um número em um asterisco ('*'). Para mais informações, confira "@*" e "@N" são especificações de fragmentação reservadas.
Exemplo: Gravar arquivos de texto para o Cloud Storage
O exemplo a seguir cria um pipeline em lote que grava arquivos de texto usando a compactação GZIP:
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Se a entrada PCollection
for ilimitada, você precisará definir uma janela ou um
gatilho na coleção e especificar gravações em janelas chamando
TextIO.Write.withWindowedWrites
.
Python
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Para o caminho de saída, especifique um caminho do Cloud Storage que inclua o nome do bucket e um prefixo de nome de arquivo. Por exemplo, se você especificar
gs://my_bucket/output/file
, o conector TextIO
gravará no
bucket do Cloud Storage chamado my_bucket
e os arquivos de saída terão o prefixo
output/file*
.
Por padrão, o conector TextIO
fragmenta os arquivos de saída usando uma convenção
de nomenclatura como esta: <file-prefix>-00000-of-00001
. Também é possível
especificar um sufixo de nome de arquivo e um esquema de compactação, conforme mostrado no exemplo.
Para garantir gravações idempotentes, o Dataflow grava em um arquivo temporário
e copia o arquivo temporário concluído para o arquivo final.
Para controlar o local de armazenamento desses arquivos temporários,
use o método
withTempDirectory
.
A seguir
- Leia a documentação da API
TextIO
. - Confira a lista de modelos fornecidos pelo Google.