Vorlage "Cloud Storage Text für BigQuery" mit Python-UDF

Die Pipeline "Cloud Storage Text für BigQuery mit Python-UDF" ist eine Batchpipeline, die in Cloud Storage gespeicherte Textdateien liest, sie mit einer benutzerdefinierten Python-Funktion (User-Defined Function, UDF) transformiert und das Ergebnis an eine BigQuery-Tabelle anhängt.

Pipelineanforderungen

  • Erstellen Sie eine JSON-Datei, die Ihr BigQuery-Schema beschreibt.

    Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen BigQuery Schema bereit, dessen Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt.

    Die Batchvorlage "Cloud Storage Text für BigQuery" unterstützt nicht den Import von Daten in Felder des Typs STRUCT (Eintrag) in der BigQuery-Zieltabelle.

    Der folgende JSON-Code beschreibt ein BigQuery-Beispielschema:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • Erstellen Sie eine JavaScript-Datei (.py) mit Ihrer UDF, die die Logik für die Transformation der Textzeilen bereitstellt. Ihre Funktion muss einen JSON-String zurückgeben.

    Diese Funktion teilt beispielsweise jede Zeile einer CSV-Datei auf und gibt nach der Transformation der Werte einen JSON-String zurück.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Vorlagenparameter

Parameter Beschreibung
JSONPath Der gs://-Pfad zur JSON-Datei, die Ihr BigQuery-Schema definiert und in Cloud Storage gespeichert wird. Beispiel: gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten.
inputFilePattern Der gs://-Pfad zum Text in Cloud Storage, den Sie verarbeiten möchten. Beispiel: gs://path/to/my/text/data.txt.
outputTable Der BigQuery-Tabellenname, den Sie zum Speichern Ihrer verarbeiteten Daten erstellen möchten. Wenn Sie eine vorhandene BigQuery-Tabelle wiederverwenden, werden die Daten an die Zieltabelle angehängt. z. B. my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory Das temporäre Verzeichnis für den BigQuery-Ladevorgang. Beispiel: gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Optional: Wenn true, verwendet die Pipeline die BigQuery Storage Write API. Der Standardwert ist false. Weitere Informationen finden Sie unter BigQuery Storage Write API verwenden.
useStorageWriteApiAtLeastOnce Optional: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie "Mindestens einmal"-Semantik verwenden, legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: eine Textzeile aus einer Cloud Storage-Eingabedatei
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PYTHON_FUNCTION: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_PYTHON_UDF_FILE: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: Der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • PATH_TO_TEMP_DIR_ON_GCS: Der Cloud Storage-Pfad zum temporären Verzeichnis

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PYTHON_FUNCTION: der Name der benutzerdefinierten Python-Funktion (UDF), die Sie verwenden möchten.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_PYTHON_UDF_FILE: Der Cloud Storage-URI der Python-Codedatei, in der die benutzerdefinierte Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: Der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • PATH_TO_TEMP_DIR_ON_GCS: Der Cloud Storage-Pfad zum temporären Verzeichnis

Nächste Schritte