Modèle de texte Cloud Storage vers BigQuery (Flux) avec UDF Python

Le pipeline Texte Cloud Storage vers BigQuery est un pipeline de streaming qui diffuse les fichiers texte stockés dans Cloud Storage, les transforme à l'aide d'une fonction définie par l'utilisateur Python (UDF) que vous fournissez et ajoute le résultat à BigQuery.

Le pipeline fonctionne indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation Watch qui est une fonction DoFn non compatible avec le drainage.

Conditions requises pour ce pipeline

  • Créez un fichier JSON décrivant le schéma de la table de sortie dans BigQuery.

    Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé fields et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemple :

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Créez un fichier Python (.py) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Votre fonction doit renvoyer une chaîne JSON.

    L'exemple suivant divise chaque ligne d'un fichier CSV, crée un objet JSON avec les valeurs et renvoie une chaîne JSON:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Paramètres de modèle

Paramètre Description
pythonExternalTextTransformGcsPath URI Cloud Storage du fichier de code Python qui définit la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Nom de la fonction définie par l'utilisateur Python (UDF) que vous souhaitez utiliser.
JSONPath Emplacement Cloud Storage de votre fichier de schéma BigQuery, décrit comme fichier JSON. Exemple : gs://path/to/my/schema.json.
outputTable Nom complet de la table BigQuery. Par exemple : my-project:dataset.table
inputFilePattern Emplacement Cloud Storage du texte que vous souhaitez traiter. Exemple : gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Répertoire temporaire pour le processus de chargement de BigQuery. Par exemple : gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table des messages qui n'ont pas pu atteindre la table de sortie. Exemple : my-project:dataset.my-unprocessed-table. Si elle n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.

Fonction définie par l'utilisateur

Ce modèle nécessite une fonction définie par l'utilisateur (UDF) qui analyse les fichiers d'entrée, comme décrit dans la section Exigences du pipeline. Le modèle appelle l'UDF pour chaque ligne de texte de chaque fichier d'entrée. Pour en savoir plus sur la création d'UDF, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : une ligne de texte provenant d'un fichier d'entrée.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • PYTHON_FUNCTION : Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_PYTHON_UDF_FILE : URI Cloud Storage du fichier de code Python contenant la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • PYTHON_FUNCTION : Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_PYTHON_UDF_FILE : URI Cloud Storage du fichier de code Python contenant la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire

Étape suivante