Componente Flink facoltativo di Dataproc

Puoi attivare componenti aggiuntivi come Flink quando crei un cluster Dataproc utilizzando la funzionalità Componenti facoltativi. Questa pagina mostra come creare un cluster Dataproc con Apache Flink componente facoltativo attivato (un cluster Flink) ed eseguire job Flink sul cluster.

Puoi utilizzare il tuo cluster Flink per:

  1. Esegui job Flink utilizzando la risorsa Dataproc Jobs dalla console Google Cloud, da Google Cloud CLI o dall'API Dataproc.

  2. Esegui i job Flink utilizzando l'interfaccia a riga di comando flink in esecuzione sul nodo master del cluster Flink.

  3. Esegui job Apache Beam su Flink

  4. Esegui Flink su un cluster Kerberized

Puoi utilizzare la console Google Cloud, Google Cloud CLI o Dataproc API per creare un cluster Dataproc con il componente Flink sia attivata sul cluster.

Suggerimento: utilizza un cluster VM standard a 1 master con il componente Flink. I cluster Dataproc in modalità ad alta disponibilità (con 3 VM master) non supportano la modalità ad alta disponibilità di Flink.

Puoi eseguire job Flink utilizzando la risorsa Jobs Dataproc dalla console Google Cloud, da Google Cloud CLI o dall'API Dataproc.

Console

