Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.
Pipelineanforderungen
- Das Pub/Sub-Eingabeabo muss vorhanden sein.
- Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
- Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
- Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
Vorlagenparameter
Erforderliche Parameter
- schemaPath : Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel:
gs://path/to/my/schema.avsc
. - inputSubscription : Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. (Beispiel: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>).
- outputTableSpec : Der Speicherort der BigQuery-Ausgabetabelle, in die die Ausgabe geschrieben werden soll. Beispiel:
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
.Abhängig von der angegebenencreateDisposition
kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden. - outputTopic : Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/topics/<TOPIC_NAME>).
Optionale Parameter
- useStorageWriteApiAtLeastOnce : Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf „true“ fest. Wenn Sie die „Genau einmal“-Semantik verwenden möchten, legen Sie den Parameter auf
false
fest. Dieser Parameter gilt nur, wennuseStorageWriteApi
true
ist. Der Standardwert istfalse
. - writeDisposition : Der BigQuery-WriteDisposition-Wert (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Beispiel:
WRITE_APPEND
,WRITE_EMPTY
oderWRITE_TRUNCATE
. Die Standardeinstellung istWRITE_APPEND
. - createDisposition : BigQuery-CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Beispiele:
CREATE_IF_NEEDED
undCREATE_NEVER
. Die Standardeinstellung istCREATE_IF_NEEDED
. - useStorageWriteApi : Wenn "true", verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false
. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Avro to BigQuery templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Ersetzen Sie Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleDEADLETTER_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Ersetzen Sie Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleDEADLETTER_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.