Google fournit un ensemble de modèles Dataflow Open Source.
Ces modèles Dataflow peuvent vous aider à traiter des tâches de données volumineuses, y compris l'importation, l'exportation, la sauvegarde et la restauration de données, ainsi que les opérations d'API groupées, le tout sans avoir à utiliser un environnement de développement dédié. Les modèles sont basés sur Apache Beam et exploitent Dataflow pour transformer les données.
Pour obtenir des informations générales sur les modèles, consultez la page Modèles Dataflow. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.Ce guide répertorie les modèles de streaming.
Abonnement Pub/Sub vers BigQuery
Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.
Conditions requises pour ce pipeline :
- Le champ
data
des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON. Par exemple, vous pouvez insérer des messages contenant les valeurs du champdata
au format{"k1":"v1", "k2":"v2"}
dans une table BigQuery comportant deux colonnes nomméesk1
etk2
, en utilisant un type de données de chaîne ("string"). - La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscription |
Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
Table BigQuery des messages qui n'ont pas pu atteindre la table de sortie, au format <my-project>:<my-dataset>.<my-table> .
Si elle n'existe pas, elle est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, OUTPUT_TABLE_SPEC_error_records est utilisé à la place. |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
Exécuter le modèle Abonnement Pub/Sub vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Subscription to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubDATASET
: votre ensemble de données BigQuery.TABLE_NAME
: nom de votre table BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubDATASET
: votre ensemble de données BigQuery.TABLE_NAME
: nom de votre table BigQuery.
Sujet Pub/Sub vers BigQuery
Le modèle Sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.
Conditions requises pour ce pipeline :
- Le champ
data
des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON. Par exemple, vous pouvez insérer des messages contenant les valeurs du champdata
au format{"k1":"v1", "k2":"v2"}
dans une table BigQuery comportant deux colonnes nomméesk1
etk2
, en utilisant un type de données de chaîne ("string"). - La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.
Paramètres de modèle
Paramètres | Description |
---|---|
inputTopic |
Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
La table BigQuery des messages n'ayant pas pu atteindre la table de sortie Elle doit être au format <my-project>:<my-dataset>.<my-table> .
Si elle n'existe pas, elle est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place. |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
Exécuter le modèle Sujet Pub/Sub vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubDATASET
: votre ensemble de données BigQuery.TABLE_NAME
: nom de votre table BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubDATASET
: votre ensemble de données BigQuery.TABLE_NAME
: nom de votre table BigQuery.
Pub/Sub Avro vers BigQuery
Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
- Le sujet Pub/Sub non traité doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
Paramètres de modèle
Paramètres | Description |
---|---|
schemaPath |
Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc . |
inputSubscription |
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription> . |
outputTopic |
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table> .
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur. |
writeDisposition |
(Facultatif) La propriété WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND , WRITE_EMPTY ou WRITE_TRUNCATE . Par défaut, WRITE_APPEND . |
createDisposition |
(Facultatif) La propriété CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER . Par défaut, CREATE_IF_NEEDED . |
Exécuter le modèle Pub/Sub Avro vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Proto Pub/Sub vers BigQuery
Le modèle proto Pub/Sub vers BigQuery est un pipeline de streaming qui ingère les données proto d'un abonnement Pub/Sub dans une table BigQuery.
Les erreurs qui se produisent lors de l'écriture dans la table BigQuery sont insérées en flux continu dans un sujet Pub/Sub non traité.
Une fonction définie par l'utilisateur (UDF) JavaScript peut être fournie pour transformer les données. Les erreurs lors de l'exécution de l'UDF peuvent être envoyées à un sujet Pub/Sub distinct ou au même sujet non traité que les erreurs BigQuery.
Conditions requises pour ce pipeline :
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements proto doit exister dans Cloud Storage.
- Le sujet Pub/Sub de sortie doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
- Si la table BigQuery existe, elle doit posséder un schéma correspondant aux données proto, quelle que soit la valeur de
createDisposition
.
Paramètres de modèle
Paramètres | Description |
---|---|
protoSchemaPath |
Emplacement Cloud Storage du fichier de schéma proto autonome. Par exemple, gs://path/to/my/file.pb .
Ce fichier peut être généré avec l'option --descriptor_set_out de la commande protoc .
L'option --include_imports garantit que le fichier est autonome. |
fullMessageName |
Nom complet du message proto. Par exemple, package.name.MessageName , où package.name est la valeur fournie pour l'instruction package , et non pour l'instruction java_package . |
inputSubscription |
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription> . |
outputTopic |
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery. Par exemple, my-project:my_dataset.my_table .
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du fichier de schéma d'entrée. |
preserveProtoFieldNames |
(Facultatif) true pour conserver le nom du champ Proto d'origine au format JSON. false pour utiliser des noms JSON plus standards.
Par exemple, false remplace field_name par fieldName . (Par défaut : false ) |
bigQueryTableSchemaPath |
(Facultatif) Chemin d'accès Cloud Storage vers le chemin d'accès du schéma BigQuery. Par exemple, gs://path/to/my/schema.json . S'il n'est pas fourni, le schéma est obtenu à partir du schéma Proto. |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
udfOutputTopic |
(Facultatif) Sujet Pub/Sub stockant les erreurs UDF. Par exemple : projects/<project-id>/topics/<topic-name> Si cet élément n'est pas fourni, les erreurs UDF sont envoyées au même sujet que outputTopic . |
writeDisposition |
(Facultatif) La disposition WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND , WRITE_EMPTY ou WRITE_TRUNCATE . Valeur par défaut : WRITE_APPEND . |
createDisposition |
(Facultatif) La disposition CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER . Valeur par défaut : CREATE_IF_NEEDED . |
Exécuter le modèle Proto Pub/Sub vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Proto to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Pub/Sub vers Pub/Sub
Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.
Conditions requises pour ce pipeline :
- L'abonnement Pub/Sub source doit exister avant l'exécution.
- L'abonnement Pub/Sub source doit être un abonnement pull.
- Le sujet Pub/Sub de destination doit exister avant l'exécution.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscription |
Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name> . |
outputTopic |
Sujet Cloud Pub/Sub dans lequel écrire la sortie. Par exemple, projects/<project-id>/topics/<topic-name> . |
filterKey |
(Facultatif) Filtrez les événements en fonction d'une clé d'attribut. Aucun filtre n'est appliqué si filterKey n'est pas spécifié. |
filterValue |
(Facultatif) Valeur d'attribut de filtre à utiliser dans le cas où un filterKey est fourni. Une valeur filterValue nulle est utilisée par défaut. |
Exécuter le modèle Pub/Sub vers Pub/Sub
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Pub/Sub template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: nom de l'abonnement Pub/SubTOPIC_NAME
: nom du sujet Pub/SubFILTER_KEY
: clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.FILTER_VALUE
: valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)SUBSCRIPTION_NAME
: nom de l'abonnement Pub/SubTOPIC_NAME
: nom du sujet Pub/SubFILTER_KEY
: clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.FILTER_VALUE
: valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
Pub/Sub vers Splunk
Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk via la solution HEC (HTTP Event Collector) de Splunk. Le cas d'utilisation le plus courant de ce modèle est l'exportation de journaux vers Splunk. Pour découvrir un exemple du workflow sous-jacent, consultez la section Déployer des exportations de journaux prêtes pour la production vers Splunk à l'aide de Dataflow.
Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript définie par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub non traité en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.
Pour ajouter une couche de protection à votre jeton HEC, vous pouvez également transmettre une clé Cloud KMS ainsi que le paramètre de jeton HEC encodé en base64 chiffré avec cette clé. Pour en savoir plus sur le chiffrement des paramètres du jeton HEC, consultez la page sur le point de terminaison du chiffrement de l'API Cloud KMS.
Conditions requises pour ce pipeline :
- L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
- Le sujet Pub/Sub non traité doit exister avant l'exécution du pipeline.
- Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
- Le jeton de la solution HEC de Splunk doit être généré et disponible.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscription |
Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name> . |
token |
(Facultatif) Jeton d'authentification HEC Splunk. Doit être spécifié si tokenSource est défini sur PLAINTEXT ou KMS. |
url |
URL HEC Splunk. Il doit être routable depuis le VPC dans lequel le pipeline est exécuté. Par exemple, https://splunk-hec-host:8088. |
outputDeadletterTopic |
Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name> . |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
batchCount |
(Facultatif) Taille de lot pour l'envoi de plusieurs événements vers Splunk. Valeur par défaut 1 (pas de traitement par lots). |
parallelism |
(Facultatif) Nombre maximal de demandes en parallèle. Valeur par défaut 1 (aucun parallélisme). |
disableCertificateValidation |
(Facultatif) Désactiver la validation du certificat SSL. Valeur par défaut "false" (validation activée). Si la valeur est "true", les certificats ne sont pas validés (tous les certificats sont approuvés) et le paramètre "rootCaCertificatePath" est ignoré. |
includePubsubMessage |
(Facultatif) Inclure le message Pub/Sub complet dans la charge utile. Valeur "false" par défaut (seul l'élément de données est inclus dans la charge utile). |
tokenSource |
Source du jeton. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager est utilisé.
Si tokenSource est défini sur KMS, tokenKMSEncryptionKey et le token chiffré doivent être spécifiés.
Si tokenSource est défini sur SECRET_MANAGER, tokenSecretId doit être spécifié.
Si tokenSource est défini sur PLAINTEXT, token doit être spécifié.
|
tokenKMSEncryptionKey |
(Facultatif) Clé Cloud KMS permettant de déchiffrer la chaîne du jeton HEC. Ce paramètre doit être spécifié si tokenSource est défini sur KMS.
Si la clé Cloud KMS est fournie, la chaîne du jeton HEC doit être transmise sous forme chiffrée. |
tokenSecretId |
(Facultatif) ID du secret fourni par Secret Manager pour le jeton. Ce paramètre doit être spécifié si tokenSource est défini sur SECRET_MANAGER.
Format requis : projects/<project-id>/secrets/<secret-name>/versions/<secret-version> . |
rootCaCertificatePath |
(Facultatif) URL complète du certificat CA racine dans Cloud Storage. Par exemple, gs://mybucket/mycerts/privateCA.crt . Le certificat fourni dans Cloud Storage doit être encodé au format DER et peut être fourni en encodage binaire ou imprimable (base64).
Si le certificat est fourni avec un encodage en base64, il doit être délimité par "------BEGIN CERTIFICATE-----" au début et par "-----END CERTIFICATE-----" à la fin. Si ce paramètre est fourni, ce fichier de certificat CA privé est extrait et ajouté au trust store du nœud de calcul Dataflow pour vérifier le certificat SSL du point de terminaison HEC de Splunk.
Si ce paramètre n'est pas fourni, le trust store par défaut est utilisé. |
enableBatchLogs |
(Facultatif) Spécifie si les journaux doivent être activés pour les lots écrits dans Splunk. Valeur par défaut : true |
enableGzipHttpCompression |
(Facultatif) Indique si les requêtes HTTP envoyées à la solution HEC de Splunk doivent être compressées (codage de contenu gzip). Valeur par défaut : true |
Exécuter le modèle Pub/Sub vers Splunk
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Splunk template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION,\ rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: nom de l'abonnement Pub/SubTOKEN
: jeton HTTP Event Collector de SplunkURL
: chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple,https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: nom du sujet Pub/SubJAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).BATCH_COUNT
: taille de lot à utiliser pour envoyer plusieurs événements vers SplunkPARALLELISM
: nombre de requêtes parallèles à utiliser pour envoyer des événements vers SplunkDISABLE_VALIDATION
:true
si vous souhaitez désactiver la validation du certificat SSLROOT_CA_CERTIFICATE_PATH
: chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple,gs://your-bucket/privateCA.crt
)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: nom de l'abonnement Pub/SubTOKEN
: jeton HTTP Event Collector de SplunkURL
: chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple,https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: nom du sujet Pub/SubJAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).BATCH_COUNT
: taille de lot à utiliser pour envoyer plusieurs événements vers SplunkPARALLELISM
: nombre de requêtes parallèles à utiliser pour envoyer des événements vers SplunkDISABLE_VALIDATION
:true
si vous souhaitez désactiver la validation du certificat SSLROOT_CA_CERTIFICATE_PATH
: chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple,gs://your-bucket/privateCA.crt
)
Pub/Sub vers fichiers Avro dans Cloud Storage
Le modèle Pub/Sub vers fichiers Avro dans Cloud Storage est un pipeline de streaming qui lit les données d'un sujet Pub/Sub et écrit des fichiers Avro dans le bucket Cloud Storage spécifié.
Conditions requises pour ce pipeline :
- Le sujet Pub/Sub d'entrée doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres | Description |
---|---|
inputTopic |
Sujet Pub/Sub permettant de s'abonner à la consultation de messages. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name> . |
outputDirectory |
Répertoire de sortie dans lequel les fichiers de sortie Avro seront archivés. Doit inclure / à la fin.
Exemple : gs://example-bucket/example-directory/ . |
avroTempDirectory |
Répertoire des fichiers Avro temporaires. Doit inclure / à la fin. Par exemple : gs://example-bucket/example-directory/ . |
outputFilenamePrefix |
(Facultatif) Préfixe du nom de fichier de sortie pour les fichiers Avro. |
outputFilenameSuffix |
(Facultatif) Suffixe du nom de fichier de sortie pour les fichiers Avro. |
outputShardTemplate |
[Facultatif] Modèle de partition du fichier de sortie. Spécifié en tant que séquences répétées des lettres S ou N . Exemple : SSS-NNN . Celles-ci sont remplacées par le numéro de partition ou par le nombre total de partitions, respectivement. Si ce paramètre n'est pas spécifié, le format du modèle par défaut est W-P-SS-of-NN . |
Exécuter le modèle Pub/Sub vers Cloud Storage Avro
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Avro Files on Cloud Storage template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ avroTempDirectory=gs://BUCKET_NAME/temp/
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom du sujet Pub/SubBUCKET_NAME
: nom du bucket Cloud StorageFILENAME_PREFIX
: préfixe du nom de fichier de sortie préféréFILENAME_SUFFIX
: suffixe du nom de fichier de sortie préféréSHARD_TEMPLATE
: modèle de partition de sortie préféré
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom du sujet Pub/SubBUCKET_NAME
: nom du bucket Cloud StorageFILENAME_PREFIX
: préfixe du nom de fichier de sortie préféréFILENAME_SUFFIX
: suffixe du nom de fichier de sortie préféréSHARD_TEMPLATE
: modèle de partition de sortie préféré
Sujet Pub/Sub vers des fichiers texte dans Cloud Storage
Le modèle Cloud Pub/Sub vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Cloud Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.
Conditions requises pour ce pipeline :
- Le sujet Pub/Sub doit exister avant l'exécution.
- Les messages publiés sur le thème doivent être au format texte.
- Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.
Paramètres de modèle
Paramètres | Description |
---|---|
inputTopic |
Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name> . |
outputDirectory |
Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/ . Cette valeur doit se terminer par une barre oblique. |
outputFilenamePrefix |
Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output- . |
outputFilenameSuffix |
Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv . |
outputShardTemplate |
Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NN où W correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01 .
|
Exécuter le modèle Pub/Sub vers des fichiers texte dans Cloud Storage
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Text Files on Cloud Storage template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom du bucket Cloud Storage
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom du bucket Cloud Storage
Sujet Pub/Sub ou abonnement vers des fichiers texte dans Cloud Storage
Le sujet Pub/Sub ou l'abonnement vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.
Conditions requises pour ce pipeline :
- Le sujet Pub/Sub ou l'abonnement doivent exister avant l'exécution.
- Les messages publiés sur le thème doivent être au format texte.
- Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.
Paramètres de modèle
Paramètres | Description |
---|---|
inputTopic |
Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name> . Si ce paramètre est fourni, inputSubscription ne doit pas être fourni. |
inputSubscription |
Abonnement Pub/Sub à partir duquel lire l'entrée. Le nom de l'abonnement doit être au format projects/<project-id>/subscription/<subscription-name> . Si ce paramètre est fourni, inputTopic ne doit pas être fourni. |
outputDirectory |
Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/ . Cette valeur doit se terminer par une barre oblique. |
outputFilenamePrefix |
Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output- . |
outputFilenameSuffix |
Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv . |
outputShardTemplate |
Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NN où W correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01 .
|
windowDuration |
(Facultatif) La durée de fenêtre correspond à l'intervalle au cours duquel les données sont écrites dans le répertoire de sortie. Configurez la durée en fonction du débit du pipeline. Par exemple, un débit plus élevé peut nécessiter des tailles de fenêtre plus petites pour que les données s'intègrent à la mémoire. La valeur par défaut est "5m", avec une durée minimale de 1 s. Les formats autorisés sont les suivants : [int]s (pour les secondes, exemple : 5s), [int]m (pour les minutes, exemple : 12m), [int]h (pour les heures, exemple : 2h). |
Exécuter le modèle Sujet Pub/Sub ou l'abonnement vers les fichiers texte dans Cloud Storage
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template jobs run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubBUCKET_NAME
: nom de votre bucket Cloud Storage
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
SUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubBUCKET_NAME
: nom de votre bucket Cloud Storage
Pub/Sub vers MongoDB
Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur. Toute erreur survenue en raison d'une non-concordance du schéma ou d'un format JSON non valide, ou pendant l'exécution de transformations, est enregistrée avec le message d'entrée dans une table BigQuery destinée aux messages non traités. Si la table des enregistrements non traités n'existe pas avant l'exécution, le pipeline la crée automatiquement.
Conditions requises pour ce pipeline :
- L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
- Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscription |
Nom de l'abonnement Pub/Sub. Par exemple :
|
mongoDBUri |
Liste de serveurs MongoDB séparés par une virgule. Par exemple : 192.285.234.12:27017,192.287.123.11:27017 |
database |
Base de données dans MongoDB pour stocker la collection. Exemple : my-db . |
collection |
Nom de la collection dans la base de données MongoDB. Exemple : my-collection . |
deadletterTable |
Table BigQuery qui stocke les messages en raison d'échecs (schéma non correspondant, format JSON non valide, etc.). Exemple : project-id:dataset-name.table-name . |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
batchSize |
(Facultatif) Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. Valeur par défaut : 1000 |
batchSizeBytes |
(Facultatif) Taille du lot en octets. Valeur par défaut : 5242880 |
maxConnectionIdleTime |
(Facultatif) Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. Valeur par défaut : 60000 |
sslEnabled |
(Facultatif) Valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. Valeur par défaut : true |
ignoreSSLCertificate |
(Facultatif) Valeur booléenne indiquant si le certificat SSL doit être ignoré. Valeur par défaut : true |
withOrdered |
(Facultatif) Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. Valeur par défaut : true |
withSSLInvalidHostNameAllowed |
(Facultatif) Valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. Valeur par défaut : true |
Exécuter le modèle Pub/Sub vers MongoDB
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
INPUT_SUBSCRIPTION
: abonnement Pub/Sub (par exemple,
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: adresses du serveur MongoDB (par exemple,192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: nom de la base de données MongoDB (par exemple,users
)COLLECTION
: nom de la collection MongoDB (par exemple,profiles
)UNPROCESSED_TABLE
: nom de la table BigQuery (par exemple,your-project:your-dataset.your-table-name
)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
INPUT_SUBSCRIPTION
: abonnement Pub/Sub (par exemple,
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: adresses du serveur MongoDB (par exemple,192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: nom de la base de données MongoDB (par exemple,users
)COLLECTION
: nom de la collection MongoDB (par exemple,profiles
)UNPROCESSED_TABLE
: nom de la table BigQuery (par exemple,your-project:your-dataset.your-table-name
)
Pub/Sub vers Elasticsearch
Le modèle Pub/Sub vers Elasticsearch est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub, exécute une fonction définie par l'utilisateur et les écrit dans Elasticsearch sous forme de documents. Le modèle Dataflow utilise la fonctionnalité de flux de données d'Elasticsearch pour stocker les données de séries temporelles sur plusieurs index tout en vous attribuant une seule ressource nommée pour les requêtes. Les flux de données sont parfaitement adaptés aux journaux, aux métriques, aux traces et aux autres données générées en continu stockées dans Pub/Sub.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
- Hôte Elasticsearch accessible au public sur une instance GCP ou sur Elastic Cloud avec Elasticsearch version 7.0 ou ultérieure. Pour en savoir plus, consultez la section Google Cloud Integration for Elastic (Intégration de Google Cloud pour Elastic).
- Sujet Pub/Sub pour le résultat de l'erreur.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscription |
Abonnement Pub/Sub à consommer. Le nom doit être au format projects/<project-id>/subscriptions/<subscription-name> . |
connectionUrl |
URL Elasticsearch au format https://hostname:[port] , ou spécifiez CloudID si vous utilisez Elastic Cloud. |
apiKey |
Clé API encodée en base64 utilisée pour l'authentification. |
errorOutputTopic |
Sujet de sortie Pub/Sub pour la publication d'enregistrements ayant échoué au format projects/<project-id>/topics/<topic-name> . |
dataset |
(Facultatif) Type de journaux envoyés via Pub/Sub pour lesquels nous disposons d'un tableau de bord prêt à l'emploi. Les valeurs de types de journaux connues sont audit, vpcflow et firewall. Valeur par défaut : pubsub |
namespace |
(Facultatif) Regroupement arbitraire, tel qu'un environnement (développement, production ou QA), une équipe ou une unité commerciale stratégique. Valeur par défaut : default |
batchSize |
(Facultatif) Taille de lot en nombre de documents. Valeur par défaut : 1000 |
batchSizeBytes |
(Facultatif) Taille de lot en octets. Valeur par défaut : 5242880 (5 Mo). |
maxRetryAttempts |
(Facultatif) Nombre maximal de nouvelles tentatives, doit être supérieur 0. Valeur par défaut : no retries |
maxRetryDuration |
(Facultatif) Durée maximale de la nouvelle tentative en millisecondes, doit être supérieure à 0. Valeur par défaut : no retries |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
propertyAsIndex |
(Facultatif) Propriété du document indexée dont la valeur spécifie les métadonnées _index à inclure dans le document de la requête groupée (prioritaire sur une UDF _index ). Valeur par défaut = none |
propertyAsId |
(Facultatif) Propriété du document indexée dont la valeur spécifie les métadonnées _id à inclure dans le document de la requête groupée (prioritaire sur une UDF _id ). Valeur par défaut = none |
javaScriptIndexFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _index à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptIndexFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _index à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptIdFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _id à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptIdFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _id à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptTypeFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _type à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptTypeFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _type à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptIsDeleteFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false" . Valeur par défaut = none |
javaScriptIsDeleteFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false" . Valeur par défaut = none |
usePartialUpdate |
(Facultatif) Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. Valeur par défaut : false |
bulkInsertMethod |
(Facultatif) Indique s'il faut utiliser INDEX (index, upsert autorisé) ou CREATE (création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. Valeur par défaut : CREATE |
Exécuter le modèle Pub/Sub vers Elasticsearch
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Elasticsearch template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
ERROR_OUTPUT_TOPIC
: sujet Pub/Sub pour le résultat d'erreurSUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubCONNECTION_URL
: URL ElasticsearchDATASET
: type de journalNAMESPACE
: espace de noms pour un ensemble de donnéesAPIKEY
: clé API encodée en base64 pour l'authentification
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
ERROR_OUTPUT_TOPIC
: sujet Pub/Sub pour le résultat d'erreurSUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubCONNECTION_URL
: URL ElasticsearchDATASET
: type de journalNAMESPACE
: espace de noms pour un ensemble de donnéesAPIKEY
: clé API encodée en base64 pour l'authentification
Datastream vers Cloud Spanner
Le modèle Datastream vers Cloud Spanner est un pipeline de streaming qui lit les événements Datastream d'un bucket Cloud Storage et les écrit dans une base de données Cloud Spanner. Il est destiné à la migration de données de sources Datastream vers Cloud Spanner.
Toutes les tables requises pour la migration doivent exister dans la base de données Cloud Spanner de destination avant l'exécution du modèle. Par conséquent, la migration du schéma d'une base de données source vers Cloud Spanner doit être terminée avant la migration des données. Des données peuvent exister dans les tables avant la migration. Ce modèle ne propage pas les modifications du schéma Datastream dans la base de données Cloud Spanner.
La cohérence des données n'est garantie à la fin de la migration que lorsque toutes les données ont été écrites dans Cloud Spanner. Pour stocker des informations de tri pour chaque enregistrement écrit dans Cloud Spanner, ce modèle crée une table supplémentaire (appelée "shadow table" ou table fictive) pour chaque table de la base de données Cloud Spanner. Cela permet de garantir la cohérence à la fin de la migration. Les tables fictives ne sont pas supprimées après la migration et peuvent être utilisées à des fins de validation à la fin de la migration.
Toutes les erreurs qui se produisent pendant l'opération, telles que les incohérences de schéma, les fichiers JSON non valides ou les erreurs résultant de l'exécution des transformations, sont enregistrées dans une file d'attente d'erreurs. La file d'attente d'erreurs est un dossier Cloud Storage qui stocke tous les événements Datastream ayant rencontré des erreurs ainsi que le motif de l'erreur au format texte. Les erreurs peuvent être temporaires ou permanentes, et sont stockées dans des dossiers Cloud Storage appropriés dans la file d'attente d'erreurs. Les erreurs temporaires font l'objet de nouvelles tentatives automatiques, contrairement aux erreurs permanentes. En cas d'erreurs permanentes, vous avez la possibilité de corriger les événements de modification et de les déplacer vers le bucket pouvant faire l'objet de nouvelles tentatives pendant l'exécution du modèle.
Conditions requises pour ce pipeline :
- Un flux Datastream dans l'état En cours d'exécution ou Non démarré.
- Un bucket Cloud Storage dans lequel les événements Datastream sont répliqués
- Une base de données Cloud Spanner avec tables existantes. Ces tables peuvent être vides ou contenir des données.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFilePattern |
Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Il s'agit généralement du chemin racine d'un flux. |
streamName |
Nom ou modèle du flux à interroger pour obtenir des informations de schéma et le type de source. |
instanceId |
Instance Cloud Spanner sur laquelle les modifications sont répliquées. |
databaseId |
Base de données Cloud Spanner dans laquelle les modifications sont répliquées. |
projectId |
ID du projet Cloud Spanner. |
deadLetterQueueDirectory |
(Facultatif) Il s'agit du chemin d'accès au fichier permettant de stocker la sortie de la file d'attente d'erreurs. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. |
inputFileFormat |
(Facultatif) Format du fichier de sortie généré par Datastream. Par exemple, avro,json . Par défaut, avro . |
shadowTablePrefix |
(Facultatif) Préfixe utilisé pour nommer les tables fantômes. Valeur par défaut : shadow_ |
Exécuter le modèle Datastream vers Cloud Spanner
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to Spanner template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ streamName=STREAM_NAME,\ instanceId=CLOUDSPANNER_INSTANCE,\ databaseId=CLOUDSPANNER_DATABASE,\ deadLetterQueueDirectory=DLQ
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple :gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: votre instance Cloud SpannerCLOUDSPANNER_DATABASE
: votre base de données Cloud Spanner.DLQ
: chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage utilisé pour stocker les événements de flux de données. Par exemple :gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: votre instance Cloud SpannerCLOUDSPANNER_DATABASE
: votre base de données Cloud Spanner.DLQ
: chemin d'accès Cloud Storage au répertoire de la file d'attente d'erreurs.
Fichiers texte dans Cloud Storage vers BigQuery (Flux)
Les fichiers texte dans Cloud Storage vers BigQuery sont un pipeline de streaming qui vous permet de lire des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide de la fonction JavaScript définie par l'utilisateur (UDF) que vous fournissez, et d'ajouter le résultat à BigQuery.
Le pipeline fonctionne indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation Watch
qui est une fonction DoFn
non compatible avec le drainage.
Conditions requises pour ce pipeline :
- Créez un fichier JSON décrivant le schéma de la table de sortie dans BigQuery.
Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé
fields
et que son contenu suit le modèle{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Exemple :{ "fields": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING", "mode": "REQUIRED" }, { "name": "coffee", "type": "STRING", "mode": "REQUIRED" } ] }
- Créez un fichier JavaScript (
.js
) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Notez que votre fonction doit renvoyer une chaîne JSON.Par exemple, cette fonction divise chaque ligne d'un fichier CSV et renvoie une chaîne JSON après avoir transformé les valeurs.
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
Paramètres de modèle
Paramètres | Description |
---|---|
javascriptTextTransformGcsPath |
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
JSONPath |
Emplacement Cloud Storage de votre fichier de schéma BigQuery, décrit comme fichier JSON.
Exemple : gs://path/to/my/schema.json . |
javascriptTextTransformFunctionName |
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
outputTable |
Nom complet de la table BigQuery.
Par exemple : my-project:dataset.table |
inputFilePattern |
Emplacement Cloud Storage du texte que vous souhaitez traiter.
Exemple : gs://my-bucket/my-files/text.txt . |
bigQueryLoadingTemporaryDirectory |
Répertoire temporaire pour le processus de chargement de BigQuery.
Par exemple : gs://my-bucket/my-files/temp_dir |
outputDeadletterTable |
Table des messages qui n'ont pas pu atteindre la table de sortie.
Exemple : my-project:dataset.my-unprocessed-table . Si elle n'existe pas, elle est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place. |
Exécuter le modèle Texte Cloud Storage vers BigQuery (Flux)
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Text Files on Cloud Storage to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQuery.BIGQUERY_UNPROCESSED_TABLE
: nom de votre table BigQuery pour les messages non traitésPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.PATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaPATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).PATH_TO_TEXT_DATA
: chemin d'accès Cloud Storage à votre ensemble de données texteBIGQUERY_TABLE
: nom de votre table BigQuery.BIGQUERY_UNPROCESSED_TABLE
: nom de votre table BigQuery pour les messages non traitésPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaire
Fichiers texte sur Cloud Storage vers Pub/Sub (Flux)
Le modèle crée un pipeline de streaming qui interroge en permanence les nouveaux fichiers texte chargés dans Cloud Storage, lit chaque fichier ligne par ligne et publie des chaînes dans un sujet Pub/Sub. Le modèle publie les enregistrements dans un fichier délimité par une nouvelle ligne contenant des enregistrements JSON ou un fichier CSV dans un sujet Pub/Sub pour un traitement en temps réel. Vous pouvez utiliser ce modèle pour relire les données dans Pub/Sub.
Le pipeline s'exécute indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation "Watch", qui est une fonction "SplittableDoFn" qui n'est pas compatible avec le drainage.
Actuellement, l'intervalle d'interrogation est fixé à 10 secondes. Ce modèle ne définit aucun horodatage sur les enregistrements individuels. Par conséquent, l'heure de l'événement est égale à l'heure de publication pendant l'exécution. Si votre pipeline dépend d'une heure d'événement précise pour le traitement, ne l'utilisez pas.
Conditions requises pour ce pipeline :
- Les fichiers d'entrée doivent être au format CSV ou JSON délimité par une nouvelle ligne. Les enregistrements couvrant plusieurs lignes dans les fichiers sources peuvent entraîner des problèmes en aval, car chaque ligne dans les fichiers est publiée sous forme de message à Pub/Sub.
- Le sujet Pub/Sub doit exister avant l'exécution.
- Le pipeline fonctionne indéfiniment et doit être terminé manuellement.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFilePattern |
Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/files/*.json ou gs://bucket-name/path/*.csv . |
outputTopic |
Sujet d'entrée Pub/Sub dans lequel écrire. Le nom doit être au format projects/<project-id>/topics/<topic-name> . |
Exécuter le modèle de fichiers texte dans Cloud Storage vers Pub/Sub (Flux)
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom de votre bucket Cloud StorageFILE_PATTERN
: modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple,path/*.csv
)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom de votre bucket Cloud StorageFILE_PATTERN
: modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple,path/*.csv
)
Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP)
Le modèle Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP) est un pipeline de streaming qui lit les fichiers CSV d'un bucket Cloud Storage, appelle l'API Cloud Data Loss Prevention (Cloud DLP) pour les anonymiser puis écrit les données anonymisées dans la table BigQuery spécifiée. Ce modèle accepte l'utilisation d'un modèle d'inspection Cloud DLP et d'un modèle de suppression d'identification Cloud DLP. Cela permet aux utilisateurs d'inspecter du texte pour identifier les données potentiellement sensibles, et de supprimer l'identification des données sensibles détectées. Ils ont également la possibilité de supprimer l'identification de données structurées lorsque des colonnes sont spécifiées et qu'aucune inspection n'est nécessaire. Il convient également de noter que ce modèle n'est pas compatible avec un chemin régional pour l'emplacement du modèle d'anonymisation. Seul un chemin global est accepté.
Conditions requises pour ce pipeline :
- Les données d'entrée à tokeniser doivent exister.
- Les modèles Cloud DLP doivent exister (par exemple, DeidentifyTemplate et InspectTemplate). Pour en savoir plus, consultez la page Modèles Cloud DLP.
- L'ensemble de données BigQuery doit exister.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFilePattern |
Fichiers csv à partir desquels lire les enregistrements de données d'entrée. Les caractères génériques sont également acceptés. Par exemple, gs://mybucket/my_csv_filename.csv ou gs://mybucket/file-*.csv .
|
dlpProjectId |
ID du projet Cloud DLP propriétaire de la ressource API Cloud DLP.
Il peut s'agir du projet propriétaire des modèles Cloud DLP ou d'un autre projet.
Par exemple, my_dlp_api_project . |
deidentifyTemplateName |
Modèle d'anonymisation Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} .
Exemple :projects/my_project/deidentifyTemplates/100 |
datasetName |
Ensemble de données BigQuery pour l'envoi des résultats tokenisés. |
batchSize |
Taille du lot pour l'envoi des données à inspecter et/ou à détokeniser. Dans le cas d'un fichier csv, batchSize correspond au nombre de lignes dans un lot. Les utilisateurs doivent déterminer la taille du lot en fonction de la taille des enregistrements et de la taille du fichier. Notez que l'API Cloud DLP limite la taille de la charge utile à 524 Ko par appel d'API. |
inspectTemplateName |
(Facultatif) Modèle d'inspection Cloud DLP à utiliser pour les requêtes d'API, spécifié à l'aide du schéma projects/{template_project_id}/identifyTemplates/{idTemplateId} .
Exemple :projects/my_project/identifyTemplates/100 |
Exécuter le modèle Masquage/Tokenisation de données de Cloud Storage vers BigQuery (à l'aide de Cloud DLP)
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
Remplacez les éléments suivants :
DLP_API_PROJECT_ID
: ID de votre projet dans l'API Cloud DLPJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)INPUT_DATA
: chemin d'accès de votre fichier d'entréeDEIDENTIFY_TEMPLATE
: numéro de modèle d'anonymisation Cloud DLPDATASET_NAME
: nom de l'ensemble de données BigQueryINSPECT_TEMPLATE_NUMBER
: numéro du modèle d'inspection Cloud DLPBATCH_SIZE_VALUE
: taille du lot (nombre de lignes par API pour les données CSV)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowDLP_API_PROJECT_ID
: ID de votre projet dans l'API Cloud DLPJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TEMP_LOCATION
: emplacement de l'écriture de fichiers temporaires (par exemple,gs://your-bucket/temp
)INPUT_DATA
: chemin d'accès de votre fichier d'entréeDEIDENTIFY_TEMPLATE
: numéro de modèle d'anonymisation Cloud DLPDATASET_NAME
: nom de l'ensemble de données BigQueryINSPECT_TEMPLATE_NUMBER
: numéro du modèle d'inspection Cloud DLPBATCH_SIZE_VALUE
: taille du lot (nombre de lignes par API pour les données CSV)
Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub (Flux)
Le modèle Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub est un pipeline de streaming qui lit les messages Pub/Sub avec des données modifiées provenant d'une base de données MySQL et écrit les enregistrements dans BigQuery. Un connecteur Debezium enregistre les modifications apportées à la base de données MySQL et publie les données modifiées dans Pub/Sub. Le modèle lit ensuite les messages Pub/Sub et les écrit dans BigQuery.
Vous pouvez utiliser ce modèle pour synchroniser des bases de données MySQL et des tables BigQuery. Le pipeline écrit les données modifiées dans une table de préproduction BigQuery et met à jour une table BigQuery par intermittence en répliquant la base de données MySQL.
Conditions requises pour ce pipeline :
- Le connecteur Debezium doit être déployé.
- Les messages Pub/Sub doivent être sérialisés dans une classe Beam Row.
Paramètres de modèle
Paramètres | Description |
---|---|
inputSubscriptions |
Liste des abonnements en entrée Pub/Sub à lire séparés par une virgule, au format <subscription>,<subscription>, ... . |
changeLogDataset |
Ensemble de données BigQuery permettant de stocker les tables de préproduction, au format <my-dataset> . |
replicaDataset |
Emplacement de l'ensemble de données BigQuery dans lequel stocker les tables dupliquées, au format <my-dataset> . |
updateFrequencySecs |
(Facultatif) Intervalle auquel le pipeline met à jour la table BigQuery en répliquant la base de données MySQL. |
Exécuter la capture de données modifiées avec Debezium et MySQL à partir du modèle Pub/Sub vers BigQuery
Pour exécuter ce modèle, procédez comme suit :
- Sur votre ordinateur local, clonez le dépôt DataflowTemplates.
- Accédez au répertoire
v2/cdc-parent
. - Assurez-vous que le connecteur Debezium est déployé.
- Exécutez le modèle Dataflow à l'aide de Maven :
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowSUBSCRIPTIONS
: liste des noms de vos abonnements Pub/Sub, séparés par une virguleCHANGELOG_DATASET
: ensemble de données BigQuery pour les données du journal des modificationsREPLICA_DATASET
: ensemble de données BigQuery pour les tables dupliquées
Apache Kafka vers BigQuery
Le modèle Apache Kafka vers BigQuery est un pipeline de streaming qui ingère les données textuelles issues d'Apache Kafka, exécute une fonction définie par l'utilisateur et génère les enregistrements obtenus dans BigQuery. Les erreurs survenant lors de la transformation des données, de l'exécution de la fonction définie par l'utilisateur ou de l'insertion dans la table de sortie sont enregistrées dans une table d'erreurs distincte dans BigQuery. Si la table d'erreurs n'existe pas avant l'exécution, elle est créée.
Conditions requises pour ce pipeline
- La table BigQuery de sortie doit exister.
- Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
- Les sujets Apache Kafka doivent exister et les messages doivent être encodés dans un format JSON valide.
Paramètres de modèle
Paramètres | Description |
---|---|
outputTableSpec |
Emplacement de la table de sortie BigQuery dans lequel écrire les messages Apache Kafka, au format my-project:dataset.table . |
inputTopics |
Sujets d'entrée Apache Kafka à lire dans une liste d'éléments séparés par une virgule. Par exemple : messages |
bootstrapServers |
Adresse hôte des serveurs de courtiers Apache Kafka en cours d'exécution dans une liste d'éléments séparés par une virgule, chaque adresse hôte au format 35.70.252.199:9092 . |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
outputDeadletterTable |
(Facultatif) Table BigQuery pour les messages qui n'ont pas pu atteindre la table de sortie, au format my-project:dataset.my-deadletter-table . Si elle n'existe pas, la table est créée lors de l'exécution du pipeline.
Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place. |
Exécuter le modèle Apache Kafka vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
BIGQUERY_TABLE
: nom de votre table BigQuery.KAFKA_TOPICS
: liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.KAFKA_SERVER_ADDRESSES
: liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple :35.70.252.199:9092
. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
BIGQUERY_TABLE
: nom de votre table BigQuery.KAFKA_TOPICS
: liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage du fichier.js
contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple,gs://my-bucket/my-udfs/my_file.js
).JAVASCRIPT_FUNCTION
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.KAFKA_SERVER_ADDRESSES
: liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple :35.70.252.199:9092
. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.
Pour en savoir plus, consultez la page Écrire des données de Kafka vers BigQuery avec Dataflow.
DataStream vers BigQuery (flux)
Le modèle Datastream vers BigQuery est un pipeline de streaming qui lit les données Datastream et les réplique dans BigQuery. Le modèle lit les données depuis Cloud Storage à l'aide de notifications Pub/Sub et les réplique dans une table de préproduction BigQuery partitionnée par date. Après la réplication, le modèle exécute un objet MERGE
dans BigQuery pour sauvegarder toutes les modifications apportées par la capture de données modifiées dans une instance dupliquée de la table source.
Le modèle gère la création et la mise à jour des tables BigQuery gérées par la réplication. Lorsque le langage de définition de données (LDD) est requis, un rappel à Datastream extrait le schéma de la table source et le convertit en types de données BigQuery. Les opérations suivantes sont acceptées :
- Des tables sont créées à mesure que des données sont insérées.
- Les nouvelles colonnes sont ajoutées aux tables BigQuery avec des valeurs initiales nulles.
- Les colonnes supprimées sont ignorées dans BigQuery, et les valeurs futures sont nulles.
- Les colonnes renommées sont ajoutées à BigQuery en tant que nouvelles colonnes.
- Les modifications de type ne sont pas transmises à BigQuery.
Conditions requises pour ce pipeline :
- Un flux DataStream prêt à répliquer ou qui réplique déjà des données.
- Les notifications Pub/Sub pour Cloud Storage sont activées pour les données DataStream.
- Les ensembles de données de destination BigQuery sont créés et un compte administrateur a été accordé au compte de service Compute Engine.
- Une clé primaire est nécessaire dans la table source pour que la table dupliquée de destination soit créée.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFilePattern |
Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Cet emplacement de fichier est généralement le chemin racine du flux. |
gcsPubSubSubscription |
Abonnement Pub/Sub avec notifications de fichier Datastream. Par exemple, projects/my-project-id/subscriptions/my-subscription-id . |
inputFileFormat |
Format du fichier de sortie généré par Datastream. Par exemple, avro,json . Par défaut, avro . |
outputStagingDatasetTemplate |
Nom d'un ensemble de données existant contenant les tables de préproduction. Vous pouvez inclure le modèle {_metadata_dataset} en tant qu'espace réservé qui est remplacé par le nom de l'ensemble de données ou le schéma source (par exemple {_metadata_dataset}_log ). |
outputDatasetTemplate |
Nom d'un ensemble de données existant devant contenir les tables dupliquées Vous pouvez inclure le modèle {_metadata_dataset} en tant qu'espace réservé qui est remplacé par le nom de l'ensemble de données ou le schéma source (par exemple, {_metadata_dataset} ). |
deadLetterQueueDirectory |
Chemin d'accès au fichier de stockage des messages non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions. |
outputStagingTableNameTemplate |
(Facultatif) Modèle pour le nom des tables de préproduction. La valeur par défaut est {_metadata_table}_log. Si vous répliquez plusieurs schémas, la solution suggérée est {_metadata_schema}_{_metadata_table}_log . |
outputTableNameTemplate |
(Facultatif) Modèle pour le nom des tables dupliquées. Par défaut, {_metadata_table} . Si vous répliquez plusieurs schémas, la solution suggérée est {_metadata_schema}_{_metadata_table} . |
outputProjectId |
(Facultatif) Projet associé aux ensembles de données BigQuery dans lesquels exporter des données. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté. |
streamName |
(Facultatif) Nom ou modèle du flux à interroger pour les informations de schéma. Par défaut, {_metadata_stream} . |
mergeFrequencyMinutes |
(Facultatif) Nombre de minutes entre les fusions pour une table donnée. Par défaut, 5. |
dlqRetryMinutes |
(Facultatif) Nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Par défaut, 10. |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
Exécuter le modèle Datastream vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Datastream to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nom de votre ensemble de données BigQuery.BIGQUERY_TABLE
: votre modèle de table BigQuery. Par exemple,{_metadata_schema}_{_metadata_table}_log
.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: nom de votre ensemble de données BigQuery.BIGQUERY_TABLE
: votre modèle de table BigQuery. Par exemple,{_metadata_schema}_{_metadata_table}_log
.
Datastream vers MySQL ou PostgreSQL (flux)
Le modèle Datastream vers SQL est un pipeline de streaming qui lit les données Datastream et les réplique dans n'importe quelle base de données MySQL ou PostgreSQL. Le modèle lit les données depuis Cloud Storage à l'aide de notifications Pub/Sub et les réplique dans des tables dupliquées SQL.
Le modèle n'est pas compatible avec le langage de définition de données (LDD) et attend que toutes les tables existent déjà dans la base de données. La réplication utilise des transformations avec état Dataflow pour filtrer les données obsolètes et assurer la cohérence des données dans le désordre. Par exemple, si une version plus récente d'une ligne est déjà transmise, une version tardive de cette ligne est ignorée. Le langage de manipulation de données (LMD) qui s'exécute constitue le meilleur moyen de répliquer parfaitement les données cibles. Les instructions LMD exécutées respectent les règles suivantes :
- Si une clé primaire existe, les opérations d'insertion et de mise à jour utilisent une syntaxe upsert (par exemple,
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Si des clés primaires existent, les suppressions sont répliquées en tant que suppression LMD.
- Si aucune clé primaire n'existe, les opérations d'insertion et de mise à jour sont insérées dans la table.
- Si aucune clé primaire n'existe, les suppressions sont ignorées.
Si vous utilisez les utilitaires Oracle vers Postgres, ajoutez ROWID
dans SQL en tant que clé primaire lorsqu'il n'en existe pas.
Conditions requises pour ce pipeline :
- Un flux DataStream prêt à répliquer ou qui réplique déjà des données.
- Les notifications Pub/Sub pour Cloud Storage sont activées pour les données DataStream.
- Une base de données PostgreSQL a été ajoutée au schéma requis.
- L'accès réseau entre les nœuds de calcul Dataflow et PostgreSQL est configuré.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFilePattern |
Emplacement des fichiers Datastream à répliquer dans Cloud Storage. Cet emplacement de fichier est généralement le chemin racine du flux. |
gcsPubSubSubscription |
Abonnement Pub/Sub avec notifications de fichier Datastream. Par exemple, projects/my-project-id/subscriptions/my-subscription-id . |
inputFileFormat |
Format du fichier de sortie généré par Datastream. Par exemple, avro,json . Par défaut, avro . |
databaseHost |
Hôte SQL sur lequel se connecter. |
databaseUser |
L'utilisateur SQL disposant de toutes les autorisations requises pour écrire dans toutes les tables de réplication. |
databasePassword |
Mot de passe de l'utilisateur SQL donné. |
databasePort |
(Facultatif) Port de la base de données SQL auquel se connecter. Par défaut, 5432. |
databaseName |
(Facultatif) Nom de la base de données SQL à laquelle se connecter. Par défaut, postgres. |
streamName |
(Facultatif) Nom ou modèle du flux à interroger pour les informations de schéma. Par défaut, {_metadata_stream} . |
Exécuter le modèle Datastream vers SQL
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Datastream to SQL template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: adresse IP de votre hôte SQL.DATABASE_USER
: votre utilisateur SQL.DATABASE_PASSWORD
: votre mot de passe SQL
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: chemin d'accès Cloud Storage aux données Datastream. Par exemple :gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: abonnement Pub/Sub à partir duquel lire les fichiers modifiés. Exemple :projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: adresse IP de votre hôte SQL.DATABASE_USER
: votre utilisateur SQL.DATABASE_PASSWORD
: votre mot de passe SQL
Pub/Sub vers Java Database Connectivity (JDBC)
Le modèle Pub/Sub vers Java Database Connectivity (JDBC) est un pipeline de streaming qui ingère les données d'un abonnement Cloud Pub/Sub préexistant en tant que chaînes JSON et écrit les enregistrements obtenus dans JDBC.
Conditions requises pour ce pipeline :
- L'abonnement Cloud Pub/Sub doit exister avant l'exécution du pipeline.
- La source JDBC doit exister avant l'exécution du pipeline.
- Le sujet de lettres mortes Cloud Pub/Sub doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres | Description |
---|---|
driverClassName |
Nom de la classe du pilote JDBC. Par exemple, com.mysql.jdbc.Driver . |
connectionUrl |
Chaîne d'URL de connexion JDBC. Par exemple, jdbc:mysql://some-host:3306/sampledb . Peut être transmis en tant que chaîne encodée en base64 et chiffrée avec une clé Cloud KMS. |
driverJars |
Chemins Cloud Storage séparés par une virgule pour les pilotes JDBC. Par exemple, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar . |
username |
(Facultatif) Nom d'utilisateur à utiliser pour la connexion JDBC. Peut être transmis en tant que chaîne encodée en base64 et chiffrée avec une clé Cloud KMS. |
password |
(Facultatif) Mot de passe à utiliser pour la connexion JDBC. Peut être transmis en tant que chaîne encodée en base64 et chiffrée avec une clé Cloud KMS. |
connectionProperties |
(Facultatif) Chaîne de propriétés à utiliser pour la connexion JDBC. Le format de la chaîne doit être [propertyName=property;]* . Exemple :unicode=true;characterEncoding=UTF-8 |
statement |
Instruction à exécuter sur la base de données. L'instruction doit spécifier les noms de colonnes de la table dans n'importe quel ordre. Seules les valeurs des noms de colonnes spécifiés sont lues à partir du fichier JSON et ajoutées à l'instruction. Par exemple : INSERT INTO tableName (column1, column2) VALUES (?,?) |
inputSubscription |
Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription> . |
outputDeadletterTopic |
Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name> . |
KMSEncryptionKey |
(Facultatif) Clé de chiffrement Cloud KMS permettant de déchiffrer le nom d'utilisateur, le mot de passe et la chaîne de connexion. Si la clé Cloud KMS est transmise, le nom d'utilisateur, le mot de passe et la chaîne de connexion doivent tous être transmis de manière chiffrée. |
extraFilesToStage |
Chemins d'accès Cloud Storage ou secrets Secret Manager séparés par une virgule afin que les fichiers soient traités dans le nœud de calcul. Ces fichiers seront enregistrés dans le répertoire /extra_files de chaque nœud de calcul. Exemple :gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> |
Exécuter le modèle Pub/Sub vers Java Database Connectivity (JDBC)
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to JDBC template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ statement=SQL_STATEMENT,\ inputSubscription=INPUT_SUBSCRIPTION,\ outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
REGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
DRIVER_CLASS_NAME
: nom de la classe du piloteJDBC_CONNECTION_URL
: URL de connexion JDBCDRIVER_PATHS
: chemin(s) d'accès Cloud Storage séparé(s) par des virgules vers le(s) pilote(s) JDBCCONNECTION_USERNAME
: nom d'utilisateur de la connexion JDBCCONNECTION_PASSWORD
: mot de passe de la connexion JDBCCONNECTION_PROPERTIES
: les propriétés de connexion JDBC, le cas échéantSQL_STATEMENT
: instruction SQL à exécuter sur la base de donnéesINPUT_SUBSCRIPTION
: abonnement Pub/Sub en entrée à lireOUTPUT_DEADLETTER_TOPIC
: sujet Pub/Sub pour transférer les messages non distribuablesKMS_ENCRYPTION_KEY
: clé de chiffrement Cloud KMS
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "statement": "SQL_STATEMENT", "inputSubscription": "INPUT_SUBSCRIPTION", "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
LOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
DRIVER_CLASS_NAME
: nom de la classe du piloteJDBC_CONNECTION_URL
: URL de connexion JDBCDRIVER_PATHS
: chemin(s) d'accès Cloud Storage séparé(s) par des virgules vers le(s) pilote(s) JDBCCONNECTION_USERNAME
: nom d'utilisateur de la connexion JDBCCONNECTION_PASSWORD
: mot de passe de la connexion JDBCCONNECTION_PROPERTIES
: les propriétés de connexion JDBC, le cas échéantSQL_STATEMENT
: instruction SQL à exécuter sur la base de donnéesINPUT_SUBSCRIPTION
: abonnement Pub/Sub en entrée à lireOUTPUT_DEADLETTER_TOPIC
: sujet Pub/Sub pour transférer les messages non distribuablesKMS_ENCRYPTION_KEY
: clé de chiffrement Cloud KMS
Flux de modifications de Cloud Spanner vers Cloud Storage
Le modèle de flux de modification Cloud Spanner vers Cloud Storage est un pipeline de streaming qui diffuse les enregistrements de modification des données Spanner et les écrit dans un bucket Cloud Storage à l'aide de l'exécuteur Dataflow V2.
Le pipeline regroupe les enregistrements de flux de modification dans des fenêtres en fonction de leur horodatage, chaque fenêtre représentant une durée dont vous pouvez configurer la durée avec ce modèle. Tous les enregistrements dont les horodatages appartiennent à la fenêtre sont garantis dans la fenêtre. Il ne peut pas y avoir d'arrivées tardives. Vous pouvez également définir un certain nombre de partitions de sortie. Le pipeline crée un fichier de sortie Cloud Storage par fenêtre et par partition. Dans un fichier de sortie, les enregistrements ne sont pas ordonnés. Les fichiers de sortie peuvent être au format JSON ou AVRO, en fonction de la configuration utilisateur.
Notez que vous pouvez réduire la latence du réseau et les coûts de transport réseau en exécutant la tâche Dataflow à partir de la même région que votre instance Cloud Spanner ou bucket Cloud Storage. Si vous utilisez des sources, des récepteurs, des emplacements de fichiers de préproduction ou des emplacements de fichiers temporaires situés en dehors de la région associée à votre tâche, vos données peuvent être envoyées d'une région à l'autre. Obtenez plus d'informations sur les points de terminaison régionaux Dataflow.
En savoir plus sur les flux de modification, la création de pipelines Dataflow de flux de modification et les bonnes pratiques.
Conditions requises pour ce pipeline :
- L'instance Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données Cloud Spanner doit exister avant l'exécution du pipeline.
- L'instance de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- Le flux de modifications Cloud Spanner doit exister avant l'exécution du pipeline.
- Le bucket de sortie Cloud Storage doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres | Description |
---|---|
spannerInstanceId |
ID d'instance Cloud Spanner à partir duquel lire les flux de modifications. |
spannerDatabase |
Base de données Cloud Spanner à partir de laquelle lire les flux de modifications. |
spannerMetadataInstanceId |
ID d'instance Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerMetadataDatabase |
Base de données Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerChangeStreamName |
Nom du flux de modifications Cloud Spanner à lire. |
gcsOutputDirectory |
Emplacement du fichier de sortie des flux de modification dans Cloud Storage au format "gs://${BUCKET}/${ROOT_PATH}/". |
outputFilenamePrefix |
(Facultatif) Préfixe du nom de fichier des fichiers dans lesquels écrire. Le préfixe de fichier par défaut est défini sur "output". |
spannerProjectId |
(Facultatif) Projet à partir duquel lire les flux de modifications. Il s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modification est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté. |
startTimestamp |
(Facultatif) Date de début (DateTime) inclusive, à utiliser pour lire les flux de modifications. Ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage de démarrage du pipeline, c'est-à-dire l'heure actuelle. |
endTimestamp |
(Facultatif) Date de début (DateTime) inclusive, à utiliser pour lire les flux de modifications. Ex-2021-10-12T07:20:50.52Z. Elle est définie par défaut sur une période infinie dans le futur. |
outputFileFormat |
(Facultatif) Format du fichier Cloud Storage de sortie. Les formats autorisés sont TEXT et AVRO. La valeur par défaut est AVRO. |
windowDuration |
(Facultatif) La durée de fenêtre correspond à l'intervalle au cours duquel les données sont écrites dans le répertoire de sortie. Configurez la durée en fonction du débit du pipeline. Par exemple, un débit plus élevé peut nécessiter des tailles de fenêtre plus petites pour que les données s'intègrent à la mémoire. La valeur par défaut est "5m", avec une durée minimale de 1 s. Les formats autorisés sont les suivants : [int]s (pour les secondes, exemple : 5s), [int]m (pour les minutes, exemple : 12m), [int]h (pour les heures, exemple : 2h). |
rpcPriority |
(Facultatif) Priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes : [HIGH,MEDIUM,LOW]. (Par défaut : HIGH) |
numShards |
(Facultatif) Nombre maximal de partitions de sortie générées lors de l'écriture. Le nombre par défaut est 20. Un nombre plus élevé de segments entraîne un débit plus élevé pour l'écriture dans Cloud Storage, mais potentiellement un coût d'agrégation de données plus élevé entre les partitions lors du traitement des fichiers Cloud Storage de sortie. |
spannerMetadataTableName |
(Facultatif) Cloud Spanner change le nom de la table des métadonnées du connecteur à utiliser. Si aucune valeur n'est fournie, une table de métadonnées de flux Cloud Spanner est automatiquement créée pendant le flux de pipeline. Ce paramètre doit être fourni lors de la mise à jour d'un pipeline existant et ne doit pas être renseigné si ce n'est pas le cas. |
Exécuter le modèle Cloud Spanner pour diffuser des flux vers Cloud Storage
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to Google Cloud Storage template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ gcsOutputDirectory=GCS_OUTPUT_DIRECTORY
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
REGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerGCS_OUTPUT_DIRECTORY
: emplacement du fichier de sortie des flux de modifications
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
LOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerGCS_OUTPUT_DIRECTORY
: emplacement du fichier de sortie des flux de modifications
Flux de modifications de Cloud Spanner vers BigQuery
Le modèle de flux de modification Cloud Spanner vers BigQuery est un pipeline de streaming qui diffuse les enregistrements de modification des données Cloud Spanner et les écrit dans les tables BigQuery à l'aide de l'exécuteur Dataflow V2.
Si les tables BigQuery nécessaires n'existent pas, le pipeline les crée. Sinon, vous utilisez des tables BigQuery existantes. Le schéma des tables BigQuery existantes doit contenir les colonnes de suivi correspondantes des tables Cloud Spanner et les colonnes de métadonnées supplémentaires (consultez la description des champs de métadonnées de la liste suivante) qui ne sont pas explicitement ignorées par l'option "ignoreFields". Chaque nouvelle ligne BigQuery inclut toutes les colonnes surveillées par le flux de modification de la ligne correspondante dans la table Cloud Spanner à l'horodatage de l'enregistrement de modification.
Toutes les colonnes du flux de modifications surveillées sont incluses dans chaque ligne de la table BigQuery, qu'elles soient modifiées ou non par une transaction Cloud Spanner. Les colonnes non surveillées ne sont pas incluses dans la ligne BigQuery. Toute modification de Cloud Spanner inférieure au filigrane Dataflow est appliquée aux tables BigQuery ou est stockée dans la file d'attente de lettres mortes pour nouvelle tentative. Les lignes BigQuery sont insérées dans le désordre par rapport à l'ordre d'horodatage de commit Cloud Spanner d'origine.
Les champs de métadonnées suivants sont ajoutés aux tables BigQuery :
- _metadata_spanner_mod_type : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_spanner_table_name : nom de la table Cloud Spanner. Notez qu'il ne s'agit pas du nom de la table de métadonnées du connecteur.
- _metadata_spanner_commit_timestamp : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_spanner_server_transaction_id : extrait de l'enregistrement de modification du flux de données de changement.
- _metadata_spanner_record_sequence : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_spanner_is_last_record_in_transaction_in_partition : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_spanner_number_of_records_in_transaction : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_spanner_number_of_partitions_in_transaction : extrait de l'enregistrement de modification des données de flux de modification.
- _metadata_big_query_commit_timestamp : horodatage de commit correspondant à l'insertion de la ligne dans BigQuery.
Remarque :
- Ce modèle ne propage pas les modifications de schéma de Cloud Spanner vers BigQuery. Étant donné qu'une modification du schéma dans Cloud Spanner est susceptible de perturber le pipeline, vous devrez peut-être le recréer après la modification du schéma.
- Pour les types de capture de valeur
OLD_AND_NEW_VALUES
etNEW_VALUES
, lorsque l'enregistrement de modification de données contient une modification UPDATE, le modèle doit effectuer une lecture non actualisée dans Cloud Spanner à l'horodatage de validation de l'enregistrement de modification de données, afin de récupérer les colonnes non modifiées mais surveillées. Veillez à configurer correctement la "version_retention_period" de votre base de données pour la lecture non actualisée. Pour le type de capture de valeurNEW_ROW
, le modèle est plus efficace, car l'enregistrement de modification des données capture la nouvelle ligne complète, y compris les colonnes qui ne sont pas mises à jour, et le modèle n'a pas besoin d'effectuer une lecture non actualisée. - Afin de réduire les coûts de latence et de transport réseau, une tâche Dataflow peut être exécutée à partir de la même région que votre instance Cloud Spanner ou vos tables BigQuery. Si vous utilisez des sources, des récepteurs, des emplacements de fichiers de préproduction ou des emplacements de fichiers temporaires situés en dehors de la région associée à votre tâche, vos données peuvent être envoyées d'une région à l'autre. Obtenez plus d'informations sur les points de terminaison régionaux Dataflow.
- Ce modèle accepte tous les types de données Cloud Spanner valides, mais si le type BigQuery est plus précis que le type Cloud Spanner, la transformation peut occasionner une perte de précision. Plus précisément :
- Pour le type JSON de Cloud Spanner, les membres d'un objet sont ordonnés de façon lexicographique, mais il n'existe aucune garantie similaire pour le type JSON de BigQuery.
- Cloud Spanner accepte le type TIMESTAMP en nanosecondes, BigQuery n'accepte le type TIMESTAMP qu'en microsecondes.
En savoir plus sur les flux de modification, la création de pipelines Dataflow de flux de modification et les bonnes pratiques.
Conditions requises pour ce pipeline :
- L'instance Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données Cloud Spanner doit exister avant l'exécution du pipeline.
- L'instance de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- Le flux de modifications Cloud Spanner doit exister avant l'exécution du pipeline.
- L'ensemble de données BigQuery doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres | Description |
---|---|
spannerInstanceId |
Instance Cloud Spanner à partir de laquelle lire les flux de modifications. |
spannerDatabase |
Base de données Cloud Spanner à partir de laquelle lire les flux de modifications. |
spannerMetadataInstanceId |
Instance Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerMetadataDatabase |
Base de données Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerChangeStreamName |
Nom du flux de modifications Cloud Spanner à lire. |
bigQueryDataSet |
Ensemble de données BigQuery pour la sortie des flux de modifications. |
spannerProjectId |
(Facultatif) Projet à partir duquel lire les flux de modifications. Il s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modification est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté. |
spannerMetadataTableName |
(Facultatif) Cloud Spanner change le nom de la table des métadonnées du connecteur à utiliser. Si aucune valeur n'est fournie, une table des métadonnées du connecteur de flux de modifications Cloud Spanner est automatiquement créée pendant le flux de pipeline. Ce paramètre doit être fourni lors de la mise à jour d'un pipeline existant et ne doit pas être renseigné si ce n'est pas le cas. |
rpcPriority |
(Facultatif) Priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes : [HIGH,MEDIUM,LOW]. (Par défaut : HIGH) |
startTimestamp |
(Facultatif) Date de début (DateTime) inclusive, à utiliser pour lire les flux de modifications. Ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage du démarrage du pipeline, c'est-à-dire l'heure actuelle. |
endTimestamp |
(Facultatif) Date de début (DateTime) inclusive, à utiliser pour lire les flux de modifications. Ex-2021-10-12T07:20:50.52Z. Elle est définie par défaut sur une période infinie dans le futur. |
bigQueryProjectId |
(Facultatif) Projet BigQuery. Le projet par défaut est la tâche Dataflow. |
bigQueryChangelogTableNameTemplate |
(Facultatif) Modèle du nom des tables de journal des modifications BigQuery. La valeur par défaut est {_metadata_spanner_table_name}_changelog. |
deadLetterQueueDirectory |
(Facultatif) Chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions. |
dlqRetryMinutes |
(Facultatif) Nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Valeur par défaut : 10 |
ignoreFields |
(Facultatif) Liste des champs séparés par une virgule (sensibles à la casse) à ignorer. Il peut s'agir de champs de tables surveillées ou de champs de métadonnées ajoutés par le pipeline. Les champs ignorés ne seront pas insérés dans BigQuery. |
Exécuter le modèle de flux Cloud Spanner vers BigQuery
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
REGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerBIGQUERY_DATASET
: ensemble de données BigQuery pour la sortie des flux de modifications
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
LOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerBIGQUERY_DATASET
: ensemble de données BigQuery pour la sortie des flux de modifications
Flux de modifications de Cloud Spanner vers Pub/Sub
Les flux de modification Cloud Spanner vers le modèle Pub/Sub sont un pipeline de streaming qui diffuse les enregistrements de modification des données Cloud Spanner et les écrit dans des sujets Pub/Sub à l'aide de l'exécuteur Dataflow V2.
Pour générer vos données dans un nouveau sujet Pub/Sub, vous devez d'abord créer le sujet. Après la création, Pub/Sub génère et associe automatiquement un abonnement au nouveau sujet. Si vous essayez de générer des données dans un sujet Pub/Sub qui n'existe pas, le pipeline Dataflow génère une exception, et le pipeline se bloque car il tente en boucle d'établir une connexion.
Si le sujet Pub/Sub nécessaire existe déjà, vous pouvez générer des données dans ce sujet.
Pour en savoir plus, consultez les pages À propos des flux de modifications, Créer des connexions de flux de modification avec Dataflow et Bonnes pratiques concernant les flux de modifications.
Conditions requises pour ce pipeline :
- L'instance Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données Cloud Spanner doit exister avant l'exécution du pipeline.
- L'instance de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- La base de données de métadonnées Cloud Spanner doit exister avant l'exécution du pipeline.
- Le flux de modifications Cloud Spanner doit exister avant l'exécution du pipeline.
- Le sujet Pub/Sub doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres | Description |
---|---|
spannerInstanceId |
Instance Cloud Spanner à partir de laquelle lire les flux de modifications. |
spannerDatabase |
Base de données Cloud Spanner à partir de laquelle lire les flux de modifications. |
spannerMetadataInstanceId |
Instance Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerMetadataDatabase |
Base de données Cloud Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification. |
spannerChangeStreamName |
Nom du flux de modifications Cloud Spanner à lire. |
pubsubTopic |
Sujet Pub/Sub pour le résultat des flux de modifications. |
spannerProjectId |
(Facultatif) Projet à partir duquel lire les flux de modifications. Il s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modifications est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté. |
spannerMetadataTableName |
(Facultatif) Cloud Spanner change le nom de la table des métadonnées du connecteur à utiliser. Si aucune valeur n'est fournie, Cloud Spanner crée automatiquement la table de métadonnées du connecteur de flux lors de la modification du flux du pipeline. Vous devez fournir ce paramètre lorsque vous mettez à jour un pipeline existant. N'utilisez pas ce paramètre dans d'autres cas. |
rpcPriority |
(Facultatif) Priorité des requêtes pour les appels Cloud Spanner. La valeur doit être l'une des suivantes : [HIGH,MEDIUM,LOW]. (Par défaut : HIGH) |
startTimestamp |
(Facultatif) Date de début (DateTime) inclusive, à utiliser pour lire les flux de modifications. Par exemple : ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage du démarrage du pipeline, c'est-à-dire l'heure actuelle. |
endTimestamp |
(Facultatif) Date de fin (DateTime) inclusive, à utiliser pour lire les flux de modifications. Par exemple : ex-2021-10-12T07:20:50.52Z. Elle est définie par défaut sur une période infinie dans le futur. |
outputFileFormat |
(Facultatif) Format du résultat. Le résultat est encapsulé dans de nombreux messages PubsubMessages et envoyé à un sujet Pub/Sub. Les formats autorisés sont JSON et AVRO. La valeur par défaut est JSON. |
pubsubAPI |
(Facultatif) API Pub/Sub utilisée pour mettre en œuvre le pipeline. Les API autorisées sont pubsubio et native_client . Pour un petit nombre de requêtes par seconde (RPS), native_client a moins de latence. Pour un grand nombre de RPS, pubsubio fournit des performances supérieures et plus stables. La valeur par défaut est pubsubio . |
Exécuter les flux de modification Cloud Spanner vers le modèle Pub/Sub
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to Pub/Sub template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
REGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerPUBSUB_TOPIC
: sujet Pub/Sub dans lequel inscrire le résultat des flux de modifications
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
LOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
SPANNER_INSTANCE_ID
: ID de l'instance Cloud SpannerSPANNER_DATABASE
: base de données Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID d'instance de métadonnées Cloud SpannerSPANNER_METADATA_DATABASE
: base de données de métadonnées Cloud SpannerSPANNER_CHANGE_STREAM
: flux de modifications Cloud SpannerPUBSUB_TOPIC
: sujet Pub/Sub dans lequel inscrire le résultat des flux de modifications
MongoDB vers BigQuery (CDC)
Le modèle MongoDB vers BigQuery CDC (Change Data Capture, capture des données modifiées) est un pipeline de streaming qui fonctionne avec les flux de modifications MongoDB.
Le pipeline lit les enregistrements JSON envoyés à Pub/Sub via un flux de modifications MongoDB et les écrit dans BigQuery, comme spécifié par le paramètre userOption
.
Conditions requises pour ce pipeline
- L'ensemble de données BigQuery cible doit exister.
- L'instance MongoDB source doit être accessible à partir des machines de nœud de calcul Dataflow.
- Le flux de modifications qui transfère les modifications de MongoDB vers Pub/Sub doit être en cours d'exécution.
Paramètres de modèle
Paramètres | Description |
---|---|
mongoDbUri |
URI de connexion MongoDB au format mongodb+srv://:@ . |
database |
Base de données de MongoDB à partir de laquelle lire la collection. Exemple : my-db . |
collection |
Nom de la collection dans la base de données MongoDB. Exemple : my-collection . |
outputTableSpec |
Table BigQuery dans laquelle écrire. Par exemple, bigquery-project:dataset.output_table . |
userOption |
FLATTEN ou NONE . FLATTEN aplatit les documents au premier niveau. NONE stocke l'intégralité du document sous forme de chaîne JSON. |
inputTopic |
Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic> . |
Exécuter le modèle MongoDB vers BigQuery (CDC)
Console
- Accédez à la page Dataflow Créer une tâche à partir d'un modèle. Accéder à la page Créer une tâche à partir d'un modèle
- Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the MongoDB to BigQuery (CDC) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixREGION_NAME
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
OUTPUT_TABLE_SPEC
: nom de votre table BigQuery cible.MONGO_DB_URI
: votre URI MongoDB.DATABASE
: votre base de données MongoDB.COLLECTION
: votre collection MongoDB.USER_OPTION
: FLATTEN ou NONE.INPUT_TOPIC
: votre sujet d'entrée Pub/Sub.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Cloud dans lequel vous souhaitez exécuter la tâche DataflowJOB_NAME
: nom de la tâche de votre choixLOCATION
: point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/- Le nom de la version, par exemple :
2021-09-20-00_RC00
, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
OUTPUT_TABLE_SPEC
: nom de votre table BigQuery cible.MONGO_DB_URI
: votre URI MongoDB.DATABASE
: votre base de données MongoDB.COLLECTION
: votre collection MongoDB.USER_OPTION
: FLATTEN ou NONE.INPUT_TOPIC
: votre sujet d'entrée Pub/Sub.