Modèles Dataflow de traitement par flux fournis par Google

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Google fournit un ensemble de modèles Dataflow Open Source.

Ces modèles Dataflow peuvent vous aider à traiter des tâches de données volumineuses, y compris l'importation, l'exportation, la sauvegarde et la restauration de données, ainsi que les opérations d'API groupées, le tout sans avoir à utiliser un environnement de développement dédié. Les modèles sont basés sur Apache Beam et exploitent Dataflow pour transformer les données.

Pour obtenir des informations générales sur les modèles, consultez la page Modèles Dataflow. Pour obtenir la liste de tous les modèles fournis par Google, consultez la page Premiers pas avec les modèles fournis par Google.

Ce guide répertorie les modèles de streaming.

Abonnement Pub/Sub vers BigQuery

Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Le champ data des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON. Par exemple, vous pouvez insérer des messages contenant les valeurs du champ data au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant un type de données de chaîne ("string").
  • La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable Table BigQuery des messages qui n'ont pas pu atteindre la table de sortie, au format <my-project>:<my-dataset>.<my-table>. Si elle n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, OUTPUT_TABLE_SPEC_error_records est utilisé à la place.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

Exécuter le modèle Abonnement Pub/Sub vers BigQuery

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Subscription to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.

Sujet Pub/Sub vers BigQuery

Le modèle Sujet Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un sujet Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline :

  • Le champ data des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON. Par exemple, vous pouvez insérer des messages contenant les valeurs du champ data au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant un type de données de chaîne ("string").
  • La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.

Paramètres de modèle

Paramètres Description
inputTopic Sujet d'entrée Pub/Sub à lire, au format projects/<project>/topics/<topic>.
outputTableSpec Emplacement de la table de sortie BigQuery, au format <my-project>:<my-dataset>.<my-table>
outputDeadletterTable La table BigQuery des messages n'ayant pas pu atteindre la table de sortie Elle doit être au format <my-project>:<my-dataset>.<my-table>. Si elle n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, <outputTableSpec>_error_records est utilisé à la place.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

Exécuter le modèle Sujet Pub/Sub vers BigQuery

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • DATASET : votre ensemble de données BigQuery.
  • TABLE_NAME : nom de votre table BigQuery.

Pub/Sub Avro vers BigQuery

Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub d'entrée doit exister.
  • Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
  • Le sujet Pub/Sub non traité doit exister.
  • L'ensemble de données BigQuery de sortie doit exister.

Paramètres de modèle

Paramètres Description
schemaPath Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc.
inputSubscription Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table>. Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur.
writeDisposition (Facultatif) La propriété WriteDisposition de BigQuery. Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Par défaut, WRITE_APPEND.
createDisposition (Facultatif) La propriété CreateDisposition de BigQuery. Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Par défaut, CREATE_IF_NEEDED.

Exécuter le modèle Pub/Sub Avro vers BigQuery

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

Proto Pub/Sub vers BigQuery

Le modèle proto Pub/Sub vers BigQuery est un pipeline de streaming qui ingère les données proto d'un abonnement Pub/Sub dans une table BigQuery. Les erreurs qui se produisent lors de l'écriture dans la table BigQuery sont insérées en flux continu dans un sujet Pub/Sub non traité.

Une fonction définie par l'utilisateur (UDF) JavaScript peut être fournie pour transformer les données. Les erreurs lors de l'exécution de l'UDF peuvent être envoyées à un sujet Pub/Sub distinct ou au même sujet non traité que les erreurs BigQuery.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub d'entrée doit exister.
  • Le fichier de schéma des enregistrements proto doit exister dans Cloud Storage.
  • Le sujet Pub/Sub de sortie doit exister.
  • L'ensemble de données BigQuery de sortie doit exister.
  • Si la table BigQuery existe, elle doit posséder un schéma correspondant aux données proto, quelle que soit la valeur de createDisposition.

Paramètres de modèle

