Questions fréquentes

La section suivante contient des réponses aux questions les plus fréquentes relatives à Cloud Dataflow.

Questions d'ordre général

Où puis-je obtenir de l'aide supplémentaire ?

Vous pouvez accéder à l'assistance Google Cloud Platform (GCP) afin d'obtenir une formule d'assistance pour GCP, y compris Cloud Dataflow.

Vous pouvez également rechercher votre question sur StackOverflow ou en poser une nouvelle. Veuillez spécifier le tag google-cloud-dataflow lorsque vous posez une question. Ce groupe est surveillé par des ingénieurs de Google qui se feront un plaisir de répondre à vos questions.

Vous pouvez aussi envoyer des questions, demandes de fonctionnalités, rapports de bug et autres commentaires sur le forum UserVoice.

Est-il possible de partager des données entre plusieurs instances de pipeline ?

Aucun mécanisme de communication spécifique à Dataflow ne permet de partager des données ni de traiter du contexte entre plusieurs pipelines. Vous pouvez toutefois recourir à un service de stockage durable tel que Cloud Storage ou à un cache en mémoire tel que App Engine pour partager des données entre plusieurs instances de pipeline.

Cloud Dataflow possède-t-il un mécanisme de planification intégré permettant d'exécuter des pipelines à un moment ou à un intervalle donné ?

Vous pouvez automatiser l'exécution des pipelines :

Comment savoir quelle version du SDK Cloud Dataflow est installée ou s'exécute dans mon environnement ?

Les détails d'installation dépendent de votre environnement de développement. Si vous utilisez Maven, plusieurs versions du SDK Cloud Dataflow peuvent être "installées" dans un ou plusieurs dépôts Maven locaux.

Java

Pour connaître la version du SDK Cloud Dataflow qu'exécute un pipeline donné, vous pouvez consulter la sortie de la console lors de l'exécution avec DataflowPipelineRunner ou BlockingDataflowPipelineRunner. La console affiche un message semblable au suivant, qui contient les informations relatives à la version du SDK Cloud Dataflow :

Python

Pour connaître la version du SDK Cloud Dataflow qu'exécute un pipeline donné, vous pouvez consulter la sortie de la console lors de l'exécution avec DataflowRunner. La console affiche un message semblable au suivant, qui contient les informations relatives à la version du SDK Cloud Dataflow :

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interagir avec votre tâche Cloud Dataflow

Puis-je accéder aux nœuds de calcul (VM Compute Engine) de ma tâche pendant l'exécution de mon pipeline ?

Vous pouvez afficher les instances de VM d'un pipeline donné à l'aide de la console Google Cloud Platform. Vous pouvez ensuite vous connecter en SSH à chaque instance depuis la console. Sachez toutefois que le service Cloud Dataflow s'arrête automatiquement et nettoie les instances de VM dès que votre tâche se termine ou échoue.

Dans l'interface de surveillance de Cloud Dataflow, pourquoi ne puis-je pas voir le temps CPU réservé de ma tâche en flux continu ?

Le service Cloud Dataflow n'indique le temps CPU réservé qu'une fois une tâche terminée. Dans le cas d'une tâche illimitée, le temps CPU réservé n'est indiqué qu'après son annulation ou son échec.

Dans l'interface de surveillance de Cloud Dataflow, pourquoi les informations sur l'état et le filigrane des tâches ne sont-elles pas disponibles pour les tâches en flux continu récemment mises à jour ?

Les opérations de mise à jour apportent plusieurs modifications qui mettent quelques minutes à se propager sur l'interface de surveillance de Cloud Dataflow. Essayez d'actualiser l'interface de surveillance cinq minutes après la mise à jour de la tâche.

Pourquoi mes transformations composites personnalisées sont-elles développées dans l'interface de surveillance de Cloud Dataflow ?

Dans votre code de pipeline, vous avez peut-être appelé votre transformation composite de la manière suivante :

result = transform.apply(input);

Les transformations composites appelées de cette façon omettent l'imbrication attendue et peuvent donc apparaître développées dans l'interface de surveillance de Cloud Dataflow. Votre pipeline peut également générer des erreurs ou des avertissements relatifs aux noms uniques stables au moment de l'exécution du pipeline.

Pour éviter ces problèmes, assurez-vous d'appeler vos transformations à l'aide du format recommandé :

result = input.apply(transform);

