Utiliser des conteneurs personnalisés avec des bibliothèques C++


Dans ce tutoriel, vous allez créer un pipeline qui utilise des conteneurs personnalisés avec des bibliothèques C++ pour exécuter un workflow Dataflow HPC hautement parallèle. Utilisez ce tutoriel afin d'apprendre à utiliser Dataflow et Apache Beam pour exécuter des applications d'informatique en grille qui nécessitent la distribution des données à des fonctions s'exécutant sur de nombreux cœurs.

Ce tutoriel explique comment exécuter le pipeline en utilisant tout d'abord l'exécuteur Direct Runner, puis l'exécuteur Dataflow Runner. En exécutant le pipeline localement, vous pouvez tester le pipeline avant de le déployer.

Cet exemple utilise les liaisons et les fonctions Cython de la bibliothèque GMP. Quelle que soit la bibliothèque ou l'outil de liaison que vous utilisez, vous pouvez appliquer les mêmes principes à votre pipeline.

L'exemple de code est disponible sur GitHub.

Objectifs

  • Créer un pipeline utilisant des conteneurs personnalisés avec des bibliothèques C++.

  • Créer une image de conteneur Docker à l'aide d'un fichier Dockerfile.

  • Empaqueter le code et les dépendances dans un conteneur Docker.

  • Exécuter le pipeline en local pour le tester.

  • Exécuter le pipeline dans un environnement distribué.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Créez un compte de service de nœud de calcul géré par l'utilisateur pour votre nouveau pipeline et attribuez à ce compte les rôles nécessaires.

    1. Pour créer le compte de service, exécutez la commande gcloud iam service-accounts create :

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Attribuez des rôles au compte de service. Exécutez la commande ci-dessous une fois pour chacun des rôles IAM suivants :

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Remplacez SERVICE_ACCOUNT_ROLE par chaque rôle individuel.

    3. Attribuez à votre compte Google un rôle qui vous permet de créer des jetons d'accès pour le compte de service :

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Téléchargez l'exemple de code, puis modifiez les répertoires.

Téléchargez l'exemple de code, puis modifiez les répertoires. Les exemples de code dans le dépôt GitHub fournissent tout le code dont vous avez besoin pour exécuter ce pipeline. Lorsque vous êtes prêt à créer votre propre pipeline, vous pouvez utiliser cet exemple de code comme modèle.

Clonez le dépôt beam-cpp-example.

  1. Exécutez la commande git clone pour cloner le dépôt GitHub :

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Accédez au répertoire de l'application :

    cd dataflow-sample-applications/beam-cpp-example
    

Code du pipeline

Vous pouvez personnaliser le code du pipeline à partir de ce tutoriel. Ce pipeline effectue les tâches suivantes :

  • Génère de manière dynamique tous les entiers dans une plage d'entrée.
  • Exécute les entiers via une fonction C++ et filtre les valeurs incorrectes.
  • Écrit des valeurs incorrectes dans un canal secondaire.
  • Compte l'occurrence de chaque temps d'arrêt et normalise les résultats.
  • Imprime la sortie, en formatant et en écrivant les résultats dans un fichier texte.
  • Crée une PCollection avec un seul élément.
  • Traite l'élément unique avec une fonction map et transmet la fréquence PCollection en tant qu'entrée secondaire.
  • Traite la PCollection et génère une seule sortie.

Le fichier de démarrage se présente comme suit :

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configurer l'environnement de développement

  1. Utilisez le SDK Apache Beam pour Python.

  2. Installez la bibliothèque GMP :

    apt-get install libgmp3-dev
    
  3. Pour installer les dépendances, utilisez le fichier requirements.txt.

    pip install -r requirements.txt
    
  4. Pour créer les liaisons Python, exécutez la commande suivante.

    python setup.py build_ext --inplace
    

Vous pouvez personnaliser le fichier requirements.txt à l'aide de ce tutoriel. Le fichier de démarrage inclut les dépendances suivantes :

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Exécuter le pipeline en local

L'exécution du pipeline en local est utile à des fins de test. En exécutant le pipeline localement, vous pouvez vérifier qu'il s'exécute et se comporte comme prévu avant de le déployer dans un environnement distribué.

Vous pouvez exécuter le pipeline en local à l'aide de la commande suivante. Cette commande génère une image nommée out.png.

python pipeline.py

Créer les ressources Google Cloud

Cette section explique comment créer les ressources suivantes :

  • Un bucket Cloud Storage à utiliser comme emplacement de stockage temporaire et un emplacement de sortie.
  • Un conteneur Docker pour empaqueter le code du pipeline et les dépendances.

