Leer de Cloud Storage a Dataflow

Para leer datos de Cloud Storage en Dataflow, usa el TextIO o el AvroIO conector de entrada/salida de Apache Beam.

Incluir la dependencia de la biblioteca de Google Cloud Platform

Para usar el conector TextIO o AvroIO con Cloud Storage, incluye la siguiente dependencia. Esta biblioteca proporciona un controlador de esquemas para nombres de archivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para obtener más información, consulta el artículo Instalar el SDK de Apache Beam.

Habilitar gRPC en el conector de E/S de Apache Beam en Dataflow

Puedes conectarte a Cloud Storage mediante gRPC a través del conector de E/S de Apache Beam en Dataflow. gRPC es un framework de llamadas a procedimientos remotos (RPC) de código abierto de alto rendimiento desarrollado por Google que puedes usar para interactuar con Cloud Storage.

Para acelerar las solicitudes de lectura de tu tarea de Dataflow en Cloud Storage, puedes habilitar el conector de E/S de Apache Beam en Dataflow para usar gRPC.

Línea de comandos

  1. Asegúrate de usar la versión 2.55.0 o una posterior del SDK de Apache Beam.
  2. Para ejecutar una tarea de Dataflow, usa la opción de --additional-experiments=use_grpc_for_gcs. Para obtener información sobre las diferentes opciones de canalización, consulta Indicadores opcionales.

SDK de Apache Beam

  1. Asegúrate de usar la versión 2.55.0 o una posterior del SDK de Apache Beam.
  2. Para ejecutar una tarea de Dataflow, usa la opción de --experiments=use_grpc_for_gcs pipeline. Para obtener información sobre las diferentes opciones de canalización, consulta las opciones básicas.

Puedes configurar el conector de entrada/salida de Apache Beam en Dataflow para generar métricas relacionadas con gRPC en Cloud Monitoring. Las métricas relacionadas con gRPC pueden ayudarte a hacer lo siguiente:

  • Monitoriza y optimiza el rendimiento de las solicitudes gRPC a Cloud Storage.
  • Solucionar y depurar problemas.
  • Obtén estadísticas sobre el uso y el comportamiento de tu aplicación.

Para obtener información sobre cómo configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC, consulta Usar métricas del lado del cliente. Si no necesitas recoger métricas para tu caso práctico, puedes inhabilitar la recogida de métricas. Para obtener instrucciones, consulta Inhabilitar las métricas del lado del cliente.

Paralelismo

Los conectores TextIO y AvroIO admiten dos niveles de paralelismo:

  • Los archivos individuales se identifican por separado para que varios trabajadores puedan leerlos.
  • Si los archivos no están comprimidos, el conector puede leer subintervalos de cada archivo por separado, lo que da lugar a un nivel de paralelismo muy alto. Esta división solo es posible si cada línea del archivo es un registro significativo. Por ejemplo, no está disponible de forma predeterminada para los archivos JSON.

Rendimiento

En la siguiente tabla se muestran las métricas de rendimiento de la lectura de Cloud Storage. Las cargas de trabajo se ejecutaron en un e2-standard2 trabajador con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 M de registros | 1 KB | 1 columna Rendimiento (bytes) Rendimiento (elementos)
Leer 320 MBps 320.000 elementos por segundo

Estas métricas se basan en sencillas canalizaciones por lotes. Su objetivo es comparar el rendimiento entre conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de las canalizaciones de Dataflow es complejo y depende del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos, y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDKs de lenguaje. Para obtener más información, consulta Rendimiento de Beam IO.

Prácticas recomendadas

  • No uses watchForNewFiles con Cloud Storage. Este enfoque no se adapta bien a las grandes cadenas de producción, ya que el conector debe mantener una lista de archivos vistos en la memoria. La lista no se puede vaciar de la memoria, lo que reduce la memoria de trabajo de los trabajadores con el tiempo. Te recomendamos que utilices las notificaciones de Pub/Sub para Cloud Storage. Para obtener más información, consulta Patrones de procesamiento de archivos.

  • Si tanto el nombre del archivo como su contenido son datos útiles, usa la clase FileIO para leer los nombres de los archivos. Por ejemplo, un nombre de archivo puede contener metadatos que sean útiles al procesar los datos del archivo. Para obtener más información, consulta Acceder a nombres de archivo. En la FileIOdocumentación también se muestra un ejemplo de este patrón.

Ejemplo

En el siguiente ejemplo se muestra cómo leer datos de Cloud Storage.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

Siguientes pasos