Pourquoi les informations sur ma tâche en cours n'apparaissent-elles plus dans l'interface de surveillance de Cloud Dataflow alors qu'elles s'affichaient auparavant ?

Un problème connu peut actuellement affecter certaines tâches Cloud Dataflow qui s'exécutent depuis un mois ou plus. Ces tâches peuvent ne pas se charger dans l'interface de surveillance de Cloud Dataflow ou présenter des informations obsolètes, même si elles étaient visibles auparavant.

Vous pouvez toujours consulter l'état de votre tâche dans la liste des tâches lorsque vous utilisez l'interface de surveillance ou l'interface de ligne de commande de Cloud Dataflow. Vous ne pouvez toutefois pas afficher les informations relatives à votre tâche si ce problème est présent.

Programmer avec le SDK Apache Beam pour Java

Puis-je transmettre des données supplémentaires (hors bande) à une opération ParDo existante ?

Oui. Vous pouvez appliquer différents schémas en fonction de votre cas d'utilisation :

  • Vous pouvez sérialiser des informations sous forme de champs dans votre sous-classe DoFn.
  • Toutes les variables référencées par les méthodes dans une fonction DoFn anonyme sont automatiquement sérialisées.
  • Vous pouvez calculer des données dans DoFn.startBundle().
  • Vous pouvez transmettre des données via ParDo.withSideInputs.

Pour en savoir plus, consultez la documentation ParDo, en particulier les sections "Creating a DoFn" (Créer une fonction DoFn) et "Side Inputs" (Entrées secondaires), ainsi que la documentation de référence de l'API pour Java concernant ParDo.

Comment les exceptions Java sont-elles traitées dans Cloud Dataflow ?

Votre pipeline peut générer des exceptions lors du traitement des données. Certaines de ces erreurs sont transitoires (par exemple, difficulté temporaire à accéder à un service externe), mais certaines sont permanentes, telles que les erreurs causées par des données d'entrée corrompues ou impossibles à analyser, ou par des pointeurs vides lors du calcul.

Cloud Dataflow traite les éléments dans des ensembles arbitraires et relance l'ensemble complet lorsqu'une erreur est générée pour l'un des éléments qu'il contient. Lors de l'exécution en mode de traitement par lots, les ensembles comprenant un élément défaillant sont relancés quatre fois. Le pipeline échoue complètement lorsqu'un ensemble échoue quatre fois. Lors de l'exécution en mode de traitement par flux, un ensemble comprenant un élément défaillant est relancé indéfiniment, ce qui risque de bloquer votre pipeline de manière permanente.

Les exceptions détectées dans le code utilisateur (par exemple, dans vos instances DoFn) sont signalées dans l'interface de surveillance de Cloud Dataflow. Si vous exécutez votre pipeline avec BlockingDataflowPipelineRunner, vous apercevrez également des messages d'erreur dans votre console ou votre fenêtre de terminal.

Vous pouvez envisager d'ajouter des gestionnaires d'exceptions pour vous prémunir contre les erreurs dans votre code. Par exemple, si vous souhaitez supprimer les éléments qui font échouer une validation d'entrée personnalisée effectuée dans une transformation ParDo, utilisez un bloc "try" ou "catch" dans ParDo pour traiter l'exception et supprimer l'élément. Vous pouvez également surveiller le nombre d'erreurs grâce à Aggregator.

Programmer avec le SDK Cloud Dataflow pour Python

Comment traiter les erreurs NameError ?

Si vous obtenez une erreur NameError lorsque vous exécutez votre pipeline à l'aide du service Cloud Dataflow, mais que tout fonctionne normalement lorsque vous l'exécutez localement (c'est-à-dire à l'aide de DirectRunner), il est possible que vos fonctions DoFn utilisent des valeurs de l'espace de noms global qui ne sont pas disponibles sur le nœud de calcul Cloud Dataflow.

Par défaut, les importations, les fonctions et les variables globales définies dans la session principale ne sont pas enregistrées lors de la sérialisation d'une tâche Cloud Dataflow. Par exemple, si vos fonctions DoFn sont définies dans le fichier principal et référencent les importations et les fonctions dans l'espace de noms global, vous pouvez définir l'option de pipeline --save_main_session sur True. L'état de l'espace de noms global sera alors picklé et chargé sur le nœud de calcul Cloud Dataflow.

