Utilizzo di container personalizzati con le librerie C++


In questo tutorial creerai una pipeline che utilizza container personalizzati con librerie C++ per eseguire un flusso di lavoro HPC di Dataflow a elevata parallela. Utilizza questo tutorial per scoprire come utilizzare Dataflow e Apache Beam per eseguire applicazioni di grid computing che richiedono la distribuzione dei dati a funzioni in esecuzione su molti core.

Il tutorial mostra come eseguire la pipeline utilizzando prima l'esecutore diretto e poi l'esecuzione di Dataflow. Eseguendo la pipeline in locale, puoi testarla prima di eseguirne il deployment.

Questo esempio utilizza le associazioni e le funzioni Cython della libreria GMP. Indipendentemente dalla libreria o dallo strumento di associazione che utilizzi, puoi applicare gli stessi principi alla pipeline.

Il codice di esempio è disponibile su GitHub.

Obiettivi

  • Crea una pipeline che utilizzi container personalizzati con librerie C++.

  • Creare un'immagine container Docker utilizzando un Dockerfile.

  • pacchettizza il codice e le dipendenze in un container Docker.

  • Esegui la pipeline in locale per testarla.

  • Eseguire la pipeline in un ambiente distribuito.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Abilita le API Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  8. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  9. Installa Google Cloud CLI.
  10. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  11. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  12. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  13. Abilita le API Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build.

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  15. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  16. Crea un account di servizio worker gestito dall'utente per la nuova pipeline e concedi i ruoli necessari all'account di servizio.

    1. Per creare l'account di servizio, esegui il comando gcloud iam service-accounts create:

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Concedi ruoli all'account di servizio. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.

    3. Concedi al tuo Account Google un ruolo che ti consente di creare token di accesso per l'account di servizio:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Scarica l'esempio di codice e cambia directory

Scarica l'esempio di codice e poi cambia directory. Gli esempi di codice nel repository GitHub forniscono tutto il codice necessario per eseguire questa pipeline. Quando è tutto pronto per creare la tua pipeline, puoi usare questo codice campione come modello.

Clona il repository beam-cpp-example.

  1. Usa il comando git clone per clonare il repository GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Passa alla directory delle applicazioni:

    cd dataflow-sample-applications/beam-cpp-example
    

Codice pipeline

Puoi personalizzare il codice della pipeline da questo tutorial. Questa pipeline completa le seguenti attività:

  • Produce dinamicamente tutti i numeri interi in un intervallo di input.
  • Esegue i numeri interi tramite una funzione C++ e filtra i valori non validi.
  • Scrive i valori errati su un canale secondario.
  • Conta l'occorrenza di ogni tempo di interruzione e normalizza i risultati.
  • Stampa l'output, formatta e scrivendo i risultati in un file di testo.
  • Crea un elemento PCollection con un singolo elemento.
  • Elabora il singolo elemento con una funzione map e passa la frequenza PCollection come input secondario.
  • Elabora PCollection e produce un singolo output.

Il file di base ha il seguente aspetto:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import os
import sys

def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Configurazione dell'ambiente di sviluppo

  1. Utilizza l'SDK Apache Beam per Python.

  2. Installa la libreria di GMP:

    apt-get install libgmp3-dev
    
  3. Per installare le dipendenze, utilizza il file requirements.txt.

    pip install -r requirements.txt
    
  4. Per creare le associazioni Python, esegui questo comando.

    python setup.py build_ext --inplace
    

Puoi personalizzare il file requirements.txt da questo tutorial. Il file di base include le seguenti dipendenze:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

esegui la pipeline in locale

L'esecuzione della pipeline in locale è utile per i test. Se esegui la pipeline in locale, puoi verificare che venga eseguita e funzioni come previsto prima di eseguirne il deployment in un ambiente distribuito.

Puoi eseguire la pipeline in locale utilizzando il comando seguente. Questo comando restituisce un'immagine denominata out.png.

python pipeline.py

crea le risorse Google Cloud

Questa sezione spiega come creare le seguenti risorse:

  • Un bucket Cloud Storage da utilizzare come posizione di archiviazione temporanea e località di output.
  • Un container Docker per pacchettizzare il codice e le dipendenze della pipeline.

crea un bucket Cloud Storage

