Migra el esquema y los datos desde Apache Hive

En este documento, se describe cómo migrar tus datos, tu configuración de seguridad y tus canalizaciones de Apache Hive a BigQuery.

También puedes usar la traducción de SQL por lotes para migrar tus secuencias de comandos de SQL de forma masiva o la traducción de SQL interactiva para traducir consultas ad hoc. Apache HiveQL es compatible por completo con ambos servicios de traducción de SQL.

Prepárate para la migración

En las siguientes secciones, se describe cómo recopilar información sobre las estadísticas de tablas, los metadatos y la configuración de seguridad para ayudarte a migrar tu almacén de datos de Hive a BigQuery.

Recopila información de la tabla de origen

Recopila información sobre las tablas de Hive de origen, como la cantidad de filas, la cantidad de columnas, los tipos de datos de las columnas, el tamaño, el formato de entrada de los datos y la ubicación. Esta información es útil en el proceso de migración y para validar la migración de datos. Si tienes una tabla de Hive llamada employees en una base de datos llamada corp, usa los siguientes comandos para recopilar información de la tabla:

# Find the number of rows in the table
hive> SELECT COUNT(*) FROM corp.employees;

# Output all the columns and their data types
hive> DESCRIBE corp.employees;

# Output the input format and location of the table
hive> SHOW CREATE TABLE corp.employees;
Output:
…
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  'hdfs://demo_cluster/user/hive/warehouse/corp/employees'
TBLPROPERTIES (# Get the total size of the table data in bytes
shell> hdfs dfs -du -s TABLE_LOCATION

Conversión de formato de tabla de origen

Algunos de los formatos que admite Hive no se pueden transferir directamente a BigQuery.

Hive admite el almacenamiento de datos en los siguientes formatos:

  • Archivo de texto
  • Archivo RC
  • Archivo de secuencia
  • Archivo Avro
  • Archivo ORC
  • Archivo Parquet

BigQuery es compatible con la carga de datos desde Cloud Storage en cualquiera de los siguientes formatos de archivo:

  • CSV
  • JSON (delimitado por saltos de línea)
  • Avro
  • ORC
  • Parquet

BigQuery puede cargar archivos de datos en formatos Avro, ORC y Parquet directamente sin la necesidad de usar archivos de esquema. En el caso de los archivos de texto sin formato CSV o JSON (delimitado por saltos de línea), puedes copiar los datos en una tabla de Hive en formato Avro o convertir el esquema de la tabla a un Esquema JSONde BigQuery para proporcionar durante la transferencia.

Recopila la configuración del control de acceso de Hive

Hive y BigQuery tienen diferentes mecanismos de control de acceso. Recopila toda la configuración de control de acceso de Hive, como funciones, grupos, miembros y privilegios que se les otorgaron. Asigna un modelo de seguridad en BigQuery a nivel de conjunto de datos y, luego, implementa una LCA detallada. Por ejemplo, un usuario de Hive se puede asignar a una Cuenta de Google, y un grupo HDFS se puede asignar a un Grupo de Google. El acceso se puede establecer a nivel del conjunto de datos. Usa los siguientes comandos para recopilar la configuración de control de acceso en Hive:

# List all the users
> hdfs dfs -ls /user/ | cut -d/ -f3

# Show all the groups that a specific user belongs to
> hdfs groups user_name

# List all the roles
hive> SHOW ROLES;

# Show all the roles assigned to a specific group
hive> SHOW ROLE GRANT GROUP group_name

# Show all the grants for a specific role
hive> SHOW GRANT ROLE role_name;

# Show all the grants for a specific role on a specific object
hive> SHOW GRANT ROLE role_name on object_type object_name;

En Hive, puedes acceder a los archivos HDFS detrás de las tablas directamente si tienes los permisos necesarios. En las tablas estándar de BigQuery, después de cargar los datos en la tabla, los datos se almacenan en el almacenamiento de BigQuery. Puedes leer datos mediante la API de lectura de almacenamiento de BigQuery, pero aún se aplica toda la seguridad a nivel de IAM, de las filas y de las columnas. Si usas tablas externas de BigQuery para consultar los datos en Cloud Storage, IAM también controla el acceso a Cloud Storage.

Puedes crear una tabla de BigLake que te permita usar conectores para consultar los datos con Apache Spark, Trino o Apache Hive. La API de BigQuery Storage aplica políticas de administración a nivel de fila y de columna para todas las tablas de BigLake en Cloud Storage o BigQuery.

Migración de datos

La migración de datos de Hive desde tu clúster de origen local o cualquier otro clúster de origen basado en la nube a BigQuery tiene dos pasos:

  1. Copia datos de un clúster de origen a Cloud Storage
  2. Carga datos de Cloud Storage a BigQuery

En las siguientes secciones, se describe la migración de datos de Hive, la validación de datos migrados y el manejo de la migración de datos transferidos de forma continua. Los ejemplos se escriben para tablas que no son ACID.

Datos de la columna de partición

En Hive, los datos en las tablas particionadas se almacenan en una estructura de directorio. Cada partición de la tabla está asociada con un valor particular de columna de partición. Los archivos de datos no contienen ningún dato de las columnas de partición. Usa el comando SHOW PARTITIONS para enumerar las diferentes particiones en una tabla particionada.

En el siguiente ejemplo, se muestra que la tabla de Hive de origen está particionada en las columnas joining_date y department. Los archivos de datos de esta tabla no contienen datos relacionados con estas dos columnas.

hive> SHOW PARTITIONS corp.employees_partitioned
joining_date="2018-10-01"/department="HR"
joining_date="2018-10-01"/department="Analyst"
joining_date="2018-11-01"/department="HR"

Una forma de copiar estas columnas es convertir la tabla particionada en una tabla no particionada antes de cargarla en BigQuery:

  1. Crear una tabla no particionada con un esquema similar a la tabla particionada.
  2. Carga datos en la tabla no particionada desde la tabla particionada de origen.
  3. Copia estos archivos de datos en la tabla no particionada de almacenamiento en etapa intermedia en Cloud Storage.
  4. Carga los datos en BigQuery con el comando bq load y proporciona el nombre de la columna de partición de tipo TIMESTAMP o DATE, si corresponde, como el argumento time_partitioning_field.

Copiar datos en Cloud Storage

El primer paso en la migración de datos es copiarlos en Cloud Storage. Usa Hadoop DistCp para copiar los datos del clúster local o de otra nube a Cloud Storage. Almacena tus datos en un bucket en la misma región o multirregión que el conjunto de datos en el que deseas almacenar los datos en BigQuery. Por ejemplo, si deseas usar un conjunto de datos de BigQuery existente como destino en la región de Tokio, debes elegir un bucket regional de Cloud Storage en Tokio para conservar los datos.

Después de seleccionar la ubicación del bucket de Cloud Storage, puedes usar el siguiente comando para enumerar todos los archivos de datos presentes en la ubicación de la tabla de Hive employees:

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

Copia todos los archivos anteriores en Cloud Storage:

> hadoop distcp
hdfs://demo_cluster/user/hive/warehouse/corp/employees
gs://hive_data/corp/employees

Ten en cuenta que se te cobra por almacenar los datos en Cloud Storage según los precios de almacenamiento de datos.

Puede haber directorios de etapa de pruebas que contengan archivos intermedios creados para trabajos de consulta. Debes asegurarte de borrar esos directorios antes de ejecutar el comando bq load.

Carga datos

BigQuery es compatible con datos de carga por lotes en muchos formatos desde Cloud Storage. Asegúrate de que el dataset de BigQuery en el que deseas cargar tus datos exista antes de crear un trabajo de carga.

En el siguiente comando, se muestran los datos copiados de Hive para una tabla que no es de ACID:

> gcloud storage ls gs://hive_data/corp/employees/
gs://hive-migration/corp/employees/
gs://hive-migration/corp/employees/000000_0
gs://hive-migration/corp/employees/000001_0
gs://hive-migration/corp/employees/000002_0

Para cargar tus datos de Hive en BigQuery, usa el comando bq load. Puedes usar un carácter comodín * en la URL para cargar datos desde varios archivos que comparten un prefijo de objeto común. Por ejemplo, usa el comando siguiente para cargar todos los archivos que comparten el prefijo gs://hive_data/corp/employees/:

bq load --source_format=AVRO corp.employees gs://hive_data/corp/employees/*

Debido a que los trabajos pueden tardar mucho tiempo en completarse, los puedes ejecutar de forma asíncrona si configuras la marca --sync como False. Cuando se ejecuta el comando bq load, se muestra el ID del trabajo de carga creado, por lo que puedes usar este comando para sondear el estado del trabajo. Estos datos incluyen detalles como el tipo de trabajo, el estado del trabajo y el usuario que ejecutó el trabajo.

Consulta cada estado de trabajo de carga con su ID de trabajo respectivo y verifica si hay algún trabajo que falló con errores. En caso de falla, BigQuery usa un enfoque de “Todos” o “Ninguno” mientras se cargan datos en una tabla. Puedes intentar resolver los errores y volver a crear de forma segura otro trabajo de carga. Si deseas obtener más información, consulta errores de solución de problemas.

Asegúrate de tener suficiente cuota de trabajo de carga por tabla y proyecto. Si excedes la cuota, el trabajo de carga falla con un error quotaExceeded.

Ten en cuenta que no se te cobrará por una operación de carga para cargar datos en BigQuery desde Cloud Storage. Una vez que se cargan los datos en BigQuery, estos quedan sujetos a los precios de almacenamiento de la herramienta. Cuando los trabajos de carga se completan de forma correcta, puedes borrar los archivos restantes en Cloud Storage a fin de evitar que se generen cargos por el almacenamiento de datos redundantes.

Validación

Después de cargar los datos con éxito, puedes validar los datos migrados si comparas la cantidad de filas en las tablas de Hive y BigQuery. Visualiza la información de la tabla para obtener detalles sobre las tablas de BigQuery, como la cantidad de filas, columnas, campos de partición o campos de agrupamiento en clústeres. Para obtener una validación adicional, considera probar la herramienta de validación de datos.

Transferencia continua

Si transfieres datos de forma continua a una tabla de Hive, realiza una migración inicial y, luego, migra solo los cambios de datos incrementales a BigQuery. Es común crear secuencias de comandos que se ejecuten de forma repetida para encontrar y cargar datos nuevos. Hay muchas formas de hacerlo y en las siguientes secciones se describe un enfoque posible.

Puedes realizar un seguimiento del progreso de la migración en una tabla de base de datos de Cloud SQL, que se describe como una tabla de seguimiento en las siguientes secciones. Durante la primera ejecución de la migración, almacena el progreso en la tabla de seguimiento. Para las ejecuciones posteriores de migración, usa la información de la tabla de seguimiento a fin de detectar si se transfirieron datos adicionales y si se pueden migrar a BigQuery.

Selecciona una columna de identificador de tipo INT64, TIMESTAMP o DATE para distinguir los datos incrementales. Esto se conoce como columna incremental.

La siguiente tabla es un ejemplo de una tabla sin particiones que usa un tipo TIMESTAMP para su columna incremental:

+-----------------------------+-----------+-----------+-----------+-----------+
| timestamp_identifier        | column_2  | column_3  | column_4  | column_5  |
+-----------------------------+-----------+-----------+-----------+-----------+
| 2018-10-10 21\:56\:41       |           |           |           |           |
| 2018-10-11 03\:13\:25       |           |           |           |           |
| 2018-10-11 08\:25\:32       |           |           |           |           |
| 2018-10-12 05\:02\:16       |           |           |           |           |
| 2018-10-12 15\:21\:45       |           |           |           |           |
+-----------------------------+-----------+-----------+-----------+-----------+

La siguiente tabla es un ejemplo de una tabla particionada en una columna de tipo DATE partition_column. Tiene una columna incremental de tipo int_identifier de número entero en cada partición.

+---------------------+---------------------+----------+----------+-----------+
| partition_column    | int_identifier      | column_3 | column_4 | column_5  |
+---------------------+---------------------+----------+----------+-----------+
| 2018-10-01          | 1                   |          |          |           |
| 2018-10-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-10-01          | 1000                |          |          |           |
| 2018-11-01          | 1                   |          |          |           |
| 2018-11-01          | 2                   |          |          |           |
| ...                 | ...                 |          |          |           |
| 2018-11-01          | 2000                |          |          |           |
+---------------------+---------------------+----------+----------+-----------+

En las siguientes secciones, se describe la migración de datos de Hive en función de si está particionada o no, o si tiene columnas incrementales.

Tabla no particionada sin columnas incrementales

Si suponemos que no hay compactaciones de archivos en Hive, Hive crea archivos de datos nuevos cuando se transfieren datos nuevos. Durante la primera ejecución, almacena la lista de archivos en la tabla de seguimiento y completa la migración inicial de la tabla de Hive mediante la copia de estos archivos en Cloud Storage y su carga en BigQuery.

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 3 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0

Después de la migración inicial, algunos datos se transfieren en Hive. Solo necesitas migrar estos datos incrementales a BigQuery. En las ejecuciones de migración posteriores, vuelve a enumerar los archivos de datos y compáralos con la información de la tabla de seguimiento para detectar los archivos de datos nuevos que no se migraron.

> hdfs dfs -ls hdfs://demo_cluster/user/hive/warehouse/corp/employees
Found 5 items
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000000_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000001_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000002_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000003_0
hdfs://demo_cluster/user/hive/warehouse/corp/employees/000004_0

En este ejemplo, hay dos archivos nuevos en la ubicación de la tabla. Para migrar los datos, copia estos archivos nuevos en Cloud Storage y cárgalos a la tabla de BigQuery existente.

Tabla no particionada con columnas incrementales

En este caso, puedes usar el valor máximo de las columnas incrementales para determinar si se agregaron datos nuevos. Mientras realizas la migración inicial, consulta la tabla de Hive para recuperar el valor máximo de la columna incremental y almacénalo en la tabla de seguimiento:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2018-12-31 22:15:04

En las ejecuciones posteriores de migración, repite la misma consulta para recuperar el valor máximo actual de la columna incremental y compáralo con el valor máximo anterior de la tabla de seguimiento a fin de verificar si existen datos incrementales:

hive> SELECT MAX(timestamp_identifier) FROM corp.employees;
2019-01-04 07:21:16

Si el valor máximo actual es mayor que el valor máximo anterior, indica que los datos incrementales se transfirieron a la tabla de Hive como en el ejemplo. Para migrar los datos incrementales, crea una tabla de etapa de pruebas y carga solo los datos incrementales en ella.

hive> CREATE TABLE stage_employees LIKE corp.employees;
hive> INSERT INTO TABLE stage_employees SELECT * FROM corp.employees WHERE timestamp_identifier>"2018-12-31 22:15:04" and timestamp_identifier<="2019-01-04 07:21:16"

Para migrar la tabla de etapa de pruebas, enumera los archivos de datos de HDFS, cópialos en Cloud Storage y cárgalos a la tabla de BigQuery existente.

Tabla particionada sin columnas incrementales

La transferencia de datos a una tabla particionada puede crear particiones nuevas, agregar datos incrementales a las particiones existentes o realizar ambas acciones. En esta situación, puedes identificar esas particiones actualizadas, pero no puedes identificar con facilidad qué datos se agregaron a estas particiones existentes, ya que no hay una columna incremental para distinguir. Otra opción es tomar y mantener instantáneas de HDFS, pero las instantáneas generan problemas de rendimiento para Hive, por lo que suele estar inhabilitada.

Mientras migras la tabla por primera vez, ejecuta el comando SHOW PARTITIONS y almacena la información sobre las diferentes particiones en la tabla de seguimiento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

En el resultado anterior, se muestra que la tabla employees tiene dos particiones. A continuación, se proporciona una versión simplificada de la tabla de seguimiento para mostrar cómo se puede almacenar esta información.

partition_information file_path gcs_copy_status gcs_file_path bq_job_id ...
partition_column =2018-10-01
partition_column =2018-11-01

En las ejecuciones de migración posteriores, ejecuta el comando SHOW PARTITIONS de nuevo para enumerar todas las particiones y compararlas con la información de partición de la tabla de seguimiento a fin de verificar si hay particiones nuevas que no se hayan migrado.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

Si se identifican particiones nuevas como en el ejemplo, crea una tabla de etapa de pruebas y carga solo las particiones nuevas en ella desde la tabla de origen. Para migrar la tabla de etapa de pruebas, copia los archivos en Cloud Storage y súbelos a la tabla de BigQuery existente.

Tabla particionada con columnas incrementales

En esta situación, la tabla de Hive está particionada y hay una columna incremental en cada partición. Incrementos de datos transferidos de forma continua en este valor de columna. Aquí tienes la capacidad de migrar las particiones nuevas como se describe en la sección anterior y también puedes migrar datos incrementales que se transfirieron a las particiones existentes.

Cuando migres la tabla por primera vez, almacena los valores mínimo y máximo de la columna incremental en cada partición junto con la información sobre las particiones de la tabla en la tabla de seguimiento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";
1 1000

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-11-01";
1 2000

En el resultado anterior, se muestra que los empleados de la tabla tienen dos particiones y los valores mínimo y máximo de la columna incremental en cada partición. A continuación, se proporciona una versión simplificada de la tabla de seguimiento para mostrar cómo se puede almacenar esta información.

partition_information inc_col_min inc_col_max file_path gcs_copy_status ...
partition_column =2018-10-01 1 1000
partition_column =2018-11-01 1 2000

En las ejecuciones posteriores, ejecuta las mismas consultas para recuperar el valor máximo actual en cada partición y compáralo con el valor máximo anterior de la tabla de seguimiento.

hive> SHOW PARTITIONS corp.employees
partition_column=2018-10-01
partition_column=2018-11-01
partition_column=2018-12-01
partition_column=2019-01-01

hive> SELECT MIN(int_identifier),MAX(int_identifier) FROM corp.employees WHERE partition_column="2018-10-01";

En el ejemplo, se identificaron dos particiones nuevas y se transfirieron algunos datos incrementales en la partición existente partition_column=2018-10-01. Si hay datos incrementales, crea una tabla de etapa de pruebas, carga solo los datos incrementales en la tabla de etapa de pruebas, copia los datos en Cloud Storage y carga los datos en la tabla BigQuery existente.

Configuración de seguridad

BigQuery usa IAM para administrar el acceso a los recursos. Las funciones predefinidas de BigQuery proporcionan acceso detallado a un servicio específico y están diseñadas para admitir casos de uso y patrones de control de acceso comunes. Puedes usar las funciones personalizadas para proporcionar un acceso aún más detallado mediante la personalización de un conjunto de permisos.

Los controles de acceso para tablas y conjuntos de datos especifican las operaciones que los usuarios, grupos y cuentas de servicio pueden realizar en las tablas, vistas y conjuntos de datos. Las vistas autorizadas te permiten compartir resultados de consultas con usuarios y grupos específicos sin darles acceso a los datos de origen subyacentes. Con la seguridad a nivel de fila y la seguridad a nivel de columna, puedes restringir quién puede acceder a qué filas o columnas dentro de una tabla. El enmascaramiento de datos te permite ocultar de manera selectiva los datos de las columnas de grupos de usuarios y permitir el acceso a la columna.

Cuando aplicas controles de acceso, puedes otorgar acceso a los siguientes usuarios y grupos:

  • Usuario por correo electrónico: otorga a una Cuenta de Google individual acceso al conjunto de datos.
  • Grupo por correo electrónico: otorga a todos los miembros de un Grupo de Google acceso al conjunto de datos.
  • Dominio: otorga a todos los usuarios y grupos en un dominio de Google acceso al conjunto de datos.
  • Todos los usuarios autenticados: otorga a todos los titulares de la Cuenta de Google acceso al conjunto de datos (hace que el conjunto de datos sea público).
  • Propietarios del proyecto: otorga a todos los propietarios del proyecto acceso al conjunto de datos.
  • Visualizadores del proyecto: otorga a todos los visualizadores del proyecto acceso al conjunto de datos.
  • Editores del proyecto: otorga a todos los editores del proyecto acceso al conjunto de datos.
  • Vista autorizada: otorga acceso de lectura al conjunto de datos.

Cambios en la canalización de datos

En las siguientes secciones, se analiza cómo cambiar tus canalizaciones de datos cuando migras de Hive a BigQuery.

Sqoop

Si tu canalización existente usa Sqoop a fin de importar datos a HDFS o Hive para su procesamiento, modifica el trabajo a fin de importar datos a Cloud Storage.

Si importas datos a HDFS, elige una de las siguientes opciones:

Si deseas que Sqoop importe datos a Hive que se ejecute en Google Cloud, haz que apunte directamente a la tabla de Hive y usa Cloud Storage como el almacén de Hive en lugar de HDFS. Para ello, configura la propiedad hive.metastore.warehouse.dir en un bucket de Cloud Storage.

Puedes ejecutar tu trabajo de Sqoop sin administrar un clúster de Hadoop mediante Dataproc para enviar trabajos de Sqoop a fin de importar datos a BigQuery.

Spark SQL y HiveQL

El traductor de SQL por lotes o el traductor de SQL interactivo pueden traducir automáticamente tu SQL de Spark o HiveQL a GoogleSQL.

Si no deseas migrar tu SQL de Spark o HiveQL a BigQuery, puedes usar Dataproc o el conector de BigQuery con Apache Spark.

ETL en Hive

Si hay trabajos de ETL existentes en Hive, puedes modificarlos de las siguientes maneras para migrarlos desde Hive:

  • Convierte el trabajo de ETL de Hive en un trabajo de BigQuery con el traductor de SQL por lotes.
  • Usa Apache Spark para leer y escribir en BigQuery mediante el conector de BigQuery. Puedes usar Dataproc para ejecutar tus trabajos de Spark de manera rentable con la ayuda de clústeres efímeros.
  • Vuelve a escribir tus canalizaciones con el SDK de Apache Beam y ejecútalas en Dataflow.
  • Usa SQL de Apache Beam para reescribir tus canalizaciones.

Para administrar tu canalización de ETL, puedes usar Cloud Composer (Apache Airflow) y Plantillas de flujo de trabajo de Dataproc. Cloud Composer proporciona una herramienta para convertir los flujos de trabajo de Oozie en los flujos de trabajo de Cloud Composer.

Dataflow

Si deseas mover tu canalización de ETL de Hive a servicios en la nube completamente administrados, considera escribir tus canalizaciones de datos mediante el SDK de Apache Beam y ejecutarlas en Dataflow.

Dataflow es un servicio administrado para ejecutar canalizaciones de procesamiento de datos. Ejecuta programas escritos con el framework de código abierto Apache Beam. Apache Beam es un modelo de programación unificado que te permite desarrollar canalizaciones de transmisión y por lotes.

Si tus canalizaciones de datos son movimiento de datos estándar, puedes usar plantillas de Dataflow para crear canalizaciones de Dataflow con rapidez sin escribir código. Puedes consultar esta plantilla proporcionada por Google que te permite leer archivos de texto de Cloud Storage, aplicar transformaciones y escribir los resultados en una tabla de BigQuery.

Para simplificar aún más el procesamiento de datos, también puedes probar Beam SQL, que te permite procesar datos mediante instrucciones similares a las de SQL.