Daten mithilfe von Workflows aus Cloud Storage in BigQuery laden

Last reviewed 2021-05-12 UTC

In dieser Anleitung wird gezeigt, wie Sie serverlose Workflows mit Workflows, Cloud Functions und Firestore zuverlässig ausführen können, um Rohdaten wie Ereignisprotokolle aus Cloud Storage in BigQuery zu laden. Analyseplattformen haben in der Regel ein Orchestrierungstool, mit dem Daten regelmäßig mithilfe von BigQuery-Jobs in BigQuery geladen und dann mithilfe von SQL-Anweisungen, einschließlich Anweisungen in der BigQuery-Prozedursprache, um Geschäftsmesswerte bereitgestellt werden. Diese Anleitung richtet sich an Entwickler und Architekten, die serverlose, ereignisgesteuerte Pipelines zur Datenverarbeitung erstellen möchten. Es wird davon ausgegangen, dass Sie mit YAML, SQL und Python vertraut sind.

Architektur

Das folgende Diagramm zeigt die allgemeine Architektur einer serverlosen ELT-Pipeline (Extrahieren, Laden und Transformieren), die Workflows verwendet.

Pipeline extrahieren, laden und transformieren.

Betrachten Sie im obigen Diagramm eine Einzelhandelsplattform, die regelmäßig Verkaufsereignisse als Dateien aus verschiedenen Geschäften erfasst und die Dateien anschließend in einen Cloud Storage-Bucket schreibt. Die Ereignisse werden verwendet, um Geschäftsmesswerte durch Importieren und Verarbeiten in BigQuery bereitzustellen. Diese Architektur bietet ein zuverlässiges und serverloses Orchestrierungssystem zum Importieren Ihrer Dateien in BigQuery und ist in die beiden folgenden Module unterteilt:

  • Dateiliste: Verwaltet die Liste der nicht verarbeiteten Dateien, die einem Cloud Storage-Bucket in einer Firestore-Sammlung hinzugefügt wurden. Dieses Modul arbeitet mit einer Cloud Functions-Funktion, die durch das Speicherereignis Objekt finalisieren ausgelöst wird. Dieses wird generiert, wenn dem Cloud Storage-Bucket eine neue Datei hinzugefügt wird. Der Dateiname wird an das Array files der Sammlung mit dem Namen new in Firestore angehängt.
  • Workflow: Führt die geplanten Workflows werden aus. Cloud Scheduler löst einen Workflow aus, der eine Reihe von Schritten gemäß einer YAML-basierten Syntax ausführt, um das Laden zu orchestrieren und dann die Daten durch Aufrufen von Cloud Functions in BigQuery zu transformieren. Die Schritte im Workflow rufen Cloud Functions auf, um die folgenden Aufgaben auszuführen:

    • Erstellen und starten Sie einen BigQuery-Ladejob.
    • Fragen Sie den Ladejobstatus ab.
    • Erstellen und starten Sie den Transformationsabfragejob.
    • Fragen Sie den Status des Transformationsjobs ab.

Durch die Verwendung der Transaktionen zum Verwalten der Liste der neuen Dateien in Firestore wird sichergestellt, dass keine Datei übersehen wird, wenn sie von einem Workflow in BigQuery importiert werden. Separate Ausführungen des Workflows sind idempotent, indem Job-Metadaten und der Status in Firestore gespeichert werden.

Lernziele

  • Erstellen Sie eine Firestore-Datenbank.
  • Richten Sie einen Cloud Functions-Trigger ein, um Dateien zu verfolgen, die dem Cloud Storage-Bucket in Firestore hinzugefügt wurden.
  • Cloud Functions zum Ausführen und Überwachen von BigQuery-Jobs bereitstellen.
  • Einen Workflow bereitstellen und ausführen, um den Prozess zu automatisieren.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  2. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  3. Cloud Build, Cloud Functions, Identity and Access Management, Resource Manager, and Workflows APIs aktivieren.

    Aktivieren Sie die APIs

  4. Rufen Sie die Seite Willkommen auf und notieren Sie sich die Projekt-ID. Sie benötigen sie in einem späteren Schritt.

    Zur Begrüßungsseite

  5. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

Umgebung vorbereiten

