Sie können zusätzliche Komponenten installieren, wenn Sie einen Dataproc-Cluster mit dem Feature Optionale Komponenten erstellen. Auf dieser Seite wird die Flink-Komponente beschrieben.
Die Dataproc Flink-Komponente installiert Apache Flink auf einem Dataproc-Cluster.
Komponente installieren
Installieren Sie die Komponente, wenn Sie einen Dataproc-Cluster erstellen. Die Dataproc Flink-Komponente kann auf Clustern installiert werden, die mit Dataproc-Image Version 1.5 oder höher erstellt wurden.
DieCluster-Image-Version bestimmt die Version der im Cluster installierten Flink-Komponente (siehe die Apache Flink-Komponentenversionen für die letzten und vorherigen vier 2.0.x-Image-Releaseversionen). Informationen zu den Komponentenversionen, die im jeweiligen Dataproc-Image-Release enthalten sind, finden Sie im Abschnitt Unterstützte Dataproc-Versionen.
gcloud-Befehl
Zum Erstellen eines Dataproc-Clusters, der die Flink-Komponente enthält, verwenden Sie den Befehl gcloud dataproc clusters create cluster-name mit dem Flag --optional-components
.
gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ ... other flags
Hinweis: Da eine Flink YARN-Sitzung erhebliche YARN-Ressourcen verbraucht, startet Dataproc standardmäßig keine Flink-Sitzung beim Start des Dataproc-Clusters.
Sie können eine Sitzung starten, wenn Sie Ihren Flink-Cluster starten, indem Sie dem Befehl gcloud dataproc clusters create
das Flag --metadata flink-start-yarn-session=true
hinzufügen.
REST API
Die Flink-Komponente kann über die Dataproc API mit SoftwareConfig.Component als Teil einer clusters.create-Anfrage angegeben werden. Im Feld SoftwareConfig.imageVersion wird die Image-Version des Clusters festgelegt.
Console
- Aktivieren Sie das Komponenten- und Komponentengateway.
- Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen. Der Bereich „Cluster einrichten“ ist ausgewählt.
- Bestätigen oder ändern Sie im Abschnitt "Versionsverwaltung" den Image-Typ und die Image-Version.
- Im Abschnitt „Komponenten“:
- Wählen Sie unter Component Gateway „Component Gateway aktivieren“ aus (siehe Component Gateway-URLs ansehen und aufrufen).
- Wählen Sie unter „Optionale Komponenten“ Flink und andere optionale Komponenten aus, die auf Ihrem Cluster installiert werden sollen.
Flink-Jobs ausführen
Nachdem ein Dataproc-Cluster mit Flink gestartet wurde, stellen Sie eine SSH-Verbindung zum Master-Knoten des Dataproc-Clusters her und führen Sie dann Flink-Jobs aus.
Example:
Führen Sie einen einzelnen Flink-Job aus. Nach der Annahme des Jobs startet Flink einen Jobmanager und Slots für den Job in YARN. Der Flink-Job wird im YARN-Cluster ausgeführt, bis er abgeschlossen ist. Der Jobmanager wird nach Abschluss des Jobs heruntergefahren. Joblogs sind in YARN-Logs verfügbar.
flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar
Example:
Starten Sie eine lang andauernde Flink-YARN-Sitzung und führen Sie dann einen Job aus.
Starten Sie die Sitzung auf dem Masterknoten des Dataproc-Clusters.
Alternativ können Sie eine Flink YARN-Sitzung starten, wenn Sie den Flink-Cluster mit dem Flag gcloud dataproc clusters create --metadata flink-start-yarn-session=true
erstellen.
. /usr/bin/flink-yarn-daemon
Notieren Sie sich den Host und den Port in FLINK_MASTER_URL
, nachdem die Sitzung erfolgreich gestartet wurde. Ersetzen Sie im folgenden Befehl JOB_MANAGER_HOSTNAME
und REST_API_PORT
durch diese Elemente: Führen Sie den Job aus:
HADOOP_CLASSPATH=`hadoop classpath`
flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar
Zum Beenden der Sitzung ersetzen Sie APPLICATION_ID
durch die Anwendungs-ID, die der Flink YARN-Sitzung in der Flink Job Manager-UI oder aus der Ausgabe von yarn application -list
zugeordnet ist. Führen Sie dann diesen Befehl aus:
yarn application -kill APPLICATION_ID
Apache Beam-Jobs ausführen
Sie können Apache Beam-Jobs in Dataproc mit dem FlinkRunner
ausführen.
Sie können Beam-Jobs auf Flink auf folgende Weise ausführen:
- Java Beam-Jobs
- Portierbare Beam-Jobs
Java Beam-Jobs
Verpacken Sie Ihre Beam-Jobs in eine JAR-Datei. Geben Sie die gebündelte JAR-Datei mit den Abhängigkeiten an, die zum Ausführen des Jobs erforderlich sind.
Im folgenden Beispiel wird ein Java Beam-Job vom Masterknoten des Dataproc-Clusters ausgeführt.
Erstellen Sie einen Dataproc-Cluster mit aktivierter Flink-Komponente.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).--region
: eine unterstützte Dataproc-Region.--enable-component-gateway
: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.--scopes
: Aktiviert den API-Zugriff auf GCP-Dienste im selben Projekt.
SSH-Verbindung zum Master-Knoten des Clusters herstellen
Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters.
. /usr/bin/flink-yarn-daemon
Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.
flink --version
Erstellen Sie auf Ihrem lokalen Computer das Beispiel für die kanonische Beam-Zählung in Java.
Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.
Öffnen Sie die generierte POM-Datei. Prüfen Sie die Version des Beam-Flink-Runners, die mit dem Tag
<flink.artifact.name>
angegeben wird. Wenn die Beam-Flink-Runner-Version im Flink-Artefaktnamen nicht mit der Flink-Version auf Ihrem Cluster übereinstimmt, aktualisieren Sie die Versionsnummer entsprechend.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Beispiel für das Verpacken der Wortzahl.
mvn package -Pflink-runner
Laden Sie die Uber-JAR-Paketdatei,
word-count-beam-bundled-0.1.jar
, (ca. 135 MB) auf den Masterknoten des Dataproc-Clusters hoch. Sie könnengsutil cp
für eine schnellere Dateiübertragung von Cloud Storage zu Ihrem Dataproc-Cluster verwenden.Erstellen Sie auf Ihrem lokalen Terminal einen Cloud Storage-Bucket und laden Sie die Uber-JAR-Datei hoch.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Laden Sie auf dem Masterknoten von Dataproc die Uber-JAR-Datei herunter.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Führen Sie den Java Beam-Job auf dem Masterknoten des Dataproc-Clusters aus.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Prüfen Sie, ob die Ergebnisse in Ihren Cloud Storage-Bucket geschrieben wurden.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Beenden Sie die Flink YARN-Sitzung.
yarn application -list
yarn application -kill APPLICATION_ID
Portierbare Beam-Jobs
Zum Ausführen von in Python, Go und anderen unterstützten Sprachen geschriebenen Beam-Jobs können Sie FlinkRunner
und PortableRunner
verwenden, wie im Flink-Runner des Beams beschrieben (siehe auch Portability Framework Roadmap).
Im folgenden Beispiel wird ein portabler Beam-Job in Python aus dem Masterknoten des Dataproc-Clusters ausgeführt.
Erstellen Sie einen Dataproc-Cluster, bei dem sowohl die Flink- als auch die Docker-Komponenten aktiviert sind.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink und Docker--image-version
: Die Image-Version des Clusters, die die auf dem Cluster installierte Flink-Version bestimmt (Beispiel: Siehe die aufgelisteten Apache Flink-Komponentenversionen für die neueste und vorherige Version von vier 2.0.x-Image-Releaseversionen).--region
: eine unterstützte Dataproc-Region.--enable-component-gateway
: Aktiviert den Zugriff auf die Benutzeroberfläche des Flink Job Manager.--scopes
: Aktiviert den API-Zugriff auf GCP-Dienste im selben Projekt.
SSH-Verbindung zum Master-Knoten des Clusters herstellen
Cloud Storage-Bucket erstellen
gsutil mb BUCKET_NAME
Starten Sie eine Flink YARN-Sitzung auf dem Masterknoten des Dataproc-Clusters und speichern Sie die Flink Master-URL nach Beginn der Sitzung.
. /usr/bin/flink-yarn-daemon
Notieren Sie sich die Flink-Version auf Ihrem Dataproc-Cluster.
flink --version
Installieren Sie die erforderlichen Python-Bibliotheken für den Job auf dem Masterknoten des Dataproc-Clusters.
Wählen Sie eine Beam-Version aus, die mit der Flink-Version auf Ihrem Dataproc-Cluster kompatibel ist. In der Tabelle Kompatibilität mit der Flink-Version sind die Versionen der Beam-Flink-Version aufgeführt.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Führen Sie das Beispiel für die Wortzählung auf dem Masterknoten des Dataproc-Clusters aus.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
--runner
(erforderlich):FlinkRunner
.--flink_version
(erforderlich): Flink-Version--flink_master
(erforderlich): Adresse des Flink-Masters, auf dem der Job ausgeführt wird.--flink_submit_uber_jar
(erforderlich): Verwenden Sie die Uber-JAR-Datei, um den Beam-Job auszuführen.--output
(erforderlich): Hier wird die Ausgabe geschrieben.
Prüfen Sie, ob die Ergebnisse in Ihren Bucket geschrieben wurden.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Beenden Sie die Flink YARN-Sitzung.
yarn application -list
yarn application -kill APPLICATION_ID
Flink auf einem kerberisierten Cluster ausführen
Die Dataproc Flink-Komponente unterstützt mit Kerberos gesicherte Cluster. Ein Kerberos-Ticket ist erforderlich, um einen Flink-Job zu senden und beizubehalten oder einen Flink-Cluster zu starten. Standardmäßig bleibt ein Ticket 7 Tage lang gültig.
Auf die Benutzeroberfläche von Flink Job Manager zugreifen
Die Flink Job Manager-Weboberfläche ist verfügbar, während ein Flink-Job oder ein Flink-Sitzungscluster ausgeführt wird. Sie können die Flink Job Manager-UI über den Application Master der Flink-Anwendung in YARN öffnen.
So aktivieren Sie den Zugriff auf die Benutzeroberfläche und verwenden ihn:
- Erstellen Sie den Dataproc-Cluster mit aktiviertem Component Gateway.
- Klicken Sie nach dem Erstellen des Clusters in der Google Cloud Console auf der Seite Clusterdetails auf den Link Component Gateway YARN ResourceManager.
- Ermitteln Sie auf der Benutzeroberfläche von YARN Resource Manager den Anwendungseintrag des Flink-Clusters. Je nach Abschlussstatus des Jobs wird ein Link zu ApplicationMaster oder History angezeigt.
- Bei einem Streamingjob mit langer Ausführungszeit klicken Sie auf den Link ApplicationManager, um das Flink-Dashboard zu öffnen. Klicken Sie für einen abgeschlossenen Job auf den Link History (Verlauf), um Jobdetails aufzurufen.