Kurzanleitung für Python

Auf dieser Seite erfahren Sie, wie Sie Ihre Python-Entwicklungsumgebung einrichten, das Apache Beam SDK für Python abrufen und eine Beispielpipeline ausführen und ändern.

Eine Apache Beam-Pipeline kann auch interaktiv mit einem Apache Beam-Notebook entwickelt und ausgeführt werden. Wenn Sie bereits ein Google Cloud-Projekt eingerichtet haben, ist die WordCount-Beispielpipeline in diesem Schnellstart als Beispiel-Notebook verfügbar.

Vorbereitung

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore und Cloud Resource Manager APIs aktivieren.

    Aktivieren Sie die APIs

  5. Authentifizierung einrichten:
    1. Wechseln Sie in der Cloud Console zur Seite Dienstkontoschlüssel erstellen.

      Zur Seite „Dienstkontoschlüssel erstellen“
    2. Wählen Sie aus der Liste Dienstkonto die Option Neues Dienstkonto aus.
    3. Geben Sie im Feld Dienstkontoname einen Namen ein.
    4. Wählen Sie in der Liste Rolle die Option Projekt > Inhaber

    5. Klicken Sie auf Erstellen. Eine JSON-Datei mit Ihrem Schlüssel wird auf Ihren Computer heruntergeladen.
  6. Legen Sie für die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS den Pfad der JSON-Datei fest, die Ihren Dienstkontoschlüssel enthält. Diese Variable gilt nur für Ihre aktuelle Shellsitzung. Wenn Sie eine neue Sitzung öffnen, müssen Sie die Variable noch einmal festlegen.

  7. Cloud Storage-Bucket erstellen:
    1. Wechseln Sie in der Cloud Console zum Cloud Storage-Browser.

      Zum Cloud Storage-Browser

    2. Klicken Sie auf Bucket erstellen.
    3. Geben Sie im Dialogfeld Bucket erstellen die folgenden Attribute an:
      • Name: Eindeutiger Bucket-Name. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist.
      • Standardspeicherklase: Standard
      • Ein Standort, an dem Bucket-Daten gespeichert werden.
    4. Klicken Sie auf Erstellen.

Umgebung einrichten

  1. Verwenden Sie das Apache Beam SDK für Python mit pip und Python-Version 3.6, 3.7 oder 3.8. Prüfen Sie mit folgendem Befehl, ob Python und pip funktionieren:
    python --version
    python -m pip --version
    Wenn Ihnen Python nicht vorliegt, finden Sie die Installationsschritte für Ihr Betriebssystem auf der Seite Python installieren.
  2. Dataflow unterstützt nicht mehr Pipelines mit Python 2. Weitere Informationen finden Sie auf der Seite Python 2-Support in Google Cloud.

  3. Richten Sie für diese Kurzanleitung eine virtuelle Python-Umgebung ein und aktivieren Sie sie.

    Nachdem Sie die Kurzanleitung durchgearbeitet haben, können Sie die virtuelle Umgebung mit dem Befehl deactivate deaktivieren.

Weitere Informationen zur Verwendung von Python in Google Cloud finden Sie auf der Seite Python-Entwicklungsumgebung einrichten.

Hinweis: Starten Sie Python 3-Pipelines mit Apache Beam 2.16.0 oder höher, um optimale Ergebnisse zu erhalten. Apache Beam SDK Version 2.24.0 ist die letzte Version, die Python 2 und Python 3.5 unterstützt. Eine Zusammenfassung der neuesten Python 3-Verbesserungen in Apache Beam finden Sie in der Apache Beam-Problemverfolgung.

Apache Beam SDK abrufen

Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren diese Pipelines mit einem Apache Beam-Programm und können einen Runner wie Dataflow zum Ausführen Ihrer Pipeline auswählen.

Installieren Sie die neueste Version des Apache Beam SDK für Python. Führen Sie dazu folgenden Befehl in einer virtuellen Umgebung aus:

pip install 'apache-beam[gcp]'

WordCount lokal ausführen

Das WordCount-Beispiel zeigt eine Pipeline, die folgende Schritte ausführt:
  1. Sie nimmt eine Textdatei als Eingabe an.
  2. Sie parst jede Zeile und unterteilt sie in Wörter.
  3. Sie misst die Häufigkeit der tokenisierten Wörter.

Führen Sie das Modul wordcount aus dem Paket apache_beam mit dem folgenden Befehl auf Ihrem lokalen Computer aus:

python -m apache_beam.examples.wordcount \
  --output outputs
