Benutzerdefinierte Container mit C++-Bibliotheken verwenden


In dieser Anleitung erstellen Sie eine Pipeline, die benutzerdefinierte Container mit C++-Bibliotheken verwendet, um einen hochparallelen Dataflow-HPC-Workflow auszuführen. In dieser Anleitung erfahren Sie, wie Sie mit Dataflow und Apache Beam Grid-Computing-Anwendungen ausführen, bei denen Daten an Funktionen verteilt werden müssen, die auf vielen Kernen ausgeführt werden.

In dieser Anleitung wird gezeigt, wie Sie die Pipeline zuerst mit der Methode Direct Runner und dann mithilfe der Funktion Dataflow-Runner ausführen. Wenn Sie die Pipeline lokal ausführen, können Sie sie vor der Bereitstellung testen.

In diesem Beispiel werden Cython-Bindungen und -Funktionen aus der GMP-Bibliothek verwendet. Unabhängig von der verwendeten Bibliothek oder dem Bindungstool können Sie auf Ihre Pipeline dieselben Prinzipien anwenden.

Der Beispielcode ist auf GitHub verfügbar.

Lernziele

  • Eine Pipeline erstellen, die benutzerdefinierte Container mit C++-Bibliotheken verwendet.

  • Erstellen Sie ein Docker-Container-Image mit einem Dockerfile.

  • Verpacken Sie den Code und die Abhängigkeiten in einen Docker-Container.

  • Führen Sie die Pipeline lokal aus, um sie zu testen

  • Führen Sie die Pipeline in einer verteilten Umgebung aus.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Artifact Registry
  • Cloud Build
  • Cloud Storage
  • Compute Engine
  • Dataflow

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  13. Enable the Cloud Storage, Cloud Storage JSON, Compute Engine, Dataflow, Resource Manager, Artifact Registry, and Cloud Build APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com storage_component storage_api cloudresourcemanager.googleapis.com artifactregistry.googleapis.com cloudbuild.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Erstellen Sie ein nutzerverwaltetes Worker-Dienstkonto für Ihre neue Pipeline und weisen Sie dem Dienstkonto die erforderlichen Rollen zu.

    1. Führen Sie den Befehl gcloud iam service-accounts create aus, um das Dienstkonto zu erstellen.

      gcloud iam service-accounts create parallelpipeline \
          --description="Highly parallel pipeline worker service account" \
          --display-name="Highly parallel data pipeline access"
    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/storage.objectAdmin
      • roles/artifactregistry.reader
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Ersetzen Sie SERVICE_ACCOUNT_ROLE durch jede einzelne Rolle.

    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie Zugriffstokens für das Dienstkonto erstellen können:

      gcloud iam service-accounts add-iam-policy-binding parallelpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator

Codebeispiel herunterladen und Verzeichnis wechseln

Laden Sie das Codebeispiel herunter und wechseln Sie dann das Verzeichnis. Die Codebeispiele im GitHub-Repository enthalten den gesamten Code, den Sie zum Ausführen dieser Pipeline benötigen. Wenn Sie Ihre eigene Pipeline erstellen möchten, können Sie diesen Beispielcode als Vorlage verwenden.

Klonen Sie das Repository beam-cpp-example.

  1. Klonen Sie das GitHub-Repository mit dem Befehl git clone:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Wechseln Sie in das Anwendungsverzeichnis:

    cd dataflow-sample-applications/beam-cpp-example
    

Pipelinecode

Sie können den Pipelinecode aus dieser Anleitung anpassen. Diese Pipeline führt die folgenden Aufgaben aus:

  • Erzeugt alle Ganzzahlen in einem Eingabebereich dynamisch.
  • Führt die Ganzzahlen über eine C++-Funktion aus und filtert fehlerhafte Werte.
  • Schreibt fehlerhafte Werte in einen Nebenkanal.
  • Zählt das Vorkommen jeder Stoppzeit und normalisiert die Ergebnisse.
  • Druckt die Ausgabe, formatiert und schreibt die Ergebnisse in eine Textdatei.
  • Erstellt eine PCollection mit einem einzelnen Element.
  • Verarbeitet das einzelne Element mit einer map-Funktion und übergibt die Häufigkeit PCollection als Nebeneingabe.
  • Verarbeitet den PCollection und erzeugt eine einzige Ausgabe.