Notez que si vous possédez des objets ne pouvant pas être picklés dans votre espace de noms global, vous obtenez une erreur de pickling. Si l'erreur concerne un module qui devrait être disponible dans la distribution Python, vous pouvez l'importer localement (là où il est utilisé) pour résoudre le problème.

Par exemple, au lieu du code suivant :

import re
…
def myfunc():
  # use re module

utilisez les lignes ci-dessous :

def myfunc():
  import re
  # use re module

Si vos fonctions DoFn couvrent plusieurs fichiers, vous devez suivre une approche différente pour empaqueter votre workflow et gérer les dépendances.

E/S de pipeline

La source et le récepteur TextIO acceptent-ils les fichiers compressés tels que GZip ?

Oui. Cloud Dataflow Java peut lire les fichiers compressés avec gzip et bzip2. Consultez la documentation de TextIO pour en savoir plus.

Puis-je utiliser une expression régulière pour cibler des fichiers spécifiques avec la source TextIO ?

Cloud Dataflow est compatible avec les schémas généraux de caractères génériques. Votre expression glob peut apparaître n'importe où dans le chemin de fichier. Cloud Dataflow n'accepte toutefois pas les caractères génériques récursifs (**).

La source d'entrée TextIO est-elle compatible avec JSON ?

Oui. Vos données sources doivent toutefois être délimitées par un saut de ligne pour que le service Cloud Dataflow puisse paralléliser l'entrée et la sortie.

Pourquoi le rééquilibrage dynamique du travail ne s'active-t-il pas avec ma source personnalisée ?

Le rééquilibrage dynamique du travail est activé par la valeur de renvoi de la méthode getProgress() de votre source personnalisée. La mise en œuvre par défaut de getProgress() renvoie null. Pour garantir l'activation de l'autoscaling, assurez-vous que votre source personnalisée remplace getProgress() afin de renvoyer une valeur appropriée.

Comment accéder aux ensembles de données BigQuery ou aux sujets ou abonnements Pub/Sub appartenant à un autre projet Google Cloud Platform (c'est-à-dire, différent de celui avec lequel j'utilise Cloud Dataflow) ?

Consultez le guide Sécurité et autorisations de Cloud Dataflow pour découvrir comment accéder aux données BigQuery ou Cloud Pub/Sub d'un projet GCP différent de celui avec lequel vous utilisez Cloud Dataflow.

Traitement par flux

Comment exécuter mon pipeline en mode de traitement par flux ?

Vous pouvez définir l'indicateur --streaming sur la ligne de commande lorsque vous exécutez votre pipeline. Vous pouvez également définir le mode de traitement par flux de façon automatisée lorsque vous construisez votre pipeline.

Quels sources et récepteurs de données sont compatibles avec le mode de traitement par flux ?

Vous pouvez lire des données en flux continu depuis Cloud Pub/Sub et en écrire dans Cloud Pub/Sub ou BigQuery.

Quelles sont les limites actuelles relatives au mode de traitement par flux ?

Le mode de traitement par flux de Cloud Dataflow présente les limites suivantes :

  • Les sources de traitement par lots ne sont pas encore compatibles avec le mode de traitement par flux.
  • Les fonctionnalités de scaling automatique du service Cloud Dataflow sont compatibles en version bêta.

On dirait que mon pipeline de traitement par flux qui lit des données depuis Pub/Sub ralentit. Que puis-je faire ?

Votre projet ne dispose peut-être pas d'un quota Cloud Pub/Sub suffisant. Vous pouvez vérifier si le quota de votre projet est insuffisant en recherchant les erreurs client 429 (Rate limit exceeded) :

  1. Accédez à la console Google Cloud Platform.
  2. Dans le menu de gauche, sélectionnez API et services.
  3. Dans le champ de recherche, recherchez Cloud Pub/Sub.
  4. Cliquez sur l'onglet Utilisation.
  5. Vérifiez les codes de réponse et recherchez les codes d'erreur client (4xx).

Pourquoi ma tâche en flux continu ne s'étend-elle pas correctement lorsque j'augmente la taille du pool de nœuds de calcul de mon pipeline ?

Lorsque vous mettez à jour une tâche Cloud Dataflow et augmentez le nombre de nœuds de calcul de la nouvelle tâche, vous ne pouvez spécifier qu'un nombre de nœuds égal à la valeur --maxNumWorkers spécifiée pour votre tâche d'origine. Le nombre de nœuds de calcul (et donc de ressources de disques persistants) ne peut pas dépasser le nombre alloué au début de votre tâche d'origine.