Sie finden die Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen gs://dataflow-samples/shakespeare/kinglear.txt. Führen Sie folgenden Befehl aus, um die Ausgabe abzurufen:
more outputs*

Drücken Sie zum Beenden die Taste q.

Die lokale Ausführung der Pipeline ermöglicht Ihnen, Ihr Apache Beam-Programm zu testen und Fehler zu beheben. Sie können den Quellcode wordcount.py auf dem GitHub für Apache Beam ansehen.

WordCount im Dataflow-Dienst ausführen

Sie können das wordcount-Modul aus dem apache_beam-Paket im Dataflow-Dienst durch Angabe von DataflowRunner im runner-Feld ausführen und eine Region auswählen, in der die Pipeline ausgeführt werden soll.

Definieren Sie zuerst die Variablen PROJECT, BUCKET und REGION:

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
REGION=DATAFLOW_REGION
Führen Sie diese Pipeline mit folgendem Befehl aus:
python -m apache_beam.examples.wordcount \
  --region $REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/results/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

Ergebnisse mit der GCP ansehen

Wenn Sie eine Pipeline mit Dataflow ausführen, befinden sich die Ergebnisse in einem Cloud Storage-Bucket.

Mit dem Tool gsutil können Sie die Ergebnisse über Ihr Terminal ansehen.

Listen Sie die Ausgabedateien mit folgendem Befehl auf:

gsutil ls -lh "gs://$BUCKET/results/outputs*"  
Sehen Sie sich die Ergebnisse in diesen Dateien mit folgendem Befehl an:
gsutil cat "gs://$BUCKET/results/outputs*"

So sehen Sie sich die Ergebnisse über die Monitoring-UI an:
  1. Öffnen Sie die Dataflow-Monitoring-UI.
    Zur Dataflow-Web-UI

    Ihr Job wordcount sollte zuerst den Status Running (Wird ausgeführt) und dann Succeeded (Erfolgreich) haben:

    Der Cloud Dataflow-WordCount-Job mit dem Status "Succeeded" (Erfolgreich).
  2. Öffnen Sie den Cloud Storage-Browser in der Google Cloud Console.
    Zum Cloud Storage-Browser

    Im Verzeichnis wordcount sollten die vom Job erstellte Ausgabedateien zu sehen sein:

    Ergebnisverzeichnis mit Ausgabedateien aus dem WordCount-Job.

Pipelinecode ändern

Die WordCount-Pipeline in den vorherigen Beispielen unterscheidet zwischen groß- und kleingeschriebenen Wörtern. Die folgende Schritt-für-Schritt-Anleitung veranschaulicht, wie Sie die WordCount-Pipeline ändern, damit sie nicht mehr zwischen Groß- und Kleinschreibung unterscheidet.
  1. Laden Sie die neueste Version des Codes WordCount aus dem Apache Beam GitHub-Repository herunter.
  2. Führen Sie die Pipeline auf Ihrem lokalen Computer aus:
    python wordcount.py --output outputs
  3. Sehen Sie sich die Ergebnisse mit dem folgenden Befehl an:
    more outputs*
    Drücken Sie zum Beenden die Taste q.
  4. Öffnen Sie die Datei wordcount.py in einem beliebigen Texteditor.
  5. Sehen Sie sich die Pipelineschritte in der Funktion run an. Nach split werden die Zeilen in Wörter als Strings unterteilt.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. Ändern Sie die Zeile nach split so, dass die Strings in Kleinbuchstaben umgewandelt werden.
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    Durch diese Änderung wird die Funktion str.lower jedem Wort zugeordnet. Diese Zeile entspricht beam.Map(lambda word: str.lower(word)).
  7. Speichern Sie die Datei und führen Sie den geänderten WordCount-Job auf Ihrem lokalen Computer aus:
    python wordcount.py --output outputs
  8. Sehen Sie sich die Ergebnisse der geänderten Pipeline mit dem folgenden Befehl an:
    more outputs*
    Drücken Sie zum Beenden die Taste q.

Bereinigen

So vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Kurzanleitung verwendeten Ressourcen in Rechnung gestellt werden:

  1. Wechseln Sie in der Cloud Console zur Seite Cloud Storage-Browser.

    Zum Cloud Storage-Browser

  2. Klicken Sie auf das Kästchen neben dem Bucket, der gelöscht werden soll.
  3. Klicken Sie zum Löschen des Buckets auf Löschen .

Nächste Schritte

Apache Beam™ ist eine Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.