Creare una pipeline di elaborazione di BigQuery per il servizio Knative con Eventarc


Questo tutorial mostra come utilizzare Eventarc per creare una pipeline di elaborazione che pianifica query su un set di dati BigQuery pubblico, genera grafici in base ai dati e condivide i link ai grafici tramite email.

Obiettivi

In questo tutorial creerai ed eseguirai il deployment di tre servizi Knative Serving in esecuzione in un cluster Google Kubernetes Engine (GKE) e che ricevono eventi utilizzando Eventarc:

  1. Query Runner: attivato quando i job Cloud Scheduler pubblicano un messaggio in un argomento Pub/Sub; questo servizio utilizza l'API BigQuery per recuperare i dati da un set di dati pubblico sul COVID-19 e salva i risultati in una nuova tabella BigQuery.
  2. Creatore di grafici: attivato quando il servizio di esecuzione delle query pubblica un messaggio in un argomento Pub/Sub; questo servizio genera grafici utilizzando la libreria di tracciamento Python, Matplotlib, e salva i grafici in un bucket Cloud Storage.
  3. Notifier: attivato dai log di controllo quando il servizio di creazione dei grafici archivia un grafico in un bucket Cloud Storage; questo servizio utilizza il servizio email SendGrid per inviare i link dei grafici a un indirizzo email.

Il seguente diagramma mostra l'architettura di alto livello:

Pipeline di elaborazione BigQuery

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  8. Install the Google Cloud CLI.

  9. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  10. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verify that billing is enabled for your Google Cloud project.

  13. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  14. Per Cloud Storage, abilita l'audit logging per i tipi di accesso ai dati ADMIN_READ, DATA_WRITE e DATA_READ.

    1. Leggi il criterio Identity and Access Management (IAM) associato al tuo Google Cloud progetto, alla tua cartella o alla tua organizzazione e archivialo in un file temporaneo:
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. In un editor di testo, apri /tmp/policy.yaml e aggiungi o modifica solo la configurazione del log di controllo nella sezione auditConfigs:

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. Scrivi il nuovo criterio IAM:

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      Se il comando precedente segnala un conflitto con un'altra modifica, allora ripeti questi passaggi, a partire dalla lettura del criterio IAM. Per ulteriori informazioni, consulta Configurare gli audit log di accesso ai dati con l'API.

  15. Imposta i valori predefiniti utilizzati in questo tutorial:
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION

    Sostituisci PROJECT_ID con l'ID progetto.

  16. Creare una chiave API SendGrid

    SendGrid è un provider email basato su cloud che ti consente di inviare email senza dover gestire i server di posta.

    1. Accedi a SendGrid e vai a Impostazioni > Chiavi API.
    2. Fai clic su Crea chiave API.
    3. Seleziona le autorizzazioni per la chiave. Per inviare email, la chiave deve disporre come minimo delle autorizzazioni per l'invio di email.
    4. Fai clic su Salva per creare la chiave.
    5. SendGrid genera una nuova chiave. Si tratta dell'unica copia della chiave, quindi assicurati di copiarla e salvarla per utilizzarla in un secondo momento.

    Crea un cluster GKE

    Crea un cluster con Workload Identity Federation for GKE abilitato in modo che possa accedere ai servizi Google Cloud dalle applicazioni in esecuzione in GKE. Per inoltrare gli eventi utilizzando Eventarc, devi anche utilizzare Workload Identity Federation for GKE.

    1. Crea un cluster GKE per Knative Serving con i componenti aggiuntivi CloudRun, HttpLoadBalancing e HorizontalPodAutoscaling abilitati:

      gcloud beta container clusters create $CLUSTER_NAME \
          --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
          --machine-type=n1-standard-4 \
          --enable-autoscaling --min-nodes=2 --max-nodes=10 \
          --no-issue-client-certificate --num-nodes=2  \
          --logging=SYSTEM,WORKLOAD \
          --monitoring=SYSTEM \
          --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
          --zone us-central1 \
          --release-channel=rapid \
          --workload-pool=$PROJECT_ID.svc.id.goog
      
    2. Attendi qualche minuto per il completamento della creazione del cluster. Durante la procedura, potresti visualizzare avvisi che puoi ignorare tranquillamente. Una volta creato il cluster, l'output è simile al seguente:

      Creating cluster ...done.
      Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
      
    3. Crea un repository standard di Artifact Registry per archiviare l'immagine container Docker:

      gcloud artifacts repositories create REPOSITORY \
          --repository-format=docker \
          --location=$CLUSTER_LOCATION

      Sostituisci REPOSITORY con un nome univoco per il repository.

    Configura il account di servizio GKE

    Configura un account di servizio GKE in modo che funga da account di servizio Compute predefinito.

    1. Crea un binding IAM (Identity and Access Management) tra i service account:

      PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
      
      gcloud iam service-accounts add-iam-policy-binding \
          --role roles/iam.workloadIdentityUser \
          --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
          $PROJECT_NUMBER-compute@developer.gserviceaccount.com
    2. Aggiungi l'annotazione iam.gke.io/gcp-service-account al account di servizio GKE, utilizzando l'indirizzo email del account di servizio di Compute:

      kubectl annotate serviceaccount \
          --namespace default \
          default \
          iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

    Abilita le destinazioni GKE

    Per consentire a Eventarc di gestire le risorse nel cluster GKE, attiva le destinazioni GKE e associa il account di servizio Eventarc ai ruoli richiesti.

    1. Abilita le destinazioni GKE per Eventarc:

      gcloud eventarc gke-destinations init
    2. Al prompt per associare i ruoli richiesti, inserisci y.

      Sono associati i seguenti ruoli:

      • roles/compute.viewer
      • roles/container.developer
      • roles/iam.serviceAccountAdmin

    Crea un account di servizio e associa i ruoli di accesso

    Prima di creare il trigger Eventarc, configura un account di servizio gestito dall'utente e concedigli ruoli specifici in modo che Eventarc possa inoltrare gli eventi Pub/Sub.

    1. Crea un account di servizio chiamato TRIGGER_GSA:

      TRIGGER_GSA=eventarc-bigquery-triggers
      gcloud iam service-accounts create $TRIGGER_GSA
    2. Concedi i ruoli pubsub.subscriber, monitoring.metricWriter e eventarc.eventReceiver al account di servizio:

      PROJECT_ID=$(gcloud config get-value project)
      
      gcloud projects add-iam-policy-binding $PROJECT_ID \
          --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
          --role "roles/pubsub.subscriber"
      
      gcloud projects add-iam-policy-binding $PROJECT_ID \
          --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
          --role "roles/monitoring.metricWriter"
      
      gcloud projects add-iam-policy-binding $PROJECT_ID \
          --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
          --role "roles/eventarc.eventReceiver"

    Crea un bucket Cloud Storage

    Crea un bucket Cloud Storage per salvare i grafici. Assicurati che il bucket e i grafici siano disponibili pubblicamente e nella stessa regione del tuo servizio GKE:

    export BUCKET="$(gcloud config get-value core/project)-charts"
    gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region)
    gcloud storage buckets update gs://${BUCKET} --uniform-bucket-level-access
    gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

    Clona il repository

    Clona il repository GitHub.

    git clone https://github.com/GoogleCloudPlatform/eventarc-samples
    cd eventarc-samples/processing-pipelines

    Esegui il deployment del servizio di notifica

    Dalla directory bigquery/notifier/python, esegui il deployment di un servizio Knative serving che riceve eventi di creazione di grafici e utilizza SendGrid per inviare via email i link ai grafici generati.

    1. Crea ed esegui il push dell'immagine container:

      pushd bigquery/notifier/python
      export SERVICE_NAME=notifier
      docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
      docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
      popd
    2. Esegui il deployment dell'immagine container in Knative serving, passando un indirizzo a cui inviare le email e la chiave API SendGrid:

      export TO_EMAILS=EMAIL_ADDRESS
      export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
      gcloud run deploy ${SERVICE_NAME} \
          --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
          --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

      Sostituisci quanto segue:

      • EMAIL_ADDRESS: un indirizzo email a cui inviare i link ai grafici generati
      • YOUR_SENDGRID_API_KEY: la chiave API SendGrid che hai annotato in precedenza

    Quando vedi l'URL del servizio, il deployment è completato.

    Crea un trigger per il servizio di notifica

    Il trigger Eventarc per il servizio di notifica di cui è stato eseguito il deployment su Knative serving filtra gli audit log di Cloud Storage in cui methodName è storage.objects.create.

    1. Crea il trigger:

      gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
          --destination-gke-cluster=$CLUSTER_NAME \
          --destination-gke-location=$CLUSTER_LOCATION \
          --destination-gke-namespace=default \
          --destination-gke-service=$SERVICE_NAME \
          --destination-gke-path=/ \
          --event-filters="type=google.cloud.audit.log.v1.written" \
          --event-filters="serviceName=storage.googleapis.com" \
          --event-filters="methodName=storage.objects.create" \
          --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

      Viene creato un trigger denominato trigger-notifier-gke.

    Esegui il deployment del servizio di creazione dei grafici

    Dalla directory bigquery/chart-creator/python, esegui il deployment di un servizio Knative Serving che riceve eventi di esecuzione di query, recupera i dati da una tabella BigQuery per un paese specifico e poi genera un grafico utilizzando Matplotlib dai dati. Il grafico viene caricato in un bucket Cloud Storage.

    1. Crea ed esegui il push dell'immagine container:

      pushd bigquery/chart-creator/python
      export SERVICE_NAME=chart-creator
      docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
      docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
      popd
    2. Esegui il deployment dell'immagine container in Knative serving, passando BUCKET:

      gcloud run deploy ${SERVICE_NAME} \
          --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
          --update-env-vars BUCKET=${BUCKET}

    Quando vedi l'URL del servizio, il deployment è completato.

    Crea un trigger per il servizio di creazione di grafici

    Il trigger Eventarc per il servizio di creazione di grafici di cui è stato eseguito il deployment su Knative serving filtra i messaggi pubblicati in un argomento Pub/Sub.

    1. Crea il trigger:

      gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
          --destination-gke-cluster=$CLUSTER_NAME \
          --destination-gke-location=$CLUSTER_LOCATION \
          --destination-gke-namespace=default \
          --destination-gke-service=$SERVICE_NAME \
          --destination-gke-path=/ \
          --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
          --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

      Viene creato un trigger denominato trigger-chart-creator-gke.

    2. Imposta la variabile di ambiente dell'argomento Pub/Sub.

      export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

    Esegui il deployment del servizio di esecuzione delle query

    Dalla directory processing-pipelines, implementa un servizio Knative che riceve eventi Cloud Scheduler, recupera i dati da un set di dati pubblico sul COVID-19 e salva i risultati in una nuova tabella BigQuery.

    1. Crea ed esegui il push dell'immagine container:

      export SERVICE_NAME=query-runner
      docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
      docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    2. Esegui il deployment dell'immagine container in Knative serving, passando PROJECT_ID e TOPIC_QUERY_COMPLETED:

      gcloud run deploy ${SERVICE_NAME} \
          --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
          --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

    Quando vedi l'URL del servizio, il deployment è completato.

    Crea un trigger per il servizio Query Runner

    Il trigger Eventarc per il servizio di esecuzione delle query di cui è stato eseguito il deployment su Knative Serving filtra i messaggi pubblicati in un argomento Pub/Sub.

    1. Crea il trigger:

      gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
          --destination-gke-cluster=$CLUSTER_NAME \
          --destination-gke-location=$CLUSTER_LOCATION \
          --destination-gke-namespace=default \
          --destination-gke-service=$SERVICE_NAME \
          --destination-gke-path=/ \
          --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
          --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

      Viene creato un trigger denominato trigger-query-runner-gke.

    2. Imposta una variabile di ambiente per l'argomento Pub/Sub.

      export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

    Pianifica i job

    La pipeline di elaborazione viene attivata da due job Cloud Scheduler.

    1. Crea un'app App Engine richiesta da Cloud Scheduler e specifica una località appropriata (ad esempio, europe-west):

      export APP_ENGINE_LOCATION=LOCATION
      gcloud app create --region=${APP_ENGINE_LOCATION}
    2. Crea due job Cloud Scheduler che pubblicano in un argomento Pub/Sub una volta al giorno:

      gcloud scheduler jobs create pubsub cre-scheduler-uk \
          --schedule="0 16 * * *" \
          --topic=${TOPIC_QUERY_SCHEDULED} \
          --message-body="United Kingdom"
      gcloud scheduler jobs create pubsub cre-scheduler-cy \
          --schedule="0 17 * * *" \
          --topic=${TOPIC_QUERY_SCHEDULED} \
          --message-body="Cyprus"

      La pianificazione è specificata nel formato cron Unix. Ad esempio, 0 16 * * * significa che i job vengono eseguiti alle 16:00 (4 PM) UTC ogni giorno.

    esegui la pipeline.

    1. Verifica che tutti i trigger siano stati creati correttamente:

      gcloud eventarc triggers list

      L'output dovrebbe essere simile al seguente:

      NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
      trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
      trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
      trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
      
    2. Recupera gli ID job Cloud Scheduler:

      gcloud scheduler jobs list

      L'output dovrebbe essere simile al seguente:

      ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
      cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
      cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
      
    3. Anche se i job sono pianificati per essere eseguiti ogni giorno alle 16:00 e alle 17:00, puoi anche eseguirli manualmente:

      gcloud scheduler jobs run cre-scheduler-cy
      gcloud scheduler jobs run cre-scheduler-uk
    4. Dopo qualche minuto, verifica che nel bucket Cloud Storage siano presenti due grafici:

      gcloud storage ls gs://${BUCKET}

      L'output dovrebbe essere simile al seguente:

      gs://PROJECT_ID-charts/chart-cyprus.png
      gs://PROJECT_ID-charts/chart-unitedkingdom.png
      

    Complimenti! Dovresti ricevere anche due email con i link ai grafici.

    Esegui la pulizia

    Se hai creato un nuovo progetto per questo tutorial, eliminalo. Se hai utilizzato un progetto esistente e vuoi conservarlo senza le modifiche aggiunte in questo tutorial, elimina le risorse create per il tutorial.

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Eliminare le risorse del tutorial

    1. Elimina tutti i servizi Knative serving di cui hai eseguito il deployment in questo tutorial:

      gcloud run services delete SERVICE_NAME

      Dove SERVICE_NAME è il nome del servizio che hai scelto.

      Puoi anche eliminare i servizi Knative Serving dalla console.Google Cloud

    2. Elimina tutti i trigger Eventarc che hai creato in questo tutorial:

      gcloud eventarc triggers delete TRIGGER_NAME
      

      Sostituisci TRIGGER_NAME con il nome del trigger.

    3. Rimuovi tutte le configurazioni predefinite di Google Cloud CLI che hai aggiunto durante la configurazione del tutorial.

      gcloud config unset project
      gcloud config unset run/cluster
      gcloud config unset run/cluster_location
      gcloud config unset run/platform
      gcloud config unset eventarc/location
      gcloud config unset compute/zone
    4. Elimina le immagini da Artifact Registry.

      gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
      gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
      gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
    5. Elimina il bucket e tutti gli oggetti al suo interno:

      gcloud storage rm --recursive gs://${BUCKET}/
    6. Elimina i job Cloud Scheduler:

      gcloud scheduler jobs delete cre-scheduler-cy
      gcloud scheduler jobs delete cre-scheduler-uk

    Passaggi successivi