Paramètres Description
protoSchemaPath Emplacement Cloud Storage du fichier de schéma proto autonome. Par exemple, gs://path/to/my/file.pb. Ce fichier peut être généré avec l'option --descriptor_set_out de la commande protoc. L'option --include_imports garantit que le fichier est autonome.
fullMessageName Nom complet du message proto. Par exemple, package.name.MessageName, où package.name est la valeur fournie pour l'instruction package, et non pour l'instruction java_package.
inputSubscription Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec Emplacement de la table de sortie BigQuery. Par exemple, my-project:my_dataset.my_table. Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du fichier de schéma d'entrée.
preserveProtoFieldNames (Facultatif) true pour conserver le nom du champ Proto d'origine au format JSON. false pour utiliser des noms JSON plus standards. Par exemple, false remplace field_name par fieldName. (Par défaut : false)
bigQueryTableSchemaPath (Facultatif) Chemin d'accès Cloud Storage vers le chemin d'accès du schéma BigQuery. Par exemple, gs://path/to/my/schema.json. S'il n'est pas fourni, le schéma est obtenu à partir du schéma Proto.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
udfOutputTopic (Facultatif) Sujet Pub/Sub stockant les erreurs UDF. Par exemple : projects/<project-id>/topics/<topic-name> Si cet élément n'est pas fourni, les erreurs UDF sont envoyées au même sujet que outputTopic.
writeDisposition (Facultatif) La disposition WriteDisposition de BigQuery. Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Valeur par défaut : WRITE_APPEND.
createDisposition (Facultatif) La disposition CreateDisposition de BigQuery. Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Valeur par défaut : CREATE_IF_NEEDED.

Exécuter le modèle Proto Pub/Sub vers BigQuery

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Proto to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple, gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME : nom du message Proto (par exemple, package.name.MessageName)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • UNPROCESSED_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple, gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME : nom du message Proto (par exemple, package.name.MessageName)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • UNPROCESSED_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

Pub/Sub vers Pub/Sub

Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution.
  • L'abonnement Pub/Sub source doit être un abonnement pull.
  • Le sujet Pub/Sub de destination doit exister avant l'exécution.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Sujet Cloud Pub/Sub dans lequel écrire la sortie. Par exemple, projects/<project-id>/topics/<topic-name>.
filterKey (Facultatif) Filtrez les événements en fonction d'une clé d'attribut. Aucun filtre n'est appliqué si filterKey n'est pas spécifié.
filterValue (Facultatif) Valeur d'attribut de filtre à utiliser dans le cas où un filterKey est fourni. Une valeur filterValue nulle est utilisée par défaut.

Exécuter le modèle Pub/Sub vers Pub/Sub

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.

Pub/Sub vers Splunk

Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk via la solution HEC (HTTP Event Collector) de Splunk. Le cas d'utilisation le plus courant de ce modèle est l'exportation de journaux vers Splunk. Pour découvrir un exemple du workflow sous-jacent, consultez la section Déployer des exportations de journaux prêtes pour la production vers Splunk à l'aide de Dataflow.

Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript définie par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub non traité en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.

Pour ajouter une couche de protection à votre jeton HEC, vous pouvez également transmettre une clé Cloud KMS ainsi que le paramètre de jeton HEC encodé en base64 chiffré avec cette clé. Pour en savoir plus sur le chiffrement des paramètres du jeton HEC, consultez la page sur le point de terminaison du chiffrement de l'API Cloud KMS.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
  • Le sujet Pub/Sub non traité doit exister avant l'exécution du pipeline.
  • Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
  • Le jeton de la solution HEC de Splunk doit être généré et disponible.

Paramètres de modèle

