Schnellstart mit Python

In dieser Kurzanleitung erfahren Sie, wie Sie das Apache Beam SDK für Python verwenden, um ein Programm zu erstellen, das eine Pipeline definiert. Anschließend führen Sie die Pipeline mit einem direkten lokalen Runner oder einem cloudbasierten Runner wie Dataflow aus.

Hinweis

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, und Cloud Resource Manager APIs aktivieren.

    Aktivieren Sie die APIs

  5. Erstellen Sie ein Dienstkonto:

    1. Wechseln Sie in der Google Cloud Console zur Seite Dienstkonto erstellen.

      Zur Seite „Dienstkonto erstellen“
    2. Wählen Sie Ihr Projekt aus.
    3. Geben Sie im Feld Dienstkontoname einen Namen ein. Die Google Cloud Console füllt das Feld Dienstkonto-ID anhand dieses Namens aus.

      Geben Sie im Feld Dienstkontobeschreibung eine Beschreibung ein. Beispiel: Service account for quickstart.

    4. Klicken Sie auf Erstellen und fortfahren.
    5. Weisen Sie dem Dienstkonto die Rolle Project > Owner zu.

      Wenn Sie die Rolle zuweisen möchten, suchen Sie die Liste Rolle auswählen und wählen Sie Project > Owner aus.

    6. Klicken Sie auf Weiter.
    7. Klicken Sie auf Fertig, um das Erstellen des Dienstkontos abzuschließen.

      Schließen Sie das Browserfenster nicht. Sie verwenden es in der nächsten Aufgabe.

  6. Erstellen Sie einen Dienstkontoschlüssel:

    1. Klicken Sie in der Google Cloud Console auf die E-Mail-Adresse des von Ihnen erstellten Dienstkontos.
    2. Klicken Sie auf Schlüssel.
    3. Klicken Sie auf Schlüssel hinzufügen und dann auf Neuen Schlüssel erstellen.
    4. Klicken Sie auf Erstellen. Daraufhin wird eine JSON-Schlüsseldatei auf Ihren Computer heruntergeladen.
    5. Klicken Sie auf Schließen.
  7. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihre Anmeldedaten enthält. Diese Variable gilt nur für Ihre aktuelle Shell-Sitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable neu festlegen.

  8. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  9. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  10. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, und Cloud Resource Manager APIs aktivieren.

    Aktivieren Sie die APIs

  11. Erstellen Sie ein Dienstkonto:

    1. Wechseln Sie in der Google Cloud Console zur Seite Dienstkonto erstellen.

      Zur Seite „Dienstkonto erstellen“
    2. Wählen Sie Ihr Projekt aus.
    3. Geben Sie im Feld Dienstkontoname einen Namen ein. Die Google Cloud Console füllt das Feld Dienstkonto-ID anhand dieses Namens aus.

      Geben Sie im Feld Dienstkontobeschreibung eine Beschreibung ein. Beispiel: Service account for quickstart.

    4. Klicken Sie auf Erstellen und fortfahren.
    5. Weisen Sie dem Dienstkonto die Rolle Project > Owner zu.

      Wenn Sie die Rolle zuweisen möchten, suchen Sie die Liste Rolle auswählen und wählen Sie Project > Owner aus.

    6. Klicken Sie auf Weiter.
    7. Klicken Sie auf Fertig, um das Erstellen des Dienstkontos abzuschließen.

      Schließen Sie das Browserfenster nicht. Sie verwenden es in der nächsten Aufgabe.

  12. Erstellen Sie einen Dienstkontoschlüssel:

    1. Klicken Sie in der Google Cloud Console auf die E-Mail-Adresse des von Ihnen erstellten Dienstkontos.
    2. Klicken Sie auf Schlüssel.
    3. Klicken Sie auf Schlüssel hinzufügen und dann auf Neuen Schlüssel erstellen.
    4. Klicken Sie auf Erstellen. Daraufhin wird eine JSON-Schlüsseldatei auf Ihren Computer heruntergeladen.
    5. Klicken Sie auf Schließen.
  13. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf den Pfad der JSON-Datei fest, die Ihre Anmeldedaten enthält. Diese Variable gilt nur für Ihre aktuelle Shell-Sitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable neu festlegen.

  14. Cloud Storage-Bucket erstellen:
    1. Wechseln Sie in der Google Cloud Console zur Cloud Storage-Seite Buckets.

      Zur Seite „Buckets“

    2. Klicken Sie auf Bucket erstellen.
    3. Geben Sie auf der Seite Bucket erstellen die Bucket-Informationen ein. Klicken Sie auf Weiter, um mit dem nächsten Schritt fortzufahren.
      • Geben Sie unter Bucket benennen einen eindeutigen Bucket-Namen ein. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
      • Gehen Sie unter Speicherort für Daten auswählen folgendermaßen vor:
        • Wählen Sie eine Option für Standorttyp aus.
        • Wählen Sie eine Standort-Option aus.
      • Wählen Sie unter Standardspeicherklasse für Ihre Daten auswählen Folgendes aus: Standard.
      • Wählen Sie unter Zugriffssteuerung für Objekte auswählen eine Option für die Zugriffssteuerung aus.
      • Geben Sie für Erweiterte Einstellungen (optional) eine Verschlüsselungsmethode, eine Aufbewahrungsrichtlinie oder Bucket-Labels an.
    4. Klicken Sie auf Erstellen.
  15. Kopieren Sie die Google Cloud-Projekt-ID und den Namen des Cloud Storage-Buckets. Sie benötigen diese Werte später in diesem Dokument.

