Usa contenedores personalizados con bibliotecas de C++


En este instructivo, crearás una canalización que use contenedores personalizados con bibliotecas de C++ para ejecutar un flujo de trabajo altamente paralelo de HPC de Dataflow. Usa este instructivo para aprender a usar Dataflow y Apache Beam a fin de ejecutar aplicaciones de computación en red que requieran que los datos se distribuyan a funciones que se ejecutan en varios núcleos.

En el instructivo, primero se muestra cómo ejecutar la canalización mediante el ejecutor directo y, luego, mediante el ejecutor de Dataflow. Si ejecutas la canalización de forma local, puedes probarla antes de implementarla.

En este ejemplo, se usan vinculaciones y funciones de Cython de la biblioteca de GMP. Sin importar la biblioteca o la herramienta de vinculación que uses, puedes aplicar los mismos principios a tu canalización.

El código está disponible en GitHub.

Objetivos

  • Crear una canalización que use contenedores personalizados con bibliotecas C++.

  • Compilar una imagen de contenedor de Docker con un Dockerfile.

  • Empaquetar el código y las dependencias en un contenedor de Docker.

  • Ejecutar la canalización de forma local para probarla.

  • Ejecutar la canalización en un entorno distribuido.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Crea una cuenta de servicio de trabajador administrado por el usuario para tu canalización nueva y otórgale los roles necesarios.

    1. Para crear la cuenta de servicio, ejecuta el comando gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Reemplaza SERVICE_ACCOUNT_ROLE por cada rol individual.

    3. Otorga a tu Cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Descarga la muestra de código y, luego, cambia de directorio

Descarga la muestra de código y, luego, cambia de directorio. Las muestras de código en el repositorio de GitHub proporcionan todo el código que necesitas para ejecutar esta canalización. Cuando estés listo para compilar tu propia canalización, puedes usar este código de muestra como una plantilla.

Clona el repositorio beam-cpp-example.

  1. Usa el comando git clone para clonar el repositorio de GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Cambia al directorio de la aplicación:

    cd dataflow-sample-applications/beam-cpp-example
    

Código de canalización

Usa este instructivo para personalizar el código de este instructivo. Esta canalización completa las siguientes tareas:

  • Produce de forma dinámica todos los números enteros en un rango de entrada.
  • Ejecuta los números enteros a través de una función de C++ y filtra los valores incorrectos.
  • Escribe los valores incorrectos en un canal lateral.
  • Cuenta el caso de cada hora de detención y normaliza los resultados.
  • Imprime el resultado, formatea y escribe los resultados en un archivo de texto.
  • Crea una PCollection de un solo elemento.
  • Procesa el elemento único con una función map y pasa la frecuencia PCollection como una entrada complementaria.
  • Procesa PCollection y produce un solo resultado.

El archivo inicial se ve de la siguiente manera:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configura tu entorno de desarrollo

  1. Usa el SDK de Apache Beam para Python.

  2. Instala la biblioteca de GMP:

    apt-get install libgmp3-dev
    
  3. Para instalar las dependencias, usa el archivo requirements.txt.

    pip install -r requirements.txt
    
  4. Para compilar las vinculaciones de Python, ejecuta el siguiente comando.

    python setup.py build_ext --inplace
    

Usa este instructivo para personalizar el requirements.txt. El archivo inicial incluye las siguientes dependencias:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Ejecute la canalización de forma local:

Ejecutar la canalización de manera local es útil para realizar pruebas. Si ejecutas la canalización de manera local, puedes confirmar que la canalización se ejecute y se comporte como se espera antes de implementar la canalización en un entorno distribuido.

Puedes ejecutar la canalización de manera local con el siguiente comando. Este comando genera una imagen llamada out.png.

python pipeline.py

Crea los recursos de Google Cloud

En esta sección, se explica cómo crear lo siguiente:

  • Un bucket de Cloud Storage para usar como ubicación de almacenamiento temporal y de salida.
  • Un contenedor de Docker para empaquetar el código de la canalización y las dependencias.

Crea un bucket de Cloud Storage

Primero, crea un bucket de Cloud Storage mediante Google Cloud CLI. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.

