Dataproc Flink-Komponente

Sie können zusätzliche Komponenten installieren, wenn Sie einen Dataproc-Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird die Flink-Komponente beschrieben.

Die Dataproc Flink-Komponente installiert Apache Flink auf einem Dataproc-Cluster.

Komponente installieren

Installieren Sie die Komponente, wenn Sie einen Dataproc-Cluster erstellen. Die Dataproc Flink-Komponente kann auf Clustern installiert werden, die mit Dataproc-Image Version 1.5 oder höher erstellt wurden.

DieCluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente (siehe die Apache Flink-Komponentenversionen für die letzten und vorherigen vier 2.0.x-Image-Releaseversionen). Informationen zu den Komponentenversionen, die im jeweiligen Dataproc-Image-Release enthalten sind, finden Sie im Abschnitt Unterstützte Dataproc-Versionen.

gcloud-Befehl

Zum Erstellen eines Dataproc-Clusters, der die Flink-Komponente enthält, verwenden Sie den Befehl gcloud dataproc clusters create cluster-name mit dem Flag --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

Hinweis: Da eine Flink YARN-Sitzung erhebliche YARN-Ressourcen verbraucht, startet Dataproc standardmäßig keine Flink-Sitzung beim Start des Dataproc-Clusters. Sie können eine Sitzung starten, wenn Sie Ihren Flink-Cluster starten, indem Sie dem Befehl gcloud dataproc clusters create das Flag --metadata flink-start-yarn-session=true hinzufügen.

REST API

Die Flink-Komponente kann über die Dataproc API mit SoftwareConfig.Component als Teil einer clusters.create-Anfrage angegeben werden. Im Feld SoftwareConfig.imageVersion wird die Image-Version des Clusters festgelegt.

Console

  1. Aktivieren Sie die Komponente und das Komponentengateway.
    • Öffnen Sie in der Cloud Console die Dataproc-Seite Cluster erstellen. Der Bereich „Cluster einrichten“ ist ausgewählt.
    • Bestätigen oder ändern Sie im Abschnitt "Versionsverwaltung" den Image-Typ und die Image-Version.
    • Im Bereich „Komponenten“:
      • Wählen Sie unter Component Gateway „Component Gateway aktivieren“ aus (siehe Component Gateway-URLs ansehen und aufrufen).
      • Wählen Sie unter „Optionale Komponenten“ Flink und andere optionale Komponenten aus, die auf Ihrem Cluster installiert werden sollen.

Nachdem ein Dataproc-Cluster mit Flink gestartet wurde, stellen Sie eine SSH-Verbindung zum Master-Knoten des Dataproc-Clusters her und führen Sie dann Flink-Jobs aus.

Example:

Führen Sie einen einzelnen Flink-Job aus. Nach der Annahme des Jobs startet Flink einen Jobmanager und Slots für den Job in YARN. Der Flink-Job wird im YARN-Cluster ausgeführt, bis er abgeschlossen ist. Der Jobmanager wird nach Abschluss des Jobs heruntergefahren. Joblogs sind in YARN-Logs verfügbar.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Example:

Starten Sie eine lang andauernde Flink-YARN-Sitzung und führen Sie dann einen Job aus.

Starten Sie die Sitzung auf dem Masterknoten des Dataproc-Clusters. Alternativ können Sie eine Flink YARN-Sitzung starten, wenn Sie den Flink-Cluster mit dem Flag gcloud dataproc clusters create --metadata flink-start-yarn-session=true erstellen.

. /usr/bin/flink-yarn-daemon

Notieren Sie sich den Host und den Port in FLINK_MASTER_URL, nachdem die Sitzung erfolgreich gestartet wurde. Ersetzen Sie im folgenden Befehl JOB_MANAGER_HOSTNAME und REST_API_PORT durch diese Elemente: Führen Sie den Job aus:

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Zum Beenden der Sitzung ersetzen Sie APPLICATION_ID durch die Anwendungs-ID, die der Flink YARN-Sitzung in der Flink Job Manager-UI oder aus der Ausgabe von yarn application -list zugeordnet ist. Führen Sie dann diesen Befehl aus:

yarn application -kill APPLICATION_ID

Apache Beam-Jobs ausführen

Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner ausführen.

Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:

  1. Java Beam-Jobs
  2. Portierbare Beam-Jobs

Java Beam-Jobs

Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.

Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Zugriff auf die Benutzeroberfläche des Flink-Job-Managers gewähren.
    • --scopes: API-Zugriff auf GCP-Dienste im selben Projekt aktivieren.
  2. SSH-Verbindung zum Master-Knoten des Clusters herstellen

  3. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version Ihres Dataproc-Clusters.

    flink --version
    
  4. Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität von Flink-Versionen finden Sie die Kompatibilität der Beam-Flink-Versionen.

    Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag <flink.artifact.name> angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Beispiel für das Verpacken der Wortzahl.

    mvn package -Pflink-runner
    
  6. Laden Sie die Uber-JAR-Paketdatei, word-count-beam-bundled-0.1.jar, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie können gsutil cp für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.

    1. Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Portierbare Beam-Jobs

Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner und PortableRunner verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).

Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.

  1. Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink und Docker
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).
    • --region: eine unterstützte Dataproc-Region.
    • --enable-component-gateway: Zugriff auf die Benutzeroberfläche des Flink-Job-Managers gewähren.
    • --scopes: API-Zugriff auf GCP-Dienste im selben Projekt aktivieren.
  2. SSH-Verbindung zum Master-Knoten des Clusters herstellen

  3. Cloud Storage-Bucket erstellen

    gsutil mb BUCKET_NAME
    
  4. Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters und speichern Sie die Flink Master-URL nach Beginn der Sitzung.

    . /usr/bin/flink-yarn-daemon
    

    Notieren Sie sich die Flink-Version Ihres Dataproc-Clusters.

    flink --version
    
  5. Installieren Sie die erforderlichen Python-Bibliotheken für den Job auf dem Masterknoten des Dataproc-Clusters.

    Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität von Flink-Versionen finden Sie die Kompatibilität der Beam-Flink-Versionen.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Dataproc-Clusters aus.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner (erforderlich): FlinkRunner.
    • --flink_version (erforderlich): Flink-Version
    • --flink_master (erforderlich): Adresse des Flink-Masters, auf dem der Job ausgeführt wird.
    • --flink_submit_uber_jar (erforderlich): Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.
    • --output (erforderlich): Hier wird die Ausgabe geschrieben.
  7. Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Beenden Sie die Flink YARN-Sitzung.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Ticket 7 Tage lang gültig.

Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. Sie können die Flink Job Manager-UI über den Application Master der Flink-Anwendung in YARN öffnen.

So aktivieren Sie den Zugriff auf die Benutzeroberfläche und verwenden ihn:

  1. Erstellen Sie den Dataproc-Cluster mit aktiviertem Component Gateway.
  2. Nach der Clustererstellung klicken Sie in der Google Cloud Console auf der Seite Clusterdetails auf den Tab der Web-Benutzeroberfläche und auf das Component Gateway YARN ResourceManager-Link.
  3. Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
  4. Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.