Umgebung einrichten

In diesem Abschnitt richten Sie über die Eingabeaufforderung eine isolierte virtuelle Python-Umgebung ein, um Ihr Pipeline-Projekt mit venv auszuführen. Auf diese Weise können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren.

Wenn Ihnen im Moment keine Eingabeaufforderung zur Verfügung steht, können Sie Cloud Shell verwenden. Der Paketmanager für Python 3 ist in Cloud Shell bereits installiert, sodass Sie mit dem Erstellen einer virtuellen Umgebung fortfahren können.

So installieren Sie Python und erstellen dann eine virtuelle Umgebung:

  1. Prüfen Sie, ob Python 3 und pip in Ihrem System ausgeführt werden:
    python --version
    python -m pip --version
    
  2. Installieren Sie gegebenenfalls Python 3 und richten Sie dann eine virtuelle Python-Umgebung ein. Folgen Sie dazu der Anleitung in den Abschnitten Python installieren und venv einrichten auf der Seite Python-Entwicklungsumgebung einrichten.

Nachdem Sie die Kurzanleitung durchgearbeitet haben, können Sie die virtuelle Umgebung mit dem Befehl deactivate deaktivieren.

Apache Beam SDK abrufen

Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren eine Pipeline mit einem Apache Beam-Programm und wählen dann einen Runner wie Dataflow aus, um Ihre Pipeline auszuführen.

So laden Sie das Apache Beam SDK herunter und installieren es:

  1. Prüfen Sie, ob Sie sich in der virtuellen Python-Umgebung befinden, die Sie im vorherigen Abschnitt erstellt haben. Die Eingabeaufforderung beginnt mit <env_name>, wobei env_name der Name der virtuellen Umgebung ist.
  2. Installieren Sie den Paketstandard Python-Rad:
    pip install wheel
    
  3. Installieren Sie die neueste Version des Apache Beam SDK für Python:
  4. pip install 'apache-beam[gcp]'

    Je nach Verbindung kann die Installation etwas dauern.

Pipeline lokal ausführen

Wenn Sie sehen möchten, wie eine Pipeline lokal ausgeführt wird, verwenden Sie ein fertiges Python-Modul für das Beispiel wordcount, das im Paket apache_beam enthalten ist.

Das Pipeline-Beispiel wordcount führt Folgendes aus:

  1. Sie nimmt eine Textdatei als Eingabe an.

    Sie finden die Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Sie parst jede Zeile und unterteilt sie in Wörter.
  3. Sie misst die Häufigkeit der tokenisierten Wörter.

Führen Sie die folgenden Schritte aus, um die Pipeline wordcount lokal bereitzustellen:

  1. Führen Sie auf Ihrem lokalen Terminal das Beispiel wordcount aus:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. Sehen Sie sich die Ausgabe der Pipeline an:
    more outputs*
  3. Drücken Sie zum Beenden q.
Wenn Sie die Pipeline lokal ausführen, können Sie das Apache Beam-Programm testen und eventuelle Fehler beheben. Sie können den Quellcode von wordcount.py auf dem GitHub für Apache Beam ansehen.