Créer un bucket Cloud Storage

Commencez par créer un bucket Cloud Storage à l'aide de Google Cloud CLI. Ce bucket sert d'emplacement de stockage temporaire pour le pipeline Dataflow.

Pour créer le bucket, utilisez la commande gcloud storage buckets create :

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Remplacez les éléments suivants :

Créer une image de conteneur

Vous pouvez le personnaliser à l'aide de ce tutoriel. Le fichier de démarrage se présente comme suit :

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Ce fichier Dockerfile contient les commandes FROM, COPY et RUN, qui sont présentées dans la documentation de référence sur Dockerfile.

  1. Pour importer des artefacts, créez un dépôt Artifact Registry. Chaque dépôt peut contenir des artefacts pour un seul format compatible.

    L'ensemble du contenu du dépôt est chiffré soit à l'aide de clés appartenant à Google et gérées par Google, soit à l'aide de clés de chiffrement gérées par le client. Artifact Registry utilise par défaut les clés appartenant à Google et gérées par Google, et aucune configuration n'est requise pour cette option.

    Vous devez au moins disposer d'un accès Rédacteur Artifact Registry au dépôt.

    Exécutez la commande suivante pour créer un dépôt. La commande utilise l'option --async et affiche immédiatement le résultat, sans attendre la fin de l'opération en cours.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Remplacez REPOSITORY par le nom que vous souhaitez donner à votre dépôt. Pour chaque emplacement de dépôt d'un projet, les noms de dépôt doivent être uniques.

  2. Créez le fichier Dockerfile.

    Pour que les packages fassent partie du conteneur Apache Beam, vous devez les spécifier dans le fichier requirements.txt. Assurez-vous de ne pas spécifier apache-beam dans le fichier requirements.txt. Le conteneur Apache Beam contient déjà apache-beam.

  3. Avant de pouvoir transférer ou extraire des images, configurez Docker afin d'authentifier les requêtes envoyées à Artifact Registry. Pour configurer l'authentification auprès des dépôts Docker, exécutez la commande suivante :

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    La commande met à jour votre configuration Docker. Vous pouvez désormais vous connecter à Artifact Registry dans votre projet Google Cloud pour transférer des images.

  4. Créez l'image Docker en utilisant votre fichier Dockerfile avec Cloud Build.

    Mettez à jour le chemin d'accès dans la commande suivante pour qu'il corresponde au fichier Dockerfile que vous avez créé. Cette commande crée le fichier et le transfère vers votre dépôt Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Empaqueter le code et les dépendances dans un conteneur Docker

  1. Pour exécuter ce pipeline dans un environnement distribué, empaquetez le code et les dépendances dans un conteneur Docker.

    docker build . -t cpp_beam_container
    
  2. Après avoir empaqueté le code et les dépendances, vous pouvez exécuter le pipeline localement pour le tester.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Cette commande écrit la sortie dans l'image Docker. Pour afficher la sortie, exécutez le pipeline avec --output, et écrivez la sortie dans un bucket Cloud Storage. Par exemple, exécutez la commande suivante.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Exécuter le pipeline

Vous pouvez maintenant exécuter le pipeline Apache Beam dans Dataflow. Pour ce faire, faites référence au fichier contenant le code du pipeline et transmettez les paramètres requis par le pipeline.

Dans votre shell ou votre terminal, exécutez le pipeline avec l'exécuteur Dataflow.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Une fois que vous avez exécuté la commande pour exécuter le pipeline, Dataflow renvoie un Job ID avec l'état de job En file d'attente. Plusieurs minutes peuvent s'écouler avant que l'état du job ne devienne En cours d'exécution. Vous pouvez alors accéder au graphique du job.

Afficher les résultats

Affichez les données écrites dans votre bucket Cloud Storage. Utilisez la commande gcloud storage ls pour lister le contenu du niveau supérieur de votre bucket :

gcloud storage ls gs://BUCKET_NAME

Si la commande est bien exécutée, elle renvoie un message semblable à celui-ci :

gs://BUCKET_NAME/out.png

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé pour le tutoriel.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Supprimer les ressources individuelles

Si vous souhaitez réutiliser le projet, supprimez les ressources que vous avez créées pour le tutoriel.

Nettoyer les ressources du projet Google Cloud

  1. Supprimez le dépôt Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Supprimez le bucket Cloud Storage. Ce bucket seul ne génère aucuns frais.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Révoquer les identifiants

  1. Révoquez les rôles que vous avez accordés au compte de service de nœud de calcul géré par l'utilisateur. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Étapes suivantes