Composant Flink facultatif de Dataproc

Vous pouvez activer des composants supplémentaires tels que Flink lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page vous explique comment créer un cluster Dataproc avec le composant facultatif Apache Flink activé (un cluster Flink), puis exécuter des tâches Flink sur le cluster.

Vous pouvez utiliser votre cluster Flink pour:

  1. Exécutez des tâches Flink à l'aide de la ressource Jobs Dataproc depuis la console Google Cloud, la Google Cloud CLI ou l'API Dataproc.

  2. Exécutez des tâches Flink à l'aide de la CLI flink exécutée sur le nœud maître du cluster Flink.

  3. Exécuter des tâches Apache Beam sur Flink

  4. Exécuter Flink sur un cluster kerberisé

Vous pouvez utiliser la console Google Cloud, la Google Cloud CLI ou l'API Dataproc pour créer un cluster Dataproc sur lequel le composant Flink est activé.

Recommandation:Utilisez un cluster de VM à un maître standard avec le composant Flink. Les clusters Dataproc en mode haute disponibilité (avec trois VM maîtres) ne sont pas compatibles avec le mode haute disponibilité de Flink.

Pour créer un cluster Dataproc Flink à l'aide de la console Google Cloud, procédez comme suit:

  1. Ouvrez la page Dataproc Créer un cluster Dataproc sur Compute Engine.

    1. Le panneau Configurer un cluster est sélectionné.
      1. Dans la section Versioning (Gestion des versions), confirmez ou modifiez le type et la version de l'image. La version de l'image du cluster détermine la version du composant Flink installé sur le cluster.
        • La version de l'image doit être 1.5 ou ultérieure pour activer le composant Flink sur le cluster (consultez la section Versions Dataproc compatibles pour afficher la liste des versions de composant incluses dans chaque version d'image Dataproc).
        • La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API Dataproc Jobs (voir Exécuter des tâches Flink Dataproc).
      2. Dans la section Composants :
        1. Sous Passerelle des composants, sélectionnez Activer la passerelle des composants. Vous devez activer la passerelle des composants pour activer le lien de la passerelle des composants vers l'interface utilisateur du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.
        2. Sous Composants facultatifs, sélectionnez Flink et les autres composants facultatifs à activer sur votre cluster.
    2. Cliquez sur le panneau Personnaliser le cluster (facultatif).

      1. Dans la section Propriétés du cluster, cliquez sur Ajouter des propriétés pour chaque propriété de cluster facultative à ajouter à votre cluster. Vous pouvez ajouter des propriétés avec un préfixe flink pour configurer des propriétés Flink dans /etc/flink/conf/flink-conf.yaml qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster.

        Exemples :

        • Définissez flink:historyserver.archive.fs.dir pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink).
        • Définissez les emplacements de tâche Flink avec flink:taskmanager.numberOfTaskSlots=n.
      2. Dans la section Métadonnées de cluster personnalisées, cliquez sur Ajouter des métadonnées pour ajouter des métadonnées facultatives. Par exemple, ajoutez flink-start-yarn-session true pour exécuter le daemon YARN Flink (/usr/bin/flink-yarn-daemon) en arrière-plan sur le nœud maître du cluster afin de démarrer une session YARN Flink (voir Mode de session Flink).

    3. Si vous utilisez une image Dataproc version 2.0 ou antérieure, cliquez sur le panneau Gérer la sécurité (facultatif), puis sous Accès au projet, sélectionnez Enables the cloud-platform scope for this cluster. Le champ d'application cloud-platform est activé par défaut lorsque vous créez un cluster qui utilise la version 2.1 ou ultérieure de l'image Dataproc.

  2. Cliquez sur Créer pour créer le cluster.

Pour créer un cluster Dataproc Flink à l'aide de la CLI gcloud, exécutez la commande gcloud dataproc clusters create suivante en local dans une fenêtre de terminal ou dans Cloud Shell:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=DATAPROC_IMAGE_VERSION \
    --optional-components=FLINK \
    --enable-component-gateway \
    --properties=PROPERTIES
    ... other flags

