Vous pouvez installer des composants supplémentaires tels que Flink lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page décrit le composant Flink.
Le composant Dataproc Flink installe Apache Flink sur un cluster Dataproc.
Installer le composant
Installez le composant lorsque vous créez un cluster Dataproc. Le composant Flink Dataproc peut être installé sur les clusters créés avec Dataproc image version 1.5 ou ultérieure.
La version de l'image de cluster détermine la version du composant Flink installé sur le cluster. Consultez la section Versions Dataproc compatibles pour afficher la liste des versions des composants incluses dans chaque version d'image Dataproc.
Recommandation : Utilisez un cluster de VM à 1 instance maître standard avec le composant Flink. Les clusters en mode haute disponibilité Dataproc (avec trois VM maîtres) ne sont pas compatibles avec le mode haute disponibilité Flink.
Commande gcloud
Pour créer un cluster Dataproc incluant le composant Flink, exécutez la commande gcloud dataproc clusters create cluster-name avec l'option --optional-components
.
gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ ... other flagsRemarques :
- L'option
--enable-component-gateway
permet d'accéder à l'interface utilisateur de Flink Job Manager. - L'option
--scopes=https://www.googleapis.com/auth/cloud-platform
permet à l'API d'accéder aux services Cloud Platform du projet. - Vous pouvez ajouter le fichier
--metadata flink-start-yarn-session=true
facultatif pour exécuter le daemon Flink YARN (/usr/bin/flink-yarn-daemon
) en arrière-plan sur le cluster et démarrer une session Flink YARN (consultez la section Mode session Flink).gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --metadata flink-start-yarn-session=true \ ... other flags
API REST
Le composant Flink peut être spécifié via l'API Dataproc à l'aide de la propriété SoftwareConfig.Component dans le cadre d'une requête clusters.create.
Définissez EndpointConfig.enableHttpPortAccess sur true
pour permettre la connexion à l'interface utilisateur de Flink Job Manager.
Console
- Activez le composant et la passerelle des composants.
- Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster Dataproc sur Compute Engine. Le panneau Configurer le cluster est sélectionné.
- Dans la section Gestion des versions, confirmez ou modifiez le type et la version de l'image.
- Dans la section Composants :
- Sous Passerelle des composants, sélectionnez Activer la passerelle des composants.
- Sous Composants facultatifs, sélectionnez Flink et les autres composants facultatifs à installer sur votre cluster.
Configurer Flink
Pour définir les propriétés de Flink:
Utilisez une action d'initialisation pour mettre à jour les propriétés Flink par défaut dans
/etc/flink/conf/flink-conf.yaml
.Spécifiez les propriétés Flink avec des indicateurs de ligne de commande lorsque vous envoyez une tâche Flink ou démarrez une session Flink.
Définir le chemin d'accès des classes
Initialisez le paramètre de classe Hadoop à partir d'une fenêtre de terminal SSH sur la VM maître du cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Exécuter des tâches Flink
Vous pouvez exécuter Flink dans différents modes de déploiement sur YARN : application, par tâche ou par session.
Mode d'application
Le mode Application Flink est compatible avec les versions d'image 2.0 et ultérieures de Dataproc.
Ce mode exécute la méthode main()
de la tâche sur le gestionnaire de tâches YARN. Le cluster s'arrête une fois la tâche terminée.
Exemple d'envoi de tâche:
flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Djobmanager.heap.mb=820 \
-Dtaskmanager.heap.mb=1640 \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=4 \
/usr/lib/flink/examples/batch/WordCount.jar
Répertoriez la tâche en cours d'exécution :
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Annulez la tâche en cours d'exécution :
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Mode par tâche
Ce mode Flink exécute la méthode main()
de la tâche côté client.
Exemple d'envoi de tâche :
flink run \
-m yarn-cluster \
-p 4 \
-ys 2 \
-yjm 1024m \
-ytm 2048m \
/usr/lib/flink/examples/batch/WordCount.jar
Mode session
Démarrez une session Flink YARN de longue durée, puis envoyez une ou plusieurs tâches à la session.
Vous pouvez démarrer une session Flink de l'une des manières suivantes:
Une fois le cluster Flink créé, exécutez le script Flink
yarn-session.sh
, qui est préinstallé sur la VM maître du cluster, avec des paramètres personnalisés:Exemple avec des paramètres personnalisés:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached ```
Une fois le cluster Flink créé, exécutez Flink le script wrapper
/usr/bin/flink-yarn-daemon
avec les paramètres par défaut:. /usr/bin/flink-yarn-daemon
Créez un cluster Flink en ajoutant l'option
--metadata flink-start-yarn-session=true
à la commandegcloud dataproc clusters create
. Lorsque cette option est activée, Dataproc exécute/usr/bin/flink-yarn-daemon
une fois le cluster créé pour démarrer une session Flink sur le cluster.
L'ID d'application YARN de la session est enregistré dans /tmp/.yarn-properties-${USER}
.
Vous pouvez répertorier l'ID à l'aide de la commande yarn application -list
.
Envoyer une tâche à la session
Lorsque vous démarrez une session Flink, le résultat de la commande liste l'URL (y compris l'hôte et le port) de la VM maître Flink où les tâches sont exécutées.
Une autre façon d'afficher l'URL maître Flink : la sortie de commande suivante répertorie l'URL maître Flink dans le champ Tracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`
Exécutez la commande suivante pour envoyer une tâche Flink à la session.
Remplacez FLINK_MASTER_URL par l'URL maître Flink après avoir supprimé http:// prefix
.
flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar
Répertorier les tâches dans une session
Pour répertorier les tâches Flink dans une session, effectuez l'une des opérations suivantes:
Exécutez
flink list
sans arguments. La commande recherche l'ID d'application YARN de la session dans/tmp/.yarn-properties-${USER}
.Exécutez
flink list -yid YARN_APPLICATION_ID
. Exécutez/tmp/.yarn-properties-${USER}
ouyarn application -list
pour obtenir l'ID d'application YARN.Exécutez
flink list -m FLINK_MASTER_URL
.
Arrêter une session
Pour arrêter la session, obtenez l'ID d'application YARN de la session auprès de /tmp/.yarn-properties-${USER}
ou la sortie de yarn application -list
, puis exécutez l'une des commandes suivantes:
echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Exécuter des tâches Apache Beam
Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner
.
Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:
- Tâches Java Beam
- Tâches portables Beam
Tâches Java Beam
Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.
L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.
Créez un cluster Dataproc avec le composant Flink activé.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink--image-version
: La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).--region
: région Dataproc compatible--enable-component-gateway
: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.--scopes
: permet d'activer l'accès aux API pour les services Cloud Platform du projet.
Utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur le nœud maître du cluster FLink.
Démarrez une session Flink YARN sur le nœud maître du cluster Dataproc.
. /usr/bin/flink-yarn-daemon
Notez la version de Flink sur votre cluster Dataproc.
flink --version
Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.
Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.
Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag
<flink.artifact.name>
. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Empaqueter l'exemple de décompte de mots.
mvn package -Pflink-runner
Importez le fichier uber JAR empaqueté
word-count-beam-bundled-0.1.jar
(~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utilisergsutil cp
pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Arrêtez la session YARN Flink.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Tâches Beam portables
Pour exécuter des tâches Beam écrites en Python, Go et d'autres langages compatibles, vous pouvez utiliser FlinkRunner
et PortableRunner
, comme décrit dans l'exécuteur Flink de Beam (voir aussi Feuille de route de la portabilité).
L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.
Créez un cluster Dataproc avec les composants Flink et Docker activés.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Remarques :
--optional-components
: Flink et Docker.--image-version
: version de l'image du cluster, qui détermine la version de Flink installée sur le cluster (par exemple, les versions du composant Apache Flink répertoriées pour les dernières versions de versions d'image 2.0.x)--region
: région Dataproc disponible.--enable-component-gateway
: permet d'activer l'accès à l'interface utilisateur de Flink Job Manager.--scopes
: permet l'accès API aux services Cloud Platform du projet.
Utilisez
gsutil
en local ou dans Cloud Shell pour créer un bucket Cloud Storage. Vous allez spécifier BUCKET_NAME lorsque vous exécuterez un exemple de programme de décompte de mots.gsutil mb BUCKET_NAME
Dans une fenêtre de terminal de la VM du cluster, démarrez une session Flink YARN. Notez l'URL maître Flink, l'adresse du maître Flink où les tâches sont exécutées. Spécifiez FLINK_MASTER_URL lorsque vous exécutez un exemple de programme de décompte de mots.
. /usr/bin/flink-yarn-daemon
Affichez et notez la version de Flink qui exécute le cluster Dataproc. Spécifiez FLINK_VERSION lorsque vous exécutez un exemple de programme de décompte de mots.
flink --version
Installez les bibliothèques Python requises pour la tâche sur le nœud maître du cluster.
Installez une version Beam compatible avec la version Flink du cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Exécutez l'exemple de comptage de mots sur le nœud maître du cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Remarques :
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, noté plus tôt.--flink_master
: FLINK_MASTER_URL, noté plus tôt.--flink_submit_uber_jar
: utilisez le fichier Uber JAR pour exécuter la tâche Beam.--output
BUCKET_NAME, créé plus tôt.
Vérifiez que les résultats ont bien été écrits dans votre bucket.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Arrêtez la session YARN Flink.
- Obtenez l'ID application.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Exécuter Flink sur un cluster kerberisé
Le composant Dataproc Flink est compatible avec les clusters kerberisés. Un ticket Kerberos valide est nécessaire pour envoyer et conserver une tâche Flink, ou pour démarrer un cluster Flink. Par défaut, un ticket Kerberos reste valide pendant sept jours.
Accéder à l'interface utilisateur de Flink Job Manager
L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Pour utiliser l'interface Web :
- Créez un cluster Dataproc avec la passerelle des composants activée.
- Une fois le cluster créé, cliquez sur le lien ResourceManagerYARN de la passerelle des composants dans l'onglet "Interface Web" de la page Détails du cluster dans la console Google Cloud.
- Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. Selon l'état d'avancement d'une tâche, un lien ApplicationMaster ou History est affiché.
- Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.