Exécuter des modèles classiques

Une fois que vous avez créé et préproduit votre modèle Dataflow, l'étape suivante consiste à l'exécuter à l'aide de Google Cloud Console, de l'API REST ou de l'outil de ligne de commande gcloud. Vous pouvez déployer les tâches du modèle Dataflow à partir de nombreux environnements, y compris l'environnement App Engine standard, Cloud Functions et d'autres environnements restreints.

Remarque : Outre le fichier de modèle, l'exécution du pipeline basé sur un modèle s'appuie sur des fichiers gérés et préparés au moment de la création du modèle. Si vous déplacez ou supprimez les fichiers intermédiaires, votre tâche de pipeline échouera.

Utiliser Cloud Console

Vous pouvez utiliser Cloud Console pour exécuter des modèles Dataflow personnalisés et fournis par Google.

Modèles fournis par Google

Pour exécuter un modèle fourni par Google, procédez comme suit :

  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  4. Bouton "Créer une tâche à partir d'un modèle" dans la console Cloud Platform
  5. Sélectionnez le modèle fourni par Google que vous souhaitez exécuter dans le menu déroulant Dataflow template (Modèle Dataflow).
  6. Formulaire d'exécution d'un modèle WordCount
  7. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  8. Saisissez vos valeurs de paramètres dans les champs fournis. La section Additional Parameters (Paramètres supplémentaires) peut être ignorée lorsque vous utilisez un modèle fourni par Google.
  9. Cliquez sur Run Job (Exécuter la tâche).

Modèles personnalisés

Pour exécuter un modèle personnalisé, procédez comme suit :

  1. Accédez à la page Dataflow dans Cloud Console.
  2. Accéder à la page Dataflow
  3. Cliquez sur CREATE JOB FROM TEMPLATE (Créer une tâche à partir d'un modèle).
  4. Bouton "Créer une tâche à partir d'un modèle" dans la console Cloud Platform
  5. Sélectionnez Custom Template (Modèle personnalisé) dans le menu déroulant Dataflow template (Modèle Dataflow).
  6. Formulaire d'exécution d'un modèle personnalisé
  7. Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).
  8. Saisissez le chemin d'accès Cloud Storage vers votre fichier de modèle dans le champ "Template GCS Path" (Chemin d'accès GCS du modèle).
  9. Si votre modèle a besoin d'autres paramètres, cliquez sur Add parameter (Ajouter un paramètre) dans la section Additional Parameters (Paramètres supplémentaires). Saisissez le nom (Name) et la valeur (Value) du paramètre. Répétez cette opération pour chaque paramètre requis.
  10. Cliquez sur Run Job (Exécuter la tâche).

Utiliser l'API REST

Pour exécuter un modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.

Consultez la documentation de référence de l'API REST concernant la requête projects.templates.launch pour en savoir plus sur les paramètres disponibles.

Exemple 1 : Création d'une tâche par lot à partir d'un modèle personnalisé

Cet exemple de requête projects.templates.launch crée une tâche par lot à partir d'un modèle qui lit un fichier texte et génère un fichier texte de sortie. Si la requête aboutit, le corps de la réponse contient une instance de LaunchTemplateResponse.

Vous devez modifier les valeurs suivantes :

  • Remplacez YOUR_PROJECT_ID par l'ID du projet.
  • Remplacez LOCATION par le point de terminaison régional Dataflow de votre choix.
  • Remplacez JOB_NAME par le nom de tâche de votre choix.
  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
  • Définissez gcsPath sur l'emplacement Cloud Storage du fichier de modèle.
  • Définissez parameters sur votre liste de paires clé/valeur.
  • Définissez tempLocation sur un emplacement pour lequel vous disposez d'une autorisation en écriture. Cette valeur est nécessaire pour exécuter les modèles fournis par Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Exemple 2 : Création d'une tâche par flux à partir d'un modèle personnalisé

Cet exemple de requête projects.templates.launch crée une tâche par flux à partir d'un modèle qui lit un sujet Pub/Sub et écrit dans une table BigQuery. Cette dernière doit déjà exister avec le schéma approprié. Si la requête aboutit, le corps de la réponse contient une instance de LaunchTemplateResponse.