Per inviare un job di conteggio delle parole Flink di esempio dalla console:

  1. Apri Dataproc Nella pagina Invia un'offerta di lavoro nella console Google Cloud nel tuo browser.

  2. Compila i campi nella pagina Invia un job:

    1. Seleziona il nome del tuo cluster dall'elenco dei cluster.
    2. Imposta Tipo di job su Flink.
    3. Imposta Classe principale o jar su org.apache.flink.examples.java.wordcount.WordCount.
    4. Imposta File jar su file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// indica un file situato nel cluster. Dataproc ha installato WordCount.jar durante la creazione del cluster Flink.
      • Questo campo accetta anche un percorso Cloud Storage (gs://BUCKET/JARFILE) o un percorso Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR).
  3. Fai clic su Invia.

    • L'output del driver del job viene visualizzato nella pagina Dettagli job.
    • I job Flink sono elencati nella Pagina Job di Dataproc nella console Google Cloud.
    • Fai clic su Arresta o Elimina dalla pagina Job o Dettagli job. per arrestare o eliminare un job.

gcloud

Per inviare un job Flink a un cluster Flink Dataproc, esegui il comando gcloud CLI gcloud dataproc jobs submit localmente in una finestra del terminale o in Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Note:

  • CLUSTER_NAME: specifica il nome del cluster Flink Dataproc a cui inviare il job.
  • REGION: specifica una regione di Compute Engine in cui si trova il cluster.
  • MAIN_CLASS: specifica la classe main della tua applicazione Flink, ad esempio:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: specifica il file jar dell'applicazione Flink. Puoi specificare:
    • Un file jar installato nel cluster, utilizzando il prefisso file:///`:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Un file jar in Cloud Storage: gs://BUCKET/JARFILE
    • Un file jar in HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: se vuoi, aggiungi argomenti del job dopo il doppio trattino (--).

  • Dopo aver inviato il job, l'output del driver del job viene visualizzato locale o nel terminale Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

Questa sezione mostra come inviare un job Flink a un Flink Dataproc. con Dataproc utilizzando jobs.submit.

Prima di utilizzare i dati della richiesta, effettua le seguenti sostituzioni:

  • PROJECT_ID: ID progetto Google Cloud
  • REGION: regione del cluster
  • CLUSTER_NAME: specifica il nome del cluster Dataproc Flink a cui inviare il job

Metodo HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corpo JSON della richiesta:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Per inviare la richiesta, espandi una di queste opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • I job Flink sono elencati nella Pagina Job di Dataproc nella console Google Cloud.
  • Puoi fare clic su Arresta o Elimina dalla pagina Job o Dettagli job. nella console Google Cloud per arrestare o eliminare un job.

Invece di l'esecuzione di job Flink utilizzando la risorsa Dataproc Jobs, puoi eseguire job Flink sul nodo master del tuo cluster Flink utilizzando l'interfaccia a riga di comando flink.

Le sezioni seguenti descrivono diversi modi per eseguire un job flink CLI sul tuo cluster Flink di Dataproc.

  1. Accedi al nodo principale tramite SSH: utilizza l'utilità SSH per aprire una finestra del terminale sulla VM principale del cluster.

  2. Imposta il percorso di classe: inizializza il percorso di classe Hadoop dalla finestra del terminale SSH sulla VM master del cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Esegui job Flink: puoi eseguire job Flink in diverse modalità di deployment su YARN: modalità applicazione, per job e sessione.

    1. Modalità applicazione: La modalità applicazione Flink è supportata da Dataproc Image 2.0 e versioni successive. Questa modalità esegue il metodo main() del job nel gestore job YARN. Il cluster si arresta al termine del job.

      Esempio di invio del job:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Elenca i job in esecuzione:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Annulla un job in esecuzione:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modalità per job: questa modalità Flink esegue il metodo main() del job lato client.

      Esempio di invio di un job:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Modalità sessione: avvia una sessione Flink YARN di lunga durata, quindi invia uno o più job alla sessione.

      1. Avvia una sessione:puoi avviare una sessione Flink in una delle nei seguenti modi:

        1. Crea un cluster Flink aggiungendo il flag --metadata flink-start-yarn-session=true al comando gcloud dataproc clusters create (consulta Creare un cluster Flink Dataproc). Con questo flag attivo, dopo la creazione del cluster, Dataproc esegue /usr/bin/flink-yarn-daemon per avviare una sessione Flink sul cluster.

          L'ID applicazione YARN della sessione viene salvato in /tmp/.yarn-properties-${USER}. Puoi elencare l'ID con il comando yarn application -list.

        2. Fai battere il cuore yarn-session.sh preinstallato sulla VM master del cluster, con impostazioni personalizzate:

          Esempio con impostazioni personalizzate:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Esegui lo script Flink del wrapper /usr/bin/flink-yarn-daemon con impostazioni predefinite:

          . /usr/bin/flink-yarn-daemon
          
      2. Invia un job a una sessione: esegui questo comando per inviare un Fai clic su "Flink" per il job alla sessione.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: l'URL, inclusi host e porta, della VM master Flink in cui vengono eseguiti i job. Rimuovi http:// prefix da l'URL. Questo URL è elencato nell'output del comando quando avvii una sessione Flink. Puoi eseguire il seguente comando per elencare questo URL nel campo Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Elenco di lavori in una sessione: per elencare i job Flink in una sessione, esegui una delle seguenti le seguenti:

        • Esegui flink list senza argomenti. Il comando cerca l'ID applicazione YARN della sessione in /tmp/.yarn-properties-${USER}.

        • Ottieni l'ID applicazione YARN della sessione da /tmp/.yarn-properties-${USER} o dall'output di yarn application -list, quindi esegui <code>flink list -yid YARN_APPLICATION_ID.

        • Esegui flink list -m FLINK_MASTER_URL.

      4. Interrompi una sessione: per interrompere la sessione, ottieni l'ID applicazione YARN della sessione da /tmp/.yarn-properties-${USER} o dall'output di yarn application -list, quindi esegui uno dei seguenti comandi:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Puoi eseguire job Apache Beam su Dataproc utilizzando FlinkRunner

Puoi eseguire job Beam su Flink nei seguenti modi:

  1. Job Java Beam
  2. Job Beam portatili

Job Java Beam

Pacchettizza i tuoi job Beam in un file JAR. Fornisci il file JAR in bundle con le dipendenze necessarie per eseguire il job.

L'esempio seguente esegue un job Java Beam dal nodo master del cluster Dataproc.

  1. Crea un cluster Dataproc con Componente Flink attivato.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: versione immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta l'elenco delle versioni del componente Apache Flink per le versioni più recenti e precedenti quattro versioni di rilascio delle immagini 2.0.x).
    • --region: una regione Dataproc supportata.
    • --enable-component-gateway: abilita l'accesso all'interfaccia utente di Flink Job Manager.
    • --scopes: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta la best practice per gli ambiti). L'ambito cloud-platform è abilitato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
  2. Utilizza l'utilità SSH per aprire una finestra del terminale sul nodo master del cluster Flink.

  3. Avvia una sessione Flink YARN sul master del cluster Dataproc nodo.

    . /usr/bin/flink-yarn-daemon
    

    Prendi nota della versione di Flink nel tuo cluster Dataproc.

    flink --version
    
  4. Sulla tua macchina locale, genera l'esempio canonico di conteggio delle parole di Beam in Java.

    Scegli una versione di Beam compatibile con la versione di Flink nel cluster Dataproc. Consulta la Compatibilità delle versioni di Flink che elenca la compatibilità delle versioni Beam-Flink.

    Apri il file POM generato. Controlla la versione runner Beam Flink specificata da il tag <flink.artifact.name>. Se la versione runner Beam Flink nello Il nome dell'artefatto Flink non corrisponde alla versione di Flink sul tuo cluster, aggiorna il numero di versione che corrisponda.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Inserisci l'esempio di conteggio delle parole.

    mvn package -Pflink-runner
    
  6. Carica il file JAR uber pacchettizzato, word-count-beam-bundled-0.1.jar (~135 MB), sul nodo principale del tuo cluster Dataproc. Puoi utilizzare gcloud storage cp per trasferimenti di file più rapidi al tuo cluster Dataproc da Cloud Storage.

    1. Sul terminale locale, crea un bucket Cloud Storage e carica l'uber JAR.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Sul nodo principale di Dataproc, scarica il file JAR uber.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Esegui il job Java Beam sul nodo master del cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Verifica che i risultati siano stati scritti nel bucket Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Interrompi la sessione Flink YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Job Beam portatili

