Le modèle Datastream vers BigQuery est un pipeline de streaming qui lit les données Datastream et les réplique dans BigQuery. Le modèle lit les données depuis Cloud Storage à l'aide de notifications Pub/Sub et les réplique dans une table de préproduction BigQuery partitionnée par date. Après la réplication, le modèle exécute un objet MERGE
dans BigQuery pour sauvegarder toutes les modifications apportées par la capture de données modifiées dans une instance dupliquée de la table source.
Le modèle gère la création et la mise à jour des tables BigQuery gérées par la réplication. Lorsque le langage de définition de données (LDD) est requis, un rappel à Datastream extrait le schéma de la table source et le convertit en types de données BigQuery. Les opérations suivantes sont acceptées :
- Des tables sont créées à mesure que des données sont insérées.
- Les nouvelles colonnes sont ajoutées aux tables BigQuery avec des valeurs initiales nulles.
- Les colonnes supprimées sont ignorées dans BigQuery, et les valeurs futures sont nulles.
- Les colonnes renommées sont ajoutées à BigQuery en tant que nouvelles colonnes.
- Les modifications de type ne sont pas transmises à BigQuery.
Il est recommandé d'exécuter ce pipeline en utilisant le mode de traitement en flux continu de type "au moins une fois", car le modèle effectue une déduplication lorsqu'il fusionne les données d'une table BigQuery temporaire vers la table BigQuery principale. Cette étape du pipeline signifie qu'il n'y a aucun avantage supplémentaire à utiliser le mode de traitement en flux continu de type "exactement une fois".
Conditions requises pour ce pipeline
- Un flux DataStream prêt à répliquer ou qui réplique déjà des données.
- Les notifications Pub/Sub pour Cloud Storage sont activées pour les données DataStream.
- Les ensembles de données de destination BigQuery sont créés et un compte administrateur a été accordé au compte de service Compute Engine.
- Une clé primaire est nécessaire dans la table source pour que la table dupliquée de destination soit créée.
- Une base de données source MySQL ou Oracle. Les bases de données PostgreSQL et SQL Server ne sont pas compatibles.
Paramètres de modèle
Paramètres obligatoires
- inputFilePattern: emplacement du fichier de sortie Datastream dans Cloud Storage, au format
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat: format des fichiers de sortie générés par Datastream. Les valeurs autorisées sont
avro
etjson
. La valeur par défaut estavro
. - gcsPubSubSubscription: abonnement Pub/Sub utilisé par Cloud Storage pour informer Dataflow des nouveaux fichiers disponibles pour le traitement, au format:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate: nom de l'ensemble de données contenant les tables de préproduction. Ce paramètre accepte les modèles, par exemple
{_metadata_dataset}_log
oumy_dataset_log
. Normalement, il s'agit du nom d'un ensemble de données. La valeur par défaut est{_metadata_dataset}
. - outputDatasetTemplate: nom de l'ensemble de données contenant les tables répliquées. Ce paramètre accepte les modèles, par exemple
{_metadata_dataset}
oumy_dataset
. Normalement, il s'agit du nom d'un ensemble de données. La valeur par défaut est{_metadata_dataset}
. - deadLetterQueueDirectory: chemin d'accès utilisé par Dataflow pour écrire la sortie de la file d'attente de lettres mortes. Ce chemin d'accès ne doit pas être identique à celui de la sortie du fichier Datastream. La valeur par défaut est
empty
.
Paramètres facultatifs
- streamName: nom ou modèle du flux à interroger pour les informations de schéma. La valeur par défaut est {_metadata_stream}. La valeur par défaut est généralement suffisante.
- rfcStartDateTime: date et heure de début à utiliser pour récupérer des données depuis Cloud Storage (https://tools.ietf.org/html/rfc3339). La valeur par défaut est :
1970-01-01T00:00:00.00Z
. - fileReadConcurrency: nombre de fichiers DataStream simultanés à lire. La valeur par défaut est
10
. - outputProjectId: ID du projet Google Cloud contenant les ensembles de données BigQuery dans lesquels exporter des données. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
- outputStagingTableNameTemplate: modèle à utiliser pour nommer les tables de préproduction. Exemple :
{_metadata_table}
La valeur par défaut est{_metadata_table}_log
. - outputTableNameTemplate: modèle à utiliser pour le nom des tables répliquées, par exemple
{_metadata_table}
. La valeur par défaut est{_metadata_table}
. - ignoreFields: champs à ignorer dans BigQuery, sous forme de liste d'éléments séparés par une virgule. La valeur par défaut est
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Par exemple,_metadata_stream,_metadata_schema
. - mergeFrequencyMinutes: nombre de minutes entre les fusions pour une table donnée. La valeur par défaut est
5
. - dlqRetryMinutes: nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. La valeur par défaut est
10
. - dataStreamRootUrl: URL racine de l'API Datastream. La valeur par défaut est https://datastream.googleapis.com/.
- applyMerge: indique si les requêtes MERGE doivent être désactivées pour la tâche. La valeur par défaut est
true
. - mergeConcurrency: nombre de requêtes MERGE BigQuery simultanées. Ne prend effet que lorsque applyMerge est défini sur "True". La valeur par défaut est
30
. - partitionRetentionDays: nombre de jours à utiliser pour la conservation des partitions lors de l'exécution de fusions BigQuery. La valeur par défaut est
1
. - useStorageWriteApiAtLeastOnce: ce paramètre ne prend effet que si
Use BigQuery Storage Write API
est activé. Si la valeur esttrue
, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique "exactement une fois" est utilisée. La valeur par défaut estfalse
. - javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par lUDF;utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples. - javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est
0
, l'actualisation de l'UDF est désactivée. La valeur par défaut est0
. - pythonTextTransformGcsPath: modèle de chemin d'accès Cloud Storage pour le code Python contenant vos fonctions définies par l'utilisateur. Exemple :
gs://your-bucket/your-transforms/*.py
- pythonRuntimeVersion: version d'exécution à utiliser pour cette fonction définie par l'utilisateur Python.
- pythonTextTransformFunctionName: nom de la fonction à appeler à partir de votre fichier JavaScript. N'utilisez que des lettres, des chiffres et des traits de soulignement. Exemple :
transform_udf1
- runtimeRetries: nombre de tentatives d'exécution d'un runtime avant échec. La valeur par défaut est 5.
- useStorageWriteApi: si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est
false
. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. La valeur par défaut est 0. - storageWriteApiTriggeringFrequencySec: spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre.
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Datastream to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nom de votre ensemble de données BigQuery.BIGQUERY_TABLE
: votre modèle de table BigQuery. Par exemple,{_metadata_schema}_{_metadata_table}_log
.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nom de votre ensemble de données BigQuery.BIGQUERY_TABLE
: votre modèle de table BigQuery. Par exemple,{_metadata_schema}_{_metadata_table}_log
.
Étape suivante
- Apprenez à mettre en œuvre Datastream et Dataflow pour l'analyse.
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.