Vous devez modifier les valeurs suivantes :

  • Remplacez YOUR_PROJECT_ID par l'ID du projet.
  • Remplacez LOCATION par le point de terminaison régional Dataflow de votre choix.
  • Remplacez JOB_NAME par le nom de tâche de votre choix.
  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
  • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
  • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et remplacez YOUR_TABLE_NAME par le nom de votre table BigQuery.
  • Définissez gcsPath sur l'emplacement Cloud Storage du fichier de modèle.
  • Définissez parameters sur votre liste de paires clé/valeur.
  • Définissez tempLocation sur un emplacement pour lequel vous disposez d'une autorisation en écriture. Cette valeur est nécessaire pour exécuter les modèles fournis par Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
            "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Exemple 3 : Mise à jour d'une tâche par flux à partir d'un modèle personnalisé

Cet exemple de requête projects.templates.launch montre comment mettre à jour une tâche par flux à partir d'un modèle.

  1. Exécutez Exemple 2 : Création d'une tâche par flux à partir d'un modèle personnalisé pour démarrer une tâche par flux à partir d'un modèle.
  2. Envoyez la requête HTTP POST ci-dessous, avec les valeurs modifiées suivantes :
    • Remplacez YOUR_PROJECT_ID par l'ID du projet.
    • Remplacez LOCATION par le point de terminaison régional Dataflow de votre choix.
    • Remplacez JOB_NAME par le nom de tâche de votre choix.
    • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
    • Remplacez YOUR_TOPIC_NAME par le nom de votre sujet Pub/Sub.
    • Remplacez YOUR_DATASET par votre ensemble de données BigQuery et remplacez YOUR_TABLE_NAME par le nom de votre table BigQuery.
    • Définissez gcsPath sur l'emplacement Cloud Storage du fichier de modèle.
    • Définissez parameters sur votre liste de paires clé/valeur.
    • Définissez tempLocation sur un emplacement pour lequel vous disposez d'une autorisation en écriture. Cette valeur est nécessaire pour exécuter les modèles fournis par Google.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
            "update": true
        }
    
  3. Accédez à l'interface de surveillance de Dataflow et vérifiez qu'une nouvelle tâche portant le même nom a été créée. Cette tâche est associée à l'état Updated (Mis à jour).

Utiliser les bibliothèques clientes des API Google

Pensez à utiliser les bibliothèques clientes des API Google pour effectuer facilement des appels aux API REST de Dataflow. Cet exemple de script utilise la bibliothèque cliente des API Google pour Python.

Dans cet exemple, vous devez définir les variables suivantes :

  • project : définie sur l'ID de votre projet.
  • job : définie sur le nom de tâche unique de votre choix.
  • template : définie sur l'emplacement Cloud Storage du fichier de modèle.
  • parameters : définie sur un dictionnaire contenant les paramètres du modèle.
from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
    projectId=project,
    gcsPath=template,
    body={
        'jobName': job,
        'parameters': parameters,
    }
)

response = request.execute()

Pour en savoir plus sur les options disponibles, reportez-vous à la section traitant de la méthode projects.templates.launch dans la documentation de référence de l'API REST de Cloud Dataflow.

Utiliser gcloud

Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.

L'outil de ligne de commande gcloud permet d'exécuter un modèle personnalisé ou fourni par Google à l'aide de la commande gcloud dataflow jobs run. Des exemples d'exécution de modèles fournis par Google sont présentés sur la page de modèles fournis par Google.

Pour les exemples de modèles personnalisés ci-après, définissez les valeurs suivantes :

  • Remplacez JOB_NAME par le nom de tâche de votre choix.
  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
  • Vous devez inclure l'indicateur --gcs-location. Définissez --gcs-location sur l'emplacement Cloud Storage du fichier de modèle.
  • Définissez --parameters sur la liste de paramètres séparés par des virgules à appliquer à la tâche. Les espaces entre virgules et valeurs ne sont pas autorisés.

Exemple 1 : Modèle personnalisé, tâche par lot

Cet exemple crée une tâche par lot à partir d'un modèle qui lit un fichier texte et génère un fichier texte de sortie.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output

La requête renvoie une réponse au format suivant :

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Exemple 2 : Modèle personnalisé, tâche de diffusion

Cet exemple crée une tâche par flux à partir d'un modèle qui lit un sujet Pub/Sub et écrit dans une table BigQuery. Cette dernière doit déjà exister avec le schéma approprié.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

La requête renvoie une réponse au format suivant :

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Pour obtenir la liste complète des options disponibles pour la commande gcloud dataflow jobs run, consultez la documentation de référence de l'outil gcloud.

Surveillance et dépannage

L'interface de surveillance de Dataflow vous permet de surveiller vos tâches Dataflow. En cas d'échec d'une tâche, vous trouverez des conseils de dépannage, des stratégies de débogage et un catalogue des erreurs courantes dans le guide Résoudre les problèmes liés à votre pipeline.