Usar contenedores personalizados con bibliotecas de C++


En este tutorial, crearás una canalización que usa contenedores personalizados con bibliotecas de C++ para ejecutar un flujo de trabajo de HPC de Dataflow altamente paralelo. En este tutorial se explica cómo usar Dataflow y Apache Beam para ejecutar aplicaciones de computación distribuida que requieren que los datos se distribuyan a funciones que se ejecutan en muchos núcleos.

En el tutorial se muestra cómo ejecutar la canalización primero con Direct Runner y, después, con Dataflow Runner. Si ejecutas la canalización de forma local, puedes probarla antes de desplegarla.

En este ejemplo se usan enlaces de Cython y funciones de la biblioteca GMP. Independientemente de la biblioteca o la herramienta de vinculación que utilices, puedes aplicar los mismos principios a tu canal.

El código de ejemplo está disponible en GitHub.

Objetivos

  • Crea una canalización que use contenedores personalizados con bibliotecas de C++.

  • Crea una imagen de contenedor Docker con un Dockerfile.

  • Empaqueta el código y las dependencias en un contenedor Docker.

  • Ejecuta la canalización de forma local para probarla.

  • Ejecuta la canalización en un entorno distribuido.

Costes

En este documento, se usan los siguientes componentes facturables de Google Cloud Platform:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  12. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Crea una cuenta de servicio de trabajador gestionada por el usuario para tu nueva canalización y asigna los roles necesarios a la cuenta de servicio.

    1. Para crear la cuenta de servicio, ejecuta el comando gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Asigna roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Sustituye SERVICE_ACCOUNT_ROLE por cada rol individual.

    3. Asigna a tu cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Descargar el código de muestra y cambiar de directorio

Descarga el código de muestra y, a continuación, cambia de directorio. Los ejemplos de código del repositorio de GitHub proporcionan todo el código que necesitas para ejecutar esta canalización. Cuando quieras crear tu propia canalización, puedes usar este código de ejemplo como plantilla.

Clona el repositorio beam-cpp-example.

  1. Usa el comando git clone para clonar el repositorio de GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Cambia al directorio de la aplicación:

    cd dataflow-sample-applications/beam-cpp-example
    

Código de flujo de procesamiento

Puedes personalizar el código de la canalización de este tutorial. Este flujo de trabajo completa las siguientes tareas:

  • Genera de forma dinámica todos los números enteros de un intervalo de entrada.
  • Ejecuta los números enteros a través de una función de C++ y filtra los valores incorrectos.
  • Escribe los valores incorrectos en un canal secundario.
  • Cuenta las veces que se produce cada tiempo de parada y normaliza los resultados.
  • Imprime la salida, da formato y escribe los resultados en un archivo de texto.
  • Crea un PCollection con un solo elemento.
  • Procesa el elemento único con una función map y pasa la frecuencia PCollection como entrada secundaria.
  • Procesa el PCollection y genera un único resultado.

El archivo de inicio tiene el siguiente aspecto:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configurar un entorno de desarrollo

  1. Usa el SDK de Apache Beam para Python.

  2. Instala la biblioteca de GMP:

    apt-get install libgmp3-dev
    
  3. Para instalar las dependencias, usa el archivo requirements.txt.

    pip install -r requirements.txt
    
  4. Para compilar las vinculaciones de Python, ejecuta el siguiente comando.

    python setup.py build_ext --inplace
    

Puedes personalizar el archivo requirements.txt de este tutorial. El archivo inicial incluye las siguientes dependencias:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Ejecutar el flujo de procesamiento de forma local

Ejecutar la canalización de forma local es útil para hacer pruebas. Al ejecutar la canalización de forma local, puedes confirmar que se ejecuta y se comporta como esperabas antes de desplegarla en un entorno distribuido.

Puedes ejecutar la canalización de forma local con el siguiente comando. Este comando genera una imagen llamada out.png.

python pipeline.py

Crear los recursos de Google Cloud Platform

En esta sección se explica cómo crear los siguientes recursos:

  • Un segmento de Cloud Storage que se usará como ubicación de almacenamiento temporal y como ubicación de salida.
  • Un contenedor Docker para empaquetar el código y las dependencias de la canalización.

Crea un segmento de Cloud Storage

