Questions fréquentes

La section suivante contient des réponses aux questions les plus fréquentes liées à Dataflow.

Questions générales

Où puis-je obtenir de l'aide supplémentaire ?

Vous pouvez accéder à l'assistance Google Cloud afin d'obtenir une formule d'assistance pour Google Cloud, y compris Dataflow.

Vous pouvez également rechercher votre question sur StackOverflow ou en poser une nouvelle. Veuillez spécifier le tag google-cloud-dataflow lorsque vous posez une question. Ce groupe est surveillé par des ingénieurs de Google qui se feront un plaisir de répondre à vos questions.

Vous pouvez aussi envoyer des questions, demandes de fonctionnalités, rapports de bug et autres commentaires sur le forum UserVoice.

Est-il possible de partager des données entre plusieurs instances de pipeline ?

Aucun mécanisme de communication spécifique à Dataflow ne permet de partager des données ni de traiter du contexte entre plusieurs pipelines. Vous pouvez toutefois recourir à un service de stockage durable tel que Cloud Storage ou à un cache en mémoire tel que App Engine pour partager des données entre plusieurs instances de pipeline.

Cloud Dataflow possède-t-il un mécanisme de planification intégré permettant d'exécuter des pipelines à un moment ou à un intervalle donné ?

Vous pouvez automatiser l'exécution des pipelines :

Comment savoir quelle version du SDK Dataflow est installée ou s'exécute dans mon environnement ?

Les détails d'installation dépendent de votre environnement de développement. Si vous utilisez Maven, plusieurs versions du SDK Dataflow peuvent être "installées" dans un ou plusieurs dépôts Maven locaux.

Java

Pour connaître la version du SDK Dataflow exécutée par un pipeline spécifique, vous pouvez consulter la sortie de la console lors de l'exécution avec DataflowPipelineRunner ou BlockingDataflowPipelineRunner. La console affiche un message semblable au suivant, qui contient les informations liées à la version du SDK Dataflow :

Python

Pour connaître la version du SDK Dataflow exécutée par un pipeline spécifique, vous pouvez consulter la sortie de la console lors de l'exécution avec DataflowRunner. La console affiche un message semblable au suivant, qui contient les informations liées à la version du SDK Dataflow :

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interagir avec votre tâche Cloud Dataflow

Puis-je accéder aux nœuds de calcul (VM Compute Engine) de ma tâche pendant l'exécution de mon pipeline ?

Vous pouvez afficher les instances de VM d'un pipeline donné à l'aide de Google Cloud Console. Vous pouvez ensuite vous connecter en SSH à chaque instance depuis la console. Sachez toutefois que le service Dataflow s'arrête automatiquement et nettoie les instances de VM dès que votre tâche se termine ou échoue.

Dans l'interface de surveillance de Cloud Dataflow, pourquoi ne puis-je pas voir le temps CPU réservé de ma tâche en flux continu ?

Le service Dataflow n'indique le temps CPU réservé qu'une fois une tâche terminée. Dans le cas d'une tâche illimitée, le temps CPU réservé n'est indiqué qu'après son annulation ou son échec.

Dans l'interface de surveillance de Cloud Dataflow, pourquoi les informations sur l'état et le filigrane des tâches ne sont-elles pas disponibles pour les tâches en flux continu récemment mises à jour ?

Les opérations de mise à jour apportent plusieurs modifications qui mettent quelques minutes à se propager sur l'interface de surveillance de Dataflow. Essayez d'actualiser l'interface de surveillance cinq minutes après la mise à jour de la tâche.

Pourquoi mes transformations composites personnalisées sont-elles développées dans l'interface de surveillance de Dataflow ?

Dans votre code de pipeline, vous avez peut-être appelé votre transformation composite de la manière suivante :

result = transform.apply(input);

Les transformations composites appelées de cette façon omettent l'imbrication attendue et peuvent donc apparaître développées dans l'interface de surveillance de Dataflow. Votre pipeline peut également générer des erreurs ou des avertissements relatifs aux noms uniques stables au moment de l'exécution du pipeline.

Pour éviter ces problèmes, assurez-vous d'appeler vos transformations à l'aide du format recommandé :

result = input.apply(transform);

Pourquoi les informations sur ma tâche en cours n'apparaissent-elles plus dans l'interface de surveillance de Cloud Dataflow alors qu'elles s'affichaient auparavant ?

Un problème connu peut actuellement affecter certaines tâches Dataflow qui s'exécutent depuis un mois ou plus. Ces tâches peuvent ne pas se charger dans l'interface de surveillance de Dataflow ou présenter des informations obsolètes, même si elles étaient visibles auparavant.

