En esta página, se describe cómo usar DataflowTemplateOperator
para iniciar canalizaciones de Dataflow desde Cloud Composer. La canalización de Cloud Storage Text a BigQuery es una canalización por lotes que te permite subir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones, y enviar los resultados a BigQuery.
Antes de iniciar el flujo de trabajo, necesitaremos crear las siguientes entidades:
Una tabla de BigQuery vacía con un conjunto de datos vacío que contendrá las siguientes columnas de información:
location
,average_temperature
,month
y, de forma opcional,inches_of_rain
,is_current
ylatest_measurement
.Un archivo JSON que normalizará los datos del archivo
.txt
al formato correcto para el esquema de nuestra tabla de BigQuery. El objeto JSON tendrá un arreglo deBigQuery Schema
, en el que cada objeto contendrá un nombre de columna, un tipo de entrada y si es o no un campo obligatorio.Un archivo
.txt
de entrada que contendrá los datos que deseamos subir por lotes en nuestra tabla de BigQuery.Una función definida por el usuario escrita en JavaScript que transformará cada línea del archivo
.txt
en las variables relevantes para nuestra tabla.Un archivo de grafo acíclico dirigido (DAG) que apunta a la ubicación de los archivos mencionados antes.
A continuación, subiremos el archivo
.txt
, el archivo de UDF.js
y el archivo de esquema.json
a un depósito de Storage. También subiremos el DAG a nuestro entorno de Cloud Composer.Una vez que se suba el DAG, se iniciará una tarea de Airflow. La tarea iniciará una canalización de Cloud Dataflow que aplicará la función definida por el usuario a nuestro archivo
.txt
, y la formateará según el esquema JSON.Por último, los datos se subirán a la tabla de BigQuery que creamos antes.
Costos
En este instructivo, se usan los componentes facturables de Google Cloud, incluidos los siguientes:
- Cloud Composer
- Dataflow
- Cloud Storage
- BigQuery
Prerequisites
- Asegúrate de haber creado un entorno de Cloud Composer.
- La versión mínima requerida de Cloud Composer es la 1.9.0. Para verificar la versión de la imagen, consulta los detalles del entorno.
- En este instructivo, se requiere estar familiarizado con JavaScript para escribir la función definida por el usuario.
-
Habilita las API de Cloud Composer, Dataflow, Cloud Storage, BigQuery.
Configura tu entorno
Crea una tabla de BigQuery vacía con una definición de esquema
Primero, crearás una tabla de BigQuery con una definición de esquema. Usarás esta definición de esquema más adelante en este instructivo. En esta tabla de BigQuery, se incluirán los resultados de la carga por lotes.
Para crear una tabla vacía con una definición de esquema, haz lo siguiente:
Console
En Console, vaya a la página de BigQuery.
En el panel de navegación, en la sección Recursos, expande tu proyecto.
En el lado derecho de la ventana, en el panel de detalles, haz clic en Crear conjunto de datos.
- En la página Crear conjunto de datos, en la sección ID del conjunto de datos, asigna el nombre
average_weather
al conjunto de datos. Deja todos los demás campos con su estado predeterminado.
Haz clic en Create dataset.
Regresa al panel de navegación, en la sección Recursos, expande tu proyecto. Luego, haz clic en el conjunto de datos
average_weather
.En el lado derecho de la ventana, en el panel de detalles, haz clic en Crear tabla (Create table).
En la página Crear tabla, en la sección Origen, selecciona Tabla vacía.
En la página Crear tabla, en la sección Destino, haz lo siguiente:
En Nombre del conjunto de datos, selecciona el conjunto de datos
average_weather
.En el campo Nombre de la tabla, ingresa el nombre
average_weather
.Verifica que Tipo de tabla esté establecido en Tabla nativa.
En la sección Esquema, ingresa la definición del esquema.
Ingresa la información del esquema de forma manual de la siguiente manera:
Habilita Editar como texto y, luego, ingresa el esquema de la tabla como un arreglo JSON. Escribe los siguientes campos para esta opción:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Usa Agregar campo para ingresar el esquema de forma manual.
En Configuración de partición y agrupamiento en clústeres, deja los valores predeterminados:
No partitioning
.En la sección Opciones avanzadas, en Encriptación, deja el valor predeterminado:
Google-managed key
. De forma predeterminada, Compute Engine encripta el contenido en reposo del cliente.Haga clic en Create table.
bq
Usa el comando bq mk
con la marca --location
para crear un conjunto de datos vacío.
Reemplaza PROJECT_ID por tu ID del proyecto y LOCATION por tu ubicación preferida. Recomendamos elegir la misma región en la que se encuentra tu entorno de Composer para minimizar la latencia.
Copia el siguiente comando para crear un conjunto de datos del clima global promedio:
bq --location=LOCATION mk \ --dataset \ PROJECT_ID:average_weather
Para crear una tabla vacía en este conjunto de datos con nuestra definición de esquema, reemplaza PROJECT_ID por el ID del proyecto en el comando a continuación y, luego, ingrésalo en la terminal:
bq mk \ --table \ PROJECT_ID:average_weather.average_weather \ location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Una vez que se crea la tabla, puedes actualizar el vencimiento, la descripción y las etiquetas. También puedes modificar la definición de esquema.
Python
Antes de ejecutar la muestra, asegúrate de ejecutar el siguiente comando para instalar la biblioteca en tu entorno:
pip install google.cloud.bigquery
Guarda este código como dataflowtemplateoperator_create_dataset_and_table_helper.py
y actualiza las variables en él para reflejar tu proyecto y ubicación; luego, ejecútalo con el siguiente comando:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.
Cree un bucket de almacenamiento
A continuación, deberás crear un bucket de almacenamiento que contenga todos los archivos necesarios para el flujo de trabajo. El DAG que crees en el futuro hará referencia a los archivos que subiste a este bucket de almacenamiento. Para crear un bucket de almacenamiento nuevo, sigue estos pasos:
Console
Abre Cloud Storage en la consola.
Haz clic en Crear depósito para abrir el formulario de creación de depósitos.
Ingresa la información del depósito y haz clic en Continuar para completar cada paso:
Especifica un Nombre único a nivel global para tu depósito (se le hará referencia como bucketName en el resto del instructivo).
Selecciona Región para el tipo de ubicación. Luego, selecciona una Ubicación en la que se almacenarán los datos del depósito de forma permanente.
Selecciona Estándar como la clase de almacenamiento predeterminada para tus datos.
Selecciona el control de acceso Uniforme para acceder a los objetos.
Haga clic en Done
gsutil
- Usa el comando
gsutil mb
:gsutil mb gs://bucketName/
Muestras de código
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de C# para Compute Engine.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go de la Guía de inicio rápido de Compute Engine: Usa las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Compute Engine para Go.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Ruby.
Crea un esquema de BigQuery con formato JSON para la tabla de salida
Crea un archivo de esquema de BigQuery con formato JSON que coincida con la tabla de salida que creaste antes. Ten en cuenta que los nombres, tipos y modos de campo deben coincidir con los definidos antes en el esquema de tu tabla de BigQuery. Este archivo normalizará los datos del archivo .txt
en un formato compatible con el esquema de BigQuery. Asígnale un nombre a este filtro jsonSchema.json
.
{ "BigQuery Schema": [ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" }] }
Crea un archivo JavaScript(.js
) para el formato de tus datos
En este archivo, definirás tu UDF (función definida por el usuario) que proporciona la lógica para transformar las líneas de texto en el archivo de entrada. Ten en cuenta que esta función toma cada línea de texto en tu archivo de entrada como su propio argumento, por lo que la función se ejecutará una vez por cada línea de tu archivo de entrada. Asígnale un nombre a este filtro transformCSVtoJSON.js
.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la Guía de inicio rápido de Compute Engine: Usa las bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Compute Engine para Node.js.
Crea tu archivo de entrada
Este archivo contendrá la información que deseas subir a la tabla de BigQuery.
Copia este archivo de forma local y asígnale el nombre inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16 POINT(41.8781 87.6298),23,'October',13,false,2015-02-13 POINT(48.8566 2.3522),80,'December',null,true,null POINT(6.5244 3.3792),15,'March',14,true,null
Sube los archivos a tu bucket de Storage y crea una carpeta de etapa de pruebas
Sube los siguientes archivos al bucket de Storage que creaste antes:
- Esquema de BigQuery con formato JSON (
.json
) - Función definida por el usuario de JavaScript (
transformCSVtoJSON.js
) El archivo de entrada del texto que deseas procesar (
.txt
)
Console
- En Google Cloud Console, ve a la página Navegador de Cloud Storage.
En la lista de depósitos, haz clic en el depósito bucketName.
En la pestaña Objetos para el depósito, realiza una de las siguientes dos acciones:
Arrastra y suelta los archivos deseados desde tu escritorio o administrador de archivos en el panel principal de la consola.
Haz clic en el botón Subir archivos, selecciona los archivos que deseas subir en el cuadro de diálogo que aparece y haz clic en Abrir.
gsutil
Usa el comando gsutil cp
:
gsutil cp [OBJECT_LOCATION] gs://bucketName
Donde:
[OBJECT_LOCATION]
es la ruta de acceso local a tu objeto. Por ejemplo,Desktop/dog.png
.[bucketName]
es el nombre de depósito único a nivel global que creaste antes.
Si es correcto, la respuesta se parece al siguiente ejemplo:
Operation completed over 1 objects/58.8 KiB.
Muestras de código
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Ruby.
Configuración de DataflowTemplateOperator
Antes de ejecutar la muestra, asegúrate de configurar las variables de entorno apropiadas.
Puedes hacerlo con gcloud
o la IU de Airflow:
gcloud
Ingresa los siguientes comandos:
CLI 1.10 de Airflow
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set project_id PROJECT_ID
En el ejemplo anterior, se ilustra lo siguiente:
ENVIRONMENT
es el nombre del entorno de Cloud Composer.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.PROJECT_ID
es tu ID del proyecto de Google Cloud.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set gce_zone GCE_ZONE
Donde:
GCE_ZONE
es tu zona de Compute Engine. Obtén más información sobre las diferencias entre zonas y regiones.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set bucket_path BUCKET_PATH
En el ejemplo anterior, se ilustra lo siguiente:
BUCKET_PATH
es la ubicación del depósito de Cloud Storage que creaste antes.
CLI 2.0 de Airflow
gcloud beta composer environments run ENVIRONMENT \ --location LOCATION \ variables set -- \ project_id PROJECT_ID
En el ejemplo anterior, se ilustra lo siguiente:
ENVIRONMENT
es el nombre del entorno de Cloud Composer.LOCATION
es la región en la que se encuentra el entorno de Cloud Composer.PROJECT_ID
es tu ID del proyecto de Google Cloud.
gcloud beta composer environments run ENVIRONMENT \ --location LOCATION \ variables set -- \ gce_zone GCE_ZONE
Donde:
GCE_ZONE
es tu zona de Compute Engine. Obtén más información sobre las diferencias entre zonas y regiones.
gcloud beta composer environments run ENVIRONMENT \ --location LOCATION \ variables set -- \ bucket_path BUCKET_PATH
En el ejemplo anterior, se ilustra lo siguiente:
BUCKET_PATH
es la ubicación del depósito de Cloud Storage que creaste antes.
IU de Airflow
En la barra de herramientas, haz clic en Administrador > Variables.
Haga clic en Crear.
Ingresa la siguiente información:
- Key:
project_id
- Val: PROJECT_ID es el ID de tu proyecto de Google Cloud.
- Key:
Haz clic en Guardar y agregar otro.
Ingresa la siguiente información:
- Key:
bucket_path
- Val: BUCKET_PATH es la ubicación de tu depósito de Cloud Storage (p. ej., “gs://my-bucket”).
- Key:
Haz clic en Guardar y agregar otro.
Ingresa la siguiente información:
- Key:
gce_zone
- Val:
GCE_ZONE
es tu zona de Compute Engine. Obtén más información sobre las diferencias entre zonas y regiones.
- Key:
Haz clic en Guardar.
Ahora harás referencia a los archivos que creaste antes para crear un DAG que inicie el flujo de trabajo de Dataflow. Copia este DAG y guárdalo de forma local como composer-dataflow-dag.py
.
Airflow 2
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.
Airflow 1
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la Guía de inicio rápido de Compute Engine: usa bibliotecas cliente. Si quieres obtener más información, consulta la Documentación de referencia de la API de Compute Engine para Python.
Sube el DAG a Cloud Storage
Sube tu DAG a tu carpeta de entorno. Una vez que la carga se haya completado de forma correcta, deberías poder verla si haces clic en el vínculo Carpeta de DAG en la página Entornos de Cloud Composer.
Visualiza el estado de una tarea
- Ve a la interfaz web de Airflow.
- En la página de los DAG, haz clic en el nombre del DAG (p. ej.,
composerDataflowDAG
). - En la página de detalles de los DAG, haz clic en Graph View.
Verifica el estado:
Fallida: La tarea tiene un cuadro rojo a su alrededor. También puedes mantener el puntero sobre la tarea y buscar Estado: Error.
Después de unos minutos, deberías ver tus resultados en Dataflow y BigQuery.
Visualiza tu trabajo en Dataflow
Ve a la IU web de Dataflow. IR A LA IU WEB DE DATAFLOW
El trabajo recibirá el nombre
dataflow_operator_transform_csv_to_bq
con un ID único al final del nombre con un guion, como el siguiente:.
Haz clic en el nombre para ver los detalles del trabajo. Obtén más información sobre los detalles del trabajo de Dataflow.
Visualiza tus resultados en BigQuery
Ir a la IU web de BigQuery. IR A LA IU WEB DE BIGQUERY
Puede enviar una consulta mediante SQL estándar. Usa la siguiente consulta para ver las filas que se agregaron a su tabla:
SELECT * FROM projectId.average_weather.average_weather
Realice una limpieza
Para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform, puedes borrar los recursos que usaste en este instructivo:
- Borra el entorno de Cloud Composer.
- Borra el depósito de Cloud Storage para el entorno de Cloud Composer. Si borras el entorno de Cloud Composer, no se borra el bucket.
- Detén el trabajo de Dataflow.
- Borra la tabla de BigQuery y el conjunto de datos de BigQuery.