Utilizzare container personalizzati con le librerie C++


In questo tutorial crei una pipeline che utilizza i contenitori personalizzati con librerie C++ per eseguire un flusso di lavoro HPC Dataflow altamente parallelo. Utilizza questo tutorial per scoprire come utilizzare Dataflow e Apache Beam per eseguire applicazioni di calcolo distribuito su rete che richiedono la distribuzione dei dati alle funzioni in esecuzione su molti core.

Il tutorial mostra come eseguire prima la pipeline utilizzando il metodo Corsa diretta e quindi utilizzando Esecutore Dataflow. Puoi testarla eseguendo la pipeline in locale prima di eseguirne il deployment.

Questo esempio utilizza le funzioni e le associazioni Cython della libreria GMP. Indipendentemente dalla libreria o dallo strumento di associazione che utilizzi, puoi applicare lo stesso principi per la tua pipeline.

Il codice di esempio è disponibile su GitHub.

Obiettivi

  • Crea una pipeline che utilizzi container personalizzati con librerie C++.

  • Crea un'immagine container Docker utilizzando un Dockerfile.

  • Pacchettizza il codice e le dipendenze in un container Docker.

  • Esegui la pipeline in locale per testarla.

  • Esegui la pipeline in un ambiente distribuito.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Crea un account di servizio worker gestito dall'utente per la nuova pipeline e concedi le necessarie ruoli all'account di servizio.

    1. Per creare l'account di servizio, esegui il comando gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Concedi i ruoli all'account di servizio. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.

    3. Concedi al tuo Account Google un ruolo che ti consenta di creare token di accesso per l'account di servizio:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Scarica l'esempio di codice e cambia le directory

Scarica l'esempio di codice e poi cambia le directory. Gli esempi di codice nel repository GitHub forniscono tutto il codice necessario per eseguire questa pipeline. Quando è tutto pronto per creare la tua pipeline, puoi utilizzare questo codice campione come modello.

Clona il repository beam-cpp-example.

  1. Usa il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Passa alla directory dell'applicazione:

    cd dataflow-sample-applications/beam-cpp-example
    

Codice pipeline

Puoi personalizzare il codice della pipeline da questo tutorial. Questa pipeline completa le seguenti attività:

  • Produce dinamicamente tutti i numeri interi in un intervallo di input.
  • Esegue i numeri interi attraverso una funzione C++ e filtra i valori errati.
  • Scrive i valori errati in un canale laterale.
  • Conta l'occorrenza di ogni momento di interruzione e normalizza i risultati.
  • Stampa l'output, formatta e scrivendo i risultati in un file di testo.
  • Crea un PCollection con un singolo elemento.
  • Elabora il singolo elemento con una funzione map e passa la frequenza PCollection come input aggiuntivo.
  • Elabora PCollection e produce un singolo output.

Il file iniziale ha il seguente aspetto:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configurazione dell'ambiente di sviluppo

  1. Utilizza l'SDK Apache Beam per Python.

  2. Installa la libreria GMP:

    apt-get install libgmp3-dev
    
  3. Per installare le dipendenze, utilizza il file requirements.txt.

    pip install -r requirements.txt
    
  4. Per compilare le associazioni Python, esegui il seguente comando.

    python setup.py build_ext --inplace
    

Puoi personalizzare il file requirements.txt da questo tutorial. Il file di avvio include le seguenti dipendenze:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Esegui la pipeline in locale

L'esecuzione della pipeline in locale è utile per i test. Se esegui la pipeline in locale, puoi verificare che funzioni e si comporti come previsto prima di eseguirne il deployment in un ambiente distribuito.

Puoi eseguire la pipeline in locale utilizzando il comando seguente. Questo comando genera un'immagine denominata out.png.

python pipeline.py

Crea le risorse Google Cloud

Questa sezione spiega come creare le risorse seguenti:

  • un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea un percorso di output.
  • Un container Docker per pacchettizzare il codice e le dipendenze della pipeline.

Crea un bucket Cloud Storage