Vous pouvez toujours consulter l'état de votre tâche dans la liste des tâches lorsque vous utilisez l'interface de surveillance ou l'interface de ligne de commande de Dataflow. Vous ne pouvez toutefois pas afficher les informations relatives à votre tâche si ce problème est présent.

Programmer avec le SDK Apache Beam pour Java

Puis-je transmettre des données supplémentaires (hors bande) à une opération ParDo existante ?

Oui. Vous pouvez appliquer différents schémas en fonction de votre cas d'utilisation :

  • Vous pouvez sérialiser des informations sous forme de champs dans votre sous-classe DoFn.
  • Toutes les variables référencées par les méthodes dans une fonction DoFn anonyme sont automatiquement sérialisées.
  • Vous pouvez calculer des données dans DoFn.startBundle().
  • Vous pouvez transmettre des données via ParDo.withSideInputs.

Pour en savoir plus, consultez la documentation ParDo, en particulier les sections "Creating a DoFn" (Créer une fonction DoFn) et "Side Inputs" (Entrées secondaires), ainsi que la documentation de référence de l'API pour Java qui est consacrée à ParDo.

Comment les exceptions Java sont-elles traitées dans Cloud Dataflow ?

Votre pipeline peut générer des exceptions lors du traitement des données. Certaines de ces erreurs sont transitoires (par exemple, difficulté temporaire à accéder à un service externe), mais certaines sont permanentes, telles que les erreurs causées par des données d'entrée corrompues ou impossibles à analyser, ou par des pointeurs vides lors du calcul.

Dataflow traite les éléments dans des ensembles arbitraires et relance l'ensemble complet lorsqu'une erreur est générée pour l'un des éléments qu'il contient. Lors de l'exécution en mode de traitement par lots, les ensembles comprenant un élément défaillant sont relancés quatre fois. Le pipeline échoue complètement lorsqu'un ensemble échoue quatre fois. Lors de l'exécution en mode de traitement par flux, un ensemble comprenant un élément défaillant est relancé indéfiniment, ce qui risque de bloquer votre pipeline de manière permanente.

Les exceptions détectées dans le code utilisateur (par exemple, dans vos instances DoFn) sont signalées dans l'interface de surveillance de Dataflow. Si vous exécutez votre pipeline avec BlockingDataflowPipelineRunner, des messages d'erreur s'affichent également dans la console ou la fenêtre de terminal.

Envisagez d'ajouter des gestionnaires d'exceptions pour éviter toute erreur dans votre code. Par exemple, si vous souhaitez supprimer des éléments qui font échouer une validation d'entrée personnalisée effectuée dans une transformation ParDo, utilisez un bloc "try" ou "catch" dans ParDo pour gérer les exceptions et supprimer les éléments. Pour effectuer le suivi du nombre d'erreurs, vous pouvez utiliser des transformations d'agrégation.

Programmer avec le SDK Cloud Dataflow pour Python

Comment gérer les erreurs NameError ?

Si vous obtenez une erreur NameError lorsque vous exécutez votre pipeline à l'aide du service Dataflow alors que tout fonctionne normalement lorsque vous l'exécutez localement (c'est-à-dire à l'aide de DirectRunner), il est possible que vos fonctions DoFn utilisent des valeurs de l'espace de noms global qui ne sont pas disponibles sur le nœud de calcul Dataflow.

Par défaut, les importations, les fonctions et les variables globales définies dans la session principale ne sont pas enregistrées lors de la sérialisation d'une tâche Dataflow. Par exemple, si vos fonctions DoFn sont définies dans le fichier principal, et si elles référencent les importations et fonctions de l'espace de noms global, vous pouvez définir l'option de pipeline --save_main_session sur True. L'état de l'espace de noms global sera alors picklé et chargé sur le nœud de calcul Dataflow.

Notez que si vous possédez des objets ne pouvant pas être picklés dans votre espace de noms global, vous obtenez une erreur de pickling. Si l'erreur concerne un module qui devrait être disponible dans la distribution Python, vous pouvez l'importer localement (là où il est utilisé) pour résoudre le problème.

Par exemple, au lieu du code suivant :

import re
…
def myfunc():
  # use re module

utilisez les lignes ci-dessous :

def myfunc():
  import re
  # use re module

Si vos fonctions DoFn couvrent plusieurs fichiers, vous devez utiliser une approche différente pour mettre en package votre workflow et gérer les dépendances.

E/S pipeline

La source et le récepteur TextIO acceptent-ils les fichiers compressés tels que GZip ?