Il s'agit d'un problème connu relatif au service Cloud Dataflow qui fait actuellement l'objet d'une enquête.

Autoscaling de streaming

Remarque : Le SDK Cloud Dataflow pour Python n'est actuellement pas compatible avec l'autoscaling en flux continu.

Comment puis-je définir un nombre de nœuds de calcul fixe ?

L'autoscaling en flux continu n'est pas activé par défaut. Vous devez activer cette fonctionnalité vous-même. Comme la sémantique des options actuelles ne change pas, aucune action n'est requise de votre part si vous souhaitez continuer à utiliser un nombre de nœuds de calcul fixe.

J'ai peur que l'autoscaling augmente ma facture. Comment puis-je limiter cette fonctionnalité ?

Vous pouvez spécifier --maxNumWorkers pour limiter la plage de scaling utilisée lors du traitement de votre tâche.

Quelle est la plage de scaling des pipelines d'autoscaling en flux continu ?

Le nombre de nœuds de calcul utilisés pour les pipelines d'autoscaling en flux continu varie entre N/15 et N nœuds de calcul (où N correspond à la valeur --maxNumWorkers). Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --maxNumWorkers=15 pour que le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul.

La valeur --maxNumWorkers ne peut pas dépasser 1 000.

Quel est le nombre maximal de nœuds de calcul utilisables par l'autoscaling ?

Cloud Dataflow utilise le quota d'instances Compute Engine ou la valeur maxNumWorkers de votre projet comme limite, selon la valeur la plus basse.

Puis-je désactiver l'autoscaling sur mon pipeline de traitement par flux ?

Oui. Définissez --autoscalingAlgorithm=NONE et mettez à jour les spécifications de cluster fixe (comme décrit dans la documentation sur le scaling manuel) en attribuant à numWorkers une valeur située dans la plage de scaling.

Puis-je modifier la plage de scaling de mon pipeline de traitement par flux ?

Oui, mais cette opération n'est pas possible à l'aide de la méthode Update*. Vous devez arrêter votre pipeline en exécutant la méthode Cancel ou Drain, puis le redéployer avec la nouvelle valeur maxNumWorkers souhaitée.

Configurer votre projet Google Cloud Platform afin d'utiliser Cloud Dataflow

Comment savoir si le projet que j'utilise avec Cloud Dataflow détient un bucket Cloud Storage sur lequel je souhaite lire ou écrire des données ?

Pour savoir si votre projet GCP détient un bucket Cloud Storage spécifique, vous pouvez utiliser la commande de console suivante :

gsutil acl get gs://<your-bucket>

La commande génère une chaîne JSON semblable à celle-ci :

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

Les entrées qui vous intéressent sont celles marquées du rôle "owner" (propriétaire). Le numéro projectNumber associé vous indique quel projet détient ce bucket. Si ce numéro ne correspond pas à celui de votre projet, vous devez effectuer l'une des opérations suivantes :

  • Créer un bucket appartenant à votre projet
  • Accorder aux comptes appropriés un accès au bucket

Comment créer un bucket appartenant à mon projet Cloud Dataflow ?

Pour créer un bucket dans le projet GCP avec lequel vous utilisez Cloud Dataflow, vous pouvez utiliser la commande de console suivante :

gsutil mb -p <Project to own the bucket> <bucket-name>

Comment accorder au projet Google Cloud Platform avec lequel j'utilise Cloud Dataflow un accès en lecture ou en écriture à un bucket détenu par un autre projet ?

Consultez le guide Sécurité et autorisations de Cloud Dataflow pour découvrir comment votre pipeline Cloud Dataflow peut accéder aux ressources GCP appartenant à un autre projet GCP.

Lorsque j'essaie d'exécuter ma tâche Cloud Dataflow, j'obtiens l'erreur "Vous devez activer certaines API Cloud dans votre projet pour pouvoir exécuter cette tâche Cloud Dataflow". Que dois-je faire ?

Pour exécuter une tâche Cloud Dataflow, vous devez activer les API GCP suivantes dans votre projet :

  • API Compute Engine (Compute Engine)
  • API Cloud Logging
  • Cloud Storage
  • API Cloud Storage JSON
  • API BigQuery
  • Cloud Pub/Sub
  • API Cloud Datastore

Consultez le guide de démarrage sur l'activation des API GCP pour en savoir plus.