Escribe datos de Dataflow en Cloud Storage

En este documento, se describe cómo escribir datos de texto de Dataflow en Cloud Storage mediante el conector de E/S TextIO de Apache Beam.

Incluye la dependencia de la biblioteca de Google Cloud

Para usar el conector TextIO con Cloud Storage, debes incluir la siguiente dependencia. Esta biblioteca proporciona un controlador de esquema para los nombres de archivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para obtener más información, consulta Instala el SDK de Apache Beam.

Habilita gRPC en el conector de E/S de Apache Beam en Dataflow

Puedes conectarte a Cloud Storage con gRPC a través del conector de E/S de Apache Beam en Dataflow. gRPC es un framework de llamadas de procedimiento remoto (RPC) de código abierto de alto rendimiento desarrollado por Google que puedes usar para interactuar con Cloud Storage.

Para acelerar las solicitudes de escritura de tu trabajo de Dataflow en Cloud Storage, puedes habilitar el conector de E/S de Apache Beam en Dataflow para usar gRPC.

Línea de comandos

  1. Asegúrate de usar la versión 2.55.0 o posterior del SDK de Apache Beam.
  2. Para ejecutar un trabajo de Dataflow, usa la opción de canalización --additional-experiments=use_grpc_for_gcs. Para obtener información sobre las diferentes opciones de canalización, consulta Marcas opcionales.

SDK de Apache Beam

  1. Asegúrate de usar la versión 2.55.0 o posterior del SDK de Apache Beam.
  2. Para ejecutar un trabajo de Dataflow, usa la opción de canalización --experiments=use_grpc_for_gcs. Para obtener información sobre las diferentes opciones de canalización, consulta Opciones básicas.

Puedes configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC en Cloud Monitoring. Las métricas relacionadas con gRPC pueden ayudarte a hacer lo siguiente:

  • Supervisa y optimiza el rendimiento de las solicitudes de gRPC a Cloud Storage.
  • Solucionar y depurar problemas
  • Obtén estadísticas sobre el uso y el comportamiento de tu aplicación.

Para obtener información sobre cómo configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC, consulta Cómo usar métricas del cliente. Si no es necesario recopilar métricas para tu caso de uso, puedes inhabilitar la recopilación de métricas. Para obtener instrucciones, consulta Cómo inhabilitar las métricas del cliente.

Paralelismo

El paralelismo se determina principalmente por la cantidad de fragmentos. De forma predeterminada, el ejecutor configura este valor automáticamente. Para la mayoría de las canalizaciones, se recomienda usar el comportamiento predeterminado. En este documento, consulta Prácticas recomendadas.

Rendimiento

En la siguiente tabla, se muestran las métricas de rendimiento para escribir en Cloud Storage. Las cargas de trabajo se ejecutaron en un trabajador e2-standard2, con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 millones de registros | 1 KB | 1 columna Capacidad de procesamiento (bytes) Capacidad de procesamiento (elementos)
Escritura 130 MBps 130,000 elementos por segundo

Estas métricas se basan en canalizaciones por lotes simples. Están diseñadas para comparar el rendimiento entre los conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de la canalización de Dataflow es complejo y es una función del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDK de lenguaje. Para obtener más información, consulta Rendimiento de E/S de Beam.

Prácticas recomendadas

  • En general, evita definir una cantidad específica de fragmentos. Esto permite que el ejecutor seleccione un valor apropiado para tu escalamiento. Si ajustas la cantidad de fragmentos, te recomendamos escribir entre 100 MB y 1 GB por fragmento. Sin embargo, el valor óptimo puede depender de la carga de trabajo.

  • Cloud Storage puede escalar a una gran cantidad de solicitudes por segundo. Sin embargo, si la canalización tiene grandes aumentos repentinos en el volumen de escritura, considera escribir en varios buckets para evitar la sobrecarga temporal de cualquier depósito de Cloud Storage.

  • En general, escribir en Cloud Storage es más eficaz cuando cada escritura es más grande (1 kb o más). Escribir registros pequeños en una gran cantidad dearchivos puede provocar un peor rendimiento por byte.

  • Cuando generes nombres de archivos, considera usar nombres de archivos no secuenciales para distribuir la carga. Para obtener más información, consulta Usa una convención de nombres que distribuya las cargas de manera uniforme entre los rangos de claves.

  • Cuando asignes nombres a archivos, no uses el signo “@” seguido de un número o un asterisco ('*'). Para obtener más información, consulta “@*” y “@N” son especificaciones de fragmentación reservadas.

Ejemplo: Escribe archivos de texto en Cloud Storage

En el siguiente ejemplo, se crea una canalización por lotes que escribe archivos de texto mediante la compresión GZIP:

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Si la entrada PCollection no está delimitada, debes definir una ventana o un activador en la colección y, luego, especificar escrituras con ventanas mediante una llamada a TextIO.Write.withWindowedWrites.

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Para la ruta de salida, especifica una ruta de Cloud Storage que incluya el nombre del bucket y un prefijo de nombre de archivo. Por ejemplo, si especificas gs://my_bucket/output/file, el conector TextIO escribe en el bucket de Cloud Storage llamado my_bucket, y los archivos de salida tienen el prefijo output/file*.

De forma predeterminada, el conector TextIO fragmenta los archivos de salida mediante una convención de nombres como la siguiente: <file-prefix>-00000-of-00001. De manera opcional, puedes especificar un sufijo de nombre de archivo y un esquema de compresión, como se muestra en el ejemplo.

Para garantizar escrituras idempotentes, Dataflow escribe en un archivo temporal y, luego, lo copia completo en el archivo final. Para controlar dónde se almacenan estos archivos temporales, usa el método withTempDirectory.

¿Qué sigue?