Composant Flink facultatif de Dataproc

Vous pouvez activer des composants supplémentaires tels que Flink lorsque vous créez une instance Dataproc à l'aide de la commande Composants facultatifs . Cette page explique comment créer un cluster Dataproc avec le kit Apache Flink un composant facultatif activé (cluster Flink), puis exécuter des jobs Flink sur le cluster.

Vous pouvez utiliser votre cluster Flink pour:

  1. Exécuter des jobs Flink à l'aide de la ressource Dataproc Jobs depuis la console Google Cloud, Google Cloud CLI ou l'API Dataproc.

  2. Exécuter des jobs Flink à l'aide de la CLI flink exécuté sur le nœud maître du cluster Flink.

  3. Exécuter des jobs Apache Beam sur Flink

  4. Exécuter Flink sur un cluster kerberisé

Vous pouvez utiliser la console Google Cloud, Google Cloud CLI ou l'API Dataproc API permettant de créer un cluster Dataproc comportant le composant Flink activée sur le cluster.

Recommandation:Utilisez un cluster de VM standard à un maître avec le composant Flink. Clusters en mode haute disponibilité Dataproc (avec trois VM maîtres) ne sont pas compatibles Mode haute disponibilité Flink :

Vous pouvez exécuter des jobs Flink à l'aide de la ressource Dataproc Jobs à partir de la console Google Cloud, la Google Cloud CLI ou l'API Dataproc.

Console

Pour envoyer un exemple de job de décompte de mots Flink à partir de la console:

  1. Ouvrir Dataproc la page Envoyer une tâche dans le la console Google Cloud dans votre navigateur.

  2. Renseignez les champs de la page Envoyer une tâche:

    1. Sélectionnez le nom du cluster dans la liste des clusters.
    2. Définissez le champ Job type (Type de job) sur Flink.
    3. Définissez le champ Main class or jar (Classe principale ou fichier JAR) sur org.apache.flink.examples.java.wordcount.WordCount.
    4. Définissez Fichiers JAR sur file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// indique un fichier situé sur le cluster. Dataproc installé WordCount.jar lors de la création du cluster Flink.
      • Ce champ accepte aussi un chemin d'accès Cloud Storage (gs://BUCKET/JARFILE) ou un Chemin d'accès HDFS (Hadoop Distributed File System) (hdfs://PATH_TO_JAR).
  3. Cliquez sur Envoyer.

    • Les résultats du pilote de tâches s'affichent sur la page Informations sur la tâche.
    • Les jobs Flink sont répertoriés sur la page Page Jobs de Dataproc dans la console Google Cloud.
    • Cliquez sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche. pour arrêter ou supprimer un job.

gcloud

Pour envoyer un job Flink à un cluster Dataproc Flink, exécutez la gcloud CLI gcloud dataproc jobs submit. en local dans une fenêtre de terminal ou dans Cloud Shell :

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Remarques :

  • CLUSTER_NAME: spécifiez le nom du Flink Dataproc. cluster auquel envoyer le job.
  • REGION: spécifiez une région Compute Engine. l'emplacement du cluster.
  • MAIN_CLASS: spécifiez la classe main de votre Application Flink, par exemple: <ph type="x-smartling-placeholder">
      </ph>
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: spécifiez le fichier JAR de l'application Flink. Vous pouvez spécifier les informations suivantes: <ph type="x-smartling-placeholder">
      </ph>
    • Un fichier JAR installé sur le cluster, à l'aide de la commande file:/// préfixe:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Un fichier JAR dans Cloud Storage: gs://BUCKET/JARFILE
    • Un fichier JAR dans HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: vous pouvez éventuellement ajouter des arguments pour la tâche après le double tiret (--).

  • Après l'envoi de la tâche, les résultats du pilote s'affichent dans le local ou Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)
    

REST

Cette section explique comment envoyer un job Flink à un job Flink Dataproc à l'aide de Dataproc jobs.submit

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • PROJECT_ID : ID de projet Google Cloud
  • REGION : région du cluster
  • CLUSTER_NAME: spécifiez le nom du cluster Dataproc Flink auquel envoyer le job.

Méthode HTTP et URL :

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corps JSON de la requête :

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

