Crear un flujo de procesamiento de Dataflow con Python

En este documento se muestra cómo usar el SDK de Apache Beam para Python para crear un programa que defina una canalización. A continuación, ejecuta la canalización mediante un ejecutor local directo o un ejecutor basado en la nube, como Dataflow. Para obtener una introducción a la canalización WordCount, consulta el vídeo Cómo usar WordCount en Apache Beam.


Para seguir las instrucciones paso a paso de esta tarea directamente en la Google Cloud consola, haga clic en Ayúdame:

Guíame


Antes de empezar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  12. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sustituye PROJECT_ID por el ID del proyecto.
    • Sustituye PROJECT_NUMBER por el número de tu proyecto. Para encontrar el número de tu proyecto, consulta el artículo Identificar proyectos o usa el comando gcloud projects describe.
    • Sustituye SERVICE_ACCOUNT_ROLE por cada rol individual.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Estándar).
    • Define la ubicación de almacenamiento de la siguiente manera: US (Estados Unidos).
    • Sustituye BUCKET_NAME por un nombre de segmento único. No incluyas información sensible en el nombre del segmento, ya que este espacio de nombres es público y visible para todos los usuarios.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copia el Google Cloud ID de proyecto y el nombre del segmento de Cloud Storage. Necesitará estos valores más adelante en este documento.
    • Configurar un entorno

      En esta sección, usa la línea de comandos para configurar un entorno virtual de Python aislado con el que ejecutar tu proyecto de flujo de trabajo mediante venv. Este proceso te permite aislar las dependencias de un proyecto de las de otros proyectos.

      Si no tienes un símbolo del sistema disponible, puedes usar Cloud Shell. Cloud Shell ya tiene instalado el gestor de paquetes de Python 3, por lo que puedes pasar directamente a crear un entorno virtual.

      Para instalar Python y crear un entorno virtual, sigue estos pasos:

      1. Comprueba que tengas Python 3 y pip en tu sistema:
        python --version
        python -m pip --version
      2. Si es necesario, instala Python 3 y, a continuación, configura un entorno virtual de Python. Para ello, sigue las instrucciones que se indican en las secciones Instalar Python y Configurar venv de la página Configurar un entorno de desarrollo de Python.

      Una vez que hayas completado la guía de inicio rápido, puedes desactivar el entorno virtual ejecutando deactivate.

      Obtener el SDK de Apache Beam

      El SDK de Apache Beam es un modelo de programación de código abierto para flujos de datos. Defines un flujo de procesamiento con un programa de Apache Beam y, a continuación, eliges un ejecutor, como Dataflow, para ejecutarlo.

      Para descargar e instalar el SDK de Apache Beam, sigue estos pasos:

      1. Comprueba que estás en el entorno virtual de Python que has creado en la sección anterior. Asegúrate de que la petición empiece por <env_name>, donde env_name es el nombre del entorno virtual.
      2. Instala la versión más reciente del SDK de Apache Beam para Python:
      3. pip install apache-beam[gcp]

      Ejecutar el flujo de procesamiento de forma local

      Para ver cómo se ejecuta una canalización de forma local, usa un módulo de Python predefinido para el wordcount ejemplo que se incluye en el paquete apache_beam.

      El ejemplo de la canalización wordcount hace lo siguiente:

      1. Usa un archivo de texto como entrada.

        Este archivo de texto se encuentra en un segmento de Cloud Storage con el nombre de recurso gs://dataflow-samples/shakespeare/kinglear.txt.

      2. Analiza cada línea en palabras.
      3. Realiza un recuento de frecuencia de las palabras tokenizadas.

      Para poner en fase la canalización de wordcount de forma local, sigue estos pasos:

      1. En tu terminal local, ejecuta el ejemplo wordcount:
        python -m apache_beam.examples.wordcount \
          --output outputs
      2. Consulta la salida del flujo de procesamiento:
        more outputs*
      3. Para salir, pulsa q.
      Ejecutar el flujo de procesamiento de forma local te permite probar y depurar tu programa de Apache Beam. Puedes consultar el código fuente de wordcount.py en GitHub de Apache Beam.

      Ejecutar el flujo de procesamiento en el servicio Dataflow

      En esta sección, ejecuta el flujo de procesamiento de ejemplo wordcount del paquete apache_beam en el servicio Dataflow. En este ejemplo, se especifica DataflowRunner como parámetro de --runner.
      • Ejecuta el flujo de procesamiento:
        python -m apache_beam.examples.wordcount \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Haz los cambios siguientes:

        • DATAFLOW_REGION: la región en la que quieres implementar el trabajo de Dataflow. Por ejemplo, europe-west1

          La marca --region anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.

        • BUCKET_NAME: el nombre del segmento de Cloud Storage que has copiado antes
        • PROJECT_ID: el ID de proyecto Google Cloud que has copiado antes

      Ver los resultados

      Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un segmento de Cloud Storage. En esta sección, comprueba que la canalización se está ejecutando mediante la Google Cloud consola o la terminal local.

      Google Cloud consola

      Para ver los resultados en la consola de Google Cloud , sigue estos pasos:

      1. En la Google Cloud consola, ve a la página Trabajos de Dataflow.

        Ir a Tareas

        En la página Tareas se muestran los detalles de tu tarea wordcount, incluido el estado En curso al principio y, después, Completada.

      2. Ve a la página Segmentos de Cloud Storage.

        Ir a Contenedores

      3. En la lista de segmentos de tu proyecto, haz clic en el segmento de almacenamiento que has creado antes.

        En el directorio wordcount, se muestran los archivos de salida que ha creado tu tarea.

      Terminal local

      Consulta los resultados desde tu terminal o mediante Cloud Shell.

      1. Para ver una lista de los archivos de salida, usa el comando gcloud storage ls:
        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
      2. Sustituye BUCKET_NAME por el nombre del segmento de Cloud Storage que se usa en el programa de la canalización.

      3. Para ver los resultados en los archivos de salida, usa el comando gcloud storage cat:
        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Modificar el código del flujo de procesamiento

      La wordcount en los ejemplos anteriores distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos se muestra cómo modificar la canalización para que no distinga entre mayúsculas y minúsculas.wordcount
      1. En tu máquina local, descarga la copia más reciente del código wordcount del repositorio de GitHub de Apache Beam.
      2. En el terminal local, ejecuta la canalización:
        python wordcount.py --output outputs
      3. Consulta los resultados:
        more outputs*
      4. Para salir, pulsa q.
      5. Abre el archivo wordcount.py en el editor que prefieras.
      6. Dentro de la función run, examine los pasos de la canalización:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum))

        Después de split, las líneas se dividen en palabras como cadenas.

      7. Para convertir las cadenas a minúsculas, modifica la línea que aparece después de split:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'lowercase' >> beam.Map(str.lower)
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
        Esta modificación asigna la función str.lower a cada palabra. Esta línea equivale a beam.Map(lambda word: str.lower(word)).
      8. Guarda el archivo y ejecuta el trabajo wordcount modificado:
        python wordcount.py --output outputs
      9. Consulta los resultados de la canalización modificada:
        more outputs*
      10. Para salir, pulsa q.
      11. Ejecuta el flujo de procesamiento modificado en el servicio Dataflow:
        python wordcount.py \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Haz los cambios siguientes:

        • DATAFLOW_REGION: la región en la que quieres desplegar la tarea de Dataflow
        • BUCKET_NAME: nombre del segmento de Cloud Storage
        • PROJECT_ID: ID de tu proyecto Google Cloud

      Limpieza

      Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.

      1. Elimina el segmento:
        gcloud storage buckets delete BUCKET_NAME
      2. Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      Siguientes pasos