Paramètres Description
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Par exemple, projects/<project-id>/subscriptions/<subscription-name>.
token (Facultatif) Jeton d'authentification HEC Splunk. Doit être spécifié si tokenSource est défini sur PLAINTEXT ou KMS.
url URL HEC Splunk. Il doit être routable depuis le VPC dans lequel le pipeline est exécuté. Par exemple, https://splunk-hec-host:8088.
outputDeadletterTopic Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
batchCount (Facultatif) Taille de lot pour l'envoi de plusieurs événements vers Splunk. Valeur par défaut 1 (pas de traitement par lots).
parallelism (Facultatif) Nombre maximal de demandes en parallèle. Valeur par défaut 1 (aucun parallélisme).
disableCertificateValidation (Facultatif) Désactiver la validation du certificat SSL. Valeur par défaut "false" (validation activée). Si la valeur est "true", les certificats ne sont pas validés (tous les certificats sont approuvés) et le paramètre "rootCaCertificatePath" est ignoré.
includePubsubMessage (Facultatif) Inclure le message Pub/Sub complet dans la charge utile. Valeur "false" par défaut (seul l'élément de données est inclus dans la charge utile).
tokenSource Source du jeton. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager est utilisé. Si tokenSource est défini sur KMS, tokenKMSEncryptionKey et le token chiffré doivent être spécifiés. Si tokenSource est défini sur SECRET_MANAGER, tokenSecretId doit être spécifié. Si tokenSource est défini sur PLAINTEXT, token doit être spécifié.
tokenKMSEncryptionKey (Facultatif) Clé Cloud KMS permettant de déchiffrer la chaîne du jeton HEC. Ce paramètre doit être spécifié si tokenSource est défini sur KMS. Si la clé Cloud KMS est fournie, la chaîne du jeton HEC doit être transmise sous forme chiffrée.
tokenSecretId (Facultatif) ID du secret fourni par Secret Manager pour le jeton. Ce paramètre doit être spécifié si tokenSource est défini sur SECRET_MANAGER. Format requis : projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath (Facultatif) URL complète du certificat CA racine dans Cloud Storage. Par exemple, gs://mybucket/mycerts/privateCA.crt. Le certificat fourni dans Cloud Storage doit être encodé au format DER et peut être fourni en encodage binaire ou imprimable (base64). Si le certificat est fourni avec un encodage en base64, il doit être délimité par "------BEGIN CERTIFICATE-----" au début et par "-----END CERTIFICATE-----" à la fin. Si ce paramètre est fourni, ce fichier de certificat CA privé est extrait et ajouté au trust store du nœud de calcul Dataflow pour vérifier le certificat SSL du point de terminaison HEC de Splunk. Si ce paramètre n'est pas fourni, le trust store par défaut est utilisé.
enableBatchLogs (Facultatif) Spécifie si les journaux doivent être activés pour les lots écrits dans Splunk. Valeur par défaut : true
enableGzipHttpCompression (Facultatif) Indique si les requêtes HTTP envoyées à la solution HEC de Splunk doivent être compressées (codage de contenu gzip). Valeur par défaut : true

Exécuter le modèle Pub/Sub vers Splunk

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Splunk template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
  • ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
  • ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)

Pub/Sub vers fichiers Avro dans Cloud Storage

Le modèle Pub/Sub vers fichiers Avro dans Cloud Storage est un pipeline de streaming qui lit les données d'un sujet Pub/Sub et écrit des fichiers Avro dans le bucket Cloud Storage spécifié.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub d'entrée doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Pub/Sub permettant de s'abonner à la consultation de messages. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Répertoire de sortie dans lequel les fichiers de sortie Avro seront archivés. Doit inclure / à la fin. Exemple : gs://example-bucket/example-directory/.
avroTempDirectory Répertoire des fichiers Avro temporaires. Doit inclure / à la fin. Par exemple : gs://example-bucket/example-directory/.
outputFilenamePrefix (Facultatif) Préfixe du nom de fichier de sortie pour les fichiers Avro.
outputFilenameSuffix (Facultatif) Suffixe du nom de fichier de sortie pour les fichiers Avro.
outputShardTemplate [Facultatif] Modèle de partition du fichier de sortie. Spécifié en tant que séquences répétées des lettres S ou N. Exemple : SSS-NNN. Celles-ci sont remplacées par le numéro de partition ou par le nombre total de partitions, respectivement. Si ce paramètre n'est pas spécifié, le format du modèle par défaut est W-P-SS-of-NN.

