Erweiterte Features von Apache Beam-Notebooks verwenden

Wenn Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, können Sie Pipelines iterativ entwickeln, Ihre Pipelinegrafik prüfen und einzelne PCollections in einem REPL-Workflow (Read-Eval-Print-Loop) analysieren. Ein Tutorial in dem gezeigt wird, wie Sie den interaktiven Apache Beam-Runner mit JupyterLab-Notebooks verwenden, finden Sie unter Mit Apache Beam-Notebooks entwickeln

Diese Seite enthält Details zu erweiterten Features, die Sie mit Ihrem Apache Beam-Notebook verwenden können.

Interaktiver FlinkRunner in mit dem Notebook verwalteten Clustern

Wenn Sie interaktiv mit Daten in Produktionsgröße vom Notebook aus arbeiten möchten, können Sie FlinkRunner mit einigen allgemeinen Pipelineoptionen verwenden, um der Notebooksitzung mitzuteilen, einen langlebigen Dataproc-Cluster zu verwalteten und Ihre Beam-Pipelines verteilt auszuführen.

Vorbereitung

So verwenden Sie dieses Feature:

  • die Dataproc API aktivieren
  • Weisen Sie dem Dienstkonto, mit dem die Notebookinstanz für Dataproc ausgeführt wird, eine Administrator- oder Bearbeiterrolle zu.
  • Verwenden Sie einen Notebook-Kernel mit der Apache Beam SDK-Version 2.40.0 oder höher.

Konfiguration

Sie müssen mindestens die folgende Konfiguration haben:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<BUCKET_NAME>/flink'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = 'PROJECT_ID'

Explizite Bestimmung (optional)

Sie können die folgenden Optionen hinzufügen.

# Change this if the pipeline needs to run in a different region
# than the default, 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 40 workers to run the pipeline.
worker_options.num_workers=40
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n1-highmem-8'

# When working with non-official Apache Beam releases, such as Apache Beam built from source
# code, configure the environment to use a compatible released SDK container.
# If needed, build a custom container and use it. For more information, see:
# https://beam.apache.org/documentation/runtime/environments/
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.41.0 or LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'

Nutzung

# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.
options.view_as(FlinkRunnerOptions).parallelism = 10

p_word_count = beam.Pipeline(interactive_flink_runner, options=options)
word_counts = (
    p_word_count
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())
# The notebook session automatically starts and manages a cluster to run
# your pipelines with the FlinkRunner.
ib.show(word_counts)