Empieza creando un segmento de Cloud Storage con la CLI de Google Cloud. Este segmento se usa como ubicación de almacenamiento temporal en la canalización de Dataflow.

Para crear el segmento, usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Haz los cambios siguientes:

Crear y compilar una imagen de contenedor

Puedes personalizar el Dockerfile de este tutorial. El archivo de inicio tiene el siguiente aspecto:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Este archivo Dockerfile contiene los comandos FROM, COPY y RUN, que puedes consultar en la referencia de Dockerfile.

  1. Para subir artefactos, crea un repositorio de Artifact Registry. Cada repositorio puede contener artefactos de un solo formato admitido.

    Todo el contenido del repositorio se cifra con claves de cifrado gestionadas por el cliente o con Google-owned and Google-managed encryption keys . Artifact Registry usaGoogle-owned and Google-managed encryption keys de forma predeterminada y no es necesario configurar nada para usar esta opción.

    Debe tener al menos acceso de escritura a Artifact Registry al repositorio.

    Ejecuta el siguiente comando para crear un repositorio. El comando usa la marca --async y se devuelve inmediatamente, sin esperar a que se complete la operación en curso.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Sustituye REPOSITORY por el nombre que quieras darle al repositorio. Los nombres de los repositorios deben ser únicos en cada ubicación de repositorio de un proyecto.

  2. Crea el Dockerfile.

    Para que los paquetes formen parte del contenedor de Apache Beam, debes especificarlos como parte del archivo requirements.txt. Asegúrate de no especificar apache-beam como parte del archivo requirements.txt. El contenedor de Apache Beam ya tiene apache-beam.

  3. Antes de poder enviar o extraer imágenes, configura Docker para autenticar las solicitudes de Artifact Registry. Para configurar la autenticación en los repositorios de Docker, ejecuta el siguiente comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    El comando actualiza tu configuración de Docker. Ahora puede conectarse a Artifact Registry en su Google Cloud proyecto para enviar imágenes.

  4. Compila la imagen Docker con tu Dockerfile con Cloud Build.

    Actualiza la ruta del siguiente comando para que coincida con el archivo Dockerfile que has creado. Este comando compila el archivo y lo envía a tu repositorio de Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Empaqueta el código y las dependencias en un contenedor Docker

  1. Para ejecutar esta canalización en un entorno distribuido, empaqueta el código y las dependencias en un contenedor Docker.

    docker build . -t cpp_beam_container
    
  2. Una vez que hayas empaquetado el código y las dependencias, podrás ejecutar la canalización de forma local para probarla.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Este comando escribe la salida en la imagen de Docker. Para ver el resultado, ejecuta la canalización con --output y escribe el resultado en un segmento de Cloud Storage. Por ejemplo, ejecuta el siguiente comando.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Ejecutar el flujo de procesamiento

Ahora puedes ejecutar el flujo de procesamiento de Apache Beam en Dataflow haciendo referencia al archivo con el código del flujo de procesamiento y transfiriendo los parámetros que requiere el flujo de procesamiento.

En tu shell o terminal, ejecuta el flujo de procesamiento con Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Después de ejecutar el comando para ejecutar la canalización, Dataflow devuelve un ID de trabajo con el estado Queued (En cola). Puede que tardes varios minutos en que el estado de la tarea sea En curso y puedas acceder al gráfico de la tarea.

Ver los resultados

Ver los datos escritos en tu segmento de Cloud Storage. Usa el comando gcloud storage ls para enumerar el contenido del nivel superior de tu segmento:

gcloud storage ls gs://BUCKET_NAME

Si la acción se realiza correctamente, el comando devuelve un mensaje similar al siguiente:

gs://BUCKET_NAME/out.png

Limpieza

Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

Eliminar el proyecto

La forma más fácil de evitar que te cobren es eliminar el Google Cloud proyecto que has creado para el tutorial.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Eliminar los recursos concretos

Si quieres reutilizar el proyecto, elimina los recursos que has creado para el tutorial.

Limpiar los recursos del proyecto de Google Cloud Platform

  1. Elimina el repositorio de Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Elimina el segmento de Cloud Storage y sus objetos. Este solo no conlleva ningún cargo.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocar credenciales

  1. Revoca los roles que hayas concedido a la cuenta de servicio de trabajador gestionada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Siguientes pasos