Exécuter le modèle Pub/Sub vers Cloud Storage Avro

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Avro Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom du sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
  • FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
  • FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
  • SHARD_TEMPLATE : modèle de partition de sortie préféré

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom du sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage
  • FILENAME_PREFIX : préfixe du nom de fichier de sortie préféré
  • FILENAME_SUFFIX : suffixe du nom de fichier de sortie préféré
  • SHARD_TEMPLATE : modèle de partition de sortie préféré

Sujet Pub/Sub vers des fichiers texte dans Cloud Storage

Le modèle Cloud Pub/Sub vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Cloud Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Les messages publiés sur le thème doivent être au format texte.
  • Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>.
outputDirectory Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output-.
outputFilenameSuffix Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NNW correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01.

Exécuter le modèle Pub/Sub vers des fichiers texte dans Cloud Storage

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Text Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • STAGING_LOCATION: emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TEMP_LOCATION : emplacement de l'écriture de fichiers temporaires (par exemple, gs://your-bucket/temp)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom du bucket Cloud Storage

Sujet Pub/Sub ou abonnement vers des fichiers texte dans Cloud Storage

Le sujet Pub/Sub ou l'abonnement vers texte Cloud Storage est un pipeline de streaming qui lit les enregistrements de Pub/Sub et les enregistre sous forme d'une série de fichiers Cloud Storage au format texte. Le modèle peut être utilisé comme moyen rapide d'enregistrer des données dans Pub/Sub pour une utilisation ultérieure. Par défaut, le modèle génère un nouveau fichier toutes les 5 minutes.

Conditions requises pour ce pipeline :

  • Le sujet Pub/Sub ou l'abonnement doivent exister avant l'exécution.
  • Les messages publiés sur le thème doivent être au format texte.
  • Les messages publiés sur le thème ne doivent contenir aucune nouvelle ligne. Notez que chaque message Pub/Sub est enregistré sur une ligne unique dans le fichier de sortie.

Paramètres de modèle

Paramètres Description
inputTopic Sujet Pub/Sub à partir duquel lire l'entrée. Le nom du sujet doit être au format projects/<project-id>/topics/<topic-name>. Si ce paramètre est fourni, inputSubscription ne doit pas être fourni.
inputSubscription Abonnement Pub/Sub à partir duquel lire l'entrée. Le nom de l'abonnement doit être au format projects/<project-id>/subscription/<subscription-name>. Si ce paramètre est fourni, inputTopic ne doit pas être fourni.
outputDirectory Chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Par exemple : gs://bucket-name/path/. Cette valeur doit se terminer par une barre oblique.
outputFilenamePrefix Préfixe à placer sur chaque fichier ciblé sur une fenêtre. Par exemple, output-.
outputFilenameSuffix Suffixe à placer sur chaque fichier ciblé sur une fenêtre, généralement une extension de fichier telle que .txt ou .csv.
outputShardTemplate Le modèle de segment définit la partie dynamique de chaque fichier ciblé sur une fenêtre. Par défaut, le pipeline utilise un seul segment pour la sortie vers le système de fichiers dans chaque fenêtre. Cela signifie que toutes les données sortent dans un seul fichier par fenêtre. Le outputShardTemplate devient par défaut W-P-SS-of-NNW correspond à la plage de dates de la fenêtre, P correspond aux informations du volet, S correspond au numéro de segment et N au nombre de segments. Dans le cas d'un fichier unique, la partie SS-of-NN de outputShardTemplate est 00-of-01.
windowDuration (Facultatif) La durée de fenêtre correspond à l'intervalle au cours duquel les données sont écrites dans le répertoire de sortie. Configurez la durée en fonction du débit du pipeline. Par exemple, un débit plus élevé peut nécessiter des tailles de fenêtre plus petites pour que les données s'intègrent à la mémoire. La valeur par défaut est "5m", avec une durée minimale de 1 s. Les formats autorisés sont les suivants : [int]s (pour les secondes, exemple : 5s), [int]m (pour les minutes, exemple : 12m), [int]h (pour les heures, exemple : 2h).

