Le pipeline de texte Cloud Storage vers BigQuery avec la fonction Python définie par l'utilisateur est un pipeline par lots qui lit les fichiers texte stockés dans Cloud Storage, les transforme à l'aide d'une fonction Python définie par l'utilisateur et ajoute le résultat à une table BigQuery.
Conditions requises pour ce pipeline
- Créez un fichier JSON décrivant votre schéma BigQuery.
Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé
BigQuery Schema
et que son contenu suit le modèle{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
.Le modèle de traitement par lots du texte Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs
STRUCT
(Enregistrement) de la table BigQuery cible.Le code JSON suivant décrit un exemple de schéma BigQuery :
{ "BigQuery Schema": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" }, ] }
- Créez un fichier Python (
.py
) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Votre fonction doit renvoyer une chaîne JSON.Par exemple, cette fonction divise chaque ligne d'un fichier CSV et renvoie une chaîne JSON après avoir transformé les valeurs.
import json def process(value): data = value.split(',') obj = { 'name': data[0], 'age': int(data[1]) } return json.dumps(obj)
Paramètres de modèle
Paramètre | Description |
---|---|
JSONPath |
Chemin d'accès gs:// vers le fichier JSON qui définit votre schéma BigQuery, stocké dans Cloud Storage. Exemple :gs://path/to/my/schema.json |
pythonExternalTextTransformGcsPath |
URI Cloud Storage du fichier de code Python qui définit la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py .
|
pythonExternalTextTransformFunctionName |
Nom de la fonction définie par l'utilisateur Python (UDF) que vous souhaitez utiliser. |
inputFilePattern |
Chemin d'accès gs:// vers le texte de Cloud Storage que vous souhaitez traiter. Par exemple, gs://path/to/my/text/data.txt . |
outputTable |
Nom de la table BigQuery que vous souhaitez créer pour stocker vos données traitées.
Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination.
Par exemple, my-project-name:my-dataset.my-table . |
bigQueryLoadingTemporaryDirectory |
Répertoire temporaire du processus de chargement BigQuery.
Par exemple, gs://my-bucket/my-files/temp_dir . |
useStorageWriteApi |
Facultatif : si la valeur est true , le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false . Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true . Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false . Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true . La valeur par défaut est false .
|
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
- Entrée : ligne de texte provenant d'un fichier d'entrée Cloud Storage.
- Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \ --region REGION_NAME \ --parameters \ pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
PYTHON_FUNCTION
: Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_PYTHON_UDF_FILE
: URI Cloud Storage du fichier de code Python contenant la fonction définie par l'utilisateur que vous souhaitez utiliser. Exemple :gs://my-bucket/my-udfs/my_file.py
PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQueryPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
PYTHON_FUNCTION
: Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_PYTHON_UDF_FILE
: URI Cloud Storage du fichier de code Python contenant la fonction définie par l'utilisateur que vous souhaitez utiliser. Exemple :gs://my-bucket/my-udfs/my_file.py
PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQueryPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.