Die Startdatei sieht so aus:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import argparse
import logging
import os
import sys


def run(argv):
  # Import here to avoid __main__ session pickling issues.
  import io
  import itertools
  import matplotlib.pyplot as plt
  import collatz

  import apache_beam as beam
  from apache_beam.io import restriction_trackers
  from apache_beam.options.pipeline_options import PipelineOptions

  class RangeSdf(beam.DoFn, beam.RestrictionProvider):
    """An SDF producing all the integers in the input range.

    This is preferable to beam.Create(range(...)) as it produces the integers
    dynamically rather than materializing them up front.  It is an SDF to do
    so with perfect dynamic sharding.
    """
    def initial_restriction(self, desired_range):
      start, stop = desired_range
      return restriction_trackers.OffsetRange(start, stop)

    def restriction_size(self, _, restriction):
      return restriction.size()

    def create_tracker(self, restriction):
      return restriction_trackers.OffsetRestrictionTracker(restriction)

    def process(self, _, active_range=beam.DoFn.RestrictionParam()):
      for i in itertools.count(active_range.current_restriction().start):
        if active_range.try_claim(i):
          yield i
        else:
          break

  class GenerateIntegers(beam.PTransform):
    def __init__(self, start, stop):
      self._start = start
      self._stop = stop

    def expand(self, p):
      return (
          p
          | beam.Create([(self._start, self._stop + 1)])
          | beam.ParDo(RangeSdf()))

  parser = argparse.ArgumentParser()
  parser.add_argument('--start', dest='start', type=int, default=1)
  parser.add_argument('--stop', dest='stop', type=int, default=10000)
  parser.add_argument('--output', default='./out.png')

  known_args, pipeline_args = parser.parse_known_args(argv)
  # Store this as a local to avoid capturing the full known_args.
  output_path = known_args.output

  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:

    # Generate the integers from start to stop (inclusive).
    integers = p | GenerateIntegers(known_args.start, known_args.stop)

    # Run them through our C++ function, filtering bad records.
    # Requires apache beam 2.34 or later.
    stopping_times, bad_values = (
        integers
        | beam.Map(collatz.total_stopping_time).with_exception_handling(
            use_subprocess=True))

    # Write the bad values to a side channel.
    bad_values | 'WriteBadValues' >> beam.io.WriteToText(
        os.path.splitext(output_path)[0] + '-bad.txt')

    # Count the occurrence of each stopping time and normalize.
    total = known_args.stop - known_args.start + 1
    frequencies = (
        stopping_times
        | 'Aggregate' >> (beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
        | 'Normalize' >> beam.MapTuple(lambda x, count: (x, count / total)))

    if known_args.stop <= 10:
      # Print out the results for debugging.
      frequencies | beam.Map(print)
    else:
      # Format and write them to a text file.
      (
          frequencies
          | 'Format' >> beam.MapTuple(lambda count, freq: f'{count}, {freq}')
          | beam.io.WriteToText(os.path.splitext(output_path)[0] + '.txt'))

    # Define some helper functions.
    def make_scatter_plot(xy):
      x, y = zip(*xy)
      plt.plot(x, y, '.')
      png_bytes = io.BytesIO()
      plt.savefig(png_bytes, format='png')
      png_bytes.seek(0)
      return png_bytes.read()

    def write_to_path(path, content):
      """Most Beam IOs write multiple elements to some kind of a container
      file (e.g. strings to lines of a text file, avro records to an avro file,
      etc.)  This function writes each element to its own file, given by path.
      """
      # Write to a temporary path and to a rename for fault tolerence.
      tmp_path = path + '.tmp'
      fs = beam.io.filesystems.FileSystems.get_filesystem(path)
      with fs.create(tmp_path) as fout:
        fout.write(content)
      fs.rename([tmp_path], [path])

    (
        p
        # Create a PCollection with a single element.
        | 'CreateSingleton' >> beam.Create([None])
        # Process the single element with a Map function, passing the frequency
        # PCollection as a side input.
        # This will cause the normally distributed frequency PCollection to be
        # colocated and processed as a single unit, producing a single output.
        | 'MakePlot' >> beam.Map(
            lambda _,
            data: make_scatter_plot(data),
            data=beam.pvalue.AsList(frequencies))
        # Pair this with the desired filename.
        |
        'PairWithFilename' >> beam.Map(lambda content: (output_path, content))
        # And actually write it out, using MapTuple to split the tuple into args.
        | 'WriteToOutput' >> beam.MapTuple(write_to_path))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv)

