Compila una canalización de procesamiento de BigQuery para Knative serving con Eventarc


En este instructivo, se muestra cómo usar Eventarc para compilar una canalización de procesamiento que programe consultas a un conjunto de datos públicos de BigQuery, genere gráficos basados en los datos y comparta vínculos a los gráficos por correo electrónico.

Objetivos

En este instructivo, compilarás e implementarás tres servicios de Knative serving que se ejecutan en un clúster de Google Kubernetes Engine (GKE) y que reciben eventos mediante Eventarc:

  1. Ejecutor de consultas: Se activa cuando los trabajos de Cloud Scheduler publican un mensaje en un tema de Pub/Sub. Este servicio usa la API de BigQuery para recuperar datos de un conjunto de datos públicos sobre el COVID-19 y guardar los resultados en una nueva tabla de BigQuery.
  2. Creador de gráficos: Se activa cuando el servicio del ejecutor de consultas publica un mensaje en un tema de Pub/Sub. Este servicio genera gráficos con la biblioteca de trazado de Python Matplotlib y guarda los gráficos en un bucket de Cloud Storage.
  3. Notificador: Se activa mediante registros de auditoría cuando el servicio Creador de gráficos almacena un gráfico en un bucket de Cloud Storage; este servicio usa el servicio de correo electrónico SendGrid para enviar vínculos de los gráficos a una dirección de correo electrónico.

En el siguiente diagrama, se muestra la arquitectura de alto nivel:

Canalización de procesamiento de BigQuery

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  12. En Cloud Storage, habilita el registro de auditoría para los tipos de acceso a los datos ADMIN_READ, DATA_WRITE y DATA_READ.

    1. Lee la política de Identity and Access Management (IAM) asociada con tu organización, carpeta o proyecto de Google Cloud y almacénala en un archivo temporal:
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. En un editor de texto, abre /tmp/policy.yaml y agrega o cambia solo la configuración del registro de auditoría en la sección auditConfigs:

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. Escribe tu nueva política de IAM:

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      Si el comando anterior informa de un conflicto con otro cambio, repite estos pasos y comienza por la lectura de la política de IAM. Para obtener más información, consulta Configura los registros de auditoría de acceso a los datos con la API.

  13. Establece los valores predeterminados que se usan en este instructivo:
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION

    Reemplaza PROJECT_ID con el ID del proyecto.

Crea una clave de API de SendGrid

SendGrid es un proveedor de correo electrónico basado en la nube que te permite enviar correos electrónicos sin tener que mantener servidores de correo electrónico.

  1. Accede a Sendgrid y ve a Configuración > Claves de API.
  2. Haz clic en Crear clave de API.
  3. Selecciona los permisos de la clave. La clave debe tener como mínimo permisos de envío de correo electrónico para enviar correos electrónicos.
  4. Haz clic en Guardar para crear la clave.
  5. SendGrid genera una clave nueva. Esta es la única copia de la clave, así que asegúrate de copiarla y guardarla para más adelante.

Cree un clúster de GKE

Crea un clúster con Workload Identity Federation for GKE habilitado para que pueda acceder a los servicios de Google Cloud desde aplicaciones que se ejecutan dentro de GKE. También necesitas Workload Identity Federation for GKE para reenviar eventos con Eventarc.

  1. Crea un clúster de GKE para Knative serving con los complementos CloudRun, HttpLoadBalancing y HorizontalPodAutoscaling habilitados:

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. Espera unos minutos a que finalice la creación del clúster. Durante el proceso, es posible que veas advertencias que puedes ignorar de forma segura. Cuando se crea el clúster, el resultado es similar al siguiente:

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Crea un repositorio estándar de Artifact Registry para almacenar tu imagen de contenedor de Docker:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    Reemplaza REPOSITORY por un nombre único para el repositorio.

Configura la cuenta de servicio de GKE

Configura una cuenta de servicio de GKE para que actúe como la cuenta de servicio de procesamiento predeterminada.

  1. Crea una vinculación de Identity and Access Management (IAM) entre las cuentas de servicio:

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. Agrega la anotación iam.gke.io/gcp-service-account a la cuenta de servicio de GKE mediante la dirección de correo electrónico de la cuenta de servicio de procesamiento:

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

Habilita los destinos de GKE

Para permitir que Eventarc administre recursos en el clúster de GKE, habilita los destinos de GKE y vincula la cuenta de servicio de Eventarc con los roles necesarios.

  1. Habilita los destinos de GKE para Eventarc:

    gcloud eventarc gke-destinations init
  2. Cuando se te solicite vincular los roles necesarios, ingresa y.

    Los siguientes roles están vinculados:

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

Crea una cuenta de servicio y vincula los roles de acceso

Antes de crear el activador de Eventarc, configura una cuenta de servicio administrada por el usuario y asígnale roles específicos para que Eventarc pueda reenviar los eventos de Pub/Sub.

  1. Crea una cuenta de servicio llamada TRIGGER_GSA:

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. Otorga las funciones pubsub.subscriber, monitoring.metricWriter y eventarc.eventReceiver a la cuenta de servicio:

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"

Cree un bucket de Cloud Storage

Crea un bucket de Cloud Storage para guardar los gráficos. Asegúrate de que el bucket y los gráficos estén disponibles de forma pública y en la misma región que tu servicio de GKE:

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region) --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

Clona el repositorio

Clona el repositorio de GitHub.

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

Implementa el servicio de notificador

