Vorlage "Pub/Sub für Pub/Sub"

Die Vorlage "Pub/Sub für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und in ein anderes Pub/Sub-Thema schreibt. Die Pipeline akzeptiert auch einen optionalen Nachrichtenattributschlüssel und einen Wert, die zum Filtern der Nachrichten verwendet werden können, die in das Pub/Sub-Thema geschrieben werden sollen. Sie können diese Vorlage verwenden, um Nachrichten mit einem optionalen Nachrichtenfilter von einem Pub/Sub-Abo in ein anderes Pub/Sub-Thema zu kopieren.

Pipelineanforderungen

  • Das als Quelle dienende Pub/Sub-Abo muss vor der Ausführung vorhanden sein.
  • Das als Quelle dienende Pub/Sub-Abo muss ein Pull-Abo sein.
  • Das Pub/Sub-Thema, in das geschrieben werden soll, muss vor der Ausführung vorhanden sein.

Vorlagenparameter

Erforderliche Parameter

  • inputSubscription ist das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll, im Format "projects/your-project-id/subscriptions/your-subscription-name" (Beispiel: projects/your-project-id) /subscriptions/your-subscription-name).
  • outputTopic ist der Name des Themas, zu dem Daten veröffentlicht werden sollen, im Format "projects/your-project-id/topics/your-topic-name" (Beispiel: projects/your-project- id/topics/your-topic-name).

Optionale Parameter

  • filterKey : ist der Attributschlüssel, nach dem Ereignisse gefiltert werden. Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • filterValue : Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Pub/Sub templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

Nächste Schritte