Crear un flujo de procesamiento de Dataflow con Python
En este documento se muestra cómo usar el SDK de Apache Beam para Python para crear un programa que defina una canalización. A continuación, ejecuta la canalización mediante un ejecutor local directo o un ejecutor basado en la nube, como Dataflow. Para obtener una introducción a la canalización WordCount, consulta el vídeo Cómo usar WordCount en Apache Beam.
Para seguir las instrucciones paso a paso de esta tarea directamente en la Google Cloud consola, haga clic en Ayúdame:
Antes de empezar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Sustituye
PROJECT_ID
por el ID del proyecto. - Sustituye
PROJECT_NUMBER
por el número de tu proyecto. Para encontrar el número de tu proyecto, consulta el artículo Identificar proyectos o usa el comandogcloud projects describe
. - Sustituye
SERVICE_ACCOUNT_ROLE
por cada rol individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Estándar). -
Define la ubicación de almacenamiento de la siguiente manera:
US
(Estados Unidos). -
Sustituye
BUCKET_NAME
por un nombre de segmento único. No incluyas información sensible en el nombre del segmento, ya que este espacio de nombres es público y visible para todos los usuarios. - Copia el Google Cloud ID de proyecto y el nombre del segmento de Cloud Storage. Necesitará estos valores más adelante en este documento.
- Comprueba que tengas Python 3 y
pip
en tu sistema:python --version python -m pip --version
- Si es necesario, instala Python 3 y, a continuación, configura un entorno virtual de Python. Para ello, sigue las instrucciones que se indican en las secciones Instalar Python y Configurar venv de la página Configurar un entorno de desarrollo de Python.
- Comprueba que estás en el entorno virtual de Python que has creado en la sección anterior.
Asegúrate de que la petición empiece por
<env_name>
, dondeenv_name
es el nombre del entorno virtual. - Instala la versión más reciente del SDK de Apache Beam para Python:
Usa un archivo de texto como entrada.
Este archivo de texto se encuentra en un segmento de Cloud Storage con el nombre de recurso
gs://dataflow-samples/shakespeare/kinglear.txt
.- Analiza cada línea en palabras.
- Realiza un recuento de frecuencia de las palabras tokenizadas.
- En tu terminal local, ejecuta el ejemplo
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Consulta la salida del flujo de procesamiento:
more outputs*
- Para salir, pulsa q.
- Ejecuta el flujo de procesamiento:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Haz los cambios siguientes:
DATAFLOW_REGION
: la región en la que quieres implementar el trabajo de Dataflow. Por ejemplo,europe-west1
La marca
--region
anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.BUCKET_NAME
: el nombre del segmento de Cloud Storage que has copiado antesPROJECT_ID
: el ID de proyecto Google Cloud que has copiado antes
- En la Google Cloud consola, ve a la página Trabajos de Dataflow.
En la página Tareas se muestran los detalles de tu tarea
wordcount
, incluido el estado En curso al principio y, después, Completada. - Ve a la página Segmentos de Cloud Storage.
En la lista de segmentos de tu proyecto, haz clic en el segmento de almacenamiento que has creado antes.
En el directorio
wordcount
, se muestran los archivos de salida que ha creado tu tarea.- Para ver una lista de los archivos de salida, usa el comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Para ver los resultados en los archivos de salida, usa el comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
- En tu máquina local, descarga la copia más reciente del código
wordcount
del repositorio de GitHub de Apache Beam. - En el terminal local, ejecuta la canalización:
python wordcount.py --output outputs
- Consulta los resultados:
more outputs*
- Para salir, pulsa q.
- Abre el archivo
wordcount.py
en el editor que prefieras. - Dentro de la función
run
, examine los pasos de la canalización:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Después de
split
, las líneas se dividen en palabras como cadenas. - Para convertir las cadenas a minúsculas, modifica la línea que aparece después de
split
: Esta modificación asigna la funcióncounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
a cada palabra. Esta línea equivale abeam.Map(lambda word: str.lower(word))
. - Guarda el archivo y ejecuta el trabajo
wordcount
modificado:python wordcount.py --output outputs
- Consulta los resultados de la canalización modificada:
more outputs*
- Para salir, pulsa q.
- Ejecuta el flujo de procesamiento modificado en el servicio Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Haz los cambios siguientes:
DATAFLOW_REGION
: la región en la que quieres desplegar la tarea de DataflowBUCKET_NAME
: nombre del segmento de Cloud StoragePROJECT_ID
: ID de tu proyecto Google Cloud
-
Elimina el segmento:
gcloud storage buckets delete BUCKET_NAME
Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Configurar un entorno
En esta sección, usa la línea de comandos para configurar un entorno virtual de Python aislado con el que ejecutar tu proyecto de flujo de trabajo mediante venv. Este proceso te permite aislar las dependencias de un proyecto de las de otros proyectos.
Si no tienes un símbolo del sistema disponible, puedes usar Cloud Shell. Cloud Shell ya tiene instalado el gestor de paquetes de Python 3, por lo que puedes pasar directamente a crear un entorno virtual.
Para instalar Python y crear un entorno virtual, sigue estos pasos:
Una vez que hayas completado la guía de inicio rápido, puedes desactivar el entorno virtual ejecutando
deactivate
.Obtener el SDK de Apache Beam
El SDK de Apache Beam es un modelo de programación de código abierto para flujos de datos. Defines un flujo de procesamiento con un programa de Apache Beam y, a continuación, eliges un ejecutor, como Dataflow, para ejecutarlo.
Para descargar e instalar el SDK de Apache Beam, sigue estos pasos:
pip install apache-beam[gcp]
Ejecutar el flujo de procesamiento de forma local
Para ver cómo se ejecuta una canalización de forma local, usa un módulo de Python predefinido para el
wordcount
ejemplo que se incluye en el paqueteapache_beam
.El ejemplo de la canalización
wordcount
hace lo siguiente:Para poner en fase la canalización de
wordcount
de forma local, sigue estos pasos:wordcount.py
en GitHub de Apache Beam.Ejecutar el flujo de procesamiento en el servicio Dataflow
En esta sección, ejecuta el flujo de procesamiento de ejemplowordcount
del paqueteapache_beam
en el servicio Dataflow. En este ejemplo, se especificaDataflowRunner
como parámetro de--runner
.Ver los resultados
Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un segmento de Cloud Storage. En esta sección, comprueba que la canalización se está ejecutando mediante la Google Cloud consola o la terminal local.
Google Cloud consola
Para ver los resultados en la consola de Google Cloud , sigue estos pasos:
Terminal local
Consulta los resultados desde tu terminal o mediante Cloud Shell.
Sustituye
BUCKET_NAME
por el nombre del segmento de Cloud Storage que se usa en el programa de la canalización.Modificar el código del flujo de procesamiento
Lawordcount
en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos se muestra cómo modificar la canalización para que no distinga entre mayúsculas y minúsculas.wordcount
Limpieza
Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.
Siguientes pasos
-
Set the storage class to