Erstellen Sie zum Vorbereiten Ihrer Umgebung eine Firestore-Datenbank, klonen Sie die Codebeispiele aus dem GitHub-Repository, erstellen Sie Ressourcen mit Terraform, bearbeiten Sie die YAML-Datei für Workflows und installieren Sie Anforderungen für den Dateigenerator.

  1. So erstellen Sie eine Firestore-Datenbank:

    1. Rufen Sie in der Google Cloud Console die Seite Firestore auf.

      Firestore aufrufen

    2. Klicken Sie auf Nativen Modus auswählen.

    3. Wählen Sie im Menü Standort auswählen die Region aus, in der Sie die Firestore-Datenbank hosten möchten. Wir empfehlen, eine Region in der Nähe Ihres physischen Standorts auszuwählen.

    4. Klicken Sie auf Datenbank erstellen.

  2. Klonen Sie in Cloud Shell das Quell-Repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Erstellen Sie in Cloud Shell die folgenden Ressourcen mit Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: Ihre Google Cloud-Projekt-ID
    • REGION: Ein bestimmter geografischer Standort von Google Cloud zum Hosten Ihrer Ressourcen, z. B. us-central1
    • ZONE: Ein Standort innerhalb einer Region zum Hosten Ihrer Ressourcen, z. B. us-central1-b

    Es sollte eine Meldung wie die folgende angezeigt werden: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Mit Terraform können Sie Infrastruktur in großem Maßstab sicher und vorhersehbar erstellen, ändern und aktualisieren. Die folgenden Ressourcen werden in Ihrem Projekt erstellt:

    • Dienstkonten mit den erforderlichen Berechtigungen, um einen sicheren Zugriff auf Ihre Ressourcen zu gewährleisten
    • Ein BigQuery-Dataset mit dem Namen serverless_elt_dataset und eine Tabelle namens word_count, um die eingehenden Dateien zu laden
    • Ein Cloud Storage-Bucket mit dem Namen ${project_id}-ordersbucket für das Staging von Eingabedateien
    • Die folgenden fünf Cloud Functions-Funktionen:
      • file_add_handler fügt den Namen der Dateien hinzu, die dem Cloud Storage-Bucket in der Firestore-Sammlung hinzugefügt wurden.
      • create_job erstellt einen neuen BigQuery-Ladejob und verknüpft Dateien in der Firebase-Sammlung mit dem Job.
      • create_query erstellt einen neuen BigQuery-Abfragejob.
      • poll_bigquery_job ruft den Status eines BigQuery-Jobs ab.
      • run_bigquery_job startet einen BigQuery-Job.
  4. Rufen Sie die URLs für die Cloud Functions-Funktionen create_job, create_query, poll_job und run_bigquery_job ab, die Sie im vorherigen Schritt bereitgestellt haben.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    Die Ausgabe sieht in etwa so aus:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Notieren Sie sich diese URLs, da sie bei der Bereitstellung Ihres Workflows benötigt werden.

Workflow erstellen und bereitstellen

  1. Öffnen Sie in Cloud Shell die Quelldatei workflow.yaml für den Workflow:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Ersetzen Sie Folgendes:

    • CREATE_JOB_URL: die URL der Funktion zum Erstellen eines neuen Jobs
    • POLL_BIGQUERY_JOB_URL: die URL der Funktion, die den Status eines laufenden Jobs abfragt
    • RUN_BIGQUERY_JOB_URL: die URL der Funktion zum Starten eines BigQuery-Ladejobs
    • CREATE_QUERY_URL: die URL der Funktion zum Starten eines BigQuery-Abfragejobs
    • BQ_REGION: die BigQuery-Region, in der Daten gespeichert sind, z. B. US
    • BQ_DATASET_TABLE_NAME: der Name der BigQuery-Dataset-Tabelle im Format PROJECT_ID.serverless_elt_dataset.word_count
  2. Stellen Sie die Datei workflow bereit:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Ersetzen Sie Folgendes:

    • WORKFLOW_NAME: der eindeutige Name des Workflows
    • WORKFLOW_REGION: die Region, in der der Workflow bereitgestellt wird, z. B. us-central1
    • WORKFLOW_DESCRIPTION: die Beschreibung des Workflows
  3. Erstellen Sie eine virtuelle Python 3-Umgebung und installieren Sie die Installationsanforderungen für den Dateigenerator:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Zu importierende Dateien generieren

Das Python-Skript gen.py generiert zufällige Inhalte im Avro-Format. Das Schema ist mit der BigQuery-Tabelle word_count identisch. Diese Avro-Dateien werden in den angegebenen Cloud Storage-Bucket kopiert.

Generieren Sie die Dateien in Cloud Shell:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Ersetzen Sie Folgendes:

  • RECORDS_PER_FILE: die Anzahl der Datensätze in einer einzelnen Datei
  • NUM_FILES: die Gesamtanzahl der hochzuladenden Dateien
  • FILE_PREFIX: das Präfix für die Namen der generierten Dateien

Dateieinträge in Firestore ansehen

Wenn die Dateien in Cloud Storage kopiert werden, wird die Cloud Functions-Funktion handle_new_file ausgelöst. Diese Funktion fügt die Dateiliste dem Dateilistenarray im Dokument new in der Firestore-Sammlung jobs hinzu.

Rufen Sie in der Google Cloud Console die Firestore-Seite Daten auf, um die Dateiliste anzusehen.

Zu "Daten"

Liste der Dateien, die der Sammlung hinzugefügt wurden.

Workflow auslösen

