Vorlage "Pub/Sub Proto für BigQuery mit Python"

Die Vorlage „Pub/Sub Proto für BigQuery“ ist eine Streamingpipeline, die Proto-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Sie können eine benutzerdefinierte Funktion (User-defined Function, UDF) zum Transformieren von Daten bereitstellen. Fehler während der Ausführung der UDF können entweder an ein separates Pub/Sub-Thema oder an dasselbe nicht verarbeitete Thema wie die BigQuery-Fehler gesendet werden.

Pipelineanforderungen

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Proto-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Ausgabethema muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
  • Wenn die BigQuery-Tabelle vorhanden ist, muss sie ein Schema haben, das mit den Proto-Daten unabhängig vom createDisposition-Wert übereinstimmt.

Vorlagenparameter

Parameter Beschreibung
protoSchemaPath Der Cloud Storage-Speicherort der eigenständigen Proto-Schemadatei. z. B. gs://path/to/my/file.pb. Diese Datei kann mit dem Flag --descriptor_set_out des Befehls protoc generiert werden. Das Flag --include_imports garantiert, dass die Datei unabhängig ist.
fullMessageName Der vollständige Proto-Nachrichtenname. Beispiel: package.name.MessageName, wobei package.name der Wert für die Anweisung package und nicht für die Anweisung java_package ist.
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. z. B. my-project:my_dataset.my_table. Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit der Eingabeschemadatei erstellt werden.
preserveProtoFieldNames Optional: true, um den ursprünglichen Proto-Feldnamen in JSON beizubehalten. false, um weitere JSON-Standardnamen zu verwenden. Zum Beispiel würde false field_name in fieldName ändern. (Standard: false)
bigQueryTableSchemaPath Optional: Cloud Storage-Pfad zum BigQuery-Schemapfad. Beispiel: gs://path/to/my/schema.json. Falls nicht angegeben ist, wird das Schema aus dem Proto-Schema abgeleitet.
pythonExternalTextTransformGcsPath Optional: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Optional: Der Name der benutzerdefinierten Python-Funktion, die Sie verwenden möchten.
udfOutputTopic Optional: Das Pub/Sub-Thema, in dem die UDF-Fehler gespeichert werden. z. B. projects/<project-id>/topics/<topic-name> Wenn nicht angegeben, werden UDF-Fehler an dasselbe Thema wie outputTopic gesendet.
writeDisposition Optional: Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standard: WRITE_APPEND.
createDisposition Optional: Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standard: CREATE_IF_NEEDED.
useStorageWriteApi Optional: Wenn true, verwendet die Pipeline die BigQuery Storage Write API. Der Standardwert ist false. Weitere Informationen finden Sie unter BigQuery Storage Write API verwenden.
useStorageWriteApiAtLeastOnce Optional: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie "Mindestens einmal"-Semantik verwenden, legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
numStorageWriteApiStreams Optional: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
storageWriteApiTriggeringFrequencySec Optional: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
  • Führen Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Proto to BigQuery with Python UDF templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Klicken Sie auf Job ausführen.

    gcloud

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    Ersetzen Sie Folgendes:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
    • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
    • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
    • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden möchten

      Sie können die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
    • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
    • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
    • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

    Nächste Schritte