Vorlage "Pub/Sub-Abo für BigQuery"

Die Vorlage "Pub/Sub-Abo für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Pipelineanforderungen

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise können Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen. Das Tabellenschema muss mit den JSON-Eingabeobjekten übereinstimmen.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle im Format <my-project>:<my-dataset>.<my-table> für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen OUTPUT_TABLE_SPEC_error_records verwendet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der dem Schema der BigQuery-Zieltabelle entspricht.
  • Führen Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Subscription to BigQuery template aus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
    8. Klicken Sie auf Job ausführen.

    gcloud

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
    

    Dabei gilt:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }
    

    Dabei gilt:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    Nächste Schritte