Pipeline im Dataflow-Dienst ausführen

In diesem Abschnitt führen Sie die wordcount-Beispielpipeline aus dem Paket apache_beam im Dataflow-Dienst aus. In diesem Beispiel wird DataflowRunner als Parameter für --runner angegeben.
  • Führen Sie die Pipeline aus:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://STORAGE_BUCKET/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://STORAGE_BUCKET/tmp/

    Dabei gilt:

    • DATAFLOW_REGION: der regionale Endpunkt, an dem Sie den Dataflow-Job bereitstellen möchten, z. B. europe-west1.

      Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

    • STORAGE_BUCKET: Cloud Storage-Name, den Sie zuvor kopiert haben.
    • PROJECT_ID: Google Cloud-Projekt-ID, die Sie zuvor kopiert haben.

Ergebnisse ansehen

Wenn Sie eine Pipeline mit Dataflow ausführen, werden die Ergebnisse in einem Cloud Storage-Bucket gespeichert. Prüfen Sie in diesem Abschnitt, ob die Pipeline mit der Cloud Console oder dem lokalen Terminal ausgeführt wird.

Cloud Console

So rufen Sie Ihre Ergebnisse in der Cloud Console auf:

  1. Rufen Sie in der Cloud Console die Seite Jobs von Dataflow auf.

    Zu Jobs

    Auf der Seite Jobs werden Details zum wordcount-Job angezeigt, z. B. der Status Aktiv und dann Erfolgreich.

  2. Zum Cloud Storage-Browser

    Browser aufrufen

  3. Klicken Sie in der Liste der Buckets in Ihrem Projekt auf den Storage-Bucket, den Sie zuvor erstellt haben.

    Im Verzeichnis wordcount werden die von Ihrem Job erstellten Ausgabedateien angezeigt.

Lokales Terminal

Um die Ergebnisse von Ihrem Terminal aus aufzurufen, verwenden Sie das gsutil-Tool. Sie können die Befehle auch über Cloud Shell ausführen.

  1. Listen Sie die Ausgabedateien auf:
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"  
  2. Ersetzen Sie STORAGE_BUCKET durch den Namen des Cloud Pipeline-Buckets, der im Pipelineprogramm verwendet wird.

  3. Sehen Sie sich die Ergebnisse in den Ausgabedateien an:
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

Pipelinecode ändern

Die wordcount-Pipeline in den vorherigen Beispielen unterscheidet zwischen groß- und kleingeschriebenen Wörtern. In den folgenden Schritten wird gezeigt, wie Sie die Pipeline so ändern, dass die Groß- und Kleinschreibung bei der wordcount-Pipeline nicht berücksichtigt wird.
  1. Laden Sie auf Ihrem lokalen Computer die neueste Kopie des wordcount-Codes aus dem Apache Beam GitHub-Repository herunter.
  2. Führen Sie die Pipeline über das lokale Terminal aus:
    python wordcount.py --output outputs
  3. Rufen Sie die Ergebnisse auf:
    more outputs*
  4. Drücken Sie zum Beenden q.
  5. Öffnen Sie die Datei wordcount.py in einem Editor Ihrer Wahl.
  6. Sehen Sie sich die Pipelineschritte in der Funktion run an:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    Nach split werden die Zeilen in Wörter als Strings unterteilt.

  7. Wenn Sie die Strings in Kleinbuchstaben darstellen möchten, ändern Sie die Zeile nach split:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    Durch diese Änderung wird die Funktion str.lower jedem Wort zugeordnet. Diese Zeile entspricht beam.Map(lambda word: str.lower(word)).
  8. Speichern Sie die Datei und führen Sie den geänderten Job wordcount aus:
    python wordcount.py --output outputs
  9. Sehen Sie sich die Ergebnisse der geänderten Pipeline an:
    more outputs*
  10. Drücken Sie zum Beenden q.

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen in Rechnung gestellt werden:

  1. Wechseln Sie in der Cloud Console zur Seite Cloud Storage-Buckets.

    Buckets aufrufen

  2. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  3. Klicken Sie zum Löschen des Buckets auf Löschen und folgen Sie der Anleitung.

Nächste Schritte

Apache Beam™ ist eine Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.