<ph type="x-smartling-placeholder">
</ph>
{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Les jobs Flink sont répertoriés sur la page Page Jobs de Dataproc dans la console Google Cloud.
  • Vous pouvez cliquer sur Arrêter ou Supprimer sur la page Tâches ou Informations sur la tâche. dans la console Google Cloud pour arrêter ou supprimer un job.

Au lieu de Exécuter des jobs Flink à l'aide de la ressource Dataproc Jobs vous pouvez exécuter des jobs Flink sur le nœud maître de votre cluster Flink à l'aide de la CLI flink.

Les sections suivantes décrivent différentes manières d'exécuter une tâche de CLI flink sur votre cluster Dataproc Flink.

  1. Connectez-vous en SSH au nœud maître:utilisez la méthode SSH pour ouvrir une fenêtre de terminal sur la VM maître du cluster.

  2. Définissez le chemin d'accès des classes:initialisez le chemin de classe Hadoop à partir de la fenêtre du terminal SSH sur le VM maître du cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Exécuter des tâches Flink:vous pouvez exécuter des tâches Flink dans différents Modes de déploiement sur YARN: mode application, par tâche et session.

    1. Mode application:le mode Application Flink est compatible avec les images Dataproc version 2.0 et ultérieures. Ce mode exécute la méthode main() de la tâche sur le gestionnaire de tâches YARN. Le cluster s'arrête une fois le job terminé.

      Exemple d'envoi de job:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Répertoriez les jobs en cours d'exécution:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Annulez une tâche en cours d'exécution:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Mode par tâche:ce mode Flink exécute la méthode main() du job sur le côté client.

      Exemple d'envoi de job:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Mode session:démarrez une session Flink YARN de longue durée, puis envoyez une ou plusieurs tâches à la session.

      1. Démarrer une session:vous pouvez démarrer une session Flink dans l'un des de différentes manières:

        1. Créez un cluster Flink en ajoutant le --metadata flink-start-yarn-session=true au paramètre gcloud dataproc clusters create (voir Créer un cluster Dataproc Flink). Avec ce drapeau une fois le cluster créé, Dataproc exécute /usr/bin/flink-yarn-daemon pour démarrer une session Flink sur le cluster.

          L'ID application YARN de la session est enregistré dans /tmp/.yarn-properties-${USER}. Vous pouvez répertorier l'ID à l'aide de la commande yarn application -list.

        2. Exécuter Flink yarn-session.sh pré-installé sur la VM maître du cluster, avec des paramètres personnalisés:

          Exemple avec des paramètres personnalisés:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Exécutez le script wrapper Flink de /usr/bin/flink-yarn-daemon avec paramètres par défaut:

          . /usr/bin/flink-yarn-daemon
          
      2. Envoyer un job à une session:exécutez la commande suivante pour envoyer un Appliquez un clin d'œil à la session.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: URL, y compris l'hôte et le port de la VM maître Flink où les jobs sont exécutés. Supprimer http:// prefix de l'URL. Cette URL est répertoriée dans le résultat de la commande lorsque vous démarrez une Session Flink. Vous pouvez exécuter la commande suivante pour répertorier cette URL dans le champ Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Répertorier les tâches au cours d'une session : pour répertorier les tâches Flink d'une session, effectuez l'une des opérations suivantes : les éléments suivants:

        • Exécutez flink list sans arguments. La commande recherche l'ID application YARN de la session dans /tmp/.yarn-properties-${USER}.

        • Obtenez l'ID application YARN de la session à partir de /tmp/.yarn-properties-${USER} ou la sortie de yarn application -list, puis exécutez <code>flink list -yid YARN_APPLICATION_ID.

        • Exécutez flink list -m FLINK_MASTER_URL.

      4. Stop a session (Arrêter une session) : pour arrêter la session, obtenez l'ID application YARN. de la session de /tmp/.yarn-properties-${USER} ou la sortie de yarn application -list, puis exécutez l'une des commandes suivantes:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Vous pouvez exécuter des tâches Apache Beam sur Dataproc à l'aide de la commande FlinkRunner.

Vous pouvez exécuter des tâches Beam sur Flink de différentes manières:

  1. Tâches Java Beam
  2. Tâches portables Beam

Tâches Java Beam

Empaquetez vos tâches Beam dans un fichier JAR. Fournissez le fichier JAR groupé avec les dépendances nécessaires à l'exécution de la tâche.

L'exemple suivant exécute une tâche Java Beam à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec le composant Flink activé.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components : Flink
    • --image-version : La version de l'image du cluster qui détermine la version Flink installée sur le cluster (par exemple, consultez les versions des composants Apache Flink répertoriées pour les quatre versions suivantes et précédentes des Versions 2.0.x de l'image).
    • --region: région Dataproc compatible
    • --enable-component-gateway: permet d'accéder à l'interface utilisateur du gestionnaire de tâches Flink.
    • --scopes: permet à votre cluster d'accéder aux API Google Cloud. (voir Bonnes pratiques concernant les niveaux d'accès). Le niveau d'accès "cloud-platform" est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre) lorsque vous créez un cluster qui utilise la version d'image 2.1 ou ultérieure de Dataproc.
  2. Utilisez l'utilitaire SSH. pour ouvrir une fenêtre de terminal sur le nœud maître du cluster Flink.

  3. Démarrer une session Flink YARN sur le maître de cluster Dataproc d'un nœud.

    . /usr/bin/flink-yarn-daemon
    

    Notez la version de Flink sur votre cluster Dataproc.

    flink --version
    
  4. Sur votre ordinateur local, générez l'exemple canonique de nombre de mots Beam en Java.

    Choisissez une version de Beam compatible avec la version de Flink de votre cluster Dataproc. Voir la page Compatibilité des versions Flink qui liste la compatibilité des versions Beam-Flink.

    Ouvrez le fichier POM généré. Vérifiez la version de l'exécuteur Beam Flink spécifiée par le tag <flink.artifact.name>. Si la version de l'exécuteur Beam Flink dans le nom de l'artefact Flink ne correspond pas à la version Flink de votre cluster, mettez à jour le numéro de version.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Empaqueter l'exemple de décompte de mots.

    mvn package -Pflink-runner
    
  6. Importez le fichier uber JAR empaqueté word-count-beam-bundled-0.1.jar (~135 Mo) sur le nœud maître de votre cluster Dataproc. Vous pouvez utiliser gcloud storage cp pour accélérer les transferts de fichiers vers votre cluster Dataproc à partir de Cloud Storage.

    1. Sur votre terminal local, créez un bucket Cloud Storage et importez le fichier Uber JAR.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sur le nœud maître de Dataproc, téléchargez le fichier Uber JAR.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Exécutez la tâche Java Beam sur le nœud maître du cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Vérifiez que les résultats ont bien été écrits dans votre bucket Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Arrêtez la session YARN Flink.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Tâches Beam portables

