Vous pouvez activer des composants supplémentaires tels que Flink lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page explique comment créer un cluster Dataproc avec le composant facultatif Apache Flink activé (cluster Flink), puis exécuter des tâches Flink sur le cluster.
Vous pouvez utiliser votre cluster Flink pour:
Exécutez des tâches Flink à l'aide de la ressource
Jobs
Dataproc à partir de la console Google Cloud, de Google Cloud CLI ou de l'API Dataproc.Exécutez des tâches Flink à l'aide de la CLI
flink
s'exécutant sur le nœud maître du cluster Flink.Exécuter Flink sur un cluster kerberisé
Créer un cluster Flink Dataproc
Vous pouvez utiliser la console Google Cloud, Google Cloud CLI ou l'API Dataproc pour créer un cluster Dataproc sur lequel le composant Flink est activé.
Recommandation:Utilisez un cluster de VM standard à un maître avec le composant Flink. Les clusters en mode haute disponibilité Dataproc (avec trois VM maîtres) ne sont pas compatibles avec le mode haute disponibilité Flink.
Console
Pour créer un cluster Dataproc Flink à l'aide de la console Google Cloud, procédez comme suit:
Ouvrez la page Dataproc Créer un cluster Dataproc sur Compute Engine.
- Le panneau Configurer le cluster est sélectionné.
- Dans la section Gestion des versions, confirmez ou modifiez le type et la version de l'image. La version de l'image du cluster détermine la version du composant Flink installée sur le cluster.
- Pour que le composant Flink puisse être activé sur le cluster, la version de l'image doit être 1.5 ou ultérieure (consultez la section Versions de Dataproc compatibles pour afficher la liste des versions de composants incluses dans chaque version d'image Dataproc).
- La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API de tâches Dataproc (consultez la section Exécuter des tâches Flink Dataproc).
- Dans la section Composants :
- Sous Passerelle des composants, sélectionnez Activer la passerelle des composants. Vous devez activer la passerelle des composants pour activer le lien de la passerelle des composants vers l'interface utilisateur du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.
- Sous Composants facultatifs, sélectionnez Flink et les autres composants facultatifs à activer sur votre cluster.
- Dans la section Gestion des versions, confirmez ou modifiez le type et la version de l'image. La version de l'image du cluster détermine la version du composant Flink installée sur le cluster.
Cliquez sur le panneau Personnaliser le cluster (facultatif).
Dans la section Propriétés du cluster, cliquez sur Ajouter des propriétés pour chaque propriété de cluster facultative à ajouter à votre cluster. Vous pouvez ajouter des propriétés avec le préfixe
flink
pour configurer des propriétés Flink dans/etc/flink/conf/flink-conf.yaml
, qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster.Exemples :
- Définissez
flink:historyserver.archive.fs.dir
pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink). - Définissez des emplacements de tâches Flink avec
flink:taskmanager.numberOfTaskSlots=n
.
- Définissez
Dans la section Métadonnées de cluster personnalisées, cliquez sur Ajouter des métadonnées pour ajouter des métadonnées facultatives. Par exemple, ajoutez
flink-start-yarn-session
true
pour exécuter le daemon Flink YARN (/usr/bin/flink-yarn-daemon
) en arrière-plan sur le nœud maître du cluster pour démarrer une session Flink YARN (voir Mode de session Flink).
Si vous utilisez la version 2.0 de l'image Dataproc ou une version antérieure, cliquez sur le panneau Gérer la sécurité (facultatif), puis, sous Accès au projet, sélectionnez
Enables the cloud-platform scope for this cluster
. Le champ d'applicationcloud-platform
est activé par défaut lorsque vous créez un cluster qui utilise l'image Dataproc version 2.1 ou ultérieure.
- Le panneau Configurer le cluster est sélectionné.
Cliquez sur Create (Créer) pour créer le cluster.
gcloud
Pour créer un cluster Dataproc Flink à l'aide de la gcloud CLI, exécutez la commande gcloud Dataproc clusters create suivante, en local dans une fenêtre de terminal ou dans Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Remarques :
- CLUSTER_NAME: spécifiez le nom du cluster.
- REGION: spécifiez la région Compute Engine dans laquelle le cluster sera situé.
DATAPROC_IMAGE_VERSION: vous pouvez spécifier la version d'image à utiliser sur le cluster. La version de l'image du cluster détermine la version du composant Flink installée sur le cluster.
Pour que le composant Flink puisse être activé sur le cluster, la version de l'image doit être 1.5 ou ultérieure (consultez la section Versions de Dataproc compatibles pour afficher la liste des versions de composants incluses dans chaque version d'image Dataproc).
La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API Jobs Dataproc (consultez la section Exécuter des tâches Flink Dataproc).
--optional-components
: vous devez spécifier le composantFLINK
pour exécuter les tâches Flink et le service Web Flink HistoryServer sur le cluster.--enable-component-gateway
: vous devez activer la passerelle des composants pour activer le lien de la passerelle des composants vers l'interface utilisateur du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.PROPERTIES. Vous pouvez également spécifier une ou plusieurs propriétés de cluster.
Lorsque vous créez des clusters Dataproc avec les versions d'image
2.0.67
+ et2.1.15
+, vous pouvez utiliser l'option--properties
pour configurer les propriétés Flink dans/etc/flink/conf/flink-conf.yaml
. Elles serviront de valeurs par défaut pour les applications Flink exécutées sur le cluster.Vous pouvez définir
flink:historyserver.archive.fs.dir
pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink).Exemple de propriétés multiples:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Autres indicateurs:
- Vous pouvez ajouter l'indicateur facultatif
--metadata flink-start-yarn-session=true
pour exécuter le daemon Flink YARN (/usr/bin/flink-yarn-daemon
) en arrière-plan sur le nœud maître du cluster pour démarrer une session Flink YARN (voir Mode de session Flink).
- Vous pouvez ajouter l'indicateur facultatif
Lorsque vous utilisez des versions d'image 2.0 ou antérieures, vous pouvez ajouter l'option
--scopes=https://www.googleapis.com/auth/cloud-platform
pour permettre à votre cluster d'accéder aux API Google Cloud (consultez la section Bonnes pratiques concernant les champs d'application). Le champ d'applicationcloud-platform
est activé par défaut lorsque vous créez un cluster qui utilise l'image Dataproc version 2.1 ou ultérieure.
API
Pour créer un cluster Dataproc Flink à l'aide de l'API Dataproc, envoyez une requête clusters.create, comme suit:
Remarques :
Définissez SoftwareConfig.Component sur
FLINK
.Vous pouvez éventuellement définir
SoftwareConfig.imageVersion
pour spécifier la version de l'image à utiliser sur le cluster. La version de l'image du cluster détermine la version du composant Flink installée sur le cluster.Pour que le composant Flink puisse être activé sur le cluster, la version de l'image doit être 1.5 ou ultérieure (consultez la section Versions de Dataproc compatibles pour afficher la liste des versions de composants incluses dans chaque version d'image Dataproc).
La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API Jobs Dataproc (consultez la section Exécuter des tâches Flink Dataproc).
Définissez EndpointConfig.enableHttpPortAccess sur
true
pour activer le lien de la passerelle des composants vers l'interface utilisateur du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.Vous pouvez éventuellement définir
SoftwareConfig.properties
pour spécifier une ou plusieurs propriétés de cluster.- Vous pouvez spécifier des propriétés Flink qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster. Par exemple, vous pouvez définir
flink:historyserver.archive.fs.dir
pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink).
- Vous pouvez spécifier des propriétés Flink qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster. Par exemple, vous pouvez définir
Vous pouvez également définir:
GceClusterConfig.metadata
, par exemple pour spécifierflink-start-yarn-session
true
afin d'exécuter le daemon Flink YARN (/usr/bin/flink-yarn-daemon
) en arrière-plan sur le nœud maître du cluster afin de démarrer une session Flink YARN (voir Mode de session Flink).- GceClusterConfig.serviceAccountScopes sur
https://www.googleapis.com/auth/cloud-platform
(champ d'applicationcloud-platform
) si vous utilisez des versions d'image 2.0 ou antérieures pour permettre à votre cluster d'accéder aux API Google Cloud (voir Bonnes pratiques concernant les champs d'application). Le champ d'applicationcloud-platform
est activé par défaut lorsque vous créez un cluster qui utilise l'image Dataproc version 2.1 ou ultérieure.
Après avoir créé un cluster Flink
- Utilisez le lien
Flink History Server
dans la passerelle des composants pour afficher le serveur d'historique Flink en cours d'exécution sur le cluster Flink. - Utilisez
YARN ResourceManager link
dans la passerelle des composants pour afficher l'interface Web du gestionnaire de tâches Flink en cours d'exécution sur le cluster Flink . - Créez un serveur d'historique persistant Dataproc pour afficher les fichiers d'historique des tâches Flink écrits par des clusters Flink existants et supprimés.
Exécuter des jobs Flink à l'aide de la ressource Dataproc Jobs
Vous pouvez exécuter des tâches Flink à l'aide de la ressource Dataproc Jobs
depuis la console Google Cloud, Google Cloud CLI ou l'API Dataproc.
Console
Pour envoyer un exemple de tâche Flink "wordcount" depuis la console:
Ouvrez la page Dataproc Envoyer une tâche dans la console Google Cloud de votre navigateur.
Remplissez les champs de la page Submit a job (Envoyer une tâche) :
- Sélectionnez le nom du cluster dans la liste des clusters.
- Définissez le Type de tâche sur
Flink
. - Définissez le champ Main class or jar (Classe principale ou fichier JAR) sur
org.apache.flink.examples.java.wordcount.WordCount
. - Définissez Fichiers JAR sur
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
indique un fichier situé dans le cluster. Dataproc a installéWordCount.jar
lors de la création du cluster Flink.- Ce champ accepte également un chemin d'accès Cloud Storage (
gs://BUCKET/JARFILE
) ou un chemin d'accès HDFS (Hadoop Distributed File System) (hdfs://PATH_TO_JAR
).
Cliquez sur Submit (Envoyer).
- Les résultats du pilote de tâches sont affichés sur la page Informations sur la tâche.
- Les tâches Flink sont répertoriées sur la page Tâches Dataproc dans la console Google Cloud.
- Cliquez sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche pour arrêter ou supprimer une tâche.
gcloud
Pour envoyer une tâche Flink à un cluster Dataproc Flink, exécutez la commande gcloud Dataproc jobssubmit de la gcloud CLI, en local dans une fenêtre de terminal ou dans Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Remarques :
- CLUSTER_NAME: spécifiez le nom du cluster Dataproc Flink auquel envoyer la tâche.
- REGION: spécifiez la région Compute Engine dans laquelle se trouve le cluster.
- MAIN_CLASS: spécifiez la classe
main
de votre application Flink, par exemple :org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: spécifier le fichier JAR de l'application Flink. Vous pouvez spécifier les éléments suivants :
- Un fichier JAR installé sur le cluster, à l'aide du préfixe
file:///
- :
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Un fichier JAR dans Cloud Storage :
gs://BUCKET/JARFILE
- Un fichier JAR dans HDFS :
hdfs://PATH_TO_JAR
- Un fichier JAR installé sur le cluster, à l'aide du préfixe
JOB_ARGS: vous pouvez éventuellement ajouter des arguments de tâche après le double tiret (
--
).Une fois la tâche envoyée, le résultat du pilote de tâche s'affiche dans le terminal local ou Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Cette section explique comment envoyer une tâche Flink à un cluster Dataproc Flink à l'aide de l'API Dataproc jobs.submit.
Avant d'utiliser les données de requête, effectuez les remplacements suivants:
- PROJECT_ID: ID du projet Google Cloud
- REGION : région du cluster
- CLUSTER_NAME: spécifiez le nom du cluster Flink Dataproc auquel envoyer la tâche.
Méthode HTTP et URL :
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Corps JSON de la requête :
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Pour envoyer votre requête, développez l'une des options suivantes :
Vous devriez recevoir une réponse JSON de ce type :
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Les tâches Flink sont répertoriées sur la page Tâches Dataproc dans la console Google Cloud.
- Vous pouvez cliquer sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche de la console Google Cloud pour arrêter ou supprimer une tâche.
Exécuter des tâches Flink à l'aide de la CLI flink
Au lieu d'exécuter des tâches Flink à l'aide de la ressource Dataproc Jobs
, vous pouvez exécuter des tâches Flink sur le nœud maître de votre cluster Flink à l'aide de la CLI flink
.
Les sections suivantes décrivent différentes manières d'exécuter une tâche de CLI flink
sur votre cluster Dataproc Flink.
SSH sur le nœud maître:utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur la VM maître du cluster.
Définissez le classpath:initialisez le classpath Hadoop à partir de la fenêtre de terminal SSH sur la VM maître du cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Exécuter des tâches Flink:vous pouvez exécuter des tâches Flink dans différents modes de déploiement sur YAML: application, tâche par tâche et mode de session.
Mode application:le mode application Flink est compatible avec les versions d'image 2.0 et ultérieures de Dataproc. Ce mode exécute la méthode
main()
de la tâche sur le gestionnaire de tâches YAML. Le cluster s'arrête une fois la tâche terminée.Exemple d'envoi de tâche:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Répertoriez les tâches en cours d'exécution:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Annuler une tâche en cours d'exécution:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Mode par tâche:ce mode Flink exécute la méthode
main()
de la tâche côté client.Exemple d'envoi de tâche:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Mode session:démarrez une session Flink YARN de longue durée, puis envoyez une ou plusieurs tâches à la session.
Démarrer une session:vous pouvez démarrer une session Flink de l'une des manières suivantes:
Créez un cluster Flink en ajoutant l'option
--metadata flink-start-yarn-session=true
à la commandegcloud dataproc clusters create
(consultez la section Créer un cluster Flink Dataproc). Lorsque cette option est activée, Dataproc exécute/usr/bin/flink-yarn-daemon
pour démarrer une session Flink sur le cluster une fois le cluster créé.L'ID d'application YARN de la session est enregistré dans
/tmp/.yarn-properties-${USER}
. Vous pouvez lister les ID à l'aide de la commandeyarn application -list
.Exécutez le script Flink
yarn-session.sh
, préinstallé sur la VM maître du cluster, avec des paramètres personnalisés:Exemple avec des paramètres personnalisés:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Exécutez Flink le script du wrapper
/usr/bin/flink-yarn-daemon
avec les paramètres par défaut:. /usr/bin/flink-yarn-daemon
Envoyer une tâche à une session:exécutez la commande suivante pour envoyer une tâche Flink à la session.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: URL, y compris l'hôte et le port, de la VM maître Flink où les tâches sont exécutées.
Supprimez le
http:// prefix
de l'URL. Cette URL est répertoriée dans le résultat de la commande lorsque vous démarrez une session Flink. Vous pouvez exécuter la commande suivante pour répertorier cette URL dans le champTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: URL, y compris l'hôte et le port, de la VM maître Flink où les tâches sont exécutées.
Supprimez le
Répertorier les tâches dans une session:pour répertorier les tâches Flink dans une session, effectuez l'une des opérations suivantes:
Exécutez
flink list
sans arguments. La commande recherche l'ID d'application YARN de la session dans/tmp/.yarn-properties-${USER}
.Obtenez l'ID application YARN de la session à partir de
/tmp/.yarn-properties-${USER}
ou dans la sortie deyarn application -list
, puis exécutez<code>
flink list -yid YARN_APPLICATION_ID.Exécutez
flink list -m FLINK_MASTER_URL
.
Arrêtez une session:pour arrêter la session, obtenez l'ID application YARN de la session à partir de
/tmp/.yarn-properties-${USER}
ou dans le résultat deyarn application -list
, puis exécutez l'une des commandes suivantes:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Exécuter des tâches Apache Beam sur Flink
Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner
.
Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:
- Tâches Java Beam
- Tâches portables Beam
Tâches Java Beam
Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.
L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.
Créez un cluster Dataproc avec le composant Flink activé.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink--image-version
: La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).--region
: région Dataproc compatible--enable-component-gateway
: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.--scopes
: activez l'accès aux API Google Cloud pour votre cluster (consultez la section Bonnes pratiques concernant les champs d'application). Le champ d'applicationcloud-platform
est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre d'option) lorsque vous créez un cluster qui utilise l'image Dataproc version 2.1 ou ultérieure.
Utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur le nœud maître du cluster Flink.
Démarrez une session Flink YARN sur le nœud maître du cluster Dataproc.
. /usr/bin/flink-yarn-daemon
Notez la version de Flink sur votre cluster Dataproc.
flink --version
Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.
Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.
Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag
<flink.artifact.name>
. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Empaqueter l'exemple de décompte de mots.
mvn package -Pflink-runner
Importez le fichier uber JAR empaqueté
word-count-beam-bundled-0.1.jar
(~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utilisergsutil cp
pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Arrêtez la session YARN Flink.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Tâches Beam portables
Pour exécuter des tâches Beam écrites en Python, Go et d'autres langages compatibles, vous pouvez utiliser FlinkRunner
et PortableRunner
, comme décrit dans l'exécuteur Flink de Beam (voir aussi Feuille de route de la portabilité).
L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.
Créez un cluster Dataproc avec les composants Flink et Docker activés.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Remarques :
--optional-components
: Flink et Docker.--image-version
: version d'image du cluster, qui détermine la version de Flink installée sur le cluster (par exemple, consultez les versions du composant Apache Flink listées pour la dernière et les quatre versions précédentes de l'image 2.0.x).--region
: région Dataproc disponible--enable-component-gateway
: active l'accès à l'interface utilisateur du gestionnaire de tâches Flink.--scopes
: activez l'accès aux API Google Cloud pour votre cluster (consultez la section Bonnes pratiques concernant les champs d'application). Le champ d'applicationcloud-platform
est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre d'option) lorsque vous créez un cluster qui utilise l'image Dataproc version 2.1 ou ultérieure.
Utilisez
gsutil
en local ou dans Cloud Shell pour créer un bucket Cloud Storage. Vous spécifierez BUCKET_NAME lorsque vous exécuterez un exemple de programme "wordcount".gsutil mb BUCKET_NAME
Dans une fenêtre de terminal sur la VM du cluster, démarrez une session Flink YARN. Notez l'URL maître Flink, qui correspond à l'adresse du maître Flink où les tâches sont exécutées. Vous spécifierez FLINK_MASTER_URL lorsque vous exécuterez un exemple de programme "wordcount".
. /usr/bin/flink-yarn-daemon
Affichez et notez la version de Flink qui exécute le cluster Dataproc. Vous spécifierez FLINK_VERSION lorsque vous exécuterez un exemple de programme "wordcount".
flink --version
Installez les bibliothèques Python nécessaires à la tâche sur le nœud maître du cluster.
Installez une version de Beam compatible avec la version de Flink sur le cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Exécutez l'exemple de nombre de mots sur le nœud maître du cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Remarques :
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, noté précédemment.--flink_master
: FLINK_MASTER_URL, noté précédemment.--flink_submit_uber_jar
: utilisez le fichier Uber JAR pour exécuter la tâche Beam.--output
: BUCKET_NAME, créé précédemment.
Vérifiez que les résultats ont été écrits dans votre bucket.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Arrêtez la session YARN Flink.
- Permet d'obtenir l'ID application.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Exécuter Flink sur un cluster kerberisé
Le composant Dataproc Flink est compatible avec les clusters kerberisés. Un ticket Kerberos valide est nécessaire pour envoyer et conserver une tâche Flink ou pour démarrer un cluster Flink. Par défaut, un ticket Kerberos reste valide pendant sept jours.
Accéder à l'interface utilisateur du gestionnaire de tâches Flink
L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Pour utiliser l'interface Web:
- Créez un cluster Dataproc Flink.
- Après avoir créé le cluster, cliquez sur le lien Passerelle des composants YARN ResourceManager dans l'onglet "Interface Web" de la page Détails du cluster dans la console Google Cloud.
- Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. En fonction de l'état d'avancement d'une tâche, un lien ApplicationMaster ou History s'affiche.
- Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.