Inizia creando un bucket Cloud Storage utilizzando Google Cloud CLI. Questo bucket viene utilizzato come posizione di archiviazione temporanea Dataflow.

Per creare il bucket, utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

Crea e crea un'immagine container

Puoi personalizzare il Dockerfile di questo tutorial. Il file di base è simile al seguente:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Questo Dockerfile contiene i comandi FROM, COPY e RUN, che puoi trovare nel riferimento Dockerfile.

  1. Per caricare gli artefatti, crea un repository Artifact Registry. Ogni repository può contenere elementi per un singolo formato supportato.

    Tutti i contenuti del repository vengono criptati utilizzando chiavi di proprietà di Google e gestite da Google o chiavi di crittografia gestite dal cliente. Per impostazione predefinita, Artifact Registry utilizza chiavi di proprietà di Google e gestite da Google e non è richiesta alcuna configurazione per questa opzione.

    Devi disporre almeno dell'accesso come autore di Artifact Registry al repository.

    Esegui questo comando per creare un nuovo repository. Il comando utilizza il flag --async e restituisce immediatamente il risultato, senza attendere il completamento dell'operazione in corso.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Sostituisci REPOSITORY con un nome per il tuo repository. Per ogni posizione del repository in un progetto, i nomi dei repository devono essere univoci.

  2. Crea il Dockerfile.

    Affinché i pacchetti facciano parte del container Apache Beam, devi specificarli come parte del file requirements.txt. Assicurati di non specifica apache-beam nell'ambito di requirements.txt . Il contenitore Apache Beam contiene già apache-beam.

  3. Prima di poter eseguire il push o il pull delle immagini, configura Docker per autenticare le richieste per Artifact Registry. Per configurare l'autenticazione nei repository Docker, esegui il seguente comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Il comando aggiorna la configurazione Docker. Ora puoi connetterti con Artifact Registry nel tuo progetto Google Cloud per eseguire il push delle immagini.

  4. Crea il Docker utilizzando Dockerfile con Cloud Build.

    Aggiorna il percorso nel comando seguente in modo che corrisponda al Dockerfile che che hai creato. Questo comando crea il file ed esegue il push al repository Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Pacchettizzazione del codice e delle dipendenze in un container Docker

  1. Per eseguire questa pipeline in un ambiente distribuito, pacchettizza il codice e le dipendenze in un container Docker.

    docker build . -t cpp_beam_container
    
  2. Dopo aver pacchettizzato il codice e le dipendenze, puoi eseguire la pipeline localmente per testarla.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Questo comando scrive l'output all'interno dell'immagine Docker. Per visualizzare l'output, eseguire la pipeline con --output e scrivere l'output in un di sincronizzare la directory di una VM con un bucket. Ad esempio, esegui il seguente comando.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

esegui la pipeline.

Ora puoi eseguire la pipeline Apache Beam in Dataflow facendo riferimento al file con il codice della pipeline e passando parametri come richiesto dalla pipeline.

Nella shell o nel terminale, esegui la pipeline con il comando Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Dopo aver eseguito il comando per eseguire la pipeline, Dataflow restituisce un ID job con lo stato In coda. Potrebbe ci vorranno diversi minuti prima che lo stato del job raggiunga In esecuzione e potrai accedere il grafico del job.

Visualizza i tuoi risultati

Visualizzare i dati scritti nel bucket Cloud Storage. Utilizza la Comando gcloud storage ls per elencare i contenuti al livello superiore del bucket:

gcloud storage ls gs://BUCKET_NAME

Se l'esito è positivo, il comando restituisce un messaggio simile al seguente:

gs://BUCKET_NAME/out.png

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per il tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina le singole risorse

Se vuoi riutilizzare il progetto, elimina le risorse che hai creato per il tutorial.

Ripulire le risorse del progetto Google Cloud

  1. Elimina il repository Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Elimina il bucket Cloud Storage. Questo bucket da solo non prevede addebiti.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocare le credenziali

  1. Revoca i ruoli che hai concesso all'account di servizio worker gestito dall'utente. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Passaggi successivi