Crea una canalización de Dataflow con Python
En esta guía de inicio rápido, aprenderás a usar el SDK de Apache Beam para Python a fin de compilar un programa que defina una canalización. Luego, deberás ejecutar la canalización a través de un ejecutor local directo o uno basado en la nube, como Dataflow. Para obtener una introducción a la canalización de WordCount, consulta el video Cómo usar WordCount en Apache Beam.
Para seguir la guía paso a paso sobre esta tarea directamente en la consola de Google Cloud, haz clic en Guiarme:
Antes de comenzar
- Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.
-
Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore y Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
EMAIL_ADDRESS
por tu dirección de correo electrónico. - Reemplaza
ROLE
por cada rol individual.
- Reemplaza
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.
-
Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore y Cloud Resource Manager:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
EMAIL_ADDRESS
por tu dirección de correo electrónico. - Reemplaza
ROLE
por cada rol individual.
- Reemplaza
Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
PROJECT_NUMBER
por el número del proyecto. Para encontrar el número de tu proyecto, consulta Identifica proyectos o usa el comandogcloud projects describe
. - Reemplaza
SERVICE_ACCOUNT_ROLE
por cada rol individual.
-
Crea un bucket de Cloud Storage y configúralo de la siguiente manera:
-
Establece la clase de almacenamiento en
S
(Estándar). -
Configura la ubicación de almacenamiento de la siguiente manera:
US
(Estados Unidos). -
Reemplaza
BUCKET_NAME
con un nombre de bucket único. No incluyas información sensible en el nombre del bucket porque su espacio de nombres es global y públicamente visible.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Establece la clase de almacenamiento en
- Copia el ID del proyecto de Google Cloud y el nombre del bucket de Cloud Storage. Necesitarás estos valores más adelante en el documento.
Configure su entorno
En esta sección, usa el símbolo del sistema para configurar un entorno virtual de Python aislado a fin de ejecutar tu proyecto de canalización con venv. Este proceso te permite aislar las dependencias de un proyecto de las dependencias de otros proyectos.
Si no tienes un símbolo del sistema disponible, puedes usar Cloud Shell. Cloud Shell ya tiene el administrador de paquetes para Python 3 instalado, por lo que puedes omitir la creación de un entorno virtual.
Para instalar Python y, luego, crear un entorno virtual, sigue estos pasos:
- Verifica que Python 3 y
pip
se estén ejecutando en el sistema:python --version python -m pip --version
- Si es necesario, instala Python 3 y, luego, configura un entorno virtual de Python: sigue las instrucciones proporcionadas en las secciones Instala Python y Configura venv de la página Configura un entorno de desarrollo de Python. Si usas Python 3.10 o una versión posterior, también debes habilitar Dataflow Runner v2. Para usar Runner v1, usa Python 3.9 o versiones anteriores.
Una vez completada la guía de inicio rápido, puedes ejecutar deactivate
para desactivar el entorno virtual.
Obtén el SDK de Apache Beam
El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir una canalización con un programa de Apache Beam y, luego, elegir un ejecutor, como Dataflow, para ejecutar tu canalización.
Para descargar y, luego, instalar el SDK de Apache Beam, sigue estos pasos:
- Verifica que estés en el entorno virtual de Python que creaste en la sección anterior.
Asegúrate de que el mensaje comience con
<env_name>
, en el queenv_name
es el nombre del entorno virtual. - Instala el estándar de empaquetado de rueda de Python:
pip install wheel
- Instala la versión más reciente del SDK de Apache Beam para Python:
pip install 'apache-beam[gcp]'
En Microsoft Windows, usa el siguiente comando:
pip install apache-beam[gcp]
Según la conexión, la instalación puede tardar un poco.
Ejecute la canalización de forma local:
Si deseas ver cómo se ejecuta una canalización de manera local, usa un módulo de Python listo para el ejemplo wordcount
que se incluye en el paquete apache_beam
.
La canalización wordcount
de ejemplo realiza lo siguiente:
Toma un archivo de texto como entrada.
Este archivo de texto se encuentra en un bucket de Cloud Storage con el nombre del recurso
gs://dataflow-samples/shakespeare/kinglear.txt
.- Analiza cada línea en palabras.
- Realiza un recuento de frecuencia en las palabras con asignación de token.
Para almacenar en etapa intermedia la canalización wordcount
de forma local, sigue estos pasos:
- Desde tu terminal local, ejecuta el
wordcount
de ejemplo:python -m apache_beam.examples.wordcount \ --output outputs
- Visualiza el resultado de la canalización:
more outputs*
- Para salir, presiona q.
wordcount.py
en GitHub de Apache Beam.
Ejecuta la canalización en el servicio de Dataflow
En esta sección, ejecuta la canalización de ejemplowordcount
desde el paquete apache_beam
en el servicio de Dataflow. En este ejemplo, se especifica DataflowRunner
como parámetro para --runner
.
- Ejecuta la canalización:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Reemplaza lo siguiente:
DATAFLOW_REGION
: Es la región en la que deseas implementar el trabajo de Dataflow, por ejemplo,europe-west1
La opción
--region
anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.BUCKET_NAME
: Es el nombre del bucket de Cloud Storage que copiaste antes.PROJECT_ID
: el ID del proyecto de Google Cloud que copiaste antes
Ve tus resultados
Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un bucket de Cloud Storage. En esta sección, debes verificar que la canalización se esté ejecutando a través de la consola de Google Cloud o la terminal local.
Consola de Google Cloud
Para ver los resultados en la consola de Google Cloud, sigue estos pasos:
- En la consola de Google Cloud, ve a la página Trabajos de Dataflow.
En la página Trabajos, se muestran detalles del trabajo
wordcount
, incluido un estado En ejecución primero y, luego, Correcto. - Ve a la página Buckets de Cloud Storage:
En la lista de buckets de tu proyecto, haz clic en el bucket de almacenamiento que creaste antes.
En el directorio
wordcount
, se muestran los archivos de salida que creó tu trabajo.
Terminal local
Consulta los resultados desde tu terminal o mediante Cloud Shell.
- Para enumerar los archivos de salida, usa el comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Para ver los resultados en los archivos de salida, usa el comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Reemplaza BUCKET_NAME
por el nombre del bucket de Cloud Storage que se usó en el programa de canalización.
Modifica el código de canalización
La canalización dewordcount
en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas.
En los siguientes pasos, se muestra cómo modificar la canalización, de modo que la canalización wordcount
no distinga entre mayúsculas y minúsculas.
- En tu máquina local, descarga la copia más reciente del código
wordcount
del repositorio de GitHub de Apache Beam. - Desde la terminal local, ejecuta la canalización:
python wordcount.py --output outputs
- Observa los resultados.
more outputs*
- Para salir, presiona q.
- En el editor que prefieras, abre el archivo
wordcount.py
. - Dentro de la función
run
, examina los pasos de la canalización:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Después de
split
, las líneas se dividen en palabras como strings. - Para convertir en minúsculas las strings, modifica la línea después de
split
:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Esta modificación asigna la funciónstr.lower
a cada palabra. Esta línea es equivalente abeam.Map(lambda word: str.lower(word))
. - Guarda el archivo y ejecuta el trabajo
wordcount
modificado:python wordcount.py --output outputs
- Visualiza los resultados de la canalización modificada:
more outputs*
- Para salir, presiona q.
- Ejecuta la canalización modificada en el servicio de Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Reemplaza lo siguiente:
DATAFLOW_REGION
: la región en la que deseas implementar el trabajo de DataflowBUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.PROJECT_ID
: es el ID del proyecto de Google Cloud
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.
- En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.
- Haz clic en la casilla de verificación del bucket que deseas borrar.
- Para borrar el bucket, haz clic en Borrar y sigue las instrucciones.
Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.
gcloud auth application-default revoke
-
Opcional: Revoca credenciales desde gcloud CLI.
gcloud auth revoke