Composant Flink facultatif Dataproc

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Vous pouvez installer des composants supplémentaires tels que Flink lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page décrit le composant Flink.

Le composant Dataproc Flink installe Apache Flink sur un cluster Dataproc.

Installer le composant

Installez le composant lorsque vous créez un cluster Dataproc. Le composant Flink Dataproc peut être installé sur les clusters créés avec Dataproc image version 1.5 ou ultérieure.

La version de l'image de cluster détermine la version du composant Flink installé sur le cluster. Consultez la section Versions Dataproc compatibles pour afficher la liste des versions des composants incluses dans chaque version d'image Dataproc.

Recommandation : Utilisez un cluster de VM à 1 instance maître standard avec le composant Flink. Les clusters en mode haute disponibilité Dataproc (avec trois VM maîtres) ne sont pas compatibles avec le mode haute disponibilité Flink.

Commande gcloud

Pour créer un cluster Dataproc incluant le composant Flink, exécutez la commande gcloud dataproc clusters create cluster-name avec l'option --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    --scopes=https://www.googleapis.com/auth/cloud-platform  \
    ... other flags
Remarques :
  • L'option --enable-component-gateway permet d'accéder à l'interface utilisateur de Flink Job Manager.
  • L'option --scopes=https://www.googleapis.com/auth/cloud-platform permet à l'API d'accéder aux services Cloud Platform du projet.
  • Vous pouvez ajouter le fichier --metadata flink-start-yarn-session=true facultatif pour exécuter le daemon Flink YARN (/usr/bin/flink-yarn-daemon) en arrière-plan sur le cluster et démarrer une session Flink YARN (consultez la section Mode session Flink).
    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK \
        --region=region \
        --enable-component-gateway \
        --image-version=DATAPROC_IMAGE_VERSION \
        --scopes=https://www.googleapis.com/auth/cloud-platform  \
        --metadata flink-start-yarn-session=true \
        ... other flags
    

API REST

Le composant Flink peut être spécifié via l'API Dataproc à l'aide de la propriété SoftwareConfig.Component dans le cadre d'une requête clusters.create.

Définissez EndpointConfig.enableHttpPortAccess sur true pour permettre la connexion à l'interface utilisateur de Flink Job Manager.

Console

  1. Activez le composant et la passerelle des composants.
    • Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster Dataproc sur Compute Engine. Le panneau Configurer le cluster est sélectionné.
    • Dans la section Gestion des versions, confirmez ou modifiez le type et la version de l'image.
    • Dans la section Composants :
      • Sous Passerelle des composants, sélectionnez Activer la passerelle des composants.
      • Sous Composants facultatifs, sélectionnez Flink et les autres composants facultatifs à installer sur votre cluster.

Pour définir les propriétés de Flink:

  1. Utilisez une action d'initialisation pour mettre à jour les propriétés Flink par défaut dans /etc/flink/conf/flink-conf.yaml.

  2. Spécifiez les propriétés Flink avec des indicateurs de ligne de commande lorsque vous envoyez une tâche Flink ou démarrez une session Flink.

Définir le chemin d'accès des classes

  1. Initialisez le paramètre de classe Hadoop à partir d'une fenêtre de terminal SSH sur la VM maître du cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    

Vous pouvez exécuter Flink dans différents modes de déploiement sur YARN : application, par tâche ou par session.

Mode d'application

Le mode Application Flink est compatible avec les versions d'image 2.0 et ultérieures de Dataproc. Ce mode exécute la méthode main() de la tâche sur le gestionnaire de tâches YARN. Le cluster s'arrête une fois la tâche terminée.

Exemple d'envoi de tâche:

flink run-application \
    -t yarn-application \
   -Djobmanager.memory.process.size=1024m \
   -Dtaskmanager.memory.process.size=2048m \
   -Djobmanager.heap.mb=820 \
   -Dtaskmanager.heap.mb=1640 \
   -Dtaskmanager.numberOfTaskSlots=2 \
   -Dparallelism.default=4 \
   /usr/lib/flink/examples/batch/WordCount.jar

Répertoriez la tâche en cours d'exécution :

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

Annulez la tâche en cours d'exécution :

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

Mode par tâche

Ce mode Flink exécute la méthode main() de la tâche côté client.

Exemple d'envoi de tâche :

flink run \
    -m yarn-cluster \
    -p 4 \
    -ys 2 \
    -yjm 1024m \
    -ytm 2048m \
    /usr/lib/flink/examples/batch/WordCount.jar

Mode session

Démarrez une session Flink YARN de longue durée, puis envoyez une ou plusieurs tâches à la session.

Vous pouvez démarrer une session Flink de l'une des manières suivantes:

  1. Une fois le cluster Flink créé, exécutez le script Flink yarn-session.sh, qui est préinstallé sur la VM maître du cluster, avec des paramètres personnalisés:

    Exemple avec des paramètres personnalisés:

    /usr/lib/flink/bin/yarn-session.sh \
      -s 1 \
      -jm 1024m \
      -tm 2048m \
      -nm flink-dataproc \
      --detached
     ```
    
  2. Une fois le cluster Flink créé, exécutez Flink le script wrapper /usr/bin/flink-yarn-daemon avec les paramètres par défaut:

    . /usr/bin/flink-yarn-daemon
    
  3. Créez un cluster Flink en ajoutant l'option --metadata flink-start-yarn-session=true à la commande gcloud dataproc clusters create. Lorsque cette option est activée, Dataproc exécute /usr/bin/flink-yarn-daemon une fois le cluster créé pour démarrer une session Flink sur le cluster.

L'ID d'application YARN de la session est enregistré dans /tmp/.yarn-properties-${USER}. Vous pouvez répertorier l'ID à l'aide de la commande yarn application -list.

Envoyer une tâche à la session

Lorsque vous démarrez une session Flink, le résultat de la commande liste l'URL (y compris l'hôte et le port) de la VM maître Flink où les tâches sont exécutées.

Une autre façon d'afficher l'URL maître Flink : la sortie de commande suivante répertorie l'URL maître Flink dans le champ Tracking-URL:

yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`

