Dataproc Flink-Komponente

Sie können zusätzliche Komponenten installieren, wenn Sie einen Dataproc-Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird die Flink-Komponente beschrieben.

Die Dataproc Flink-Komponente installiert Apache Flink auf einem Dataproc-Cluster.

Komponente installieren

Installieren Sie die Komponente, wenn Sie einen Dataproc-Cluster erstellen. Die Dataproc Flink-Komponente kann auf Clustern installiert werden, die mit Dataproc-Image Version 1.5 oder höher erstellt wurden.

DieCluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente (siehe die Apache Flink-Komponentenversionen für die letzten und vorherigen vier 2.0.x-Image-Releaseversionen). Informationen zu den Komponentenversionen, die im jeweiligen Dataproc-Image-Release enthalten sind, finden Sie im Abschnitt Unterstützte Dataproc-Versionen.

gcloud-Befehl

Zum Erstellen eines Dataproc-Clusters, der die Flink-Komponente enthält, verwenden Sie den Befehl gcloud dataproc clusters create cluster-name mit dem Flag --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

Hinweis: Da eine Flink YARN-Sitzung erhebliche YARN-Ressourcen verbraucht, startet Dataproc standardmäßig keine Flink-Sitzung beim Start des Dataproc-Clusters. Sie können eine Sitzung starten, wenn Sie Ihren Flink-Cluster starten, indem Sie dem Befehl gcloud dataproc clusters create das Flag --metadata flink-start-yarn-session=true hinzufügen.

REST API

Die Flink-Komponente kann über die Dataproc API mit SoftwareConfig.Component als Teil einer clusters.create-Anfrage angegeben werden. Im Feld SoftwareConfig.imageVersion wird die Image-Version des Clusters festgelegt.

Console

  1. Aktivieren Sie das Komponenten- und Komponentengateway.
    • Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen. Der Bereich „Cluster einrichten“ ist ausgewählt.
    • Bestätigen oder ändern Sie im Abschnitt "Versionsverwaltung" den Image-Typ und die Image-Version.
    • Im Abschnitt „Komponenten“:
      • Wählen Sie unter Component Gateway „Component Gateway aktivieren“ aus (siehe Component Gateway-URLs ansehen und aufrufen).
      • Wählen Sie unter „Optionale Komponenten“ Flink und andere optionale Komponenten aus, die auf Ihrem Cluster installiert werden sollen.

Nachdem ein Dataproc-Cluster mit Flink gestartet wurde, stellen Sie eine SSH-Verbindung zum Master-Knoten des Dataproc-Clusters her und führen Sie dann Flink-Jobs aus.

Example:

Führen Sie einen einzelnen Flink-Job aus. Nach der Annahme des Jobs startet Flink einen Jobmanager und Slots für den Job in YARN. Der Flink-Job wird im YARN-Cluster ausgeführt, bis er abgeschlossen ist. Der Jobmanager wird nach Abschluss des Jobs heruntergefahren. Joblogs sind in YARN-Logs verfügbar.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Example:

Starten Sie eine lang andauernde Flink-YARN-Sitzung und führen Sie dann einen Job aus.

Starten Sie die Sitzung auf dem Masterknoten des Dataproc-Clusters. Alternativ können Sie eine Flink YARN-Sitzung starten, wenn Sie den Flink-Cluster mit dem Flag gcloud dataproc clusters create --metadata flink-start-yarn-session=true erstellen.

. /usr/bin/flink-yarn-daemon

Notieren Sie sich den Host und den Port in FLINK_MASTER_URL, nachdem die Sitzung erfolgreich gestartet wurde. Ersetzen Sie im folgenden Befehl JOB_MANAGER_HOSTNAME und REST_API_PORT durch diese Elemente: Führen Sie den Job aus:

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Zum Beenden der Sitzung ersetzen Sie APPLICATION_ID durch die Anwendungs-ID, die der Flink YARN-Sitzung in der Flink Job Manager-UI oder aus der Ausgabe von yarn application -list zugeordnet ist. Führen Sie dann diesen Befehl aus:

yarn application -kill APPLICATION_ID

Apache Beam-Jobs ausführen

Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner ausführen.

Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:

  1. Java Beam-Jobs
  2. Portierbare Beam-Jobs

Java Beam-Jobs

Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.

Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.
    • --scopes: Aktiviert den API-Zugriff auf GCP-Dienste im selben Projekt.
  2. SSH-Verbindung zum Master-Knoten des Clusters herstellen

  3. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.

    flink --version
    
  4. Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.

    Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag <flink.artifact.name> angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Beispiel für das Verpacken der Wortzahl.

    mvn package -Pflink-runner
    
  6. Laden Sie die Uber-JAR-Paketdatei, word-count-beam-bundled-0.1.jar, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie können gsutil cp für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.

    1. Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Portierbare Beam-Jobs

Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner und PortableRunner verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).

Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink und Docker
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.
    • --scopes: Aktiviert den API-Zugriff auf GCP-Dienste im selben Projekt.
  2. SSH-Verbindung zum Master-Knoten des Clusters herstellen

  3. Cloud Storage-Bucket erstellen

    gsutil mb BUCKET_NAME
    
  4. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters und speichern Sie die Flink Master-URL nach Beginn der Sitzung.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.

    flink --version
    
  5. Installieren Sie die erforderlichen Python-Bibliotheken für den Job auf dem Masterknoten des Dataproc-Clusters.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Dataproc-Clusters aus.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner (erforderlich): FlinkRunner.
    • --flink_version (erforderlich): Flink-Version
    • --flink_master (erforderlich): Adresse des Flink-Masters, auf dem der Job ausgeführt wird.
    • --flink_submit_uber_jar (erforderlich): Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.
    • --output (erforderlich): Hier wird die Ausgabe geschrieben.
  7. Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Ticket 7 Tage lang gültig.

Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. Sie können die Flink Job Manager-UI über den Application Master der Flink-Anwendung in YARN öffnen.

So aktivieren Sie den Zugriff auf die Benutzeroberfläche und verwenden ihn:

  1. Erstellen Sie den Dataproc-Cluster mit aktiviertem Component Gateway.
  2. Klicken Sie nach dem Erstellen des Clusters in der Google Cloud Console auf der Seite Clusterdetails auf den Link Component Gateway YARN ResourceManager.
  3. Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
  4. Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.