Crear un flujo de procesamiento de Dataflow con Go
En esta página se muestra cómo usar el SDK de Apache Beam para Go para crear un programa que defina una canalización. A continuación, ejecuta el flujo de procesamiento de forma local y en el servicio Dataflow. Para obtener una introducción a la canalización WordCount, consulta el vídeo Cómo usar WordCount en Apache Beam.
Antes de empezar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Sustituye
PROJECT_ID
por el ID del proyecto. - Sustituye
PROJECT_NUMBER
por el número de tu proyecto. Para encontrar el número de tu proyecto, consulta el artículo Identificar proyectos o usa el comandogcloud projects describe
. - Sustituye
SERVICE_ACCOUNT_ROLE
por cada rol individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Estándar). -
Define la ubicación de almacenamiento de la siguiente manera:
US
(Estados Unidos). -
Sustituye
BUCKET_NAME
por un nombre de segmento único. No incluyas información sensible en el nombre del segmento, ya que este espacio de nombres es público y visible para todos los usuarios. - Copia el Google Cloud ID de proyecto y el nombre del segmento de Cloud Storage. Necesitará estos valores más adelante en esta guía de inicio rápido.
- Lee un archivo de texto como entrada. De forma predeterminada, lee un archivo de texto ubicado en un segmento de Cloud Storage con el nombre de recurso
gs://dataflow-samples/shakespeare/kinglear.txt
. - Analiza cada línea en palabras.
- Realiza un recuento de frecuencia de las palabras tokenizadas.
Usa el comando
git clone
para clonar el repositorio de GitHubapache/beam
:git clone https://github.com/apache/beam.git
Cambia al directorio
beam/sdks/go
:cd beam/sdks/go
Usa el siguiente comando para ejecutar la canalización:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
La marca
input
especifica el archivo que se va a leer y la marcaoutput
especifica el nombre del archivo de salida del recuento de frecuencias.Crea un directorio para tu módulo de Go en la ubicación que quieras:
mkdir wordcount
cd wordcount
Crea un módulo de Go. En este ejemplo, usa
example/dataflow
como ruta del módulo.go mod init example/dataflow
Descarga la copia más reciente del código
wordcount
del repositorio de GitHub de Apache Beam. Coloca este archivo en el directoriowordcount
que has creado.Si usas un sistema operativo que no sea Linux, debes obtener el paquete Go
unix
. Este paquete es necesario para ejecutar flujos de procesamiento en el servicio Dataflow.go get -u golang.org/x/sys/unix
Comprueba que el archivo
go.mod
coincida con el código fuente del módulo:go mod tidy
En el terminal, compila y ejecuta el flujo de procesamiento de forma local:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Consulta los resultados:
more outputs*
Para salir, pulsa q.
Abre el archivo
wordcount.go
en el editor que prefieras.Examina el bloque
init
(se han eliminado los comentarios para que sea más claro):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Añade una línea para registrar la función
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Examina la función
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Para convertir las palabras a minúsculas, añade un ParDo que aplique
strings.ToLower
a cada palabra:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Guarda el archivo.
Crea y ejecuta el flujo de procesamiento
wordcount
modificado:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Consulta los resultados de salida de la canalización modificada. Todas las palabras deben estar en minúsculas.
more outputs*
Para salir, pulsa q.
BUCKET_NAME
: el nombre del segmento de Cloud Storage.PROJECT_ID
: el ID del proyecto. Google CloudDATAFLOW_REGION
: la región en la que quieres desplegar la tarea de Dataflow. Por ejemplo,europe-west1
. Para ver una lista de las ubicaciones disponibles, consulta Ubicaciones de Dataflow. La marca--region
anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.Para ver una lista de los archivos de salida, usa el comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Sustituye
BUCKET_NAME
por el nombre del segmento de Cloud Storage de salida especificado.Para ver los resultados en los archivos de salida, usa el comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
-
Elimina el segmento:
gcloud storage buckets delete BUCKET_NAME
Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Configurar un entorno de desarrollo
El SDK de Apache Beam es un modelo de programación de código abierto para flujos de datos. Defines un flujo de procesamiento con un programa de Apache Beam y, a continuación, eliges un ejecutor, como Dataflow, para ejecutarlo.
Te recomendamos que uses la versión más reciente de Go cuando trabajes con el SDK de Apache Beam para Go. Si no tienes instalada la versión más reciente de Go, consulta la guía de descarga e instalación de Go para descargar e instalar Go en tu sistema operativo.
Para comprobar la versión de Go que has instalado, ejecuta el siguiente comando en tu terminal local:
go version
Ejecutar el ejemplo de recuento de palabras de Beam
El SDK de Apache Beam para Go incluye un
wordcount
ejemplo de flujo de procesamiento. El ejemplo dewordcount
hace lo siguiente:Para ejecutar la versión más reciente del ejemplo de Beam
wordcount
en tu máquina local, sigue estos pasos:Una vez que se haya completado el flujo de trabajo, consulta los resultados:
more outputs*
Para salir, pulsa q.
Modificar el código del flujo de procesamiento
La canalización de Beam
wordcount
distingue entre palabras en mayúsculas y minúsculas. En los siguientes pasos se muestra cómo crear tu propio módulo de Go, modificar la pipelinewordcount
para que no distinga entre mayúsculas y minúsculas, y ejecutarla en Dataflow.Crear un módulo de Go
Para modificar el código de la canalización, sigue estos pasos.
Ejecutar el flujo de procesamiento sin modificar
Verifica que la canalización
wordcount
sin modificar se ejecuta de forma local.Cambiar el código de la canalización
Para cambiar la canalización de forma que no distinga entre mayúsculas y minúsculas, modifica el código para aplicar la función
strings.ToLower
a todas las palabras.Ejecutar el flujo de procesamiento actualizado de forma local
Ejecuta la
wordcount
canalización actualizada de forma local y comprueba que el resultado ha cambiado.Ejecutar el flujo de procesamiento en el servicio Dataflow
Para ejecutar el ejemplo
wordcount
actualizado en el servicio Dataflow, usa el siguiente comando:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner dataflow \ --project PROJECT_ID \ --region DATAFLOW_REGION \ --staging_location gs://BUCKET_NAME/binaries/
Haz los cambios siguientes:
Ver los resultados
Puedes ver una lista de tus trabajos de Dataflow en laGoogle Cloud consola. En la Google Cloud consola, ve a la página Trabajos de Dataflow.
En la página Tareas se muestran los detalles de tu
wordcount
tarea, incluido el estado, que al principio es En curso y, después, Completada.Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un segmento de Cloud Storage. Para ver los resultados, usa laGoogle Cloud consola o el terminal local.
Consola
Para ver los resultados en la Google Cloud consola, ve a la página Segmentos de Cloud Storage.
En la lista de segmentos de tu proyecto, haz clic en el segmento de almacenamiento que has creado antes. Los archivos de salida que ha creado tu trabajo se muestran en el directorio
results
.Terminal
Consulta los resultados desde tu terminal o mediante Cloud Shell.
Limpieza
Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.
Siguientes pasos
-
Set the storage class to