Mit Workflows wird eine Reihe von serverlosen Aufgaben aus Google Cloud- und API-Diensten verknüpft. Einzelne Schritte in diesem Workflow werden als Cloud Functions-Funktionen ausgeführt und der Status wird in Firestore gespeichert. Alle Aufrufe von Cloud Functions-Funktionen werden mithilfe des Dienstkontos des Workflows authentifiziert.

Führen Sie den Workflow in Cloud Shell aus:

gcloud workflows execute WORKFLOW_NAME

Das folgende Diagramm zeigt die im Workflow verwendeten Schritte:

Schritte, die im Haupt- und untergeordneten Workflow verwendet werden.

Der Workflow ist in zwei Teile unterteilt: den Hauptworkflow und den untergeordneten Workflow. Der Hauptworkflow übernimmt die Joberstellung und die bedingte Ausführung, während der untergeordnete Workflow einen BigQuery-Job ausführt. Der Workflow führt folgende Vorgänge aus:

  • Die Cloud Functions-Funktion create_job erstellt ein neues Jobobjekt, ruft die Liste der zu Cloud Storage hinzugefügten Dateien aus dem Firestore-Dokument ab und verknüpft die Dateien mit dem Ladejob. Wenn keine Dateien zum Laden vorhanden sind, erstellt die Funktion keinen neuen Job.
  • Die Cloud Functions-Funktion create_query verwendet die auszuführende Abfrage zusammen mit der BigQuery-Region, in der die Abfrage ausgeführt werden soll. Die Funktion erstellt den Job in Firestore und gibt die Job-ID zurück.
  • Die Cloud Functions-Funktion run_bigquery_job ruft die ID des Jobs ab, der ausgeführt werden muss, und ruft dann die BigQuery API auf, um den Job zu senden.
  • Statt auf den Abschluss des Jobs in der Cloud Functions-Funktion zu warten, können Sie den Status des Jobs regelmäßig abfragen.
    • Die Cloud Functions-Funktion poll_bigquery_job gibt den Status des Jobs an. Sie wird so lange wiederholt aufgerufen, bis der Job abgeschlossen ist.
    • Wenn Sie eine Verzögerung zwischen Aufrufen der Cloud Functions-Funktion poll_bigquery_job hinzufügen möchten, wird über Workflows eine sleep-Routine aufgerufen.

Jobstatus ansehen

Sie können die Dateiliste und den Status des Jobs aufrufen.

  1. Rufen Sie in der Google Cloud Console die Firestore-Seite Daten auf.

    Zu "Daten"

  2. Für jeden Job wird eine eindeutige Kennzeichnung (UUID) generiert. Klicken Sie auf die Job-ID, um job_type und status aufzurufen. Jeder Job kann einen der folgenden Typen und Status haben:

    • job_type: Der Typ des Jobs, der vom Workflow ausgeführt wird, mit einem der folgenden Werte:

      • 0: Daten in BigQuery laden
      • 1: Abfrage in BigQuery ausführen
    • status: der aktuelle Status des Jobs mit einem der folgenden Werte:

      • 0: Der Job wurde erstellt, aber nicht gestartet.
      • 1: Der Job wird ausgeführt.
      • 2: Der Job wurde erfolgreich ausgeführt.
      • 3: Ein Fehler ist aufgetreten und der Job wurde nicht erfolgreich abgeschlossen.

    Das Jobobjekt enthält auch Metadatenattribute wie die Region des BigQuery-Datasets, den Namen der BigQuery-Tabelle und bei einem Abfragejob den ausgeführten Abfragestring.

Liste der Dateien mit hervorgehobenem Jobstatus.

Daten in BigQuery ansehen

Wenn Sie prüfen möchten, ob der ELT-Job erfolgreich war, prüfen Sie, ob die Daten in der Tabelle angezeigt werden.

  1. Rufen Sie in der Google Cloud Console die BigQuery-Seite Editor auf.

    Zum Editor

  2. Klicken Sie auf die Tabelle serverless_elt_dataset.word_count.

  3. Klicken Sie auf den Tab Preview (Vorschau).

    Tab "Vorschau" mit Daten in der Tabelle.

Workflow planen

Zur regelmäßigen Ausführung des Workflows nach Zeitplan können Sie Cloud Scheduler verwenden.

Bereinigen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben. Alternativ haben Sie die Möglichkeit, die einzelnen Ressourcen zu löschen.

Einzelne Ressourcen löschen

  1. Entfernen Sie in Cloud Shell alle mit Terraform erstellten Ressourcen:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Rufen Sie in der Google Cloud Console die Firestore-Seite Daten auf.

    Zu "Daten"

  3. Klicken Sie neben Jobs auf  Menü und wählen Sie Löschen aus.

    Menüpfad zum Löschen einer Sammlung.

Projekt löschen

  1. Wechseln Sie in der Google Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Nächste Schritte