Die Pipeline "Cloud Storage Text für BigQuery" ist eine Streamingpipeline, die in Cloud Storage gespeicherte Textdateien streamt, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User-Defined Function, UDF) transformiert und das Ergebnis an BigQuery anhängt.
Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss manuell über eine Cancel-Anweisung und kein
Drain beendet werden, aufgrund ihrer Verwendung der Watch
Transformation, die eine splittable DoFn
ist, die den Draining nicht unterstützt.
Pipelineanforderungen
- Erstellen Sie eine JSON-Datei, die das Schema Ihrer Ausgabetabelle in BigQuery beschreibt.
Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen
fields
bereit, dessen Inhalt dem Muster{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
folgt. Beispiele:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Erstellen Sie eine JavaScript-Datei (
.js
) mit Ihrer UDF, die die Logik für die Transformation der Textzeilen bereitstellt. Ihre Funktion muss einen JSON-String zurückgeben.Im folgenden Beispiel wird jede Zeile einer CSV-Datei aufgeteilt, ein JSON-Objekt mit den Werten erstellt und ein JSON-String zurückgegeben:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Vorlagenparameter
Erforderliche Parameter
- inputFilePattern: Der gs://-Pfad zum Text in Cloud Storage, den Sie verarbeiten möchten. Beispiel:
gs://your-bucket/your-file.txt
. - JSONPath: Der gs://-Pfad zur JSON-Datei, die Ihr BigQuery-Schema definiert und in Cloud Storage gespeichert wird. Beispiel:
gs://your-bucket/your-schema.json
. - outputTable: Der Speicherort der BigQuery-Tabelle zum Speichern der verarbeiteten Daten. Wenn Sie eine vorhandene Tabelle wiederverwenden, wird sie überschrieben. Beispiel:
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - javascriptTextTransformGcsPath: Der Cloud Storage-URI der Datei
.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://your-bucket/your-transforms/*.js
- javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Beispiel:transform_udf1
. - bigQueryLoadingTemporaryDirectory: Das temporäre Verzeichnis für den BigQuery-Ladevorgang. Beispiel:
gs://your-bucket/your-files/temp-dir
.
Optionale Parameter
- outputDeadletterTable: Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn eine Tabelle nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Falls nichts angegeben wird, wird
<outputTableSpec>_error_records
verwendet. Beispiel:<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
- useStorageWriteApiAtLeastOnce: Dieser Parameter wird nur wirksam, wenn
Use BigQuery Storage Write API
aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false". - useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false
. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. - pythonExternalTextTransformGcsPath: Das Cloud Storage-Pfadmuster für den Python-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel:
gs://your-bucket/your-function.py
. - javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert
0
ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist0
.
Benutzerdefinierte Funktion
Diese Vorlage erfordert eine UDF, die die Eingabedateien parst, wie unter Pipelineanforderungen beschrieben. Die Vorlage ruft die UDF für jede Textzeile in jeder Eingabedatei auf. Weitere Informationen zum Erstellen von benutzerdefinierten Funktionen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Funktionsspezifikation
UDFs haben die folgende Spezifikation:
- Eingabe: eine einzelne Textzeile aus einer Eingabedatei
- Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Storage Text to BigQuery (Stream) templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Ersetzen Sie Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
STAGING_LOCATION
: der Speicherort für das Staging lokaler Dateien (z. B.gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.PATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennameBIGQUERY_UNPROCESSED_TABLE
: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete NachrichtenPATH_TO_TEMP_DIR_ON_GCS
: der Cloud Storage-Pfad zum temporären Verzeichnis
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
STAGING_LOCATION
: der Speicherort für das Staging lokaler Dateien (z. B.gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.PATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennameBIGQUERY_UNPROCESSED_TABLE
: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete NachrichtenPATH_TO_TEMP_DIR_ON_GCS
: der Cloud Storage-Pfad zum temporären Verzeichnis
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.