Pour exécuter des jobs Beam écrits en Python, Go et dans d'autres langages compatibles, vous pouvez : utilisez FlinkRunner et PortableRunner comme décrit sur le schéma Flink Runner (consultez également la page Feuille de route du framework de portabilité).

L'exemple suivant exécute une tâche Beam portable en Python à partir du nœud maître du cluster Dataproc.

  1. Créez un cluster Dataproc avec les composants Flink et Docker activés.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Remarques :

    • --optional-components: Flink et Docker.
    • --image-version: version de l'image du cluster qui détermine la version de Flink installée sur le cluster (par exemple, consultez la liste des versions des composants Apache Flink pour les versions les plus récentes et les plus récentes quatre versions d'image 2.0.x).
    • --region: région Dataproc disponible.
    • --enable-component-gateway: active l'accès à l'interface utilisateur de Flink Job Manager.
    • --scopes: permet à votre cluster d'accéder aux API Google Cloud (voir Bonnes pratiques concernant les niveaux d'accès). Le niveau d'accès "cloud-platform" est activé par défaut (vous n'avez pas besoin d'inclure ce paramètre) lorsque vous créez un cluster qui utilise la version d'image 2.1 ou ultérieure de Dataproc.
  2. Utiliser la gcloud CLI en local ou dans Cloud Shell pour créer bucket Cloud Storage. Vous devez spécifier l'élément BUCKET_NAME. lorsque vous exécutez un exemple de programme de décompte de mots.

    gcloud storage buckets create BUCKET_NAME
    
  3. Dans une fenêtre de terminal de la VM du cluster, démarrez une session Flink YARN. Notez l'URL du maître Flink, qui correspond à l'adresse du maître Flink. où les jobs sont exécutés. Vous spécifierez FLINK_MASTER_URL lorsque vous exécuter un exemple de programme de décompte de mots.

    . /usr/bin/flink-yarn-daemon
    

    Affichez et notez la version de Flink exécutant Dataproc. cluster. Vous spécifierez FLINK_VERSION lorsque vous exécuter un exemple de programme de décompte de mots.

    flink --version
    
  4. Installez les bibliothèques Python nécessaires à ce job sur le nœud maître du cluster.

  5. Installez un Version du Beam compatible avec la version de Flink du cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Exécuter l'exemple de nombre de mots sur le maître de cluster d'un nœud.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Remarques :

    • --runner : FlinkRunner.
    • --flink_version: FLINK_VERSION, noté précédemment.
    • --flink_master: FLINK_MASTER_URL, noté précédemment.
    • --flink_submit_uber_jar: utilisez l'uber JAR pour exécuter le job Beam.
    • --output: BUCKET_NAME, créé précédemment.
  7. Vérifiez que les résultats ont bien été écrits dans votre bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Arrêtez la session YARN Flink.

    1. Permet d'obtenir l'ID application.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Le composant Dataproc Flink est compatible avec les clusters kerberisés. Un ticket Kerberos valide est nécessaire pour envoyer et conserver un job Flink, ou pour démarrer un cluster Flink. Par défaut, un ticket Kerberos reste valide pendant sept jours.

L'interface Web du gestionnaire de tâches Flink est disponible lorsqu'une tâche Flink ou un cluster de session Flink est en cours d'exécution. Pour utiliser l'interface Web:

  1. Créez un cluster Dataproc Flink.
  2. Une fois le cluster créé, cliquez sur la passerelle des composants. Lien ResourceManager YAML dans l'onglet "Interface Web" de la page Détails du cluster dans la console Google Cloud.
  3. Dans l'interface utilisateur YARN Resource Manager, identifiez l'entrée de l'application de cluster Flink. Selon l'état d'avancement d'une tâche, un objet ApplicationMaster ou Historique s'affiche.
  4. Pour une tâche de streaming de longue durée, cliquez sur le lien ApplicationManager pour ouvrir le tableau de bord Flink. Pour une tâche terminée, cliquez sur le lien Historique pour afficher les détails de la tâche.