Entwicklungsumgebung einrichten

  1. Verwenden Sie das Apache Beam SDK für Python.

  2. Installieren Sie die GMP-Bibliothek:

    apt-get install libgmp3-dev
    
  3. Verwenden Sie die Datei requirements.txt, um die Abhängigkeiten zu installieren.

    pip install -r requirements.txt
    
  4. Führen Sie den folgenden Befehl aus, um die Python-Bindungen zu erstellen.

    python setup.py build_ext --inplace
    

Sie können die requirements.txt-Datei aus dieser Anleitung anpassen. Die Startdatei enthält die folgenden Abhängigkeiten:

#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

apache-beam[gcp]==2.46.0
cython==0.29.24
pyparsing==2.4.2
matplotlib==3.4.3

Pipeline lokal ausführen

Die lokale Ausführung der Pipeline ist zu Testzwecken nützlich. Durch lokale Ausführung der Pipeline können Sie prüfen, ob die Pipeline ausgeführt wird und sich wie erwartet verhält, bevor Sie die Pipeline in einer verteilten Umgebung bereitstellen.

Mit dem folgenden Befehl können Sie die Pipeline lokal ausführen. Dieser Befehl gibt ein Image mit dem Namen out.png aus.

python pipeline.py

Google Cloud-Ressourcen erstellen

In diesem Abschnitt werden die folgenden Ressourcen erstellt:

  • Ein Cloud Storage-Bucket, der als temporärer Speicherort und als Ausgabespeicherort verwendet werden soll.
  • Einen Docker-Container zum Verpacken des Pipelinecodes und der Abhängigkeiten.

Cloud Storage-Bucket erstellen

Erstellen Sie zuerst einen Cloud Storage-Bucket mit der Google Cloud CLI. Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.

Verwenden Sie den Befehl gcloud storage buckets create, um den Bucket zu erstellen:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ersetzen Sie Folgendes:

Container-Image erstellen

Sie können das Dockerfile aus dieser Anleitung anpassen. Die Startdatei sieht so aus:

FROM apache/beam_python3.9_sdk:2.46.0

# Install a C++ library.
RUN apt-get update
RUN apt-get install -y libgmp3-dev

# Install Python dependencies.
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Install the code and some python bindings.
COPY pipeline.py pipeline.py
COPY collatz.pyx collatz.pyx
COPY setup.py setup.py
RUN python setup.py install

Dieses Dockerfile enthält die Befehle FROM, COPY und RUN. Informationen dazu finden Sie in der Dockerfile-Referenz.

  1. Erstellen Sie ein Artifact Registry-Repository, um Artefakte hochzuladen. Jedes Repository kann Artefakte für ein einzelnes unterstütztes Format enthalten.

    Alle Repository-Inhalte werden entweder mit Schlüsseln verschlüsselt, die Google gehören oder von Google verwaltet werden, oder mit vom Kunden verwaltete Verschlüsselungsschlüssel. Artifact Registry verwendet Schlüssel, die Google gehören und von Google verwaltet werden, sind standardmäßig und keine Konfiguration erforderlich für diese Option.

    Sie müssen für das Repository mindestens Zugriff als Artifact Registry-Autor haben.

    Führen Sie den folgenden Befehl aus, um ein neues Repository zu erstellen: Der Befehl verwendet das Flag --async und kehrt sofort zurück, ohne auf den Abschluss des Vorgangs zu warten.

    gcloud artifacts repositories create REPOSITORY \
       --repository-format=docker \
       --location=LOCATION \
       --async
    

    Ersetzen Sie REPOSITORY durch einen Namen für das Repository. Repository-Namen können für jeden Repository-Speicherort in einem Projekt nur einmal vorkommen.

  2. Erstellen Sie das Dockerfile.

    Pakete, die Teil des Apache Beam-Containers sein sollen, müssen als Teil der Datei requirements.txt angegeben werden. Achten Sie darauf, nicht apache-beam als Teil der Datei requirements.txt anzugeben. Der Apache Beam-Container hat bereits apache-beam.

  3. Um Images per Push oder Pull übertragen zu können, konfigurieren Sie Docker für die Authentifizierung von Anfragen für Artifact Registry. Führen Sie den folgenden Befehl aus, um die Authentifizierung bei Docker-Repositories einzurichten:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Mit dem Befehl wird die Docker-Konfiguration aktualisiert. Sie können jetzt eine Verbindung zu Artifact Registry in Ihrem Google Cloud-Projekt herstellen, um Images per Push zu übertragen.

  4. Erstellen Sie mit Cloud Build das Docker-Image mithilfe eines Dockerfile

    Aktualisieren Sie den Pfad im folgenden Befehl, damit er dem von Ihnen erstellten Dockerfile entspricht. Mit diesem Befehl wird die Datei erstellt und in Ihr Artifact Registry-Repository übertragen.

    gcloud builds submit --tag LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest .
    

