Crear un flujo de procesamiento de Dataflow con Go

En esta página se muestra cómo usar el SDK de Apache Beam para Go para crear un programa que defina una canalización. A continuación, ejecuta el flujo de procesamiento de forma local y en el servicio Dataflow. Para obtener una introducción a la canalización WordCount, consulta el vídeo Cómo usar WordCount en Apache Beam.

Antes de empezar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  12. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sustituye PROJECT_ID por el ID del proyecto.
    • Sustituye PROJECT_NUMBER por el número de tu proyecto. Para encontrar el número de tu proyecto, consulta el artículo Identificar proyectos o usa el comando gcloud projects describe.
    • Sustituye SERVICE_ACCOUNT_ROLE por cada rol individual.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Estándar).
    • Define la ubicación de almacenamiento de la siguiente manera: US (Estados Unidos).
    • Sustituye BUCKET_NAME por un nombre de segmento único. No incluyas información sensible en el nombre del segmento, ya que este espacio de nombres es público y visible para todos los usuarios.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copia el Google Cloud ID de proyecto y el nombre del segmento de Cloud Storage. Necesitará estos valores más adelante en esta guía de inicio rápido.
    • Configurar un entorno de desarrollo

      El SDK de Apache Beam es un modelo de programación de código abierto para flujos de datos. Defines un flujo de procesamiento con un programa de Apache Beam y, a continuación, eliges un ejecutor, como Dataflow, para ejecutarlo.

      Te recomendamos que uses la versión más reciente de Go cuando trabajes con el SDK de Apache Beam para Go. Si no tienes instalada la versión más reciente de Go, consulta la guía de descarga e instalación de Go para descargar e instalar Go en tu sistema operativo.

      Para comprobar la versión de Go que has instalado, ejecuta el siguiente comando en tu terminal local:

      go version

      Ejecutar el ejemplo de recuento de palabras de Beam

      El SDK de Apache Beam para Go incluye un wordcountejemplo de flujo de procesamiento. El ejemplo de wordcount hace lo siguiente:

      1. Lee un archivo de texto como entrada. De forma predeterminada, lee un archivo de texto ubicado en un segmento de Cloud Storage con el nombre de recurso gs://dataflow-samples/shakespeare/kinglear.txt.
      2. Analiza cada línea en palabras.
      3. Realiza un recuento de frecuencia de las palabras tokenizadas.

      Para ejecutar la versión más reciente del ejemplo de Beam wordcount en tu máquina local, sigue estos pasos:

      1. Usa el comando git clone para clonar el repositorio de GitHub apache/beam:

        git clone https://github.com/apache/beam.git
      2. Cambia al directorio beam/sdks/go:

        cd beam/sdks/go
      3. Usa el siguiente comando para ejecutar la canalización:

        go run examples/wordcount/wordcount.go \
          --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output outputs

        La marca input especifica el archivo que se va a leer y la marca output especifica el nombre del archivo de salida del recuento de frecuencias.

      Una vez que se haya completado el flujo de trabajo, consulta los resultados:

      more outputs*

      Para salir, pulsa q.

      Modificar el código del flujo de procesamiento

      La canalización de Beam wordcount distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos se muestra cómo crear tu propio módulo de Go, modificar la pipeline wordcount para que no distinga entre mayúsculas y minúsculas, y ejecutarla en Dataflow.

      Crear un módulo de Go

      Para modificar el código de la canalización, sigue estos pasos.

      1. Crea un directorio para tu módulo de Go en la ubicación que quieras:

        mkdir wordcount
        cd wordcount
      2. Crea un módulo de Go. En este ejemplo, usa example/dataflow como ruta del módulo.

        go mod init example/dataflow
      3. Descarga la copia más reciente del código wordcount del repositorio de GitHub de Apache Beam. Coloca este archivo en el directorio wordcount que has creado.

      4. Si usas un sistema operativo que no sea Linux, debes obtener el paquete Go unix. Este paquete es necesario para ejecutar flujos de procesamiento en el servicio Dataflow.

        go get -u golang.org/x/sys/unix
      5. Comprueba que el archivo go.mod coincida con el código fuente del módulo:

        go mod tidy

      Ejecutar el flujo de procesamiento sin modificar

      Verifica que la canalización wordcount sin modificar se ejecuta de forma local.

      1. En el terminal, compila y ejecuta el flujo de procesamiento de forma local:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Consulta los resultados:

         more outputs*
      3. Para salir, pulsa q.

      Cambiar el código de la canalización

      Para cambiar la canalización de forma que no distinga entre mayúsculas y minúsculas, modifica el código para aplicar la función strings.ToLower a todas las palabras.

      1. Abre el archivo wordcount.go en el editor que prefieras.

      2. Examina el bloque init (se han eliminado los comentarios para que sea más claro):

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
         }
        
      3. Añade una línea para registrar la función strings.ToLower:

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
           register.Function1x1(strings.ToLower)
         }
        
      4. Examina la función CountWords:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Count the number of times each word occurs.
           return stats.Count(s, col)
         }
        
      5. Para convertir las palabras a minúsculas, añade un ParDo que aplique strings.ToLower a cada palabra:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Map all letters to lowercase.
           lowercaseWords := beam.ParDo(s, strings.ToLower, col)
        
           // Count the number of times each word occurs.
           return stats.Count(s, lowercaseWords)
         }
        
      6. Guarda el archivo.

      Ejecutar el flujo de procesamiento actualizado de forma local

      Ejecuta la wordcount canalización actualizada de forma local y comprueba que el resultado ha cambiado.

      1. Crea y ejecuta el flujo de procesamiento wordcount modificado:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Consulta los resultados de salida de la canalización modificada. Todas las palabras deben estar en minúsculas.

         more outputs*
      3. Para salir, pulsa q.

      Ejecutar el flujo de procesamiento en el servicio Dataflow

      Para ejecutar el ejemplo wordcount actualizado en el servicio Dataflow, usa el siguiente comando:

      go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://BUCKET_NAME/results/outputs \
          --runner dataflow \
          --project PROJECT_ID \
          --region DATAFLOW_REGION \
          --staging_location gs://BUCKET_NAME/binaries/

      Haz los cambios siguientes:

      • BUCKET_NAME: el nombre del segmento de Cloud Storage.

      • PROJECT_ID: el ID del proyecto. Google Cloud

      • DATAFLOW_REGION: la región en la que quieres desplegar la tarea de Dataflow. Por ejemplo, europe-west1. Para ver una lista de las ubicaciones disponibles, consulta Ubicaciones de Dataflow. La marca --region anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.

      Ver los resultados

      Puedes ver una lista de tus trabajos de Dataflow en laGoogle Cloud consola. En la Google Cloud consola, ve a la página Trabajos de Dataflow.

      Ir a Tareas

      En la página Tareas se muestran los detalles de tu wordcount tarea, incluido el estado, que al principio es En curso y, después, Completada.

      Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un segmento de Cloud Storage. Para ver los resultados, usa laGoogle Cloud consola o el terminal local.

      Consola

      Para ver los resultados en la Google Cloud consola, ve a la página Segmentos de Cloud Storage.

      Ir a Contenedores

      En la lista de segmentos de tu proyecto, haz clic en el segmento de almacenamiento que has creado antes. Los archivos de salida que ha creado tu trabajo se muestran en el directorio results.

      Terminal

      Consulta los resultados desde tu terminal o mediante Cloud Shell.

      1. Para ver una lista de los archivos de salida, usa el comando gcloud storage ls:

        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

        Sustituye BUCKET_NAME por el nombre del segmento de Cloud Storage de salida especificado.

      2. Para ver los resultados en los archivos de salida, usa el comando gcloud storage cat:

        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Limpieza

      Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.

      1. Elimina el segmento:
        gcloud storage buckets delete BUCKET_NAME
      2. Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      Siguientes pasos