Crea una canalización de Dataflow con Python

En esta guía de inicio rápido, aprenderás a usar el SDK de Apache Beam para Python a fin de compilar un programa que defina una canalización. Luego, deberás ejecutar la canalización a través de un ejecutor local directo o uno basado en la nube, como Dataflow. Para obtener una introducción a la canalización de WordCount, consulta el video Cómo usar WordCount en Apache Beam.


Para seguir la guía paso a paso sobre esta tarea directamente en la consola de Google Cloud, haz clic en Guiarme:

GUIARME


Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore y Cloud Resource Manager:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  8. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.
  9. Instala Google Cloud CLI.
  10. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  11. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Google Cloud:

      gcloud projects create PROJECT_ID

      Reemplaza PROJECT_ID por un nombre para el proyecto de Google Cloud que estás creando.

    • Selecciona el proyecto de Google Cloud que creaste:

      gcloud config set project PROJECT_ID

      Reemplaza PROJECT_ID por el nombre del proyecto de Google Cloud.

  12. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  13. Habilita las APIs de Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore y Cloud Resource Manager:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  15. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.
  16. Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza PROJECT_NUMBER por el número del proyecto. Para encontrar el número de tu proyecto, consulta Identifica proyectos o usa el comando gcloud projects describe.
    • Reemplaza SERVICE_ACCOUNT_ROLE por cada rol individual.
  17. Crea un bucket de Cloud Storage y configúralo de la siguiente manera:
    • Establece la clase de almacenamiento en S (Estándar).
    • Configura la ubicación de almacenamiento de la siguiente manera: US (Estados Unidos).
    • Reemplaza BUCKET_NAME con un nombre de bucket único. No incluyas información sensible en el nombre del bucket porque su espacio de nombres es global y públicamente visible.
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Copia el ID del proyecto de Google Cloud y el nombre del bucket de Cloud Storage. Necesitarás estos valores más adelante en el documento.

Configure su entorno

En esta sección, usa el símbolo del sistema para configurar un entorno virtual de Python aislado a fin de ejecutar tu proyecto de canalización con venv. Este proceso te permite aislar las dependencias de un proyecto de las dependencias de otros proyectos.

Si no tienes un símbolo del sistema disponible, puedes usar Cloud Shell. Cloud Shell ya tiene el administrador de paquetes para Python 3 instalado, por lo que puedes omitir la creación de un entorno virtual.

Para instalar Python y, luego, crear un entorno virtual, sigue estos pasos:

  1. Verifica que Python 3 y pip se estén ejecutando en el sistema:
    python --version
    python -m pip --version
    
  2. Si es necesario, instala Python 3 y, luego, configura un entorno virtual de Python: sigue las instrucciones proporcionadas en las secciones Instala Python y Configura venv de la página Configura un entorno de desarrollo de Python. Si usas Python 3.10 o una versión posterior, también debes habilitar Dataflow Runner v2. Para usar Runner v1, usa Python 3.9 o versiones anteriores.

Una vez completada la guía de inicio rápido, puedes ejecutar deactivate para desactivar el entorno virtual.

Obtén el SDK de Apache Beam

El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir una canalización con un programa de Apache Beam y, luego, elegir un ejecutor, como Dataflow, para ejecutar tu canalización.

Para descargar y, luego, instalar el SDK de Apache Beam, sigue estos pasos:

  1. Verifica que estés en el entorno virtual de Python que creaste en la sección anterior. Asegúrate de que el mensaje comience con <env_name>, en el que env_name es el nombre del entorno virtual.
  2. Instala el estándar de empaquetado de rueda de Python:
    pip install wheel
    
  3. Instala la versión más reciente del SDK de Apache Beam para Python:
  4. pip install 'apache-beam[gcp]'

    En Microsoft Windows, usa el siguiente comando:

    pip install apache-beam[gcp]

    Según la conexión, la instalación puede tardar un poco.

Ejecute la canalización de forma local:

Si deseas ver cómo se ejecuta una canalización de manera local, usa un módulo de Python listo para el ejemplo wordcount que se incluye en el paquete apache_beam.