Para crear el bucket, usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Reemplaza lo siguiente:

  • BUCKET_NAME: Es un nombre para tu bucket de Cloud Storage que cumple con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
  • LOCATION: la ubicación del bucket.

Crea y compila una imagen de contenedor

Usa este instructivo para personalizar el Dockerfile. El archivo inicial se ve de la siguiente manera:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Este Dockerfile contiene los comandos FROM, COPY y RUN, que puedes leer en la referencia de Dockerfile.

  1. Para subir artefactos, crea un repositorio de Artifact Registry. Cada repositorio puede contener artefactos para un formato compatible único.

    Todo el contenido del repositorio se encripta con claves de Google y administradas por Google o con claves de encriptación administradas por el cliente. Artifact Registry usa claves de Google y administradas por Google de forma predeterminada y no se requiere ninguna configuración para esta opción.

    Debes tener al menos el acceso de escritor de Artifact Registry al repositorio.

    Ejecuta el siguiente comando para crear un repositorio nuevo. El comando usa la marca --async y se muestra de inmediato, sin necesidad de esperar a que se complete la operación en curso.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Reemplaza REPOSITORY por un nombre para tu repositorio. Para la ubicación de cada repositorio en un proyecto, los nombres de los repositorios deben ser únicos.

  2. Crea el Dockerfile.

    Para que los paquetes formen parte del contenedor de Apache Beam, debes especificarlos como parte del archivo requirements.txt. Asegúrate de no especificar apache-beam como parte del archivo requirements.txt. El contenedor de Apache Beam ya tiene apache-beam.

  3. Antes de poder enviar o extraer imágenes, configura Docker para autenticar solicitudes de Artifact Registry. Para configurar la autenticación en los repositorios de Docker, ejecuta el siguiente comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    El comando actualiza tu configuración de Docker. Ahora puedes conectarte con Artifact Registry en tu proyecto de Google Cloud para enviar imágenes.

  4. Compila la imagen de Docker mediante Dockerfile con Cloud Build.

    Actualiza la ruta de acceso en el siguiente comando para que coincida con el Dockerfile que creaste. Este comando compila el archivo y lo envía a tu repositorio de Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Empaqueta el código y las dependencias en un contenedor de Docker

  1. Para ejecutar esta canalización en un entorno distribuido, empaqueta el código y las dependencias en un contenedor de Docker.

    docker build . -t cpp_beam_container
    
  2. Después de empaquetar el código y las dependencias, puedes ejecutar la canalización de forma local para probarla.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Con este comando, se escribe el resultado dentro de la imagen de Docker. Para ver el resultado, ejecuta la canalización con --output y escribe el resultado en un bucket de Cloud Storage. Por ejemplo, ejecuta el siguiente comando.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Ejecuta la canalización

Ahora puedes ejecutar la canalización de Apache Beam en Dataflow mediante una consulta al archivo con el código de canalización y pasar los parámetros que requiere la canalización.

En tu shell o terminal, ejecuta la canalización con el ejecutor de Dataflow.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Después de ejecutar el comando para ejecutar la plantilla, el Dataflow muestra un ID de trabajo con el estado En cola. Es posible que el estado del trabajo demore varios minutos en Ejecutarse y que pueda acceder al grafo del trabajo.

Ve los resultados

Consulta los datos escritos en tu bucket de Cloud Storage. Usa el comando gcloud storage ls para mostrar el contenido en el nivel superior del bucket:

gcloud storage ls gs://BUCKET_NAME

Si no hay errores, el comando mostrará un mensaje similar a este:

gs://BUCKET_NAME/out.png

Realiza una limpieza

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto de Google Cloud que creaste para el instructivo.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Borra los recursos individuales

Si deseas volver a usar el proyecto, borra los recursos que creaste para el instructivo.

Borra los recursos del proyecto de Google Cloud

  1. Borra el repositorio de Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Borra el bucket de Cloud Storage. El bucket por sí solo no genera cargos.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revoca credenciales

  1. Revoca los roles que otorgaste a la cuenta de servicio de trabajador administrada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

¿Qué sigue?