Composant Dataproc Flink

Vous pouvez installer des composants supplémentaires lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page décrit le composant Flink.

Le composant Dataproc Flink installe Apache Flink sur un cluster Dataproc.

Installer le composant

Installez le composant lorsque vous créez un cluster Dataproc. Le composant Dataproc Flink peut être installé sur les clusters créés avec la version 1.5 ou ultérieure de l'image Dataproc.

La version de l'image du cluster détermine la version du composant Flink installé sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes desVersions 2.0.x de l'image). Consultez la section Versions Dataproc compatibles pour obtenir la version de composant incluse dans chaque version d'image Dataproc.

Commande gcloud

Pour créer un cluster Dataproc incluant le composant Flink, exécutez la commande gcloud dataproc clusters create cluster-name avec l'option --optional-components.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

Remarque : Étant donné qu'une session YARN Flink consomme un volume important de ressources YARN, par défaut, Dataproc ne démarre pas de session Flink au démarrage du cluster Dataproc. Vous pouvez commencer une session lorsque vous démarrez votre cluster Flink en ajoutant l'option --metadata flink-start-yarn-session=true à la commande gcloud dataproc clusters create.

API REST

Le composant Flink peut être spécifié via l'API Dataproc à l'aide de la propriété SoftwareConfig.Component dans le cadre d'une requête clusters.create. Le champ SoftwareConfig.imageVersion permet de définir la version de l'image du cluster.

Console

  1. Activez le composant et la passerelle des composants.
    • Dans Cloud Console, ouvrez la page Dataproc Créer un cluster. Le panneau "Configurer un cluster" est sélectionné.
    • Dans la section "Gestion des versions", confirmez ou modifiez le type et la version de l'image.
    • Dans la section Composants :

Après avoir démarré un cluster Dataproc avec Flink, connectez-vous en SSH au nœud maître du cluster Dataproc, puis exécutez des tâches Flink.

Exemple :

Exécutez une seule tâche Flink. Une fois la tâche acceptée, Flink démarre un gestionnaire de tâches et des emplacements pour la tâche dans YARN. La tâche Flink sera exécutée dans le cluster YARN jusqu'à ce que l'opération soit terminée. Le gestionnaire de tâches est arrêté une fois la tâche terminée. Les journaux des tâches sont disponibles dans les journaux YARN.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

Exemple :

Démarrez une session Flink YARN de longue durée, puis exécutez une tâche.

Démarrez la session sur le nœud maître du cluster Dataproc. Vous pouvez également démarrer une session YARN Flink lorsque vous créez le cluster Flink à l'aide de l'option gcloud dataproc clusters create --metadata flink-start-yarn-session=true.

. /usr/bin/flink-yarn-daemon

Notez l'hôte et le port dans FLINK_MASTER_URL une fois la session démarrée avec succès. Dans la commande suivante, remplacez JOB_MANAGER_HOSTNAME et REST_API_PORT par ces éléments. Exécutez la tâche :

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

Pour arrêter la session, remplacez APPLICATION_ID par l'ID de l'application associé à la session Flink YARN disponible dans l'interface utilisateur du gestionnaire de tâches Flink ou à partir du résultat de yarn application -list. Exécutez ensuite la commande ci-dessous:

yarn application -kill APPLICATION_ID

Exécuter des tâches Apache Beam

Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner.

Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:

  1. Tâches Java Beam
  2. Tâches portables Beam

Tâches Java Beam

Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.

L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec le composant Flink activé.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components : Flink
    • --image-version : La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc compatible
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes: active l'accès API aux services GCP dans le même projet.
  2. Se connecter en SSH au nœud maître du cluster Dataproc

  3. Démarrez une session YARN Flink sur le nœud maître du cluster Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Notez la version de Flink sur votre cluster Dataproc.

    flink --version
    
  4. Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.

    Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.

    Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag <flink.artifact.name>. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueter l'exemple de décompte de mots.

    mvn package -Pflink-runner
    
  6. Importez le fichier uber JAR empaqueté word-count-beam-bundled-0.1.jar (~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utiliser gsutil cp pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.

    1. Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Arrêtez la session YARN Flink.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Tâches Beam portables

Pour exécuter des tâches Beam écrites en Python, Go et d'autres langages compatibles, vous pouvez utiliser FlinkRunner et PortableRunner, comme décrit dans l'exécuteur Flink de Beam (voir aussi Feuille de route de la portabilité).

L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec les composants Flink et Docker activés.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink et Docker.
    • --image-version : La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc compatible
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes: active l'accès API aux services GCP dans le même projet.
  2. Se connecter en SSH au nœud maître du cluster Dataproc

  3. Créez un bucket Cloud Storage.

    gsutil mb BUCKET_NAME
    
  4. Démarrez une session YARN Flink sur le nœud maître du cluster Dataproc et enregistrez l'URL du maître Flink au démarrage de la session.

    . /usr/bin/flink-yarn-daemon
    

    Notez la version de Flink sur votre cluster Dataproc.

    flink --version
    
  5. Installez les bibliothèques Python nécessaires pour exécuter la tâche sur le nœud maître du cluster Dataproc.

    Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Exécutez l'exemple de comptage de mots sur le nœud maître du cluster Dataproc.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner (Obligatoire) : FlinkRunner.
    • --flink_version (obligatoire): version de Flink.
    • --flink_master (obligatoire): adresse du maître Flink dans lequel la tâche sera exécutée.
    • --flink_submit_uber_jar (obligatoire): utilisez le fichier Uber JAR pour exécuter la tâche Beam.
    • --output (obligatoire): où la sortie doit être écrite.
  7. Vérifiez que les résultats ont bien été écrits dans votre bucket.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Arrêtez la session YARN Flink.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Le composant Dataproc Flink est compatible avec les clusters kerberisés. Une demande Kerberos valide est nécessaire pour envoyer et conserver une tâche Flink, ou pour démarrer un cluster Flink. Par défaut, une demande reste valide pendant sept jours.

L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Vous pouvez ouvrir l'interface utilisateur du gestionnaire de tâches Flink à partir de l'application maître de l'application Flink dans YARN.

Pour activer et utiliser l'accès à l'interface utilisateur, procédez comme suit :

  1. Créez le cluster Dataproc avec la passerelle des composants activée.
  2. Après la création du cluster, cliquez sur le lien du ResourceManager YARN de la passerelle des composants dans l'onglet "Interface Web" de la page Détails du cluster de Google Cloud Console.
  3. Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. En fonction de l'état d'exécution de votre tâche, un lien ApplicationMaster ou History (Historique) est répertorié.
  4. Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.