Per iniziare, crea un bucket Cloud Storage utilizzando Google Cloud CLI. Questo bucket viene utilizzato come località di archiviazione temporanea dalla pipeline Dataflow.

Per creare il bucket, utilizza il comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Sostituisci quanto segue:

Crea e crea un'immagine container

Puoi personalizzare il Dockerfile da questo tutorial. Il file di base ha il seguente aspetto:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Questo Dockerfile contiene i comandi FROM, COPY e RUN, che puoi scoprire nel riferimento Dockerfile.

  1. Per caricare gli artefatti, crea un repository Artifact Registry. Ogni repository può contenere artefatti per un singolo formato supportato.

    Tutti i contenuti del repository sono criptati mediante chiavi di crittografia gestite da Google o dal cliente. Artifact Registry utilizza chiavi di crittografia gestite da Google per impostazione predefinita e per questa opzione non è richiesta alcuna configurazione.

    Devi avere almeno l'accesso come Writer ad Artifact Registry al repository.

    Esegui questo comando per creare un nuovo repository. Il comando utilizza il flag --async e restituisce immediatamente, senza attendere il completamento dell'operazione in corso.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Sostituisci REPOSITORY con un nome per il repository. Per ogni località del repository in un progetto, i nomi dei repository devono essere univoci.

  2. Creare il Dockerfile.

    Affinché i pacchetti facciano parte del container Apache Beam, devi specificarli nel file requirements.txt. Assicurati di non specificare apache-beam come parte del file requirements.txt. Il container Apache Beam ha già apache-beam.

  3. Prima di eseguire il push o il pull delle immagini, configura Docker per autenticare le richieste per Artifact Registry. Per configurare l'autenticazione nei repository Docker, esegui questo comando:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Il comando aggiorna la configurazione Docker. Ora puoi connetterti ad Artifact Registry nel tuo progetto Google Cloud per eseguire il push delle immagini.

  4. Crea l'immagine Docker utilizzando il tuo Dockerfile con Cloud Build.

    Aggiorna il percorso nel comando seguente in modo che corrisponda al Dockerfile creato. Questo comando crea il file e ne esegue il push nel repository Artifact Registry.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Pacchettizzazione del codice e delle dipendenze in un container Docker

  1. Per eseguire questa pipeline in un ambiente distribuito, pacchettizza il codice e le dipendenze in un container Docker.

    docker build . -t cpp_beam_container
    
  2. Dopo aver pacchettizzato il codice e le dipendenze, puoi eseguire la pipeline localmente per testarla.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Questo comando scrive l'output all'interno dell'immagine Docker. Per visualizzare l'output, esegui la pipeline con --output e scrivi l'output in un bucket Cloud Storage. Ad esempio, esegui il comando seguente.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

esegui la pipeline.

Ora puoi eseguire la pipeline Apache Beam in Dataflow facendo riferimento al file con il codice della pipeline e passando i parametri richiesti dalla pipeline.

Nella shell o nel terminale, esegui la pipeline con Dataflow Runner.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Dopo aver eseguito il comando per eseguire la pipeline, Dataflow restituisce un ID job con lo stato del job Queued. Potrebbero essere necessari diversi minuti prima che lo stato del job raggiunga In esecuzione e tu possa accedere al grafico del job.

Visualizza i tuoi risultati

Visualizzare i dati scritti nel bucket Cloud Storage. Utilizza il comando gcloud storage ls per elencare i contenuti al livello più alto del tuo bucket:

gcloud storage ls gs://BUCKET_NAME

Se l'esito è positivo, il comando restituisce un messaggio simile al seguente:

gs://BUCKET_NAME/out.png

Esegui la pulizia

Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per il tutorial.

  1. Nella console Google Cloud, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Elimina le singole risorse

Se vuoi riutilizzare il progetto, elimina le risorse che hai creato per il tutorial.

Esegui la pulizia delle risorse di progetto Google Cloud

  1. Elimina il repository Artifact Registry.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Eliminare il bucket Cloud Storage. Questo bucket da solo non prevede alcun addebito.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocare le credenziali

  1. Revoca i ruoli che hai concesso all'account di servizio worker gestito dall'utente. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  3. Facoltativo: revoca le credenziali dallgcloud CLI.

    gcloud auth revoke

Passaggi successivi