Pub/Sub Lite-Nachrichten mit Dataflow streamen

Alternativ zum Schreiben und Ausführen eigener Datenverarbeitungsprogramme können Sie Dataflow mit dem Pub/Sub Lite-E/A-Connector für Apache Beam verwenden. Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream-Modus (Echtzeit) und im Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Dataflow führt zuverlässig Programme aus, die mit dem Apache Beam SDK entwickelt wurden, das einen erweiterbaren Satz leistungsstarker zustandsorientierter Verarbeitungsabstraktionen sowie E/A-Connectors zu anderen Streaming- und Batchsystemen enthält.

In dieser Kurzanleitung wird beschrieben, wie Sie eine Apache Beam-Pipeline schreiben, die folgende Aufgaben ausführt:

  • Nachrichten aus Pub/Sub Lite lesen
  • Windowing (oder Gruppieren) von Nachrichten nach Veröffentlichungszeitstempel
  • Nachrichten in Cloud Storage schreiben

Außerdem erfahren Sie, wie Sie:

  • Die Pipeline zur Ausführung in Dataflow senden
  • Eine Flexible Dataflow-Vorlage aus einer Pipeline erstellen

Für diese Anleitung wird Maven benötigt, aber es ist auch möglich, das Beispielprojekt von Maven in Gradle zu konvertieren. Weitere Informationen finden Sie unter Optional: Von Maven in Gradle konvertieren.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Aktivieren Sie die Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Richten Sie die Authentifizierung ein:

    1. Erstellen Sie das Dienstkonto:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ersetzen Sie SERVICE_ACCOUNT_NAME mit einem Namen für das Dienstkonto.

    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin für jede der folgenden IAM-Rollen einmal aus:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • ROLE: die zu gewährende Rolle
    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • USER_EMAIL: E-Mail-Adresse Ihres Google-Kontos
  8. Installieren Sie die Google Cloud CLI.
  9. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  10. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  11. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  12. Aktivieren Sie die Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Richten Sie die Authentifizierung ein:

    1. Erstellen Sie das Dienstkonto:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ersetzen Sie SERVICE_ACCOUNT_NAME mit einem Namen für das Dienstkonto.

    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin für jede der folgenden IAM-Rollen einmal aus:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • ROLE: die zu gewährende Rolle
    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Dabei gilt:

      • SERVICE_ACCOUNT_NAME: der Name des Dienstkontos
      • PROJECT_ID: die Projekt-ID, unter der Sie das Dienstkonto erstellt haben
      • USER_EMAIL: E-Mail-Adresse Ihres Google-Kontos
  14. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Pub/Sub Lite-Projekt einrichten

  1. Erstellen Sie Variablen für Ihren Cloud Storage-Bucket, Ihr Projekt und Ihre Dataflow-Region. Cloud Storage-Bucket-Namen müssen global einmalig sein. Die Dataflow-Region muss eine gültige Region sein, in der Sie Ihren Job ausführen können. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:

       gsutil mb gs://$BUCKET
    

Zonales Lite-Thema und -Abo für Pub/Sub Lite erstellen

Erstellen Sie ein zonales Lite Pub/Sub Lite-Thema und Lite-Abo.

Wählen Sie als Lite-Speicherort einen unterstützten Pub/Sub Lite-Standort aus. Außerdem müssen Sie eine Zone für die Region angeben. Beispiel: us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Nachrichten zu Dataflow streamen

Beispielcode der Kurzanleitung herunterladen

Klonen Sie das Schnellstart-Repository und wechseln Sie zum Beispielcode-Verzeichnis.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Beispielcode

In diesem Beispielcode wird Dataflow für Folgendes verwendet:

  • Nachrichten aus einem Pub/Sub Lite-Abo als unbegrenzte Quelle lesen.
  • Gruppieren Sie Nachrichten anhand ihrer Veröffentlichungszeitstempel und verwenden Sie feste Zeitfenster und den Standardtrigger.
  • Schreiben Sie die gruppierten Nachrichten in Dateien in Cloud Storage.

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Dataflow-Pipeline starten

Führen Sie den folgenden Befehl aus, um die Pipeline in Dataflow zu starten:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

Der vorherige Befehl startet einen Dataflow-Job. Folgen Sie dem Link in der Konsolenausgabe, um in der Dataflow-Monitoringkonsole auf den Job zuzugreifen.

Jobfortschritt beobachten

Beobachten Sie den Fortschritt des Jobs in der Dataflow-Konsole.

Zur Dataflow-Konsole

Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:

  • Jobgrafik
  • Ausführungsdetails
  • Jobmesswerte

Veröffentlichen Sie einige Nachrichten in Ihrem Lite-Thema.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Es kann einige Minuten dauern, bis die Nachrichten in den Worker-Logs angezeigt werden.

Prüfen Sie mit dem folgenden Befehl, welche Dateien in Cloud Storage geschrieben wurden.

gsutil ls "gs://$BUCKET/samples/"

Die Ausgabe sollte so aussehen:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Verwenden Sie den folgenden Befehl, um den Inhalt einer Datei aufzurufen:

gsutil cat "gs://$BUCKET/samples/your-filename"

Optional: Dataflow-Vorlage erstellen

Optional können Sie auf der Grundlage Ihrer Pipeline eine benutzerdefinierte Dataflow-Flex-Vorlage erstellen. Mit Dataflow-Vorlagen können Sie Jobs mit unterschiedlichen Eingabeparametern über die Google Cloud Console oder die Befehlszeile ausführen, ohne eine vollständige Java-Entwicklungsumgebung einrichten zu müssen.

  1. Erstellen Sie eine fette JAR-Datei, die alle Abhängigkeiten Ihrer Pipeline enthält. Nachdem der Befehl ausgeführt wurde, sollte target/pubsublite-streaming-bundled-1.0.jar angezeigt werden.

    mvn clean package -DskipTests=true
    
  2. Geben Sie Namen und Speicherorte für die Vorlagendatei und das Vorlagencontainer-Image an.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Erstellen Sie eine benutzerdefinierte Flex-Vorlage. Eine erforderliche metadata.json-Datei mit der erforderlichen Spezifikation zum Ausführen des Jobs.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Führen Sie einen Job mit der benutzerdefinierten Flex-Vorlage aus.

Console

  1. Job aus Vorlage erstellen.

  2. Geben Sie einen Jobnamen ein.

  3. Geben Sie Ihre Dataflow-Region ein.

  4. Wählen Sie Ihre benutzerdefinierte Vorlage aus.

  5. Geben Sie den Vorlagenpfad ein.

  6. Geben Sie die erforderlichen Parameter ein.

  7. Klicken Sie auf Job ausführen.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Bereinigen

Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.

  1. Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, anstatt sie zu leeren.

  2. Löschen Sie das Thema und das Abo.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Löschen Sie die von der Pipeline erstellten Dateien.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Löschen Sie das Vorlagen-Image und die Vorlagendatei, sofern vorhanden.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Entfernen Sie den Cloud Storage-Bucket.

    gsutil rb gs://$BUCKET
    

  6. Löschen Sie das Dienstkonto:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:

    gcloud auth application-default revoke
  8. Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.

    gcloud auth revoke

Nächste Schritte