Per eseguire job Beam scritti in Python, Go e altri linguaggi supportati, puoi utilizza FlinkRunner e PortableRunner come descritto sullo schermo Flink Runner (vedi anche Roadmap del framework per la portabilità).

L'esempio seguente esegue un job Beam portatile in Python dal nodo master del cluster Dataproc.

  1. Crea un cluster Dataproc con i componenti Flink e Docker abilitati.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Note:

    • --optional-components: Flink e Docker.
    • --image-version: la versione immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta l'elenco delle versioni del componente Apache Flink per le versioni più recenti e precedenti quattro versioni di rilascio delle immagini 2.0.x).
    • --region: un'area geografica Dataproc disponibile.
    • --enable-component-gateway: abilita l'accesso all'interfaccia utente di Flink Job Manager.
    • --scopes: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta la best practice per gli ambiti). L'ambito cloud-platform è abilitato per impostazione predefinita (non è necessario includere questa impostazione di flag) quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
  2. Utilizza gcloud CLI in locale o Cloud Shell per creare nel bucket Cloud Storage. Specifica BUCKET_NAME quando esegui un programma di conteggio delle parole di esempio.

    gcloud storage buckets create BUCKET_NAME
    
  3. In una finestra del terminale sulla VM del cluster, avvia una sessione Flink YARN. Prendi nota dell'URL del master Flink, ovvero l'indirizzo del master Flink dove vengono eseguiti i job. Specificherai FLINK_MASTER_URL quando eseguirai un programma di conteggio delle parole di esempio.

    . /usr/bin/flink-yarn-daemon
    

    Visualizza e osserva la versione di Flink che esegue Dataproc in un cluster Kubernetes. Specificherai FLINK_VERSION quando eseguirai un programma di conteggio delle parole di esempio.

    flink --version
    
  4. Installa le librerie Python necessarie per il job nel nodo master del cluster.

  5. Installa una versione di Beam compatibile con la versione di Flink sul cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Esegui l'esempio di conteggio parole sul master del cluster nodo.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Note:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, come indicato in precedenza.
    • --flink_master: FLINK_MASTER_URL, indicato in precedenza.
    • --flink_submit_uber_jar: utilizza il file JAR uber per eseguire il job Beam.
    • --output: BUCKET_NAME, creata in precedenza.
  7. Verifica che i risultati siano stati scritti nel bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Interrompi la sessione Flink YARN.

    1. Recupera l'ID applicazione.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Il componente Dataproc Flink supporta Cluster Kerberizzati. È necessario un ticket Kerberos valido per inviare e mantenere un job Flink o per avviare in un ammasso Flink. Per impostazione predefinita, un ticket Kerberos rimane valido per sette giorni.

L'interfaccia web di Flink Job Manager è disponibile durante l'esecuzione di un job Flink o di un cluster di sessioni Flink. Per utilizzare l'interfaccia web:

  1. Crea un cluster Dataproc Flink.
  2. Dopo aver creato il cluster, fai clic su Component Gateway Link a ResourceManager YARN nella scheda Interfaccia web della pagina Dettagli cluster nella console Google Cloud.
  3. Nella UI di YARN Resource Manager, identifica l'applicazione del cluster Flink . A seconda dello stato di completamento di un job, verrà visualizzato un link ApplicationMaster o History.
  4. Per un job di flussi di dati a lunga esecuzione, fai clic sul link ApplicationManager per aprire la dashboard di Flink; Per un job completato, fai clic sul link Cronologia per visualizzare i dettagli del job.