Componente Flink opcional do Dataproc

Ative outros componentes, como o Flink, ao criar um Dataproc cluster usando o Componentes opcionais . Nesta página, mostramos como criar um cluster do Dataproc com o componente opcional do Apache Flink ativado (um cluster do Flink) e executar jobs do Flink no cluster.

Você pode usar seu cluster do Flink para:

  1. Executar jobs do Flink usando o recurso Jobs do Dataproc usando o console do Google Cloud, a Google Cloud CLI ou a API Dataproc.

  2. Execute jobs do Flink usando a CLI flink no nó mestre do cluster do Flink.

  3. Executar jobs do Apache Beam no Flink

  4. Executar o Flink em um cluster do Kerberos

Use o console do Google Cloud, a Google Cloud CLI ou o Dataproc API para criar um cluster do Dataproc que tem o componente Flink ativados no cluster.

Recomendação: use um cluster de VM de 1 mestre padrão com o componente Flink. Os clusters do modo de alta disponibilidade do Dataproc (com três VMs mestre) não têm suporte para o modo de alta disponibilidade do Flink.

É possível executar jobs do Flink usando o recurso Jobs do Dataproc no console do Google Cloud, na CLI do Google Cloud ou na API Dataproc.

Console

Para enviar um job de contagem de palavras do Flink de exemplo no console:

  1. Abra a página Enviar um job do Dataproc no Console do Google Cloud no navegador.

  2. Preencha os campos na página Enviar um job:

    1. Selecione o nome do Cluster na lista de clusters.
    2. Defina o Tipo de job como Flink.
    3. Defina Classe principal ou jar como org.apache.flink.examples.java.wordcount.WordCount.
    4. Defina Arquivos JAR como file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// indica um arquivo localizado no cluster. Dataproc. instalou o WordCount.jar quando criou o cluster do Flink;
      • Esse campo também aceita um caminho do Cloud Storage (gs://BUCKET/JARFILE) ou um Caminho do sistema de arquivos distribuídos do Hadoop (HDFS) hdfs://PATH_TO_JAR.
  3. Clique em Enviar.

    • A saída do driver do job é exibida na página Detalhes do job.
    • Os jobs Flink estão listados no Página Jobs do Dataproc no console do Google Cloud.
    • Clique em Parar ou Excluir na página Jobs ou Job details para interromper ou excluir um job.

gcloud

Para enviar um job do Flink a um cluster do Flink do Dataproc, execute o comando da CLI do gcloud gcloud dataproc jobs submit localmente em uma janela de terminal ou no Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Observações:

  • CLUSTER_NAME: especifique o nome do cluster do Dataproc Flink para enviar o job.
  • REGION: especifique uma região do Compute Engine em que o cluster está localizado.
  • MAIN_CLASS: especifique a classe main do seu aplicativo Flink, como:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: especifica o arquivo jar do aplicativo Flink. É possível especificar:
    • um arquivo jar instalado no cluster, usando o file:///` prefixo:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Um arquivo JAR no Cloud Storage: gs://BUCKET/JARFILE
    • Um arquivo jar no HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: opcionalmente, adicione argumentos do job após o travessão duplo (--).

  • Depois de enviar o job, a saída do driver do job é exibida no terminal local ou do Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

Nesta seção, mostramos como enviar um job do Flink para um Dataproc Flink cluster usando o Dataproc jobs.submit.

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • PROJECT_ID: ID do projeto do Google Cloud
  • REGION: região do cluster
  • CLUSTER_NAME: especifique o nome do cluster Flink do Dataproc para enviar o job

Método HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corpo JSON da solicitação:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Para enviar a solicitação, expanda uma destas opções:

Você receberá uma resposta JSON semelhante a esta:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Os jobs do Flink estão listados na página Jobs do Dataproc no console do Google Cloud.
  • Clique em Parar ou Excluir na página Jobs ou Detalhes do job no console do Google Cloud para interromper ou excluir um job.

Em vez de executar jobs do Flink usando o recurso Jobs do Dataproc; É possível executar jobs do Flink no nó mestre do cluster do Flink usando a CLI flink.

As seções a seguir descrevem diferentes maneiras de executar um job da CLI flink em seu cluster Flink do Dataproc.

  1. SSH no nó mestre: use o utilitário SSH para abrir uma janela de terminal na VM mestre do cluster.

  2. Definir o classpath: inicialize o classpath do Hadoop na janela do terminal SSH na VM mestre do cluster do Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Executar jobs do Flink:é possível executar jobs do Flink em diferentes modos de implantação no YARN: aplicativo, por job e modo de sessão.

    1. Modo de aplicativo:o modo de aplicativo Flink é compatível com a versão 2.0 e posteriores de imagem do Dataproc. Esse modo executa o método main() do job no YARN Job Manager. O cluster é encerrado quando o job termina.

      Exemplo de envio de job:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Liste os jobs em execução:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Cancele um job em execução:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modo por job: esse modo do Flink executa o método main() do job no lado do cliente.

      Exemplo de envio de job:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Modo de sessão: inicie uma sessão do Flink YARN de longa duração e envie um ou mais jobs para a sessão.

      1. Iniciar uma sessão: é possível iniciar uma sessão do Flink de uma das seguintes maneiras:

        1. Crie um cluster do Flink, adicionando a flag --metadata flink-start-yarn-session=true ao comando gcloud dataproc clusters create. Consulte Criar um cluster do Flink do Dataproc. Com essa flag ativada, depois que o cluster é criado, o Dataproc executa /usr/bin/flink-yarn-daemon para iniciar uma sessão do Flink no cluster.

          O ID do aplicativo YARN da sessão é salvo em /tmp/.yarn-properties-${USER}. É possível listar o ID com o comando yarn application -list.

        2. Executar o Flink yarn-session.sh pré-instalado na VM do mestre do cluster, com configurações personalizadas:

          Exemplo com configurações personalizadas:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Execute o script de wrapper Flink /usr/bin/flink-yarn-daemon com configurações padrão:

          . /usr/bin/flink-yarn-daemon
          
      2. Enviar um job para uma sessão: execute o comando a seguir para enviar um job do Flink à sessão.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: o URL, incluindo o host e a porta da VM mestre Flink em que os jobs são executados. Remova o http:// prefix do URL. Esse URL é listado na resposta ao comando quando você inicia Sessão Flink. Execute o comando a seguir para listar este URL No campo Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Listar jobs em uma sessão:para listar jobs do Flink em uma sessão, siga um destes procedimentos. o seguinte:

        • Execute flink list sem argumentos. O comando procura pela ID do aplicativo YARN da sessão em /tmp/.yarn-properties-${USER}.

        • Consiga o ID do aplicativo YARN da sessão. /tmp/.yarn-properties-${USER} ou a saída de yarn application -list, e depois execute <code>flink list -yid YARN_APPLICATION_ID.

        • Execute flink list -m FLINK_MASTER_URL.

      4. Interromper uma sessão:para interromper a sessão, consiga o ID do aplicativo YARN. da sessão de /tmp/.yarn-properties-${USER} ou a saída de yarn application -list e execute um dos seguintes comandos:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

É possível executar jobs do Apache Beam no Dataproc usando o FlinkRunner.

É possível executar jobs do Beam no Flink das seguintes maneiras:

  1. Jobs do Beam em Java
  2. Jobs do Beam portáteis

Jobs do Beam em Java

Empacotar os jobs do Beam em um arquivo JAR. Forneça o arquivo JAR empacotado com as dependências necessárias para executar o job.

O exemplo a seguir executa um job do Java Beam a partir do nó mestre do cluster do Dataproc.

  1. Crie um cluster do Dataproc com o componente Flink ativado.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: a versão de imagem do cluster, que determina a versão do Flink instalada no cluster. Por exemplo, consulte as versões do componente Flink do Apache listadas para a versão mais recente e quatro anteriores versões de lançamento de imagem 2.0.x.
    • --region: uma região compatível do Dataproc.
    • --enable-component-gateway: ativa o acesso à IU do Flink Job Manager.
    • --scopes: ativa o acesso do cluster às APIs do Google Cloud Consulte Prática recomendada para escopos. O escopo cloud-platform é ativado por padrão (não é necessário incluir essa configuração de flag) quando você cria um cluster que usa a versão 2.1 ou mais recente da imagem do Dataproc.
  2. Usar o utilitário SSH para abrir uma janela de terminal no nó mestre do cluster Flink.

  3. Iniciar uma sessão do Flink YARN no mestre do cluster do Dataproc nó.

    . /usr/bin/flink-yarn-daemon
    

    Anote a versão do Flink no cluster do Dataproc.

    flink --version
    
  4. Na máquina local, gere o exemplo de contagem de palavras canônica do Beam em Java.

    Escolha uma versão do Beam compatível com a versão do Flink no cluster do Dataproc. Consulte a Compatibilidade da versão do Flink que lista a compatibilidade da versão Beam-Flink.

    Abra o arquivo POM gerado. Verifique a versão do executor do Flink do Beam especificada pela tag <flink.artifact.name>. Se o executor do Flink do Beam no nome do artefato do Flink não corresponder à versão do Flink no cluster, atualize o número da versão para corresponder.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Crie um exemplo de contagem de palavras.

    mvn package -Pflink-runner
    
  6. Faça upload do arquivo JAR uber empacotado (word-count-beam-bundled-0.1.jar, cerca de 135 MB) no nó mestre do cluster do Dataproc. É possível usar gcloud storage cp para transferências de arquivos mais rápidas para o cluster do Dataproc do Cloud Storage.

    1. No terminal local, crie um bucket do Cloud Storage e faça o upload do JAR uber.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. No nó mestre do Dataproc, faça o download do JAR uber.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Execute o job Java Beam no nó mestre do cluster do Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Verifique se os resultados foram gravados no seu bucket do Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Interrompa a sessão YARN do Flink.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Jobs do Beam portáteis

Para executar jobs do Beam escritos em Python, Go e outras linguagens compatíveis, é possível use o FlinkRunner e o PortableRunner, conforme descrito no Flink Runner (link em inglês) (consulte também Roteiro do framework de portabilidade).

No exemplo a seguir, um job portátil do Beam é executado no Python a partir do nó mestre do cluster do Dataproc.

  1. Crie um cluster do Dataproc com os componentes Flink e Docker ativados.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Observações:

    • --optional-components: Flink e Docker.
    • --image-version: a versão de imagem do cluster; que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões do componente Apache Flink listadas para as versões mais recentes e anteriores quatro versões de lançamento da imagem 2.0.x).
    • --region: uma região do Dataproc disponível.
    • --enable-component-gateway: ativa o acesso à IU do Flink Job Manager.
    • --scopes: ative o acesso às APIs do Google Cloud pelo cluster. Consulte as práticas recomendadas de escopos. O escopo cloud-platform é ativado por padrão. Você não precisa incluir essa configuração de sinalização) ao criar um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
  2. Use a CLI gcloud localmente ou no Cloud Shell para criar do bucket do Cloud Storage. Você vai especificar o BUCKET_NAME ao executar um programa de exemplo de contagem de palavras.

    gcloud storage buckets create BUCKET_NAME
    
  3. Em uma janela de terminal na VM do cluster, inicie uma sessão do Flink YARN. Anote o URL mestre do Flink, o endereço do mestre do Flink em que os jobs são executados. Você vai especificar o FLINK_MASTER_URL ao executar um programa de contagem de palavras de exemplo.

    . /usr/bin/flink-yarn-daemon
    

    Exiba e observe a versão do Flink que executa o Dataproc. cluster. Você vai especificar o FLINK_VERSION ao executar um programa de contagem de palavras de exemplo.

    flink --version
    
  4. Instale as bibliotecas Python necessárias para o job no nó mestre do cluster.

  5. Instale uma versão do Beam compatível com a versão do Flink no cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Execute o exemplo de contagem de palavras no nó mestre do cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Observações:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, anotado anteriormente.
    • --flink_master: FLINK_MASTER_URL, anotado anteriormente.
    • --flink_submit_uber_jar: use o JAR uber para executar o job do Beam.
    • --output: BUCKET_NAME, criado anteriormente.
  7. Verifique se os resultados foram gravados no bucket.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Interrompa a sessão YARN do Flink.

    1. Consiga o ID do aplicativo.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

O componente Flink do Dataproc é compatível com clusters do Kerberos. Um tíquete do Kerberos válido é necessário para enviar e manter um job do Flink ou iniciar um cluster do Flink. Por padrão, um tíquete do Kerberos permanece válido por sete dias.

A interface da Web do Flink Job Manager está disponível durante a execução de um job de Flink ou um cluster de sessão do Flink. Para usar a interface da Web:

  1. Crie um cluster Flink do Dataproc.
  2. Após a criação do cluster, clique no link do YARN ResourceManager do Gateway de componentes na guia "Interface da Web", na página Detalhes do cluster do Console do Google Cloud.
  3. Na IU do YARN Resource Manager, identifique a entrada do aplicativo de cluster Flink. Dependendo do status de conclusão de um job, um ApplicationMaster ou Histórico será listado.
  4. Para um job de streaming de longa duração, clique no link ApplicationManager para abrir o painel do Flink. Para um job concluído, clique no link Histórico para visualizar os detalhes dele.