Vorlage "Pub/Sub für BigQuery mit Python-UDF"

Die Vorlage "Pub/Sub für BigQuery mit Python" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus Pub/Sub liest und in eine BigQuery-Tabelle schreibt. Optional können Sie eine benutzerdefinierte Funktion (UDF) in Python bereitstellen, um die eingehenden Nachrichten zu verarbeiten.

Pipelineanforderungen

  • Die BigQuery-Tabelle muss vorhanden sein und ein Schema haben.
  • Die Pub/Sub-Nachrichtendaten müssen das JSON-Format verwenden oder Sie müssen eine UDF bereitstellen, die die Nachrichtendaten in JSON konvertiert. Die JSON-Daten müssen mit dem BigQuery-Tabellenschema übereinstimmen. Wenn die JSON-Nutzlasten beispielsweise als {"k1":"v1", "k2":"v2"} formatiert sind, muss die BigQuery-Tabelle zwei Stringspalten mit den Namen k1 und k2 haben.
  • Geben Sie den Parameter inputSubscription oder inputTopic an, aber nicht beides.

Vorlagenparameter

Erforderliche Parameter

  • outputTableSpec: Die BigQuery-Tabelle, in die Daten geschrieben werden sollen, formatiert als PROJECT_ID:DATASET_NAME.TABLE_NAME.

Optionale Parameter

  • inputTopic: Das Pub/Sub-Thema, aus dem gelesen werden soll, formatiert als projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: Das Pub/Sub-Abo, aus dem gelesen werden soll, formatiert als projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: Die BigQuery-Tabelle, die für Nachrichten verwendet werden soll, die die Ausgabetabelle nicht erreicht haben, formatiert als PROJECT_ID:DATASET_NAME.TABLE_NAME. Wenn die Tabelle nicht vorhanden ist, wird sie beim Ausführen der Pipeline erstellt. Wenn dieser Parameter nicht angegeben ist, wird stattdessen der Wert OUTPUT_TABLE_SPEC_error_records verwendet.
  • useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die „Mindestens einmal“-Semantik verwenden (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf „true“ fest. Wenn Sie die „Genau einmal“-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
  • useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
  • pythonExternalTextTransformGcsPath: Das Cloud Storage-Pfadmuster für den Python-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel: gs://your-bucket/your-function.py.
  • pythonExternalTextTransformFunctionName: Der Name der Funktion, die aus Ihrer Python-Datei aufgerufen werden soll. Verwenden Sie nur Buchstaben, Ziffern und Unterstriche. Beispiel: 'transform' or 'transform_udf1'.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
  • Führen Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to BigQuery with Python UDF templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
    8. Klicken Sie auf Job ausführen.

    gcloud

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Ersetzen Sie Folgendes:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    Nächste Schritte