Le pipeline Texte Cloud Storage vers BigQuery est un pipeline de streaming qui diffuse les fichiers texte stockés dans Cloud Storage, les transforme à l'aide d'une fonction JavaScript définie par l'utilisateur (UDF) que vous fournissez et ajoute le résultat à BigQuery.
Le pipeline fonctionne indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation Watch
qui est une fonction DoFn
non compatible avec le drainage.
Conditions requises pour ce pipeline
- Créez un fichier JSON décrivant le schéma de la table de sortie dans BigQuery.
Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé
fields
et que son contenu suit le modèle{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Exemple :{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Créez un fichier JavaScript (
.js
) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Votre fonction doit renvoyer une chaîne JSON.L'exemple suivant divise chaque ligne d'un fichier CSV, crée un objet JSON avec les valeurs et renvoie une chaîne JSON:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Paramètres de modèle
Paramètres obligatoires
- inputFilePattern : chemin d'accès gs:// vers le texte que vous souhaitez traiter dans Cloud Storage (exemple : gs://your-bucket/your-file.txt).
- JSONPath : chemin d'accès gs:// vers le fichier JSON qui définit votre schéma BigQuery, stocké dans Cloud Storage. (Exemple : gs://your-bucket/your-schema.json).
- outputTable : emplacement de la table BigQuery à utiliser pour stocker les données traitées. Si vous réutilisez une table existante, celle-ci va être écrasée. (Exemple : <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>).
- javascriptTextTransformGcsPath : URI Cloud Storage du fichier
.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. (Exemple : gs://your-bucket/your-transforms/*.js). - javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples (exemple : transform_udf1). - bigQueryLoadingTemporaryDirectory : répertoire temporaire pour le processus de chargement de BigQuery (exemple : gs://your-bucket/your-files/temp-dir).
Paramètres facultatifs
- outputDeadletterTable : table des messages qui n'ont pas pu atteindre la table de sortie. En l'absence de table existante, une table va être créée lors de l'exécution du pipeline. Si aucune valeur n'est spécifiée,
<outputTableSpec>_error_records
est utilisé. (Exemple : <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>). - useStorageWriteApiAtLeastOnce : Ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
- useStorageWriteApi : si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est
false
. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. La valeur par défaut est 0. - storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. - pythonExternalTextTransformGcsPath : modèle de chemin d'accès Cloud Storage pour le code Python contenant vos fonctions définies par l'utilisateur (exemple : gs://your-bucket/your-function.py).
- javascriptTextTransformReloadIntervalMinutes : spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.
Fonction définie par l'utilisateur
Ce modèle nécessite une fonction définie par l'utilisateur (UDF) qui analyse les fichiers d'entrée, comme décrit dans la section Exigences du pipeline. Le modèle appelle l'UDF pour chaque ligne de texte de chaque fichier d'entrée. Pour en savoir plus sur la création d'UDF, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
- Entrée : une ligne de texte provenant d'un fichier d'entrée.
- Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Storage Text to BigQuery (Stream) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQuery.BIGQUERY_UNPROCESSED_TABLE
: nom de votre table BigQuery pour les messages non traitésPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQuery.BIGQUERY_UNPROCESSED_TABLE
: nom de votre table BigQuery pour les messages non traitésPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.