Oui. Dataflow Java peut lire les fichiers compressés avec gzip et bzip2. Pour en savoir plus, consultez la documentation de TextIO.

Puis-je utiliser une expression régulière pour cibler des fichiers spécifiques avec la source TextIO ?

Dataflow est compatible avec les schémas généraux de caractères génériques. Votre expression glob peut apparaître n'importe où dans le chemin de fichier. Dataflow n'accepte toutefois pas les caractères génériques récursifs (**).

La source d'entrée TextIO est-elle compatible avec JSON ?

Oui. Vos données sources doivent toutefois être délimitées par un saut de ligne pour que le service Dataflow puisse paralléliser l'entrée et la sortie.

Pourquoi le rééquilibrage dynamique du travail ne s'active-t-il pas avec ma source personnalisée ?

Le rééquilibrage dynamique du travail est activé par la valeur de retour de la méthode getProgress() de votre source personnalisée. L'implémentation par défaut de getProgress() renvoie null. Pour garantir l'activation de l'autoscaling, assurez-vous que votre source personnalisée remplace getProgress()afin de renvoyer une valeur appropriée.

Comment accéder aux ensembles de données BigQuery ou aux sujets ou abonnements Pub/Sub appartenant à un autre projet Google Cloud Platform (c'est-à-dire, différent de celui avec lequel j'utilise Cloud Dataflow) ?

Consultez le guide Sécurité et autorisations de Dataflow pour découvrir comment accéder aux données BigQuery ou Pub/Sub d'un projet Google Cloud différent de celui avec lequel vous utilisez Dataflow.

Pourquoi des erreurs "rateLimitExceeded" s'affichent-elles lorsque j'utilise le connecteur BigQuery, et que dois-je faire dans ce cas ?

BigQuery applique des limites de quota à court terme lorsqu'un nombre de requêtes API trop important est envoyé dans un court laps de temps. Il est possible que votre pipeline Dataflow dépasse temporairement ce quota. Lorsque cela se produit, les requêtes API de votre pipeline Dataflow vers BigQuery peuvent échouer, ce qui peut entraîner l'apparition d'erreurs rateLimitExceeded dans les journaux des nœuds de calcul. Notez que Dataflow effectue de nouvelles tentatives en cas de défaillance. Vous pouvez donc ignorer ces erreurs. Si vous pensez que votre pipeline est fortement affecté par des erreurs rateLimitExceeded, veuillez contacter l'assistance Google Cloud.

J'utilise le connecteur BigQuery pour écrire sur BigQuery à l'aide d'insertions en flux continu, et mon débit en écriture est inférieur à celui attendu. Que puis-je faire pour résoudre ce problème ?

Le débit peut être ralenti lorsque votre pipeline dépasse le quota d'insertion en flux continu BigQuery disponible. Dans ce cas, des messages d'erreur liés au quota de BigQuery s'affichent dans les journaux des nœuds de calcul Dataflow (recherchez les erreurs quotaExceeded). Si vous rencontrez de telles erreurs, envisagez de configurer l'option de récepteur BigQuery ignoreInsertIds() (si vous utilisez le SDK Apache Beam pour Java), ou d'utiliser l'option ignore_insert_ids (si vous utilisez le SDK Apache Beam pour Python). Ainsi, vous aurez automatiquement accès à un débit d'insertion en flux continu BigQuery de 1 Go/s. Pour en savoir plus sur les mises en garde liées à la déduplication automatique des messages, consultez la documentation BigQuery. Pour augmenter le quota d'insertion en flux continu BigQuery au-delà de 1 Go/s, envoyez une demande via Cloud Console.

Si les erreurs liées aux quotas ne s'affichent pas dans les journaux des nœuds de calcul, le problème peut être dû au fait que le regroupement par défaut ou les paramètres liés au traitement par lots ne fournissent pas le parallélisme approprié pour votre pipeline. Plusieurs configurations liées au connecteur BigQuery de Dataflow peuvent être ajustées afin d'atteindre les performances attendues lorsque vous écrivez des données dans BigQuery à l'aide d'insertions en flux continu. Par exemple, pour le SDK Apache Beam pour Java, ajustez numStreamingKeys afin que sa valeur corresponde au nombre maximal de nœuds de calcul. Envisagez également d'augmenter la valeur de insertBundleParallelism afin de configurer le connecteur BigQuery de sorte qu'il utilise davantage de threads parallèles pour l'écriture dans BigQuery. Pour connaître les configurations disponibles dans le SDK Apache Beam pour Java, consultez la page BigQueryPipelineOptions. Pour connaître les configurations disponibles dans le SDK Apache Beam pour Python, consultez la page sur la transformation WriteToBigQuery.

Traitement par flux

Comment exécuter mon pipeline en mode de traitement par flux ?

Vous pouvez définir l'indicateur --streaming sur la ligne de commande lorsque vous exécutez votre pipeline. Vous avez également la possibilité de définir le mode de traitement par flux de façon automatisée lorsque vous construisez votre pipeline.

Quels sources et récepteurs de données sont compatibles avec le mode de traitement par flux ?

Vous pouvez lire des flux de données depuis Pub/Sub et en écrire dans Pub/Sub ou BigQuery.

Quelles sont les limites actuelles relatives au mode de traitement par flux ?

Les sources de traitement par lots ne sont pas encore compatibles avec le mode de traitement par flux.

On dirait que mon pipeline de traitement par flux qui lit des données depuis Pub/Sub ralentit. Que puis-je faire ?

Votre projet ne dispose peut-être pas d'un quota Pub/Sub suffisant. Vous pouvez savoir si le quota de votre projet est insuffisant en recherchant les erreurs client 429 (Rate limit exceeded) :

  1. Accédez à Google Cloud Console.
  2. Dans le menu de gauche, sélectionnez API et services.
  3. Dans le champ de recherche, recherchez Cloud Pub/Sub.
  4. Cliquez sur l'onglet Utilisation.
  5. Vérifiez les codes de réponse et recherchez les codes d'erreur client (4xx).

Pourquoi ma tâche en flux continu ne s'étend-elle pas correctement lorsque j'augmente la taille du pool de nœuds de calcul de mon pipeline ?

Java

Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le nombre de nœuds de calcul et de ressources de disques persistants ne peut pas dépasser la quantité allouée au début de la tâche d'origine. Lorsque vous mettez à jour une tâche Dataflow et augmentez le nombre de nœuds de calcul de la nouvelle tâche, vous ne pouvez spécifier qu'un nombre de nœuds égal à la valeur --maxNumWorkers spécifiée pour votre tâche d'origine.

Python

Pour les tâches de traitement par flux qui n'utilisent pas Streaming Engine, le nombre de nœuds de calcul et de ressources de disques persistants ne peut pas dépasser la quantité allouée au début de la tâche d'origine. Lorsque vous mettez à jour une tâche Dataflow et augmentez le nombre de nœuds de calcul de la nouvelle tâche, vous ne pouvez spécifier qu'un nombre de nœuds égal à la valeur --max_num_workers spécifiée pour votre tâche d'origine.

Autoscaling de flux

Comment puis-je définir un nombre de nœuds de calcul fixe ?

L'autoscaling en flux continu n'est pas activé par défaut. Vous devez activer cette fonctionnalité vous-même. Comme la sémantique des options actuelles ne change pas, aucune action n'est requise de votre part si vous souhaitez continuer à utiliser un nombre de nœuds de calcul fixe.

J'ai peur que l'autoscaling augmente ma facture. Comment puis-je limiter cette fonctionnalité ?

Java

Vous pouvez spécifier --maxNumWorkers pour limiter la plage de scaling utilisée pour le traitement de votre tâche.

Python

Vous pouvez spécifier --max_num_workers pour limiter la plage de scaling utilisée pour le traitement de votre tâche.

Quelle est la plage de scaling des pipelines d'autoscaling en flux continu ?

Java

Pour les tâches d'autoscaling en flux continu qui n'utilisent pas Streaming Engine, le service Dataflow alloue entre 1 et 15 disques persistants à chaque nœud de calcul. Cela signifie que le nombre minimal de nœuds de calcul utilisés pour un pipeline d'autoscaling en flux continu est de N/15, où N est la valeur de --maxNumWorkers.

Pour les tâches d'autoscaling en flux continu qui utilisent Streaming Engine, le nombre minimal de nœuds de calcul est de 1.

Dataflow équilibre le nombre de disques persistants entre les nœuds de calcul. Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --maxNumWorkers=15. Le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul (1, 2, 3, 4, 5, 8 ou 15 nœud(s) de calcul, ce qui correspond à 15, 8, 5, 4, 3, 2 ou 1 disque(s) persistant(s) par nœud de calcul, respectivement).

La valeur --maxNumWorkers ne peut pas dépasser 1 000.

Python

Pour les tâches d'autoscaling en flux continu qui n'utilisent pas Streaming Engine, le service Dataflow alloue entre 1 et 15 disques persistants à chaque nœud de calcul. Cela signifie que le nombre minimal de nœuds de calcul utilisés pour un pipeline d'autoscaling en flux continu est de N/15, où N est la valeur de --max_num_workers.

Pour les tâches d'autoscaling en flux continu qui utilisent Streaming Engine, le nombre minimal de nœuds de calcul est de 1.

Dataflow équilibre le nombre de disques persistants entre les nœuds de calcul. Par exemple, si votre pipeline a besoin de 3 ou 4 nœuds de calcul prêts à l'emploi, vous pouvez définir --max_num_workers=15. Le pipeline s'adapte automatiquement afin d'utiliser 1 à 15 nœuds de calcul (1, 2, 3, 4, 5, 8 ou 15 nœud(s) de calcul, ce qui correspond à 15, 8, 5, 4, 3, 2 ou 1 disque(s) persistant(s) par nœud de calcul, respectivement).

La valeur --max_num_workers ne peut pas dépasser 1 000.

Quel est le nombre maximal de nœuds de calcul utilisables par l'autoscaling ?

Java

Dataflow utilise le quota d'instances Compute Engine ou la valeur maxNumWorkers de votre projet comme limite, selon la valeur la plus basse.

Python

Dataflow utilise le quota d'instances Compute Engine ou la valeur max_num_workers de votre projet comme limite, selon la valeur la plus basse.

Puis-je désactiver l'autoscaling sur mon pipeline de traitement par flux ?

Java

Oui. Définissez --autoscalingAlgorithm=NONE. Mettez à jour le pipeline avec des spécifications de cluster fixe (comme décrit dans la documentation sur le scaling manuel) en attribuant à numWorkers une valeur située dans la plage de scaling.

Python

Oui. Définissez --autoscaling_algorithm=NONE. Mettez à jour le pipeline avec des spécifications de cluster fixe (comme décrit dans la documentation sur le scaling manuel) en attribuant à num_workers une valeur située dans la plage de scaling.

Puis-je modifier la plage de scaling de mon pipeline de traitement par flux ?

Java

Oui, mais cette opération n'est pas possible à l'aide de la méthode Update*. Vous devez arrêter votre pipeline à l'aide de l'option Annuler ou Drainer, puis le redéployer avec la nouvelle valeur maxNumWorkers souhaitée.

Python

Oui, mais cette opération n'est pas possible à l'aide de la méthode Update*. Vous devez arrêter votre pipeline à l'aide de l'option Annuler ou Drainer, puis le redéployer avec la nouvelle valeur max_num_workers souhaitée.

Configurer votre projet Google Cloud Platform afin d'utiliser Cloud Dataflow

Comment savoir si le projet que j'utilise avec Cloud Dataflow est bien propriétaire d'un bucket Cloud Storage sur lequel je souhaite lire ou écrire des données ?

Pour savoir si votre projet Google Cloud détient un bucket Cloud Storage spécifique, vous pouvez utiliser la commande de console suivante :

gsutil acl get gs://<your-bucket>

La commande génère une chaîne JSON semblable à celle-ci :

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

Les entrées qui vous intéressent sont celles marquées du rôle "owner" (propriétaire). La valeur projectNumber associée vous indique quel projet est propriétaire de ce bucket. Si ce numéro ne correspond pas à celui de votre projet, vous devez effectuer l'une des opérations suivantes :

  • Créer un bucket appartenant à votre projet
  • Accorder aux comptes appropriés un accès au bucket

Comment créer un bucket appartenant à mon projet Cloud Dataflow ?

Pour créer un bucket dans le projet Google Cloud avec lequel vous utilisez Cloud Dataflow, vous pouvez utiliser la commande de console suivante :

gsutil mb -p <Project to own the bucket> <bucket-name>

Comment accorder au projet Google Cloud Platform avec lequel j'utilise Cloud Dataflow un accès en lecture ou en écriture à un bucket détenu par un autre projet ?

Consultez le guide Sécurité et autorisations de Dataflow pour découvrir comment votre pipeline Dataflow peut accéder aux ressources Google Cloud appartenant à un autre projet Google Cloud.

Lorsque j'essaie d'exécuter ma tâche Cloud Dataflow, j'obtiens l'erreur "Vous devez activer certaines API Cloud dans votre projet pour pouvoir exécuter cette tâche Cloud Dataflow". Que dois-je faire ?

Pour exécuter une tâche Dataflow, vous devez activer les API Google Cloud suivantes dans votre projet :

  • API Compute Engine (Compute Engine)
  • API Cloud Logging
  • Cloud Storage
  • API Cloud Storage JSON
  • API BigQuery
  • Pub/Sub
  • API Datastore

Consultez le guide de démarrage sur l'activation des API Google Cloud pour en savoir plus.