# Interactively adjust the parallelism.
options.view_as(FlinkRunnerOptions).parallelism = 150
# The BigQuery read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)
delays_by_airline = (
    p_bq
    | 'Read Dataset from BigQuery' >> beam.io.ReadFromBigQuery(
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `bigquery-samples.airline_ontime_data.flights` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())
# This step reuses the existing cluster.
ib.collect(delays_by_airline)

# Describe the cluster running the pipelines.
# You can access the Flink dashboard from the printed link.
ib.clusters.describe()

# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)

Mit dem Notebook verwaltete Cluster

  • Wenn Sie keine Pipeline-Optionen angeben, verwendet Interactive Apache Beam standardmäßig immer den zuletzt verwendeten Cluster, um eine Pipeline mit FlinkRunner auszuführen.
    • Führen Sie ib.clusters.set_default_cluster(None) aus, um dieses Verhalten zu vermeiden, z. B. wenn Sie eine andere Pipeline in derselben Notebook-Sitzung mit einem FlinkRunner ausführen möchten, der nicht vom Notebook gehostet wird.
  • Beim Instanziieren einer neuen Pipeline, die ein Projekt, eine Region und eine Bereitstellungskonfiguration verwendet, die einem vorhandenen Dataproc-Cluster zugeordnet sind, verwendet Dataflow auch den Cluster wieder. Dies ist jedoch möglicherweise nicht der zuletzt verwendete Cluster.
  • Wenn jedoch eine Änderung der Bereitstellung angegeben wird, z. B. beim Ändern der Größe eines Clusters, wird ein neuer Cluster erstellt, um die gewünschte Änderung zu übernehmen. Wenn Sie die Größe eines Clusters ändern möchten, um zu verhindern, dass die Cloud-Ressourcen erschöpft werden, bereinigen Sie überflüssige Cluster mithilfe von ib.clusters.cleanup(pipeline).
  • Wenn eine Flink-master_url angegeben ist und zu einem Cluster gehört, der von der Notebooksitzung verwaltet wird, verwendet Dataflow den verwalteten Cluster wieder.
    • Wenn master_url in der Notebooksitzung unbekannt ist, bedeutet dies, dass ein vom Nutzer selbst gehosteter FlinkRunner gewünscht wird. Im Notebook wird nichts implizit ausgeführt.

Fehlerbehebung

Dieser Abschnitt enthält Informationen zur Fehlerbehebung beim interaktiven FlinkRunner in mit dem Notebook verwalteten Clustern.

Der Einfachheit halber wird die Zwischenspeicherkonfiguration des Flink-Netzwerks nicht für die Konfiguration bereitgestellt.

Wenn die Jobgrafik zu kompliziert oder die Parallelität zu hoch eingestellt ist, könnte die Kardinalität der Schritte multipliziert mit der Parallelität zu groß sein und dazu führen, dass zu viele Aufgaben parallel geplant werden und die Ausführung fehlschlägt.

Beachten Sie die folgenden Tipps, um die Geschwindigkeit interaktiver Ausführungen zu verbessern:

  • Weisen Sie einer Variablen nur die PCollection zu, die Sie prüfen möchten.
  • Prüfen Sie PCollections einzeln.
  • Verwenden Sie Reshuffle nach High-Fanout-Transformationen.
  • Passen Sie die Parallelität an die Datengröße an (manchmal ist kleiner schneller).

Die Prüfung der Daten dauert zu lange

Prüfen Sie das Flink-Dashboard für den laufenden Job. Möglicherweise sehen Sie einen Schritt, bei dem Hunderte von Aufgaben abgeschlossen wurden und nur eine übrig ist, da sich In-Flight-Daten auf einer einzelnen Maschine befinden und nicht zufällig verteilt werden.

Verwenden Sie nach einer High-Fanout-Transformation immer die erneute Zufallswiedergabe, z. B. in folgenden Fällen:

  • Zeilen aus einer Datei lesen
  • Zeilen aus einer BigQuery-Tabelle lesen

Ohne Reshuffle werden Fanout-Daten immer auf demselben Worker ausgeführt und können die Vorteile der Parallelität nicht nutzen.

Wie viele Worker benötige ich?

Als Faustregel gilt, dass der Flink-Cluster etwa die Anzahl der vCPUs multipliziert mit der Anzahl der Worker-Slots hat. Wenn Sie beispielsweise 40 n1-highmem-8-Worker haben, hat der Flink-Cluster höchstens 320 Slots, also 8 multipliziert mit 40.

Im Idealfall kann der Worker einen Job verwalten, der mit Hunderten von Parallelitäten liest, zuordnet und kombiniert, wodurch Tausende von Aufgaben parallel geplant werden.

Funktioniert das auch beim Streaming?

Streamingpipelines sind derzeit nicht mit dem interaktiven Flink auf Notebook-verwalteten Cluster kompatibel.

Beam SQL und beam_sql-Magie

Mit Beam SQL können Sie begrenzte und unbegrenzte PCollections mit SQL-Anweisungen abfragen. Wenn Sie in einem Apache Beam-Notebook arbeiten, können Sie die benutzerdefinierte IPython-Magie beam_sql verwenden, um die Pipelineentwicklung zu beschleunigen.

Sie können die Nutzung der Magie beam_sql mit der Option -h oder --help prüfen:

Hilfe zu „beam_sql“ ansehen

Sie können eine PCollection aus konstanten Werten erstellen:

Eine PCollection aus konstanten Werten erstellen

Sie können mehrere PCollections zusammenführen:

Mehrere PCollections zusammenführen

Sie können einen Dataflow-Job mit der Option -r DataflowRunner oder --runner DataflowRunner starten:

Dataflow-Job mit Apache Beam SQL starten

Weitere Informationen finden Sie im Beispiel-Notebook Apache Beam SQL in Notebooks.

Mit JIT-Compiler und GPU beschleunigen

Sie können Bibliotheken wie numba und GPUs verwenden, um Ihren Python-Code und Apache Beam-Pipelines zu beschleunigen. In der Apache Beam-Notebookinstanz, die mit einer nvidia-tesla-t4-GPU erstellt wurde, kompilieren Sie zur Ausführung auf GPUs Ihren Python-Code mit numba.cuda.jit, Optional, um die Ausführung auf CPUs zu beschleunigen, kompilieren Sie Ihren Python-Code mit numba.jit oder numba.njit in Maschinencode.

Im folgenden Beispiel wird ein DoFn erstellt, das auf GPUs verarbeitet wird:

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

Die folgende Abbildung zeigt das Notebook, das auf einer GPU ausgeführt wird:

DoFn auf GPU ausführen

Weitere Details finden Sie im Beispiel-Notebook GPUs mit Apache Beam verwenden.

Einen benutzerdefinierten Container erstellen

Wenn Ihre Pipeline keine zusätzlichen Python-Abhängigkeiten oder ausführbaren Dateien benötigt, kann Apache Beam in den meisten Fällen automatisch seine offiziellen Container-Images verwenden, um Ihren benutzerdefinierten Code auszuführen. Diese Images enthalten viele gängige Python-Module, die weder erstellt noch explizit angegeben werden müssen.

In einigen Fällen haben Sie möglicherweise zusätzliche Python-Abhängigkeiten oder nicht von Python stammenden Abhängigkeiten. In diesen Szenarien können Sie einen benutzerdefinierten Container erstellen und zur Ausführung dem Flink-Cluster zur Verfügung stellen. Die folgende Liste bietet die Vorteile eines benutzerdefinierten Containers:

  • Schnellere Einrichtung für aufeinanderfolgende und interaktive Ausführungen
  • Stabile Konfigurationen und Abhängigkeiten
  • Mehr Flexibilität: Sie können mehr als Python-Abhängigkeiten einrichten

Der Container-Build-Prozess kann mühsam sein, aber Sie können alles im Notebook mit dem folgenden Nutzungsmuster erledigen.

Lokalen Arbeitsbereich erstellen

Erstellen Sie zuerst ein lokales Arbeitsverzeichnis im Jupyter-Basisverzeichnis.

!mkdir -p /home/jupyter/.flink

Python-Abhängigkeiten vorbereiten

Installieren Sie als Nächstes alle zusätzlichen Python-Abhängigkeiten, die Sie verwenden können, und exportieren Sie sie in eine Anforderungsdatei.

%pip install dep_a
%pip install dep_b
...

Sie können mithilfe der Notebookmagie %%writefile eine Anforderungsdatei explizit erstellen.

%%writefile /home/jupyter/.flink/requirements.txt
dep_a
dep_b
...

Alternativ können Sie alle lokalen Abhängigkeiten in einer Anforderungsdatei fixieren. Diese Option kann zu unbeabsichtigten Abhängigkeiten führen.

%pip freeze > /home/jupyter/.flink/requirements.txt

Nicht-Python-Abhängigkeiten vorbereiten

Kopieren Sie alle Nicht-Python-Abhängigkeiten in den Arbeitsbereich. Wenn Sie keine Nicht-Python-Abhängigkeiten haben, überspringen Sie diesen Schritt.

!cp /path/to/your-dep /home/jupyter/.flink/your-dep
...

Dockerfile erstellen

Erstellen Sie ein Dockerfile mit der Notebook-Magie %%writefile. Beispiele:

%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  your_dep /tmp/your_dep
...

RUN python -m pip install -r /tmp/requirements.txt

Im Beispielcontainer wird das Image des Apache Beam SDK Version 2.40.0 mit Python 3.7 als Basis verwendet, eine your_dep-Datei hinzugefügt und die zusätzlichen Python-Abhängigkeiten installiert. Verwenden Sie dieses Dockerfile als Vorlage und passen Sie es an Ihren Anwendungsfall an.

Wenn Sie in Ihren Apache Beam-Pipelines auf Abhängigkeiten verweisen, die nicht Python sind, verwenden Sie deren COPY-Ziele. Beispielsweise ist /tmp/your_dep der Dateipfad der Datei your_dep.

Mit Cloud Build ein Container-Image in Artifact Registry erstellen

  1. Aktivieren Sie die Cloud Build- und Artifact Registry-Dienste, falls noch nicht geschehen.

    !gcloud services enable cloudbuild.googleapis.com
    !gcloud services enable artifactregistry.googleapis.com
    
  2. Erstellen Sie ein Artifact Registry-Repository, damit Sie Artefakte hochladen können. Jedes Repository kann Artefakte für ein einzelnes unterstütztes Format enthalten.

    Alle Repository-Inhalte werden entweder mit Schlüsseln verschlüsselt, die Google gehören oder von Google verwaltet werden, oder mit vom Kunden verwaltete Verschlüsselungsschlüssel. Artifact Registry verwendet Schlüssel, die Google gehören und von Google verwaltet werden, sind standardmäßig und keine Konfiguration erforderlich für diese Option.

    Sie müssen für das Repository mindestens Zugriff als Artifact Registry-Autor haben.

    Führen Sie den folgenden Befehl aus, um ein neues Repository zu erstellen: Der Befehl verwendet das Flag --async und kehrt sofort zurück, ohne auf den Abschluss des Vorgangs zu warten.

    gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=LOCATION \
    --async
    

    Ersetzen Sie die folgenden Werte:

    • REPOSITORY ist ein Name für Ihr Repository. Repository-Namen können für jeden Repository-Speicherort in einem Projekt nur einmal vorkommen.
    • LOCATION: der Speicherort für Ihr Repository.
  3. Um Images per Push oder Pull übertragen zu können, konfigurieren Sie Docker für die Authentifizierung von Anfragen für Artifact Registry. Führen Sie den folgenden Befehl aus, um die Authentifizierung bei Docker-Repositories einzurichten:

    gcloud auth configure-docker LOCATION-docker.pkg.dev
    

    Mit dem Befehl wird die Docker-Konfiguration aktualisiert. Sie können jetzt eine Verbindung zu Artifact Registry in Ihrem Google Cloud-Projekt herstellen, um Images per Push zu übertragen.

  4. Mit Cloud Build das Container-Image erstellen und in Artifact Registry speichern.

    !cd /home/jupyter/.flink \
    && gcloud builds submit \
     --tag LOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest \
     --timeout=20m
    

    Ersetzen Sie dabei PROJECT_ID durch die Projekt-ID Ihres Zielprojekts.

Benutzerdefinierte Container verwenden

Je nach Runner können Sie benutzerdefinierte Container für unterschiedliche Zwecke verwenden.

Informationen zur allgemeinen Nutzung von Apache Beam-Containern finden Sie unter:

Informationen zur Verwendung des Dataflow-Containers finden Sie unter:

Deaktivieren Sie externe IP-Adressen

Deaktivieren Sie beim Erstellen einer Apache Beam-Notebookinstanz zur Erhöhung der Sicherheit externe IP-Adressen. Da Notebook-Instanzen einige öffentliche Internetressourcen wie Artifact Registry herunterladen müssen, müssen Sie zuerst ein neues VPC-Netzwerk ohne externe IP-Adresse erstellen. Erstellen Sie dann ein Cloud NAT-Gateway für dieses VPC-Netzwerk. Weitere Informationen zu Cloud NAT finden Sie in der Cloud NAT-Dokumentation. Mit dem VPC-Netzwerk und dem Cloud NAT-Gateway auf das erforderliche öffentliche Internet zugreifen ohne externe IP-Adressen zu aktivieren.