Exécuter le modèle Sujet Pub/Sub ou l'abonnement vers les fichiers texte dans Cloud Storage

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche de votre choix
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage

Pub/Sub vers MongoDB

Le modèle Pub/Sub vers MongoDB est un pipeline de streaming qui lit les messages encodés au format JSON d'un abonnement Pub/Sub et les écrit dans MongoDB sous forme de documents. Si nécessaire, ce pipeline accepte des transformations supplémentaires qui peuvent être incluses à l'aide d'une fonction JavaScript définie par l'utilisateur. Toute erreur survenue en raison d'une non-concordance du schéma ou d'un format JSON non valide, ou pendant l'exécution de transformations, est enregistrée avec le message d'entrée dans une table BigQuery destinée aux messages non traités. Si la table des enregistrements non traités n'existe pas avant l'exécution, le pipeline la crée automatiquement.

Conditions requises pour ce pipeline :

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Le cluster MongoDB doit exister et être accessible à partir des machines de nœud de calcul Dataflow.

Paramètres de modèle

Paramètres Description
inputSubscription Nom de l'abonnement Pub/Sub. Par exemple : projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Liste de serveurs MongoDB séparés par une virgule. Par exemple : 192.285.234.12:27017,192.287.123.11:27017
database Base de données dans MongoDB pour stocker la collection. Exemple : my-db.
collection Nom de la collection dans la base de données MongoDB. Exemple : my-collection.
deadletterTable Table BigQuery qui stocke les messages en raison d'échecs (schéma non correspondant, format JSON non valide, etc.). Exemple : project-id:dataset-name.table-name.
javascriptTextTransformGcsPath (Facultatif) URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facultatif) Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
batchSize (Facultatif) Taille de lot utilisée pour l'insertion par lots de documents dans MongoDB. Valeur par défaut : 1000
batchSizeBytes (Facultatif) Taille du lot en octets. Valeur par défaut : 5242880
maxConnectionIdleTime (Facultatif) Durée maximale d'inactivité autorisée en secondes avant que le délai de connexion ne s'écoule. Valeur par défaut : 60000
sslEnabled (Facultatif) Valeur booléenne indiquant si le protocole SSL est activé pour la connexion à MongoDB. Valeur par défaut : true
ignoreSSLCertificate (Facultatif) Valeur booléenne indiquant si le certificat SSL doit être ignoré. Valeur par défaut : true
withOrdered (Facultatif) Valeur booléenne permettant l'activation d'insertions groupées triées dans MongoDB. Valeur par défaut : true
withSSLInvalidHostNameAllowed (Facultatif) Valeur booléenne indiquant si un nom d'hôte non valide est autorisé pour la connexion SSL. Valeur par défaut : true

Exécuter le modèle Pub/Sub vers MongoDB

Console

  1. Accédez à la page Dataflow Créer une tâche à partir d'un modèle.
  2. Accéder à la page Créer une tâche à partir d'un modèle
  3. Dans le champ Nom de la tâche, saisissez un nom de tâche unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. Le point de terminaison régional par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter une tâche Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to MongoDB template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • REGION_NAME : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • JOB_NAME : nom de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Cloud dans lequel vous souhaitez exécuter la tâche Dataflow
  • LOCATION : point de terminaison régional où vous souhaitez déployer votre tâche Dataflow, par exemple us-central1
  • JOB_NAME : nom de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

    • latest pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates/latest/
    • Le nom de la version, par exemple :2021-09-20-00_RC00, pour utiliser une version spécifique du modèle, qui peut être imbriquée dans le dossier parent daté du bucket :gs://dataflow-templates/
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub (par exemple, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI : adresses du serveur MongoDB (par exemple, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE : nom de la base de données MongoDB (par exemple, users)
  • COLLECTION : nom de la collection MongoDB (par exemple, profiles)
  • UNPROCESSED_TABLE : nom de la table BigQuery (par exemple, your-project:your-dataset.your-table-name)