Desde el directorio bigquery/notifier/python, implementa un servicio de Knative serving que reciba eventos del creador de gráficos y use SendGrid para enviar vínculos por correo electrónico a los gráficos generados.

  1. Compila y envía la imagen del contenedor:

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Implementa la imagen de contenedor en Knative serving y pasa una dirección a la que se enviarán los correos electrónicos, junto con la clave de API de SendGrid:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    Reemplaza lo siguiente:

    • EMAIL_ADDRESS: una dirección de correo electrónico para enviar los vínculos a los gráficos generados
    • YOUR_SENDGRID_API_KEY: la clave de API de SendGrid que anotaste antes

Cuando veas la URL del servicio, se completará la implementación.

Crea un activador para el servicio Notificador

El activador de Eventarc para el servicio Notificador implementado en los filtros de Knative serving de los registros de auditoría de Cloud Storage donde el methodName es storage.objects.create.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    Esto crea un activador llamado trigger-notifier-gke.

Implementa el servicio del creador de gráficos

Desde el directorio bigquery/chart-creator/python, implementa un servicio de Knative serving que reciba eventos del ejecutor de consultas, recupera datos de una tabla de BigQuery para un país específico y, luego, genera un gráfico mediante Matplotlib, a partir de los datos. El gráfico se sube a un bucket de Cloud Storage.

  1. Compila y envía la imagen del contenedor:

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. Implementa la imagen de contenedor en Knative serving y pasa BUCKET:

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}

Cuando veas la URL del servicio, se completará la implementación.

Crea un activador para el servicio Creador de gráficos

El activador de Eventarc para el servicio Creador de gráficos implementado en los filtros de Knative serving de los mensajes publicados en un tema de Pub/Sub.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    Esto crea un activador llamado trigger-chart-creator-gke.

  2. Configura la variable de entorno del tema de Pub/Sub.

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

Implementa el servicio del ejecutor de consultas

Desde el directorio processing-pipelines, implementa un servicio de Knative serving que reciba eventos de Cloud Scheduler, recupera datos de un conjunto de datos públicos del COVID-19 y guarda los resultados en una tabla de BigQuery nueva.

  1. Compila y envía la imagen del contenedor:

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  2. Implementa la imagen de contenedor en Knative serving y pasa PROJECT_ID y TOPIC_QUERY_COMPLETED:

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

Cuando veas la URL del servicio, se completará la implementación.

Crea un activador para el servicio Ejecutor de consultas

El activador de Eventarc para el servicio del ejecutor de consultas implementado en los filtros de Knative serving de los mensajes publicados en un tema de Pub/Sub.

  1. Crea el activador:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    Esto crea un activador llamado trigger-query-runner-gke.

  2. Configura una variable de entorno para el tema de Pub/Sub.

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

Programa los trabajos

La canalización de procesamiento se activa mediante dos trabajos de Cloud Scheduler.

  1. Crea una app de App Engine que sea necesaria para Cloud Scheduler y especifica una ubicación adecuada (por ejemplo, europe-west):

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. Crea dos trabajos de Cloud Scheduler que se publiquen en un tema de Pub/Sub una vez al día:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    El programa se especifica en formato unix-cron. Por ejemplo, 0 16 * * * significa que los trabajos se ejecutan a las 16:00 (4 p.m.) UTC todos los días.

Ejecuta la canalización

  1. Confirma que todos los activadores se hayan creado de forma correcta:

    gcloud eventarc triggers list

    El resultado debería ser similar al siguiente ejemplo:

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. Recupera los ID de trabajo de Cloud Scheduler:

    gcloud scheduler jobs list

    El resultado debería ser similar al ejemplo siguiente:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. Aunque los trabajos están programados para ejecutarse a diario a las 4 y 5 p.m., también puedes ejecutar los trabajos de Cloud Scheduler de forma manual:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. Después de unos minutos, confirma que haya dos gráficos en el bucket de Cloud Storage:

    gcloud storage ls gs://${BUCKET}

    El resultado debería ser similar al ejemplo siguiente:

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

¡Felicitaciones! También deberías recibir dos correos electrónicos con vínculos a los gráficos.

Realiza una limpieza

Si creaste un proyecto nuevo para este instructivo, bórralo. Si usaste un proyecto existente y quieres conservarlo sin los cambios que se agregaron en este instructivo, borra los recursos creados para el instructivo.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimina recursos de instructivos

  1. Borra los servicios de Knative serving que implementaste en este instructivo:

    gcloud run services delete SERVICE_NAME

    En el ejemplo anterior, SERVICE_NAME es el nombre del servicio que elegiste.

    También puedes borrar los servicios de Knative serving desde la consola de Google Cloud.

  2. Borra cualquier activador de Eventarc que hayas creado en este instructivo:

    gcloud eventarc triggers delete TRIGGER_NAME
    

    Reemplaza TRIGGER_NAME por el nombre de tu activador.

  3. Quita las opciones de configuración predeterminadas de Google Cloud CLI que agregaste durante la configuración del instructivo.

    gcloud config unset project
    gcloud config unset run/cluster
    gcloud config unset run/cluster_location
    gcloud config unset run/platform
    gcloud config unset eventarc/location
    gcloud config unset compute/zone
  4. Borra las imágenes de Artifact Registry.

    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
  5. Borra el bucket, junto con todos los objetos dentro del bucket:

    gcloud storage rm --recursive gs://${BUCKET}/
  6. Borra los trabajos de Cloud Scheduler:

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk

¿Qué sigue?