Dataflow-Pipeline mit Go erstellen

Auf dieser Seite erfahren Sie, wie Sie mit dem Apache Beam SDK für Go ein Programm erstellen, das eine Pipeline definiert. Anschließend führen Sie die Pipeline lokal und im Dataflow-Dienst aus. Eine Einführung in die WordCount-Pipeline finden Sie im Video How to use WordCount in Apache Beam.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  12. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie PROJECT_NUMBER durch die Projekt-ID. Ihre Projektnummer finden Sie unter Projekte identifizieren oder verwenden Sie den Befehl gcloud projects describe.
    • Ersetzen Sie SERVICE_ACCOUNT_ROLE durch jede einzelne Rolle.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Legen Sie als Speicherort Folgendes fest: US (USA).
    • Ersetzen Sie BUCKET_NAME durch einen eindeutigen Bucket-Namen. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Kopieren Sie die Google Cloud Projekt-ID und den Namen des Cloud Storage-Bucket. Sie benötigen diese Werte später in dieser Kurzanleitung.
    • Entwicklungsumgebung einrichten

      Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren eine Pipeline mit einem Apache Beam-Programm und wählen dann einen Runner wie Dataflow aus, um Ihre Pipeline auszuführen.

      Wir empfehlen die Verwendung der aktuellen Version von Go, wenn Sie mit dem Apache Beam SDK für Go arbeiten. Wenn Sie die aktuelle Version von Go nicht installiert haben, können Sie Go im Leitfaden zum Herunterladen und Installieren von Go für Ihr Betriebssystem verwenden.

      Führen Sie den folgenden Befehl in Ihrem lokalen Terminal aus, um die Version von Go zu prüfen, die Sie installiert haben:

      go version

      Beispiel für Beam-Wordcount ausführen

      Das Apache Beam SDK für Go enthält ein wordcount-Pipeline-Beispiel. Das wordcount-Beispiel führt Folgendes aus:

      1. Liest eine Textdatei als Eingabe. Standardmäßig wird eine Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen gs://dataflow-samples/shakespeare/kinglear.txt gelesen.
      2. Sie parst jede Zeile und unterteilt sie in Wörter.
      3. Sie misst die Häufigkeit der tokenisierten Wörter.

      Führen Sie die folgenden Schritte aus, um die neueste Version des Beam-wordcount-Beispiels auf Ihrem lokalen Computer auszuführen.

      1. Klonen Sie das GitHub-Repository apache/beam mit dem Befehl git clone:

        git clone https://github.com/apache/beam.git
      2. Wechseln Sie in das Verzeichnis beam/sdks/go:

        cd beam/sdks/go
      3. Verwenden Sie den folgenden Befehl, um die Pipeline auszuführen:

        go run examples/wordcount/wordcount.go \
          --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output outputs

        Das Flag input gibt die zu lesende Datei an und das Flag output gibt den Dateinamen für die Ausgabe der Häufigkeit an.

      Sehen Sie sich nach Abschluss der Pipeline die Ausgabeergebnisse an:

      more outputs*

      Drücken Sie zum Beenden q.

      Pipelinecode ändern

      Die Beam-wordcount-Pipeline unterscheidet zwischen Groß- und Kleinbuchstaben. Die folgenden Schritte zeigen, wie Sie Ihr eigenes Go-Modul erstellen, die wordcount-Pipeline so ändern, dass sie Groß- und Kleinschreibung nicht beachtet, und sie auf Dataflow ausführen.

      Go-Modul erstellen

      Gehen Sie so vor, um Änderungen am Pipelinecode vorzunehmen.

      1. Erstellen Sie ein Verzeichnis für Ihr Go-Modul an einem Speicherort Ihrer Wahl:

        mkdir wordcount
        cd wordcount
      2. Go-Modul erstellen Verwenden Sie für dieses Beispiel example/dataflow als Modulpfad.

        go mod init example/dataflow
      3. Laden Sie die neueste Version des Codes wordcount aus dem Apache Beam GitHub-Repository herunter. Legen Sie diese Datei in dem Verzeichnis wordcount ab, das Sie erstellt haben.

      4. Wenn Sie ein Nicht-Linux-Betriebssystem verwenden, müssen Sie das Go-Paket unix abrufen. Dieses Paket ist erforderlich, um Pipelines im Dataflow-Dienst auszuführen.

        go get -u golang.org/x/sys/unix
      5. Prüfen Sie, ob die Datei go.mod mit dem Quellcode des Moduls übereinstimmt:

        go mod tidy

      Unveränderte Pipeline ausführen

      Prüfen Sie, ob die unveränderte wordcount-Pipeline lokal ausgeführt wird.

      1. Erstellen Sie die Pipeline über das Terminal und führen Sie sie lokal aus:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Ausgabeergebnisse ansehen:

         more outputs*
      3. Drücken Sie zum Beenden q.

      Pipelinecode ändern

      Wenn Sie die Pipeline so ändern möchten, dass sie nicht mehr zwischen Groß- und Kleinschreibung unterscheidet, ändern Sie den Code so, dass die Funktion strings.ToLower auf alle Wörter angewendet wird.

      1. Öffnen Sie die Datei wordcount.go in einem Editor Ihrer Wahl.

      2. Sehen Sie sich den init-Block an (Kommentare wurden entfernt, um die Übersichtlichkeit zu verbessern):

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
         }
        
      3. Fügen Sie eine neue Zeile hinzu, um die Funktion strings.ToLower zu registrieren:

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
           register.Function1x1(strings.ToLower)
         }
        
      4. Sehen Sie sich die Funktion CountWords an:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Count the number of times each word occurs.
           return stats.Count(s, col)
         }
        
      5. Fügen Sie zum Entfernen der Kleinbuchstaben ein ParDo hinzu, das strings.ToLower auf jedes Wort anwendet:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Map all letters to lowercase.
           lowercaseWords := beam.ParDo(s, strings.ToLower, col)
        
           // Count the number of times each word occurs.
           return stats.Count(s, lowercaseWords)
         }
        
      6. Speichern Sie die Datei.

      Aktualisierte Pipeline lokal ausführen

      Führen Sie die aktualisierte wordcount-Pipeline lokal aus und prüfen Sie, ob sich die Ausgabe geändert hat.

      1. Erstellen Sie die geänderte wordcount-Pipeline und führen Sie sie aus:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Sehen Sie sich die Ausgabeergebnisse der geänderten Pipeline an. Alle Wörter sollten Kleinbuchstaben sein.

         more outputs*
      3. Drücken Sie zum Beenden q.

      Pipeline im Dataflow-Dienst ausführen

      Führen Sie den folgenden Befehl aus, um das aktualisierte wordcount-Beispiel im Dataflow-Dienst auszuführen:

      go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://BUCKET_NAME/results/outputs \
          --runner dataflow \
          --project PROJECT_ID \
          --region DATAFLOW_REGION \
          --staging_location gs://BUCKET_NAME/binaries/

      Dabei gilt:

      • BUCKET_NAME: der Name des Cloud Storage-Buckets.

      • PROJECT_ID: die Google Cloud Projekt-ID.

      • DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten. Beispiel: europe-west1 Eine Liste der verfügbaren Dataflow-Standorte finden Sie hier. Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

      Ergebnisse ansehen

      Sie können eine Liste Ihrer Dataflow-Jobs in derGoogle Cloud Console aufrufen. Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.

      ZU JOBS

      Auf der Seite Jobs werden Details zum wordcount-Job angezeigt, z. B. der Status Aktiv und dann Erfolgreich.

      Wenn Sie eine Pipeline mit Dataflow ausführen, werden die Ergebnisse in einem Cloud Storage-Bucket gespeichert. Sehen Sie sich die Ausgabeergebnisse entweder mit derGoogle Cloud Console oder mit dem lokalen Terminal an.

      Console

      Wechseln Sie in der Google Cloud Console unter „Cloud Storage“ zur Seite Buckets, um Ihre Ergebnisse anzusehen.

      Buckets aufrufen

      Klicken Sie in der Liste der Buckets in Ihrem Projekt auf den Storage-Bucket, den Sie zuvor erstellt haben. Die von Ihrem Job erstellten Ausgabedateien werden im Verzeichnis results angezeigt.

      Terminal

      Sehen Sie sich die Ergebnisse über Ihr Terminal oder mithilfe von Cloud Shell an.

      1. Verwenden Sie den Befehl gcloud storage ls, um die Ausgabedateien aufzulisten:

        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

        Ersetzen Sie BUCKET_NAME durch den Namen des angegebenen Cloud Storage-Ausgabe-Buckets.

      2. Verwenden Sie den Befehl gcloud storage cat, um die Ergebnisse in den Ausgabedateien aufzurufen:

        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Bereinigen

      Löschen Sie das Google Cloud -Projekt mit den Ressourcen, damit Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

      1. Löschen Sie den Bucket:
        gcloud storage buckets delete BUCKET_NAME
      2. Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      Nächste Schritte