Le modèle crée un pipeline de streaming qui interroge en permanence les nouveaux fichiers texte chargés dans Cloud Storage, lit chaque fichier ligne par ligne et publie des chaînes dans un sujet Pub/Sub. Le modèle publie les enregistrements dans un fichier délimité par une nouvelle ligne contenant des enregistrements JSON ou un fichier CSV dans un sujet Pub/Sub pour un traitement en temps réel. Vous pouvez utiliser ce modèle pour relire les données dans Pub/Sub.
Le pipeline s'exécute indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation "Watch", qui est une fonction "SplittableDoFn" qui n'est pas compatible avec le drainage.
Actuellement, l'intervalle d'interrogation est fixé à 10 secondes. Ce modèle ne définit aucun horodatage sur les enregistrements individuels. Par conséquent, l'heure de l'événement est égale à l'heure de publication pendant l'exécution. Si le traitement de votre pipeline dépend d'une heure d'événement précise, nous vous conseillons de ne pas utiliser ce pipeline.
Conditions requises pour ce pipeline
- Les fichiers d'entrée doivent être au format CSV ou JSON délimité par une nouvelle ligne. Les enregistrements couvrant plusieurs lignes dans les fichiers sources peuvent entraîner des problèmes en aval, car chaque ligne dans les fichiers est publiée sous forme de message à Pub/Sub.
- Le sujet Pub/Sub doit exister avant l'exécution.
- Le pipeline fonctionne indéfiniment et doit être terminé manuellement.
Paramètres de modèle
Paramètres obligatoires
- inputFilePattern : modèle de fichier d'entrée à lire. (par exemple, gs://bucket-name/files/*.json).
- outputTopic : sujet d'entrée Pub/Sub dans lequel écrire. Le nom doit être au format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. (par exemple, projects/votre-id-projet/topics/nom-de-votre-sujet).
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement par flux de type "au moins une fois", sélectionnez Au moins une fois.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom de votre bucket Cloud StorageFILE_PATTERN
: modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple,path/*.csv
)
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
STAGING_LOCATION
: emplacement des fichiers locaux de préproduction (par exemple,gs://your-bucket/staging
)TOPIC_NAME
: nom de votre sujet Pub/SubBUCKET_NAME
: nom de votre bucket Cloud StorageFILE_PATTERN
: modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple,path/*.csv
)
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.