Lee desde Cloud Storage a Dataflow

Para leer datos de Cloud Storage a Dataflow, usa el conector de E/S TextIO o AvroIO de Apache Beam.

Incluye la dependencia de la biblioteca de Google Cloud

Para usar el conector TextIO o AvroIO con Cloud Storage, debes incluir la siguiente dependencia. Esta biblioteca proporciona un controlador de esquema para los nombres de archivo "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Para obtener más información, consulta Instala el SDK de Apache Beam.

Habilita gRPC en el conector de E/S de Apache Beam en Dataflow

Puedes conectarte a Cloud Storage con gRPC a través del conector de E/S de Apache Beam en Dataflow. gRPC es un framework de llamadas de procedimiento remoto (RPC) de código abierto de alto rendimiento desarrollado por Google que puedes usar para interactuar con Cloud Storage.

Para acelerar las solicitudes de lectura de tu trabajo de Dataflow a Cloud Storage, puedes habilitar el conector de E/S de Apache Beam en Dataflow para usar gRPC.

Línea de comandos

  1. Asegúrate de usar la versión 2.55.0 o posterior del SDK de Apache Beam.
  2. Para ejecutar un trabajo de Dataflow, usa la opción de canalización --additional-experiments=use_grpc_for_gcs. Para obtener información sobre las diferentes opciones de canalización, consulta Marcas opcionales.

SDK de Apache Beam

  1. Asegúrate de usar la versión 2.55.0 o posterior del SDK de Apache Beam.
  2. Para ejecutar un trabajo de Dataflow, usa la opción de canalización --experiments=use_grpc_for_gcs. Para obtener información sobre las diferentes opciones de canalización, consulta Opciones básicas.

Puedes configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC en Cloud Monitoring. Las métricas relacionadas con gRPC pueden ayudarte a hacer lo siguiente:

  • Supervisa y optimiza el rendimiento de las solicitudes de gRPC a Cloud Storage.
  • Solucionar y depurar problemas
  • Obtén estadísticas sobre el uso y el comportamiento de tu aplicación.

Para obtener información sobre cómo configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC, consulta Cómo usar métricas del cliente. Si no es necesario recopilar métricas para tu caso de uso, puedes inhabilitar la recopilación de métricas. Para obtener instrucciones, consulta Cómo inhabilitar las métricas del cliente.

Paralelismo

Los conectores TextIO y AvroIO admiten dos niveles de paralelismo:

  • Los archivos individuales se codifican por separado para que varios trabajadores puedan leerlos.
  • Si los archivos no están comprimidos, el conector puede leer subrangos de cada archivo por separado, lo que genera un nivel muy alto de paralelismo. Esta división solo es posible si cada línea del archivo es un registro significativo. Por ejemplo, no está disponible de forma predeterminada para los archivos JSON.

Rendimiento

En la siguiente tabla, se muestran las métricas de rendimiento para leer desde Cloud Storage. Las cargas de trabajo se ejecutaron en un trabajador e2-standard2, con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 millones de registros | 1 KB | 1 columna Capacidad de procesamiento (bytes) Capacidad de procesamiento (elementos)
Leer 320 MBps 320,000 elementos por segundo

Estas métricas se basan en canalizaciones por lotes simples. Están diseñadas para comparar el rendimiento entre los conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de la canalización de Dataflow es complejo y es una función del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDK de lenguaje. Para obtener más información, consulta Rendimiento de E/S de Beam.

Prácticas recomendadas

  • Evita usar watchForNewFiles con Cloud Storage. Este enfoque no se escala bien para canalizaciones de producción grandes, ya que el conector debe mantener una lista de archivos vistos en la memoria. La lista no se puede vaciar de la memoria, lo que reduce la memoria de trabajo de los trabajadores con el tiempo. Considera usar las notificaciones de Pub/Sub para Cloud Storage. Para obtener más información, consulta Patrones de procesamiento de archivos.

  • Si el nombre del archivo y el contenido del archivo son datos útiles, usa la clase FileIO para leer los nombres de archivo. Por ejemplo, un nombre de archivo puede contener metadatos que son útiles cuando se procesan los datos en el archivo. Para obtener más información, consulta Accede a nombres de archivos. En la documentación de FileIO también se muestra un ejemplo de este patrón.

Ejemplo

En el siguiente ejemplo, se muestra cómo leer desde Cloud Storage.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

¿Qué sigue?