La canalización wordcount de ejemplo realiza lo siguiente:

  1. Toma un archivo de texto como entrada.

    Este archivo de texto se encuentra en un bucket de Cloud Storage con el nombre del recurso gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Analiza cada línea en palabras.
  3. Realiza un recuento de frecuencia en las palabras con asignación de token.

Para almacenar en etapa intermedia la canalización wordcount de forma local, sigue estos pasos:

  1. Desde tu terminal local, ejecuta el wordcount de ejemplo:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Visualiza el resultado de la canalización:
    more outputs*
  3. Para salir, presiona q.
Ejecutar la canalización de manera local te permite probar y depurar el programa de Apache Beam. Puedes ver el código fuente wordcount.py en GitHub de Apache Beam.

Ejecuta la canalización en el servicio de Dataflow

En esta sección, ejecuta la canalización de ejemplo wordcount desde el paquete apache_beam en el servicio de Dataflow. En este ejemplo, se especifica DataflowRunner como parámetro para --runner.
  • Ejecuta la canalización:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Reemplaza lo siguiente:

    • DATAFLOW_REGION: Es la región en la que deseas implementar el trabajo de Dataflow, por ejemplo, europe-west1

      La opción --region anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.

    • BUCKET_NAME: Es el nombre del bucket de Cloud Storage que copiaste antes.
    • PROJECT_ID: el ID del proyecto de Google Cloud que copiaste antes

Ve tus resultados

Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un bucket de Cloud Storage. En esta sección, debes verificar que la canalización se esté ejecutando a través de la consola de Google Cloud o la terminal local.

Consola de Google Cloud

Para ver los resultados en la consola de Google Cloud, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Trabajos de Dataflow.

    Ir a Trabajos

    En la página Trabajos, se muestran detalles del trabajo wordcount, incluido un estado En ejecución primero y, luego, Correcto.

  2. Ve a la página Buckets de Cloud Storage:

    Ir a Buckets

  3. En la lista de buckets de tu proyecto, haz clic en el bucket de almacenamiento que creaste antes.

    En el directorio wordcount, se muestran los archivos de salida que creó tu trabajo.

Terminal local

Consulta los resultados desde tu terminal o mediante Cloud Shell.

  1. Para enumerar los archivos de salida, usa el comando gcloud storage ls:
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. Reemplaza BUCKET_NAME por el nombre del bucket de Cloud Storage que se usó en el programa de canalización.

  3. Para ver los resultados en los archivos de salida, usa el comando gcloud storage cat:
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

Modifica el código de canalización

La canalización de wordcount en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos, se muestra cómo modificar la canalización, de modo que la canalización wordcount no distinga entre mayúsculas y minúsculas.
  1. En tu máquina local, descarga la copia más reciente del código wordcount del repositorio de GitHub de Apache Beam.
  2. Desde la terminal local, ejecuta la canalización:
    python wordcount.py --output outputs
  3. Observa los resultados.
    more outputs*
  4. Para salir, presiona q.
  5. En el editor que prefieras, abre el archivo wordcount.py.
  6. Dentro de la función run, examina los pasos de la canalización:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Después de split, las líneas se dividen en palabras como strings.

  7. Para convertir en minúsculas las strings, modifica la línea después de split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Esta modificación asigna la función str.lower a cada palabra. Esta línea es equivalente a beam.Map(lambda word: str.lower(word)).
  8. Guarda el archivo y ejecuta el trabajo wordcount modificado:
    python wordcount.py --output outputs
  9. Visualiza los resultados de la canalización modificada:
    more outputs*
  10. Para salir, presiona q.
  11. Ejecuta la canalización modificada en el servicio de Dataflow:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    Reemplaza lo siguiente:

    • DATAFLOW_REGION: la región en la que deseas implementar el trabajo de Dataflow
    • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
    • PROJECT_ID: es el ID del proyecto de Google Cloud

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.

  1. En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  2. Haz clic en la casilla de verificación del bucket que deseas borrar.
  3. Para borrar el bucket, haz clic en Borrar y sigue las instrucciones.
  4. Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Opcional: Revoca las credenciales de autenticación que creaste y borra el archivo local de credenciales.

    gcloud auth application-default revoke
  6. Opcional: Revoca credenciales desde gcloud CLI.

    gcloud auth revoke

¿Qué sigue?