En este documento se describe cómo escribir datos de texto de Dataflow en Cloud Storage mediante el TextIO
conector de E/S de Apache Beam.
Incluir la dependencia de la biblioteca de Google Cloud Platform
Para usar el conector TextIO
con Cloud Storage, incluye la siguiente dependencia. Esta biblioteca proporciona un controlador de esquemas para nombres de archivo "gs://"
.
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Python
apache-beam[gcp]==VERSION
Go
import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Para obtener más información, consulta el artículo Instalar el SDK de Apache Beam.
Habilitar gRPC en el conector de E/S de Apache Beam en Dataflow
Puedes conectarte a Cloud Storage mediante gRPC a través del conector de E/S de Apache Beam en Dataflow. gRPC es un framework de llamadas a procedimientos remotos (RPC) de código abierto de alto rendimiento desarrollado por Google que puedes usar para interactuar con Cloud Storage.
Para acelerar las solicitudes de escritura de tu tarea de Dataflow en Cloud Storage, puedes habilitar el conector de E/S de Apache Beam en Dataflow para usar gRPC.
Línea de comandos
- Asegúrate de usar la versión 2.55.0 o una posterior del SDK de Apache Beam.
- Para ejecutar una tarea de Dataflow, usa la opción de
--additional-experiments=use_grpc_for_gcs
. Para obtener información sobre las diferentes opciones de canalización, consulta Indicadores opcionales.
SDK de Apache Beam
- Asegúrate de usar la versión 2.55.0 o una posterior del SDK de Apache Beam.
-
Para ejecutar una tarea de Dataflow, usa la opción de
--experiments=use_grpc_for_gcs
pipeline. Para obtener información sobre las diferentes opciones de canalización, consulta las opciones básicas.
Puedes configurar el conector de entrada/salida de Apache Beam en Dataflow para generar métricas relacionadas con gRPC en Cloud Monitoring. Las métricas relacionadas con gRPC pueden ayudarte a hacer lo siguiente:
- Monitoriza y optimiza el rendimiento de las solicitudes gRPC a Cloud Storage.
- Solucionar y depurar problemas.
- Obtén estadísticas sobre el uso y el comportamiento de tu aplicación.
Para obtener información sobre cómo configurar el conector de E/S de Apache Beam en Dataflow para generar métricas relacionadas con gRPC, consulta Usar métricas del lado del cliente. Si no necesitas recoger métricas para tu caso práctico, puedes inhabilitar la recogida de métricas. Para obtener instrucciones, consulta Inhabilitar las métricas del lado del cliente.
Paralelismo
El paralelismo se determina principalmente por el número de fragmentos. De forma predeterminada, el ejecutor asigna este valor automáticamente. En la mayoría de las canalizaciones, se recomienda usar el comportamiento predeterminado. En este documento, consulta las prácticas recomendadas.
Rendimiento
En la siguiente tabla se muestran las métricas de rendimiento de escritura en Cloud Storage. Las cargas de trabajo se ejecutaron en un e2-standard2
trabajador con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.
100 M de registros | 1 KB | 1 columna | Rendimiento (bytes) | Rendimiento (elementos) |
---|---|---|
Escribir | 130 MB/s | 130.000 elementos por segundo |
Estas métricas se basan en sencillas canalizaciones por lotes. Su objetivo es comparar el rendimiento entre conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de las canalizaciones de Dataflow es complejo y depende del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos, y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDKs de lenguaje. Para obtener más información, consulta Rendimiento de Beam IO.
Prácticas recomendadas
En general, no se recomienda definir un número específico de fragmentos. De esta forma, el corredor puede seleccionar un valor adecuado para tu escala. Para habilitar el particionado automático, llama a
.withAutoSharding()
, no a.withNumShards(0)
. Si ajustas el número de fragmentos, te recomendamos que escribas entre 100 MB y 1 GB por fragmento. Sin embargo, el valor óptimo puede depender de la carga de trabajo.Cloud Storage puede escalar a un número muy elevado de solicitudes por segundo. Sin embargo, si tu canalización tiene picos grandes en el volumen de escritura, considera la posibilidad de escribir en varios segmentos para evitar que se sobrecargue temporalmente un único segmento de Cloud Storage.
En general, escribir en Cloud Storage es más eficiente cuando cada escritura es mayor (1 KB o más). Escribir registros pequeños en un gran número de archivos puede empeorar el rendimiento por byte.
Cuando genere nombres de archivo, considere la posibilidad de usar nombres de archivo no secuenciales para distribuir la carga. Para obtener más información, consulta Usar una convención de nomenclatura que distribuya la carga de forma uniforme entre los intervalos de claves.
Cuando asignes nombres a los archivos, no uses el signo @ seguido de un número o un asterisco (*). Para obtener más información, consulta "@*" y "@N" son especificaciones de fragmentación reservadas.
Ejemplo: escribir archivos de texto en Cloud Storage
En el siguiente ejemplo se crea un flujo de procesamiento por lotes que escribe archivos de texto con compresión GZIP:
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.
Si la entrada PCollection
no está limitada, debes definir una ventana o un
activador en la colección y, a continuación, especificar las escrituras en ventanas llamando a
TextIO.Write.withWindowedWrites
.
Python
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.
En la ruta de salida, especifica una ruta de Cloud Storage que incluya el nombre del segmento y un prefijo de nombre de archivo. Por ejemplo, si especificas gs://my_bucket/output/file
, el conector TextIO
escribe en el segmento de Cloud Storage llamado my_bucket
y los archivos de salida tienen el prefijo output/file*
.
De forma predeterminada, el conector TextIO
fragmenta los archivos de salida con una convención de nomenclatura como esta: <file-prefix>-00000-of-00001
. Si quieres, puedes especificar un sufijo de nombre de archivo y un esquema de compresión, como se muestra en el ejemplo.
Para asegurar las escrituras idempotentes, Dataflow escribe en un archivo temporal y, a continuación, copia el archivo temporal completado en el archivo final.
Para controlar dónde se almacenan estos archivos temporales, usa el método withTempDirectory
.
Siguientes pasos
- Lee la documentación de la API
TextIO
. - Consulta la lista de plantillas proporcionadas por Google.