Schnellstart: Streamverarbeitung mit Dataflow

Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream- (Echtzeit-) und Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung mit dem Apache Beam SDK, das eine Vielzahl von Windowing- und Sitzungsanalyse-Primitiven sowie ein Ökosystem von Quell- und Sink-Connectors bietet. In diesem Schnellstart erfahren Sie, wie Sie Dataflow für Folgendes verwenden:

  • Nachrichten lesen, die in einem Pub/Sub-Thema veröffentlicht wurden
  • Windowing (oder Gruppieren) von Nachrichten nach Zeitstempel
  • Nachrichten in Cloud Storage schreiben

Dieser Schnellstart bietet eine Einführung in die Verwendung von Dataflow in Java und Python. SQL wird ebenfalls unterstützt.

Sie können auch UI-basierte Dataflow-Vorlagen verwenden, wenn Sie keine benutzerdefinierte Datenverarbeitung durchführen möchten.

Hinweis

  1. Befolgen Sie die Anleitungen zum Installieren und Initialisieren des Cloud SDK.
  2. Aktivieren Sie die Abrechnung für Ihr Projekt.
  3. Um diesen Schnellstart abzuschließen, müssen Sie die folgenden APIs aktivieren: Compute Engine, Google Cloud Operations Suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager und App Engine.

    APIs aktivieren

    Es kann einen Moment dauern, bis die APIs in der Konsole angezeigt werden.

  4. Dienstkontoschlüssel erstellen

    Dienstkontoschlüssel erstellen

    1. Wählen Sie aus der Liste Dienstkonto die Option Neues Dienstkonto aus.
    2. Geben Sie im Feld Name des Dienstkontos einen Namen ein.
    3. Wählen Sie in der Liste Rolle die Option Projekt > Inhaber aus.
    4. Klicken Sie auf Erstellen.

    Der Schlüssel wird an den Standardordner für Downloads Ihres Browsers gesendet.

  5. Setzen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS so, dass sie auf den Dienstkontoschlüssel verweist.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. Erstellen Sie Variablen für Ihren Bucket und Ihr Projekt. Cloud Storage-Bucket-Namen müssen global eindeutig sein.

    BUCKET_NAME=bucket-name
    PROJECT_NAME=$(gcloud config get-value project)
    
  7. Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:

    gsutil mb gs://$BUCKET_NAME
    
  8. Erstellen Sie ein Pub/Sub-Thema in diesem Projekt:

    gcloud pubsub topics create cron-topic
    
  9. Erstellen Sie einen Cloud Scheduler-Job in diesem Projekt. Der Job veröffentlicht eine Nachricht zu einem Cloud Pub/Sub-Thema in Intervallen von einer Minute.

    Wenn für das Projekt keine App Engine-Anwendung vorhanden ist, wird bei diesem Schritt eine erstellt.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"
    

    Starten Sie den Job.

    gcloud scheduler jobs run publisher-job
    
  10. Verwenden Sie den folgenden Befehl, um das Schnellstart-Repository zu klonen und zum Beispielcodeverzeichnis zu gehen:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

Streamen Sie Nachrichten von Pub/Sub zu Cloud Storage

Codebeispiel

In diesem Beispielcode wird Dataflow für Folgendes verwendet:

  • Pub/Sub-Nachrichten lesen
  • Windowing (oder Gruppieren) von Nachrichten in festen Intervallen nach Veröffentlichungszeitstempeln.
  • Die Nachrichten in jedem Fenster in Dateien in Cloud Storage schreiben.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
import datetime
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            # Use a dummy key to group the elements in the same window.
            # Note that all the elements in one window must fit into memory
            # for this. If the windowed elements do not fit into memory,
            # please consider using `beam.util.BatchElements`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }

class WriteBatchesToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:
                f.write("{}\n".format(json.dumps(element)).encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read PubSub Messages"
            >> beam.io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupWindowsIntoBatches(window_size)
            | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
        )

if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from.\n"
        '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in number of minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="GCS Path of the output file including filename prefix.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        pipeline_args,
    )

Pipeline starten

Verwenden Sie den folgenden Befehl, um die Pipeline zu starten:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --temp_location=gs://$BUCKET_NAME/temp

Job- und Pipeline-Fortschritt beobachten

Sie können den Fortschritt des Jobs in der Dataflow-Konsole verfolgen.

Zur Dataflow-Konsole

Beobachten Sie den Fortschritt des Jobs

Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:

  • Jobstruktur
  • Jobprotokolle
  • Anzeigebereich-Messwerte

Beobachten Sie den Fortschritt des Jobs

Es kann einige Minuten dauern, bis die Ausgabedateien in Cloud Storage angezeigt werden.

Beobachten Sie den Fortschritt des Jobs

Alternativ können Sie die folgende Befehlszeile verwenden, um zu prüfen, welche Dateien geschrieben wurden.

gsutil ls gs://${BUCKET_NAME}/samples/

Die Ausgabe sollte so aussehen:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32
gs://{$BUCKET_NAME}/samples/output-22:32-22:34
gs://{$BUCKET_NAME}/samples/output-22:34-22:36
gs://{$BUCKET_NAME}/samples/output-22:36-22:38

Clean-up

  1. Cloud Scheduler-Job löschen

    gcloud scheduler jobs delete publisher-job
    
  2. Verwenden Sie Ctrl+C, um das Programm in Ihrem Terminal zu beenden.

  3. Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, ohne sie zu leeren.

  4. Thema löschen

    gcloud pubsub topics delete cron-topic
    
  5. Löschen Sie die von der Pipeline erstellten Dateien.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  6. Entfernen Sie den Cloud Storage-Bucket.

    gsutil rb gs://${BUCKET_NAME}
    

Nächste Schritte