Die Pipeline "Cloud Storage Text für BigQuery mit Python-UDF" ist eine Batchpipeline, die in Cloud Storage gespeicherte Textdateien liest, sie mit einer benutzerdefinierten Python-Funktion (User-Defined Function, UDF) transformiert und das Ergebnis an eine BigQuery-Tabelle anhängt.
Pipelineanforderungen
- Erstellen Sie eine JSON-Datei, die Ihr BigQuery-Schema beschreibt.
Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen
BigQuery Schema
bereit, dessen Inhalt dem Muster{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
folgt.Die Batchvorlage "Cloud Storage Text für BigQuery" unterstützt nicht den Import von Daten in Felder des Typs
STRUCT
(Eintrag) in der BigQuery-Zieltabelle.Der folgende JSON-Code beschreibt ein BigQuery-Beispielschema:
{ "BigQuery Schema": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" }, ] }
- Erstellen Sie eine JavaScript-Datei (
.py
) mit Ihrer UDF, die die Logik für die Transformation der Textzeilen bereitstellt. Ihre Funktion muss einen JSON-String zurückgeben.Diese Funktion teilt beispielsweise jede Zeile einer CSV-Datei auf und gibt nach der Transformation der Werte einen JSON-String zurück.
import json def process(value): data = value.split(',') obj = { 'name': data[0], 'age': int(data[1]) } return json.dumps(obj)
Vorlagenparameter
Parameter | Beschreibung |
---|---|
JSONPath |
Der gs:// -Pfad zur JSON-Datei, die Ihr BigQuery-Schema definiert und in Cloud Storage gespeichert wird. Beispiel: gs://path/to/my/schema.json . |
pythonExternalTextTransformGcsPath |
Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py .
|
pythonExternalTextTransformFunctionName |
Der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten. |
inputFilePattern |
Der gs:// -Pfad zum Text in Cloud Storage, den Sie verarbeiten möchten. Beispiel: gs://path/to/my/text/data.txt . |
outputTable |
Der BigQuery-Tabellenname, den Sie zum Speichern Ihrer verarbeiteten Daten erstellen möchten.
Wenn Sie eine vorhandene BigQuery-Tabelle wiederverwenden, werden die Daten an die Zieltabelle angehängt.
z. B. my-project-name:my-dataset.my-table . |
bigQueryLoadingTemporaryDirectory |
Das temporäre Verzeichnis für den BigQuery-Ladevorgang.
Beispiel: gs://my-bucket/my-files/temp_dir . |
useStorageWriteApi |
Optional: Wenn true , verwendet die Pipeline die
BigQuery Storage Write API. Der Standardwert ist false . Weitere Informationen finden Sie unter BigQuery Storage Write API verwenden.
|
useStorageWriteApiAtLeastOnce |
Optional: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie
"Mindestens einmal"-Semantik verwenden, legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false .
|
Benutzerdefinierte Funktion
Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Funktionsspezifikation
UDFs haben die folgende Spezifikation:
- Eingabe: eine Textzeile aus einer Cloud Storage-Eingabedatei
- Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \ --region REGION_NAME \ --parameters \ pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
PYTHON_FUNCTION
: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten.PATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_PYTHON_UDF_FILE
: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: Der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennamePATH_TO_TEMP_DIR_ON_GCS
: Der Cloud Storage-Pfad zum temporären Verzeichnis
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
PYTHON_FUNCTION
: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten.PATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_PYTHON_UDF_FILE
: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: Der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennamePATH_TO_TEMP_DIR_ON_GCS
: Der Cloud Storage-Pfad zum temporären Verzeichnis
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.