Modèle de flux de modifications Spanner vers la base de données source

Pipeline de streaming. Lit les données des flux de modifications Spanner et les écrit dans une source.

Paramètres de modèle

Paramètre Description
changeStreamName Nom du flux de modifications Spanner à partir duquel le pipeline lit les données.
instanceId Nom de l'instance Spanner dans laquelle le flux de modifications est présent.
databaseId Nom de la base de données Spanner que le flux de modifications surveille.
spannerProjectId Nom du projet Spanner.
metadataInstance Instance permettant de stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API de flux de modification.
metadataDatabase Base de données permettant de stocker les métadonnées utilisées par le connecteur pour contrôler la consommation des données de l'API de flux de modification.
sourceShardsFilePath Chemin d'accès à un fichier Cloud Storage contenant des informations sur le profil de connexion pour les fragments sources.
startTimestamp Facultatif: Code temporel de début pour la lecture des modifications. La valeur par défaut est vide.
endTimestamp Facultatif: Code temporel de fin pour la lecture des modifications. Si aucun code temporel n'est fourni, la lecture se poursuit indéfiniment. La valeur par défaut est vide.
shadowTablePrefix Facultatif: préfixe utilisé pour nommer les tables fantômes. Valeur par défaut : shadow_
sessionFilePath Facultatif: Chemin d'accès de la session dans Cloud Storage contenant des informations de mappage de HarbourBridge.
filtrationMode Facultatif: Mode de filtrage. Spécifie comment supprimer certains enregistrements en fonction d'un critère. Les modes acceptés sont les suivants: none (ne filtre rien), forward_migration (filtre les enregistrements écrits à l'aide du pipeline de migration avant). La valeur par défaut est forward_migration.
shardingCustomJarPath Facultatif: emplacement du fichier JAR personnalisé dans Cloud Storage contenant la logique de personnalisation pour extraire l'ID de fragment. Si vous définissez ce paramètre, définissez le paramètre shardingCustomJarPath. La valeur par défaut est vide.
shardingCustomClassName Facultatif: nom de classe complet avec l'implémentation de l'ID de fragment personnalisé. Si shardingCustomJarPath est spécifié, ce paramètre est obligatoire. La valeur par défaut est vide.
shardingCustomParameters Facultatif: chaîne contenant les paramètres personnalisés à transmettre à la classe de partitionnement personnalisée. La valeur par défaut est vide.
sourceDbTimezoneOffset (Facultatif) Décalage du fuseau horaire par rapport à l'UTC pour la base de données source. Exemple de valeur: +10:00. La valeur par défaut est +00:00.
dlqGcsPubSubSubscription Facultatif: abonnement Pub/Sub utilisé dans une règle de notification Cloud Storage pour le répertoire de nouvelle tentative de la file d'attente de lettres mortes en mode normal. Le nom doit être au format projects/<project-id>/subscriptions/<subscription-name>. Lorsqu'il est défini, les valeurs deadLetterQueueDirectory et dlqRetryMinutes sont ignorées.
skipDirectoryName Facultatif: les enregistrements ignorés lors de la réplication inverse sont écrits dans ce répertoire. Le nom de répertoire par défaut est "skip".
maxShardConnections Facultatif: nombre maximal de connexions qu'une partition donnée peut accepter. La valeur par défaut est 10000.
deadLetterQueueDirectory Facultatif: chemin d'accès utilisé pour stocker la sortie de la file d'attente d'erreurs. Le chemin d'accès par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow.
dlqMaxRetryCount Facultatif: nombre maximal de nouvelles tentatives en raison d'erreurs temporaires via la file d'attente de lettres mortes. Valeur par défaut : 500.
runMode Facultatif: type de mode d'exécution. Valeurs autorisées: regular, retryDLQ. Valeur par défaut : regular Spécifiez retryDLQ pour ne réessayer que les enregistrements de file d'attente de messages non distribués graves.
dlqRetryMinutes Facultatif : nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. Valeur par défaut : 10

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Spanner Change Streams to Source Database template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud CLI

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • CHANGE_STREAM_NAME: nom du flux de modifications à lire
  • INSTANCE_ID : ID de l'instance Cloud Spanner
  • DATABASE_ID : ID de la base de données Cloud Spanner
  • SPANNER_PROJECT_ID : ID du projet Cloud Spanner
  • METADATA_INSTANCE: instance Cloud Spanner pour stocker les métadonnées lors de la lecture à partir de flux de modifications
  • METADATA_DATABASE: la base de données Cloud Spanner pour stocker les métadonnées lors de la lecture à partir de flux de modifications
  • SOURCE_SHARDS_FILE_PATH: chemin d'accès au fichier GCS contenant les détails du fragment source

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • CHANGE_STREAM_NAME: nom du flux de modifications à lire
  • INSTANCE_ID : ID de l'instance Cloud Spanner
  • DATABASE_ID : ID de la base de données Cloud Spanner
  • SPANNER_PROJECT_ID : ID du projet Cloud Spanner
  • METADATA_INSTANCE: instance Cloud Spanner pour stocker les métadonnées lors de la lecture à partir de flux de modifications
  • METADATA_DATABASE: la base de données Cloud Spanner pour stocker les métadonnées lors de la lecture à partir de flux de modifications
  • SOURCE_SHARDS_FILE_PATH: chemin d'accès au fichier GCS contenant les détails du fragment source