Remarques :

  • CLUSTER_NAME: spécifiez le nom du cluster.
  • REGION: spécifiez une région Compute Engine dans laquelle se trouvera le cluster.
  • DATAPROC_IMAGE_VERSION: spécifiez éventuellement la version de l'image à utiliser sur le cluster. La version de l'image du cluster détermine la version du composant Flink installé sur le cluster.

    • La version de l'image doit être 1.5 ou ultérieure pour activer le composant Flink sur le cluster (consultez la section Versions Dataproc compatibles pour afficher la liste des versions de composant incluses dans chaque version d'image Dataproc).

    • La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API Dataproc Jobs (voir Exécuter des tâches Flink Dataproc).

  • --optional-components: vous devez spécifier le composant FLINK pour exécuter les jobs Flink et le service Web HistoryServer Flink sur le cluster.

  • --enable-component-gateway: vous devez activer la passerelle des composants pour activer le lien de la passerelle des composants vers l'interface utilisateur du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.

  • PROPERTIES. Vous pouvez également spécifier une ou plusieurs propriétés de cluster.

    • Lorsque vous créez des clusters Dataproc avec les versions d'image 2.0.67 et 2.1.15, vous pouvez utiliser l'indicateur --properties pour configurer des propriétés Flink dans /etc/flink/conf/flink-conf.yaml qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster.

    • Vous pouvez définir flink:historyserver.archive.fs.dir pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink).

    • Exemple pour plusieurs établissements:

    --properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
    
  • Autres options:

    • Vous pouvez ajouter l'option --metadata flink-start-yarn-session=true facultative pour exécuter le daemon YARN Flink (/usr/bin/flink-yarn-daemon) en arrière-plan sur le nœud maître du cluster afin de démarrer une session YARN Flink (voir la section Mode de session Flink).
  • Lorsque vous utilisez des versions d'image 2.0 ou antérieures, vous pouvez ajouter l'option --scopes=https://www.googleapis.com/auth/cloud-platform pour activer l'accès aux API Google Cloud par votre cluster (voir la section Bonnes pratiques concernant les champs d'application). Le champ d'application cloud-platform est activé par défaut lorsque vous créez un cluster qui utilise la version 2.1 ou ultérieure de l'image Dataproc.

Pour créer un cluster Dataproc Flink à l'aide de l'API Dataproc, envoyez une requête clusters.create, comme suit:

