Nachrichten aus Pub/Sub mit Dataflow streamen

Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream- (Echtzeit-) und Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung mit dem Apache Beam SDK, das eine Vielzahl von Windowing- und Sitzungsanalyse-Primitiven sowie ein Ökosystem von Quell- und Sink-Connectors bietet. In diesem Schnellstart erfahren Sie, wie Sie Dataflow für Folgendes verwenden:

  • Nachrichten lesen, die in einem Pub/Sub-Thema veröffentlicht wurden
  • Windowing (oder Gruppieren) von Nachrichten nach Zeitstempel
  • Nachrichten in Cloud Storage schreiben

Dieser Schnellstart bietet eine Einführung in die Verwendung von Dataflow in Java und Python. SQL wird ebenfalls unterstützt. Diese Kurzanleitung wird auch als Google Cloud Skills Boost-Anleitung mit temporären Anmeldedaten für den Einstieg angeboten.

Sie können auch UI-basierte Dataflow-Vorlagen verwenden, wenn Sie keine benutzerdefinierte Datenverarbeitung durchführen möchten.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Aktivieren Sie die Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Richten Sie die Authentifizierung ein:

    1. Erstellen Sie das Dienstkonto:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ersetzen Sie SERVICE_ACCOUNT_NAME mit einem Namen für das Dienstkonto.

    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin für jede der folgenden IAM-Rollen einmal aus:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • ROLE: die zu gewährende Rolle
    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • USER_EMAIL: E-Mail-Adresse Ihres Google-Kontos
  8. Installieren Sie die Google Cloud CLI.
  9. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  10. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  11. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  12. Aktivieren Sie die Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Richten Sie die Authentifizierung ein:

    1. Erstellen Sie das Dienstkonto:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ersetzen Sie SERVICE_ACCOUNT_NAME mit einem Namen für das Dienstkonto.

    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin für jede der folgenden IAM-Rollen einmal aus:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • ROLE: die zu gewährende Rolle
    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • USER_EMAIL: E-Mail-Adresse Ihres Google-Kontos
  14. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Pub/Sub-Projekt einrichten

  1. Erstellen Sie Variablen für Ihren Bucket, Ihr Projekt und Ihre Region. Cloud Storage-Bucket-Namen müssen global eindeutig sein. Wählen Sie eine Dataflow-Region in der Nähe des Standorts, an dem die Befehle in dieser Kurzanleitung ausgeführt werden. Der Wert der Variablen REGION muss ein gültiger Regionsname sein. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    
  2. Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:

    gsutil mb gs://$BUCKET_NAME
  3. Erstellen Sie ein Pub/Sub-Thema in diesem Projekt:

    gcloud pubsub topics create $TOPIC_ID
  4. Erstellen Sie einen Cloud Scheduler-Job in diesem Projekt. Der Job veröffentlicht eine Nachricht zu einem Pub/Sub-Thema in Intervallen von einer Minute.

    Wenn für das Projekt keine App Engine-Anwendung vorhanden ist, wird in diesem Schritt eine erstellt.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Starten Sie den Job.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Verwenden Sie die folgenden Befehle, um das Schnellstart-Repository zu klonen und zum Beispielcodeverzeichnis zu gehen:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Streamen Sie Nachrichten von Pub/Sub zu Cloud Storage

Codebeispiel

In diesem Beispielcode wird Dataflow für Folgendes verwendet:

  • Pub/Sub-Nachrichten lesen
  • Windowing (oder Gruppieren) von Nachrichten in festen Intervallen nach Veröffentlichungszeitstempeln.
  • Die Nachrichten in jedem Fenster in Dateien in Cloud Storage schreiben.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Pipeline starten

Führen Sie den folgenden Befehl aus, um die Pipeline zu starten:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

Der vorherige Befehl wird lokal ausgeführt und startet einen Dataflow-Job, der in der Cloud ausgeführt wird. Wenn der Befehl JOB_MESSAGE_DETAILED: Workers have started successfully zurückgibt, beenden Sie das lokale Programm mit Ctrl+C.

Job- und Pipeline-Fortschritt beobachten

Sie können den Fortschritt des Jobs in der Dataflow-Konsole verfolgen.

Zur Dataflow-Konsole

Beobachten Sie den Fortschritt des Jobs

Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:

  • Jobstruktur
  • Jobprotokolle
  • Anzeigebereich-Messwerte

Beobachten Sie den Fortschritt des Jobs

Es kann einige Minuten dauern, bis die Ausgabedateien in Cloud Storage angezeigt werden.

Beobachten Sie den Fortschritt des Jobs

Alternativ können Sie die folgende Befehlszeile verwenden, um zu prüfen, welche Dateien geschrieben wurden.

gsutil ls gs://${BUCKET_NAME}/samples/

Die Ausgabe sollte so aussehen:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Bereinigen

Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

  1. Cloud Scheduler-Job löschen

    gcloud scheduler jobs delete publisher-job --location=$REGION
    
  2. Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, ohne sie zu leeren.

  3. Thema löschen

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Löschen Sie die von der Pipeline erstellten Dateien.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Entfernen Sie den Cloud Storage-Bucket.

    gsutil rb gs://${BUCKET_NAME}
    

  6. Löschen Sie das Dienstkonto:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  8. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte