Développer des notebooks Apache Beam avec l'exécuteur interactif

Utilisez l'exécuteur interactif Apache Beam avec les notebooks JupyterLab pour effectuer les tâches suivantes :

  • Développer des pipelines de manière itérative.
  • Inspecter le graphique de votre pipeline.
  • Analyser des PCollections individuels dans un workflow REPL (read-eval-print-loop).

Ces notebooks Apache Beam sont disponibles via des notebooks gérés par l'utilisateur Vertex AI Workbench, un service qui héberge des machines virtuelles de notebooks préinstallées avec les derniers frameworks de data science. et de machine learning. Dataflow n'est compatible qu'avec les instances de notebooks gérées par l'utilisateur.

Ce guide se concentre sur la fonctionnalité introduite par les notebooks Apache Beam, mais n'explique pas comment créer un notebook. Pour plus d'informations sur Apache Beam, consultez le guide de programmation d'Apache Beam.

Compatibilité et limites

  • Les notebooks Apache Beam ne sont compatibles qu'avec Python.
  • Les segments de pipeline Apache Beam exécutés dans ces notebooks s'exécutent dans un environnement de test, et non sur un exécuteur de production Apache Beam. Pour lancer les notebooks sur le service Dataflow, exportez les pipelines créés dans votre notebook Apache Beam. Pour en savoir plus, consultez la section Lancer des tâches Dataflow à partir d'un pipeline créé dans votre notebook.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

Avant de créer votre instance de notebook Apache Beam, activez des API supplémentaires pour les pipelines utilisant d'autres services, tels que Pub/Sub.

Si elle n'est pas spécifiée, l'instance de notebook est exécutée par le compte de service Compute Engine par défaut avec le rôle d'éditeur de projet IAM. Si les limites du rôle du compte de service sont explicitement définies dans le projet, assurez-vous qu'il dispose des autorisations suffisantes pour exécuter vos notebooks. Par exemple, la lecture d'un sujet Pub/Sub crée implicitement un abonnement, et votre compte de service nécessite un rôle d'éditeur Pub/Sub IAM. En revanche, la lecture d'un abonnement Pub/Sub ne nécessite qu'un rôle d'abonné Pub/Sub IAM.

Une fois que vous avez terminé ce guide, supprimez les ressources que vous avez créées pour éviter de continuer à payer des frais. Pour plus d'informations, consultez la section Nettoyer.

Lancer une instance de notebook Apache Beam

  1. Dans la console Google Cloud, accédez à la page Workbench de Dataflow.

    Accéder à Workbench

  2. Vérifiez que vous êtes bien dans l'onglet Notebooks gérés par l'utilisateur.

  3. Dans la barre d'outils, cliquez sur Créer.

  4. Dans la section Environnement, pour Environnement, sélectionnez Apache Beam.

  5. Facultatif : Si vous souhaitez exécuter des notebooks sur un GPU, dans la section Type de machine, sélectionnez un type de machine compatible avec les GPU, puis sélectionnez Installer le pilote de GPU NVIDIA automatiquement pour moi. Pour en savoir plus, consultez la page Plates-formes de GPU.

  6. Dans la section Mise en réseau, sélectionnez un sous-réseau pour la VM de notebook.

  7. Facultatif : Si vous souhaitez configurer une instance de notebook personnalisée, consultez la section Créer une instance de notebook gérée par l'utilisateur avec des propriétés spécifiques.

  8. Cliquez sur Créer. Dataflow Workbench crée une instance de notebook Apache Beam.

  9. Une fois l'instance de notebook créée, le lien Ouvrir JupyterLab devient actif. Cliquez sur Ouvrir JupyterLab.

Facultatif : installer les dépendances

Les dépendances de connecteur Apache Beam et Google Cloud sont déjà installées sur les notebooks Apache Beam. Si votre pipeline contient des connecteurs ou des PTransforms personnalisés qui dépendent de bibliothèques tierces, installez-les après avoir créé une instance de notebook. Pour en savoir plus, consultez la section Installer les dépendances dans la documentation sur les notebooks gérés par l'utilisateur.

Exemples de notebooks Apache Beam

Après avoir créé une instance de notebooks gérés par l'utilisateur, ouvrez-la dans JupyterLab. Dans l'onglet Fichiers de la barre latérale JupyterLab, le dossier Exemples contient des exemples de notebooks. Pour en savoir plus sur l'utilisation des fichiers JupyterLab, consultez la section Utiliser des fichiers du guide de l'utilisateur JupyterLab.

Les notebooks suivants sont disponibles:

  • Nombre de mots
  • Nombre de mots en flux continu
  • Données en flux continu sur les courses en taxi à New York
  • Apache Beam SQL dans les notebooks avec comparaisons aux pipelines
  • Apache Beam SQL dans les notebooks avec l'exécuteur Dataflow
  • Apache Beam SQL dans les notebooks
  • Nombre de mots Dataflow
  • Flink interactif à grande échelle
  • RunInference
  • Utiliser des GPU avec Apache Beam
  • Visualiser des données

Le dossier Tutoriels contient des tutoriels supplémentaires qui expliquent les principes de base d'Apache Beam. Les tutoriels suivants sont disponibles:

  • Opérations de base
  • Opérations au niveau des éléments
  • Agrégations
  • Windows
  • Opérations d'E/S
  • Streaming
  • Exercices finaux

Ces notebooks incluent du texte explicatif et des blocs de code commentés pour vous aider à comprendre les concepts d'Apache Beam et l'utilisation de l'API. Les tutoriels fournissent également des exercices pour vous entraîner à maîtriser les concepts abordés.

Les sections suivantes utilisent l'exemple de code du notebook "Nombre de mots en flux continu". Les extraits de code de ce guide et ceux du notebook "Nombre de mots en flux continu" peuvent présenter des écarts mineurs.

Créer une instance de notebook

Accédez à Fichier > Nouveau > Notebook, puis sélectionnez un noyau Apache Beam 2.22 ou version ultérieure.

Les notebooks Apache Beam sont créés par rapport à la branche principale du SDK Apache Beam. Cela signifie que la dernière version du noyau affichée dans l'interface utilisateur des notebooks peut être antérieure à la version la plus récente du SDK.

Apache Beam est installé sur votre instance de notebook. Par conséquent, incluez les modules interactive_runner et interactive_beam dans votre notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Si votre notebook utilise d'autres API Google, ajoutez les instructions d'importation suivantes :

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Définir les options d'interactivité

La ligne suivante définit la durée pendant laquelle InteractiveRunner enregistre les données d'une source illimitée. Dans cet exemple, la durée est définie sur 10 minutes.

ib.options.recording_duration = '10m'

Vous pouvez également modifier la taille limite d'enregistrement (en octets) d'une source illimitée en utilisant la propriété recording_size_limit.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

Pour voir d'autres options interactives, consultez la classe interactive_beam.options.

Créer votre pipeline

Initialisez le pipeline à l'aide d'un objet InteractiveRunner.

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

Lire et visualiser les données

L'exemple suivant montre un pipeline Apache Beam qui crée un abonnement au sujet Pub/Sub donné et effectue des lectures à partir de l'abonnement.

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

Le pipeline compte les mots par fenêtres à partir de la source. Il crée un fenêtrage fixe, chaque fenêtre ayant une durée de 10 secondes.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Une fois les données fenêtrées, les mots sont comptés par fenêtre.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

La méthode show() permet de visualiser la PCollection obtenue dans le notebook.

ib.show(windowed_word_counts, include_window_info=True)

Méthode d'affichage permettant de visualiser une PCollection sous forme de tableau.

Vous pouvez réduire le champ d'application de l'ensemble de résultats à partir de show() en définissant deux paramètres facultatifs : n et duration.

  • Définissez n de sorte que l'ensemble de résultats affiche au maximum n éléments, par exemple 20. Si n n'est pas défini, le comportement par défaut consiste à répertorier les éléments les plus récents capturés jusqu'à la fin de l'enregistrement source.
  • Définissez duration pour limiter l'ensemble de résultats à un nombre spécifié de secondes de données à compter du début de l'enregistrement source. Si duration n'est pas défini, le comportement par défaut consiste à répertorier tous les éléments jusqu'à la fin de l'enregistrement.

Si les deux paramètres facultatifs sont définis, show() s'arrête chaque fois qu'un seuil est atteint. Dans l'exemple suivant, show() renvoie au maximum 20 éléments calculés sur la base des 30 premières secondes de données provenant des sources enregistrées.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

Pour afficher les visualisations de vos données, transmettez visualize_data=True à la méthode show(). Vous pouvez appliquer plusieurs filtres à vos visualisations. La visualisation suivante vous permet de filtrer par libellé et par axe :

Méthode d'affichage permettant de visualiser une PCollection sous la forme d'un ensemble complet d'éléments d'interface utilisateur filtrables.

Pour garantir la répétabilité lors du prototypage de pipelines de traitement par flux, les appels de méthode show() réutilisent les données capturées par défaut. Pour modifier ce comportement et faire en sorte que la méthode show() extraie toujours de nouvelles données, définissez interactive_beam.options.enable_capture_replay = False. En outre, si vous ajoutez une deuxième source illimitée à votre notebook, les données de la source illimitée précédente sont supprimées.

Un DataFrame Pandas est une autre visualisation utile dans les notebooks Apache Beam. L'exemple suivant convertit d'abord les mots en minuscules, puis calcule la fréquence de chaque mot.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

La méthode collect() fournit le résultat dans un DataFrame pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Méthode de collecte représentant une PCollection dans un DataFrame pandas.

La modification et la réexécution d'une cellule sont courantes dans le développement de notebook. Lorsque vous modifiez et réexécutez une cellule dans un notebook Apache Beam, la cellule n'annule pas l'action prévue du code dans la cellule d'origine. Par exemple, si une cellule ajoute un PTransform à un pipeline, la réexécution de cette cellule ajoute un PTransform supplémentaire au pipeline. Si vous souhaitez effacer l'état, redémarrez le noyau et réexécutez les cellules.

Visualiser les données via l'outil d'inspection interactif de Beam

Il est possible que cela vous prenne beaucoup de temps d'examiner les données d'une PCollection en appelant constamment show() et collect(), en particulier lorsque le résultat prend beaucoup d'espace sur votre écran et complexifie la navigation sur le notebook. Vous pouvez également avoir besoin de comparer plusieurs PCollections côte à côte pour vérifier si une transformation fonctionne comme prévu. Par exemple, lorsqu'une PCollection subit une transformation et produit l'autre. Pour ces cas d'utilisation, l'outil d'inspection interactif de Beam est une solution plus pratique.

L'outil d'inspection interactif de Beam est fourni sous la forme d'une extension JupyterLab apache-beam-jupyterlab-sidepanel préinstallée dans le notebook Apache Beam. Avec l'extension, vous pouvez examiner de manière interactive l'état des pipelines et des données associés à chaque PCollection sans appeler explicitement show() ou collect().

Il existe trois façons d'ouvrir l'outil d'inspection :

  • Cliquez sur Interactive Beam dans la barre de menu supérieure de JupyterLab. Dans la liste déroulante, localisez Open Inspector, puis cliquez dessus pour ouvrir l'outil d'inspection.

    Ouvrir l'outil d'inspection via le menu

  • Utilisez la page du lanceur d'applications. Si aucune page du lanceur d'applications ne s'ouvre, cliquez sur File -> New Launcher pour l'ouvrir. Sur la page du lanceur d'applications, localisez Interactive Beam et cliquez sur Open Inspector pour ouvrir l'outil d'inspection.

    Ouvrir l'outil d'inspection via le lanceur

  • Utilisez la palette de commandes. Dans la barre de menu JupyterLab, cliquez sur View > Activate Command Palette. Dans la boîte de dialogue, recherchez Interactive Beam pour répertorier toutes les options de l'extension. Cliquez sur Open Inspector pour ouvrir l'outil d'inspection.

    Ouvrir l'outil d'inspection via la palette de commandes

Lorsque l'outil d'inspection est sur le point de se lancer :

  • Si un seul notebook est ouvert, l'outil d'inspection s'y connecte automatiquement.

  • Si aucun notebook n'est ouvert, une boîte de dialogue vous permet de sélectionner un noyau.

  • Si plusieurs notebooks sont ouverts, une boîte de dialogue vous permet de sélectionner la session de notebook.

    Sélectionner le notebook auquel vous souhaitez vous connecter

Il est recommandé d'ouvrir au moins un notebook et de sélectionner un noyau pour celui-ci avant d'ouvrir l'outil d'inspection. Si vous ouvrez un outil d'inspection avec un noyau avant d'ouvrir un notebook, vous devrez sélectionner Interactive Beam Inspector Session dans Use Kernel from Preferred Session pour ouvrir un notebook et vous y connecter. Un outil d'inspection et un notebook sont connectés lorsqu'ils partagent la même session, et non des sessions différentes créées à partir du même noyau. Sélectionner le même noyau dans Start Preferred Kernel crée une session indépendante des sessions existantes de notebooks ou d'outils d'inspection ouverts.

Vous pouvez ouvrir plusieurs outils d'inspection pour un notebook ouvert et les organiser en effectuant librement un glisser-déposer de leurs onglets dans l'espace de travail.

Ouvrir deux outils d'inspection et les organiser côte à côte

La page de l'outil d'inspection s'actualise automatiquement lorsque vous exécutez des cellules dans le notebook. La page répertorie les pipelines et la PCollections définis dans le notebook connecté. Les fichiers PCollections sont organisés par pipelines auxquels ils appartiennent. Vous pouvez les réduire en cliquant sur le pipeline d'en-tête.

Lorsque vous cliquez sur les éléments de la liste des pipelines et des PCollections, l'outil d'inspection affiche les visualisations correspondantes à droite :

  • S'il s'agit d'une PCollection, l'outil d'inspection affiche ses données (de manière dynamique si les données sont toujours présentes pour les PCollections illimitées) avec des widgets supplémentaires pour ajuster la visualisation après avoir cliqué sur le bouton APPLY.

    Page de l'outil d'inspection

    Étant donné que l'outil d'inspection et le notebook ouvert partagent la même session de noyau, ils se bloquent l'un l'autre. Par exemple, si le notebook est occupé à exécuter du code, l'outil d'inspection ne se met à jour qu'une fois l'exécution du notebook terminée. À l'inverse, si vous souhaitez exécuter du code immédiatement dans votre notebook pendant que l'outil d'inspection visualise une PCollection de manière dynamique, vous devez cliquer sur le bouton STOP pour arrêter la visualisation et libérer le noyau de manière préemptive dans le notebook.

  • S'il s'agit d'un pipeline, l'outil d'inspection affiche le graphique correspondant.

    Page de l'outil d'inspection

Vous remarquerez peut-être des pipelines anonymes. Ces pipelines comportent des PCollections auxquelles vous pouvez accéder, mais ils ne sont plus référencés par la session principale. Par exemple :

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

L'exemple précédent crée un pipeline vide p et un pipeline anonyme contenant un pcoll PCollection. Vous pouvez accéder au pipeline anonyme à l'aide de pcoll.pipeline.

Vous pouvez activer/désactiver le pipeline et la liste PCollection pour économiser de l'espace pour les visualisations volumineuses. Activer/désactiver la liste de gauche

Comprendre l'état d'enregistrement d'un pipeline

Outre les visualisations, vous pouvez également inspecter l'état d'enregistrement d'un pipeline ou de l'ensemble des pipelines de votre instance de notebook en appelant describe.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

La méthode describe() fournit les informations suivantes :

  • Taille totale (en octets) de tous les enregistrements du pipeline sur le disque
  • Heure de début du démarrage de la tâche d'enregistrement en arrière-plan (en secondes à partir de l'époque Unix)
  • État actuel du pipeline de la tâche d'enregistrement en arrière-plan
  • Variable Python pour le pipeline

Lancer des tâches Dataflow à partir d'un pipeline créé dans votre notebook

  1. Facultatif : Avant d'utiliser votre notebook pour exécuter des tâches Dataflow, redémarrez le noyau, réexécutez toutes les cellules et vérifiez le résultat. Si vous ignorez cette étape, les états masqués du notebook peuvent affecter le graphique de la tâche dans l'objet de pipeline.
  2. Activez l'API Dataflow.
  3. Ajoutez la déclaration d'importation suivante :

    from apache_beam.runners import DataflowRunner
    
  4. Transmettez vos options de pipeline.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    Vous pouvez ajuster les valeurs des paramètres. Par exemple, vous pouvez remplacer la valeur region par us-central1.

  5. Exécutez le pipeline avec DataflowRunner. Cette opération exécute votre tâche sur le service Dataflow.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p est un objet de pipeline issu de l'étape Créer votre pipeline.

Pour obtenir un exemple sur la façon d'exécuter cette conversion sur un notebook interactif, consultez le notebook "Nombre de mots Dataflow" de votre instance de notebook.

Vous pouvez également exporter votre notebook en tant que script exécutable, modifier le fichier .py généré à l'aide des étapes précédentes, puis déployer votre pipeline sur le service Dataflow.

Enregistrer votre notebook

Les notebooks que vous créez sont enregistrés en local dans votre instance de notebook en cours d'exécution. Si vous réinitialisez ou arrêtez l'instance de notebook pendant le développement, ces notebooks sont conservés tant qu'ils sont créés dans le répertoire /home/jupyter. Toutefois, si une instance de notebook est supprimée, ces notebooks sont également supprimés.

Pour les conserver en vue d'une utilisation ultérieure, téléchargez-les localement sur votre poste de travail, enregistrez-les dans GitHub ou exportez-les dans un autre format de fichier.

Enregistrer votre notebook sur des disques persistants supplémentaires

Si vous souhaitez conserver votre travail, tel que les notebooks et les scripts sur différentes instances de notebook, vous pouvez les stocker sur un disque persistant.

  1. Créez ou associez un disque persistant. Suivez les instructions pour utiliser ssh afin de vous connecter à la VM de l'instance de notebook et d'exécuter des commandes dans Cloud Shell ouvert.

  2. Notez le répertoire dans lequel le disque persistant est installé, par exemple /mnt/myDisk.

  3. Modifiez les détails de la VM de l'instance de notebook pour ajouter une entrée à Custom metadata : clé container-custom-params ; valeur -v /mnt/myDisk:/mnt/myDisk. Métadonnées supplémentaires nécessaires pour lier le disque persistant installé

  4. Cliquez sur Enregistrer.

  5. Pour mettre à jour ces modifications, réinitialisez l'instance de notebook. Réinitialiser une instance de notebook

  6. Après la réinitialisation, cliquez sur Open JupyterLab (Ouvrir JupyterLab). L'UI de JupyterLab peut prendre un certain temps à s'afficher. Une fois l'UI affichée, ouvrez un terminal et exécutez la commande suivante : ls -al /mnt. Le répertoire /mnt/myDisk doit être listé. Répertorier les volumes associés

Vous pouvez maintenant enregistrer votre travail dans le répertoire /mnt/myDisk. Même si l'instance de notebook est supprimée, le disque persistant existe dans votre projet. Vous pouvez ensuite associer ce disque persistant à d'autres instances de notebook.

Effectuer un nettoyage

Une fois que vous avez fini d'utiliser votre instance de notebook Apache Beam, arrêtez l'instance de notebook pour nettoyer les ressources que vous avez créées sur Google Cloud.

Étapes suivantes