Remarques :

  • Définissez SoftwareConfig.Component sur FLINK.

  • Vous pouvez éventuellement définir SoftwareConfig.imageVersion pour spécifier la version d'image à utiliser sur le cluster. La version de l'image du cluster détermine la version du composant Flink installé sur le cluster.

    • La version de l'image doit être 1.5 ou ultérieure pour activer le composant Flink sur le cluster (consultez la section Versions Dataproc compatibles pour afficher la liste des versions de composant incluses dans chaque version d'image Dataproc).

    • La version de l'image doit être [TBD] ou ultérieure pour exécuter des tâches Flink via l'API Dataproc Jobs (voir Exécuter des tâches Flink Dataproc).

  • Définissez EndpointConfig.enableHttpPortAccess sur true pour activer le lien de la passerelle des composants vers l'UI du serveur d'historique Flink. L'activation de la passerelle des composants permet également d'accéder à l'interface Web du gestionnaire de tâches Flink exécutée sur le cluster Flink.

  • Vous pouvez éventuellement définir SoftwareConfig.properties pour spécifier une ou plusieurs propriétés de cluster.

    • Vous pouvez spécifier des propriétés Flink qui serviront de valeurs par défaut pour les applications Flink que vous exécutez sur le cluster. Par exemple, vous pouvez définir flink:historyserver.archive.fs.dir pour spécifier l'emplacement Cloud Storage dans lequel écrire les fichiers d'historique des tâches Flink (cet emplacement sera utilisé par le serveur d'historique Flink exécuté sur le cluster Flink).
  • Vous pouvez éventuellement définir les éléments suivants:

    • GceClusterConfig.metadata. Par exemple, pour spécifier flink-start-yarn-session true afin d'exécuter le daemon YARN Flink (/usr/bin/flink-yarn-daemon) en arrière-plan sur le nœud maître du cluster afin de démarrer une session YARN Flink (voir Mode de session Flink).
    • GceClusterConfig.serviceAccountScopes à https://www.googleapis.com/auth/cloud-platform (champ d'application cloud-platform) lorsque vous utilisez des versions d'image 2.0 ou antérieures pour autoriser votre cluster à accéder aux API (voir Bonnes pratiques concernant les champs d'application). Google Cloud Le champ d'application cloud-platform est activé par défaut lorsque vous créez un cluster qui utilise la version 2.1 ou ultérieure de l'image Dataproc.

Vous pouvez exécuter des jobs Flink à l'aide de la ressource Jobs Dataproc depuis la console Google Cloud, la Google Cloud CLI ou l'API Dataproc.

Pour envoyer un exemple de tâche de calcul du nombre de mots Flink à partir de la console:

  1. Ouvrez la page Dataproc Submit a job (Envoyer une tâche) dans la console Google Cloud de votre navigateur.

  2. Renseignez les champs de la page Envoyer une tâche:

    1. Sélectionnez le nom du cluster dans la liste des clusters.
    2. Définissez le champ Job type (Type de tâche) sur Flink.
    3. Définissez le champ Main class or jar (Classe principale ou fichier JAR) sur org.apache.flink.examples.java.wordcount.WordCount.
    4. Définissez Fichiers JAR sur file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// désigne un fichier situé sur le cluster. Dataproc a installé WordCount.jar lors de la création du cluster Flink.
      • Ce champ accepte également un chemin d'accès à Cloud Storage (gs://BUCKET/JARFILE) ou un chemin d'accès au système de fichiers distribué Hadoop (HDFS, Hadoop Distributed File System) (hdfs://PATH_TO_JAR).
  3. Cliquez sur Envoyer.

    • Le résultat du pilote de tâches s'affiche sur la page Informations sur la tâche.
    • Les tâches Flink sont répertoriées sur la page Jobs (Tâches) de Dataproc dans la console Google Cloud.
    • Cliquez sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche pour arrêter ou supprimer une tâche.

Pour envoyer une tâche Flink à un cluster Flink Dataproc, exécutez la commande de gcloud CLI gcloud dataproc jobs submit en local dans une fenêtre de terminal ou dans Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Remarques :

  • CLUSTER_NAME: spécifiez le nom du cluster Dataproc Flink auquel envoyer la tâche.
  • REGION: spécifiez une région Compute Engine dans laquelle se trouve le cluster.
  • MAIN_CLASS: spécifiez la classe main de votre application Flink, par exemple :
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: spécifiez le fichier JAR de l'application Flink. Vous pouvez spécifier les éléments suivants :
    • Un fichier JAR installé sur le cluster, à l'aide du préfixe file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Fichier JAR dans Cloud Storage : gs://BUCKET/JARFILE
    • Fichier JAR dans HDFS : hdfs://PATH_TO_JAR
  • JOB_ARGS: vous pouvez ajouter des arguments de tâche après le double tiret (--).

  • Une fois la tâche envoyée, le résultat du pilote de tâches s'affiche dans le terminal local ou Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

Cette section explique comment envoyer une tâche Flink à un cluster Flink Dataproc à l'aide de l'API jobs.submit de Dataproc.

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • PROJECT_ID: Google Cloud ID du projet
  • REGION : région du cluster
  • CLUSTER_NAME: spécifiez le nom du cluster Dataproc Flink auquel envoyer la tâche.

Méthode HTTP et URL :

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corps JSON de la requête :

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Pour envoyer votre requête, développez l'une des options suivantes :

Enregistrez le corps de la requête dans un fichier nommé request.json, puis exécutez la commande suivante :

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit"

Enregistrez le corps de la requête dans un fichier nommé request.json, puis exécutez la commande suivante :

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit" | Select-Object -Expand Content

Vous devriez recevoir une réponse JSON de ce type :

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Les tâches Flink sont répertoriées sur la page Jobs (Tâches) de Dataproc dans la console Google Cloud.
  • Vous pouvez cliquer sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche de la console Google Cloud pour arrêter ou supprimer une tâche.

Au lieu d'exécuter des tâches Flink à l'aide de la ressource Jobs Dataproc, vous pouvez exécuter des tâches Flink sur le nœud maître de votre cluster Flink à l'aide de la CLI flink.

Les sections suivantes décrivent différentes manières d'exécuter une tâche de CLI flink sur votre cluster Dataproc Flink.

  1. Se connecter en SSH au nœud maître:utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur la VM maître du cluster.

  2. Définir le chemin d'accès aux classes:initialisez le chemin d'accès aux classes Hadoop à partir de la fenêtre du terminal SSH sur la VM maître du cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Exécuter des tâches Flink:vous pouvez exécuter des tâches Flink dans différents modes de déploiement sur YARN: application, par tâche et session.

    1. Mode application:le mode application Flink est compatible avec la version 2.0 de l'image Dataproc et les versions ultérieures. Ce mode exécute la méthode main() de la tâche sur le gestionnaire de tâches YARN. Le cluster s'arrête une fois la tâche terminée.

      Exemple d'envoi d'une tâche:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Répertorier les tâches en cours d'exécution:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Pour annuler une tâche en cours d'exécution:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Mode par tâche:ce mode Flink exécute la méthode main() de la tâche côté client.

      Exemple d'envoi d'une tâche:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Mode session:démarrez une session Flink YARN de longue durée, puis envoyez-y une ou plusieurs tâches.

      1. Démarrer une session:vous pouvez démarrer une session Flink de l'une des manières suivantes:

        1. Créez un cluster Flink en ajoutant l'option --metadata flink-start-yarn-session=true à la commande gcloud dataproc clusters create (voir Créer un cluster Flink Dataproc). Lorsque cet indicateur est activé, une fois le cluster créé, Dataproc exécute /usr/bin/flink-yarn-daemon pour démarrer une session Flink sur le cluster.

          L'ID d'application YARN de la session est enregistré dans /tmp/.yarn-properties-${USER}. Vous pouvez lister l'ID à l'aide de la commande yarn application -list.

        2. Exécutez le script Flink yarn-session.sh, qui est préinstallé sur la VM maître du cluster, avec des paramètres personnalisés:

          Exemple avec des paramètres personnalisés:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Exécutez le script de wrapper /usr/bin/flink-yarn-daemon Flink avec les paramètres par défaut:

          . /usr/bin/flink-yarn-daemon
          
      2. Envoyer une tâche à une session:exécutez la commande suivante pour envoyer une tâche Flink à la session.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: URL, y compris l'hôte et le port, de la VM maître Flink où les tâches sont exécutées. Supprimez http:// prefix de l'URL. Cette URL est listée dans la résultat de la commande lorsque vous démarrez une session Flink. Vous pouvez exécuter la commande suivante pour lister cette URL dans le champ Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Lister les tâches dans une session:pour lister les tâches Flink dans une session, procédez comme suit:

        • Exécutez flink list sans arguments. La commande recherche l'ID d'application YARN de la session dans /tmp/.yarn-properties-${USER}.

        • Obtenez l'ID application YARN de la session à partir de /tmp/.yarn-properties-${USER} ou de la sortie de yarn application -list, puis exécutez <code>flink list -yid YARN_APPLICATION_ID.

        • Exécutez flink list -m FLINK_MASTER_URL.

      4. Arrêter une session:pour arrêter la session, obtenez l'ID d'application YARN de la session à partir de /tmp/.yarn-properties-${USER} ou de la sortie de yarn application -list, puis exécutez l'une des commandes suivantes:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner.

Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:

  1. Tâches Java Beam
  2. Tâches portables Beam

Tâches Java Beam

Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.

L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec le composant Flink activé.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components : Flink
    • --image-version : La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc compatible
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes: activez l'accès aux API Google Cloud par votre cluster (voir la section Bonnes pratiques concernant les champs d'application). Le champ d'application cloud-platform est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre d'indicateur) lorsque vous créez un cluster qui utilise la version d'image Dataproc 2.1 ou ultérieure.
  2. Utilisez l'utilitaire SSH pour ouvrir une fenêtre de terminal sur le nœud maître du cluster Flink.

  3. Démarrez une session YARN Flink sur le nœud maître du cluster Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Notez la version de Flink sur votre cluster Dataproc.

    flink --version
    
  4. Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.

    Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Consultez le tableau Compatibilité avec les versions de Flink qui répertorie la compatibilité des versions de Beam-Flink.

    Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag <flink.artifact.name>. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueter l'exemple de décompte de mots.

    mvn package -Pflink-runner
    
  6. Importez le fichier uber JAR empaqueté word-count-beam-bundled-0.1.jar (~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utiliser gcloud storage cp pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.

    1. Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Arrêtez la session YARN Flink.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Tâches Beam portables

Pour exécuter des tâches Beam écrites en Python, Go et d'autres langages compatibles, vous pouvez utiliser FlinkRunner et PortableRunner, comme décrit dans l'exécuteur Flink de Beam (voir aussi Feuille de route de la portabilité).

L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec les composants Flink et Docker activés.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Remarques :

    • --optional-components: Flink et Docker.
    • --image-version: version de l'image du cluster, qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc disponible.
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes: activez l'accès aux API Google Cloud par votre cluster (voir la bonne pratique concernant les champs d'application). Le champ d'application cloud-platform est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre d'indicateur) lorsque vous créez un cluster qui utilise la version d'image Dataproc 2.1 ou ultérieure.
  2. Utilisez gcloud CLI localement ou dans Cloud Shell pour créer un bucket Cloud Storage. Vous spécifierez BUCKET_NAME lorsque vous exécuterez un exemple de programme de comptage de mots.

    gcloud storage buckets create BUCKET_NAME
    
  3. Dans une fenêtre de terminal sur la VM du cluster, démarrez une session YARN Flink. Notez l'URL du maître Flink, l'adresse du maître Flink où les tâches sont exécutées. Vous spécifierez FLINK_MASTER_URL lorsque vous exécuterez un exemple de programme de comptage de mots.

    . /usr/bin/flink-yarn-daemon
    

    Affichez et notez la version de Flink qui exécute le cluster Dataproc. Vous spécifierez FLINK_VERSION lorsque vous exécuterez un exemple de programme de comptage de mots.

    flink --version
    
  4. Installez les bibliothèques Python nécessaires pour exécuter la tâche sur le nœud maître du cluster.

  5. Installez une version de Beam compatible avec la version de Flink sur le cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Exécutez l'exemple de décompte de mots sur le nœud maître du cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Remarques :

    • --runner : FlinkRunner.
    • --flink_version: FLINK_VERSION, comme indiqué précédemment.
    • --flink_master: FLINK_MASTER_URL, comme indiqué précédemment.
    • --flink_submit_uber_jar: utilisez le fichier Uber JAR pour exécuter la tâche Beam.
    • --output: BUCKET_NAME, créé précédemment.
  7. Vérifiez que les résultats ont bien été écrits dans votre bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Arrêtez la session YARN Flink.

    1. Obtenez l'ID application.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Le composant Dataproc Flink est compatible avec les clusters kerberisés. Une demande Kerberos valide est nécessaire pour envoyer et conserver une tâche Flink, ou pour démarrer un cluster Flink. Par défaut, un ticket Kerberos reste valide pendant sept jours.

L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Pour utiliser l'interface Web:

  1. Créez un cluster Dataproc Flink.
  2. Après la création du cluster, cliquez sur le lien du ResourceManager YARN de la passerelle des composants dans l'onglet "Interface Web" de la page Détails du cluster de Google Cloud Console.
  3. Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. En fonction de l'état d'exécution d'une tâche, un lien ApplicationMaster ou History (Historique) est répertorié.
  4. Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.