Exécutez la commande suivante pour envoyer une tâche Flink à la session. Remplacez FLINK_MASTER_URL par l'URL maître Flink après avoir supprimé http:// prefix.

flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar

Répertorier les tâches dans une session

Pour répertorier les tâches Flink dans une session, effectuez l'une des opérations suivantes:

  • Exécutez flink list sans arguments. La commande recherche l'ID d'application YARN de la session dans /tmp/.yarn-properties-${USER}.

  • Exécutez flink list -yid YARN_APPLICATION_ID. Exécutez /tmp/.yarn-properties-${USER} ou yarn application -list pour obtenir l'ID d'application YARN.

  • Exécutez flink list -m FLINK_MASTER_URL.

Arrêter une session

Pour arrêter la session, obtenez l'ID d'application YARN de la session auprès de /tmp/.yarn-properties-${USER} ou la sortie de yarn application -list, puis exécutez l'une des commandes suivantes:

echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID

Exécuter des tâches Apache Beam

Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner.

Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:

  1. Tâches Java Beam
  2. Tâches portables Beam

Tâches Java Beam

Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.

L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec le composant Flink activé.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components : Flink
    • --image-version : La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc compatible
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes : permet d'activer l'accès aux API pour les services Cloud Platform du projet.
  2. Utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur le nœud maître du cluster FLink.

  3. Démarrez une session Flink YARN sur le nœud maître du cluster Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Notez la version de Flink sur votre cluster Dataproc.

    flink --version
    
  4. Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.

    Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.

    Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag <flink.artifact.name>. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueter l'exemple de décompte de mots.

    mvn package -Pflink-runner
    
  6. Importez le fichier uber JAR empaqueté word-count-beam-bundled-0.1.jar (~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utiliser gsutil cp pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.

    1. Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Arrêtez la session YARN Flink.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Tâches Beam portables

Pour exécuter des tâches Beam écrites en Python, Go et d'autres langages compatibles, vous pouvez utiliser FlinkRunner et PortableRunner, comme décrit dans l'exécuteur Flink de Beam (voir aussi Feuille de route de la portabilité).

L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec les composants Flink et Docker activés.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Remarques :

    • --optional-components: Flink et Docker.
    • --image-version: version de l'image du cluster, qui détermine la version de Flink installée sur le cluster (par exemple, les versions du composant Apache Flink répertoriées pour les dernières versions de versions d'image 2.0.x)
    • --region: région Dataproc disponible.
    • --enable-component-gateway : permet d'activer l'accès à l'interface utilisateur de Flink Job Manager.
    • --scopes: permet l'accès API aux services Cloud Platform du projet.
  2. Utilisez gsutil en local ou dans Cloud Shell pour créer un bucket Cloud Storage. Vous allez spécifier BUCKET_NAME lorsque vous exécuterez un exemple de programme de décompte de mots.

    gsutil mb BUCKET_NAME
    
  3. Dans une fenêtre de terminal de la VM du cluster, démarrez une session Flink YARN. Notez l'URL maître Flink, l'adresse du maître Flink où les tâches sont exécutées. Spécifiez FLINK_MASTER_URL lorsque vous exécutez un exemple de programme de décompte de mots.

    . /usr/bin/flink-yarn-daemon
    

    Affichez et notez la version de Flink qui exécute le cluster Dataproc. Spécifiez FLINK_VERSION lorsque vous exécutez un exemple de programme de décompte de mots.

    flink --version
    
  4. Installez les bibliothèques Python requises pour la tâche sur le nœud maître du cluster.

  5. Installez une version Beam compatible avec la version Flink du cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Exécutez l'exemple de comptage de mots sur le nœud maître du cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Remarques :

    • --runner : FlinkRunner.
    • --flink_version: FLINK_VERSION, noté plus tôt.
    • --flink_master: FLINK_MASTER_URL, noté plus tôt.
    • --flink_submit_uber_jar: utilisez le fichier Uber JAR pour exécuter la tâche Beam.
    • --output BUCKET_NAME, créé plus tôt.
  7. Vérifiez que les résultats ont bien été écrits dans votre bucket.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Arrêtez la session YARN Flink.

    1. Obtenez l'ID application.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Le composant Dataproc Flink est compatible avec les clusters kerberisés. Un ticket Kerberos valide est nécessaire pour envoyer et conserver une tâche Flink, ou pour démarrer un cluster Flink. Par défaut, un ticket Kerberos reste valide pendant sept jours.

L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Pour utiliser l'interface Web :

  1. Créez un cluster Dataproc avec la passerelle des composants activée.
  2. Une fois le cluster créé, cliquez sur le lien ResourceManagerYARN de la passerelle des composants dans l'onglet "Interface Web" de la page Détails du cluster dans la console Google Cloud.
  3. Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. Selon l'état d'avancement d'une tâche, un lien ApplicationMaster ou History est affiché.
  4. Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.