Code und Abhängigkeiten in einem Docker-Container verpacken

  1. Packen Sie den Code und die Abhängigkeiten in einen Docker-Container, um diese Pipeline in einer verteilten Umgebung auszuführen.

    docker build . -t cpp_beam_container
    
  2. Nachdem Sie den Code und die Abhängigkeiten gepackt haben, können Sie die Pipeline lokal ausführen um es zu testen.

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container"
    

    Mit diesem Befehl wird die Ausgabe in das Docker-Image geschrieben. Führen Sie die Pipeline mit --output aus, um die Ausgabe aufzurufen, und schreiben Sie die Ausgabe in einen Cloud Storage-Bucket. Führen Sie dazu beispielsweise den folgenden Befehl aus:

    python pipeline.py \
       --runner=PortableRunner \
       --job_endpoint=embed \
       --environment_type=DOCKER \
       --environment_config="docker.io/library/cpp_beam_container" \
       --output=gs://BUCKET_NAME/out.png
    

Pipeline ausführen

Sie können die Apache Beam-Pipeline nun in Dataflow ausführen. Verweisen Sie dazu auf die Datei mit dem Pipelinecode und übergeben Sie die für die Pipeline erforderlichen Parameter.

Führen Sie die Pipeline in Ihrer Shell oder im Terminal mit dem Dataflow-Runner aus.

python pipeline.py \
    --runner=DataflowRunner \
    --project=PROJECT_ID \
    --region=REGION \
    --temp_location=gs://BUCKET_NAME/tmp \
    --sdk_container_image="LOCATION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/cpp_beam_container:latest" \
    --experiment=use_runner_v2 \
    --output=gs://BUCKET_NAME/out.png

Nachdem Sie den Befehl zum Ausführen der Pipeline ausgeführt haben, gibt Dataflow eine Job-ID mit dem Auftragsstatus In der Warteschlange zurück. Es kann einige Minuten dauern, bis der Jobstatus Läuft erreicht und Sie auf die Jobgrafik zugreifen können.

Ergebnisse ansehen

Sehen Sie sich die Daten an, die in Ihren Cloud Storage-Bucket geschrieben wurden. Verwenden Sie den Befehl gcloud storage ls, um den Inhalt auf oberster Ebene des Buckets aufzulisten:

gcloud storage ls gs://BUCKET_NAME

Wenn der Vorgang erfolgreich durchgeführt wurde, wird eine Meldung folgender Art angezeigt:

gs://BUCKET_NAME/out.png

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Einzelne Ressourcen löschen

Wenn Sie das Projekt wiederverwenden möchten, löschen Sie die für die Anleitung erstellten Ressourcen.

Google Cloud-Projektressourcen bereinigen

  1. Löschen Sie das Artifact Registry-Repository.

    gcloud artifacts repositories delete REPOSITORY \
       --location=LOCATION --async
    
  2. Löschen Sie den Cloud Storage-Bucket. Für den Bucket fallen keine Gebühren an.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Anmeldedaten entfernen

  1. Widerrufen Sie die Rollen, die Sie dem nutzerverwalteten Worker-Dienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    • roles/artifactregistry.reader
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:parallelpipeline@PROJECT_ID.iam.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Nächste Schritte