Ative outros componentes, como o Flink, ao criar um Dataproc cluster usando o Componentes opcionais . Nesta página, mostramos como criar um cluster do Dataproc com o serviço Apache Flink componente opcional ativado (um cluster Flink) e, em seguida, executar jobs Flink no cluster.
Você pode usar o cluster Flink para:
Executar jobs do Flink usando o recurso
Jobs
do Dataproc usando o console do Google Cloud, a Google Cloud CLI ou a API Dataproc.Execute jobs do Flink usando a CLI
flink
em execução no nó mestre do cluster Flink.Execute o Flink em um cluster do Kerberos.
Criar um cluster Flink do Dataproc
Use o console do Google Cloud, a Google Cloud CLI ou o Dataproc API para criar um cluster do Dataproc que tem o componente Flink ativados no cluster.
Recomendação: use um cluster de VM padrão de um mestre com o componente Flink. Clusters do modo de alta disponibilidade do Dataproc (com três VMs mestres) não oferecem suporte Modo de alta disponibilidade Flink.
Console
Para criar um cluster Flink do Dataproc usando o console do Google Cloud, siga estas etapas:
Abra o Dataproc Página Criar um cluster do Dataproc no Compute Engine.
- O painel Configurar cluster está selecionado.
- Na seção Controle de versão, confirme ou altere o
Tipo de imagem e versão. A versão da imagem do cluster determina
do componente Flink instalado no cluster.
- A versão da imagem precisa ser 1.5 ou mais recente para ativar o componente Flink no cluster. Consulte Versões compatíveis do Dataproc para visualizar as listagens das versões dos componentes incluídas em cada versão de imagem do Dataproc).
- A versão da imagem precisa ser [TBD] ou superior para executar jobs do Flink pela API Dataproc Jobs (consulte Execute jobs Flink do Dataproc.
- Na seção Componentes:
- Em Gateway de componentes, selecione Ative o gateway de componente. É necessário ativar o Gateway de componentes para ativar o link do Gateway de componentes para a interface do servidor de histórico Flink. Ativar o Gateway de componentes também Acesso à interface da Web do Flink Job Manager em execução no cluster do Flink.
- Em Componentes opcionais, selecione Flink e outras opções opcionais componentes a serem ativados no cluster.
- Na seção Controle de versão, confirme ou altere o
Tipo de imagem e versão. A versão da imagem do cluster determina
do componente Flink instalado no cluster.
Clique no painel Personalizar cluster (opcional).
Na seção Propriedades do cluster, clique em Adicionar propriedades para cada um deles opcional propriedade do cluster para adicionar ao cluster. É possível adicionar
flink
propriedades prefixadas para configurar propriedades do Flink no/etc/flink/conf/flink-conf.yaml
que atuará como padrão para os aplicativos Flink executados no cluster.Exemplos:
- Definir
flink:historyserver.archive.fs.dir
para especificar o local do Cloud Storage para gravar o histórico de jobs do Flink (esse local será usado pelo Flink History Server executado no aglomerado Flink). - Defina slots de tarefas do Flink com
flink:taskmanager.numberOfTaskSlots=n
.
- Definir
Na seção Metadados de cluster personalizados, clique em Adicionar metadados para incluir metadados opcionais. Por exemplo, adicione
flink-start-yarn-session
true
para executar o daemon Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no mestre do cluster para iniciar uma sessão Flink YARN (consulte Modo de sessão Flink).
Se você usa a imagem do Dataproc versão 2.0 ou anterior, Clique no painel Gerenciar segurança (opcional) e, em Acesso ao projeto, selecione
Enables the cloud-platform scope for this cluster
. O escopocloud-platform
é ativado por padrão quando você cria um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
- O painel Configurar cluster está selecionado.
Clique em Criar para gerar o cluster.
gcloud
Para criar um cluster Flink do Dataproc usando a CLI gcloud, execute o seguinte gcloud dataproc clusters create localmente em uma janela de terminal ou Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Observações:
- CLUSTER_NAME: especifica o nome do cluster.
- REGION: especifique uma região do Compute Engine onde o cluster vai ficar.
DATAPROC_IMAGE_VERSION: como opção, especifique a versão da imagem para usar no cluster. A versão da imagem do cluster determina do componente Flink instalado no cluster.
A versão da imagem precisa ser 1.5 ou mais recente para ativar o componente Flink no cluster. Consulte Versões compatíveis do Dataproc para visualizar as listagens das versões dos componentes incluídas em cada versão de imagem do Dataproc).
A versão da imagem precisa ser [TBD] ou superior para executar jobs do Flink por meio da API Dataproc Jobs (consulte Executar jobs Flink do Dataproc).
--optional-components
: é necessário especificar o componenteFLINK
para executar o Flink. jobs e o serviço da Web Flink HistoryServer no cluster.--enable-component-gateway
: é preciso ativar o Gateway de Componentes para ativar o link do Gateway de componentes para a interface do servidor de histórico Flink. A ativação do Gateway de Componentes também permite acesso ao Interface da Web do Gerenciador de jobs do Flink em execução na Aglomerado Flink.PROPERTIES: É possível especificar um ou mais propriedades do cluster.
Ao criar clusters do Dataproc com para as versões de imagem
2.0.67
+ e2.1.15
+, é possível usar a flag--properties
para para configurar propriedades do Flink no/etc/flink/conf/flink-conf.yaml
que vão atuam como padrão para os aplicativos Flink executados no cluster.Você pode definir
flink:historyserver.archive.fs.dir
para especificar o local do Cloud Storage para gravar o histórico de jobs do Flink (esse local será usado pelo Flink History Server executado no aglomerado Flink).Exemplo de várias propriedades:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Outras sinalizações:
- É possível adicionar o
--metadata flink-start-yarn-session=true
opcional flag para executar o daemon Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no nó mestre do cluster para iniciar uma sessão do Flink YARN Consulte Modo de sessão Flink.
- É possível adicionar o
Ao usar versões de imagem 2.0 ou anteriores, você pode adicionar o
--scopes=https://www.googleapis.com/auth/cloud-platform
para ativar o acesso do cluster às APIs do Google Cloud Consulte Prática recomendada para escopos. O escopocloud-platform
é ativado por padrão quando você cria um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
API
Para criar um cluster Flink do Dataproc usando a API Dataproc, envie um clusters.create da seguinte forma:
Observações:
Defina o SoftwareConfig.Component para
FLINK
.Também é possível definir
SoftwareConfig.imageVersion
para especificar a versão da imagem a ser usada no cluster. A versão da imagem do cluster determina do componente Flink instalado no cluster.A versão da imagem precisa ser 1.5 ou mais recente para ativar o componente Flink no cluster. Consulte Versões compatíveis do Dataproc para visualizar as listagens das versões dos componentes incluídas em cada versão de imagem do Dataproc).
A versão da imagem precisa ser [TBD] ou superior para executar jobs do Flink por meio da API Dataproc Jobs (consulte Executar jobs Flink do Dataproc).
Definir EndpointConfig.enableHttpPortAccess para
true
para ativar o gateway de componentes para a interface do servidor de histórico Flink. A ativação do Gateway de Componentes também permite acesso ao Interface da Web do Gerenciador de jobs do Flink em execução na Aglomerado Flink.Também é possível definir
SoftwareConfig.properties
para especificar um ou mais propriedades do cluster.- Você pode especificar propriedades do Flink que atuarão
padrões para aplicativos Flink executados no cluster. Por exemplo:
é possível definir o
flink:historyserver.archive.fs.dir
para especificar o local do Cloud Storage para gravar o histórico de jobs do Flink (esse local será usado pelo Flink History Server executado no aglomerado Flink).
- Você pode especificar propriedades do Flink que atuarão
padrões para aplicativos Flink executados no cluster. Por exemplo:
é possível definir o
Também é possível definir:
GceClusterConfig.metadata
. por exemplo, para especificarflink-start-yarn-session
true
para executar o daemon Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no mestre do cluster para iniciar uma sessão Flink YARN (consulte Modo de sessão Flink).- GceClusterConfig.serviceAccountScopes
para
https://www.googleapis.com/auth/cloud-platform
(escopo decloud-platform
) ao usar versões de imagem 2.0 ou anteriores para permitir o acesso ao Google Cloud APIs pelo seu cluster (consulte a Prática recomendada de escopos). O escopocloud-platform
é ativado por padrão quando você cria um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
Depois de criar um cluster do Flink
- Use o link
Flink History Server
na Gateway de componentes para visualizar o Servidor de histórico Flink em execução no cluster Flink. - Usar o
YARN ResourceManager link
no gateway de componentes para visualizar a interface da Web do Flink Job Manager em execução no cluster do Flink . - Crie um servidor de histórico permanente do Dataproc para visualizar os arquivos de histórico de jobs do Flink gravados por clusters Flink atuais e excluídos.
Executar jobs do Flink usando o recurso Jobs
do Dataproc
É possível executar jobs do Flink usando o recurso Jobs
do Dataproc do
console do Google Cloud, Google Cloud CLI ou API Dataproc.
Console
Para enviar um exemplo de job de contagem de palavras do Flink usando o console:
Abra o Dataproc página Envie um job no console do Google Cloud no seu navegador.
Preencha os campos na página Enviar um job:
- Selecione o nome do Cluster na lista de clusters.
- Defina o Tipo de job como
Flink
. - Defina Classe principal ou jar como
org.apache.flink.examples.java.wordcount.WordCount
. - Defina Arquivos Jar como
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
indica um arquivo localizado no cluster. Dataproc instalou oWordCount.jar
quando criou o cluster do Flink;- Esse campo também aceita um caminho do Cloud Storage
(
gs://BUCKET/JARFILE
) ou um Caminho do sistema de arquivos distribuídos do Hadoop (HDFS) (hdfs://PATH_TO_JAR
).
Clique em Enviar.
- A saída do driver do job é exibida na página Detalhes do job.
- Os jobs Flink estão listados no Página Jobs do Dataproc no console do Google Cloud.
- Clique em Interromper ou Excluir na página Jobs ou Detalhes do job. para interromper ou excluir um job.
gcloud
Para enviar um job do Flink a um cluster do Dataproc Flink, execute a CLI gcloud gcloud dataproc jobs submit localmente em uma janela de terminal ou Cloud Shell:
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Observações:
- CLUSTER_NAME: especifique o nome do Flink do Dataproc. cluster para enviar o job.
- REGION: especifique uma região do Compute Engine onde o cluster está localizado.
- MAIN_CLASS: especifique a classe
main
do seu Aplicativo Flink, como:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: especifica o arquivo jar do aplicativo Flink. É possível especificar:
- um arquivo jar instalado no cluster, usando o
file:///` prefixo:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Um arquivo jar no Cloud Storage:
gs://BUCKET/JARFILE
- Um arquivo jar no HDFS:
hdfs://PATH_TO_JAR
- um arquivo jar instalado no cluster, usando o
JOB_ARGS: opcionalmente, adicione argumentos do job após o travessão duplo (
--
).Depois de enviar o job, a saída do driver do job será exibida no terminal local ou do Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Nesta seção, mostramos como enviar um job do Flink para um Dataproc Flink cluster usando o Dataproc jobs.submit.
Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:
- PROJECT_ID: ID do projeto do Google Cloud
- REGION: região do cluster
- CLUSTER_NAME: especifique o nome do cluster Flink do Dataproc para enviar o job
Método HTTP e URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Corpo JSON da solicitação:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Para enviar a solicitação, expanda uma destas opções:
Você receberá uma resposta JSON semelhante a esta:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Os jobs Flink estão listados no Página Jobs do Dataproc no console do Google Cloud.
- Clique em Parar ou Excluir na página Jobs ou Detalhes do job no console do Google Cloud para interromper ou excluir um job.
Executar jobs do Flink usando a CLI flink
Em vez de
executar jobs do Flink usando o recurso Jobs
do Dataproc;
É possível executar jobs do Flink no nó mestre do cluster do Flink usando a CLI flink
.
As seções a seguir descrevem diferentes maneiras de executar um job da CLI flink
em
seu cluster Flink do Dataproc.
SSH no nó mestre:use o SSH para abrir uma janela de terminal na VM mestre do cluster.
Defina o caminho de classe: inicialize o caminho de classe do Hadoop pela janela do terminal SSH no VM do mestre do cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Executar jobs do Flink:é possível executar jobs do Flink em diferentes modos de implantação no YARN: aplicativo, por job e modo de sessão.
Modo de aplicativo:o modo de aplicativo Flink é compatível com a versão 2.0 e posteriores de imagem do Dataproc. Esse modo executa o método
main()
do job no YARN Job Manager. O cluster é encerrado após a conclusão do job.Exemplo de envio de job:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Liste os jobs em execução:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Cancele um job em execução:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modo por job: esse modo Flink executa o método
main()
do job no no lado do cliente.Exemplo de envio de job:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Modo de sessão:inicie uma sessão de longa duração do Flink YARN e envie o arquivo. um ou mais jobs à sessão.
Iniciar uma sessão:você pode iniciar uma sessão Flink em um dos da seguinte maneira:
Crie um cluster Flink, adicionando o a sinalização
--metadata flink-start-yarn-session=true
aogcloud dataproc clusters create
(consulte Criar um cluster Flink do Dataproc). Com esta sinalização ativado, depois da criação do cluster, o Dataproc executa/usr/bin/flink-yarn-daemon
para iniciar uma sessão Flink no cluster.O ID do aplicativo YARN da sessão é salvo em
/tmp/.yarn-properties-${USER}
. É possível listar o ID com o comandoyarn application -list
.Executar o Flink
yarn-session.sh
, que vem pré-instalado na VM do mestre do cluster, com configurações personalizadas:Exemplo com configurações personalizadas:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Execute o script de wrapper Flink
/usr/bin/flink-yarn-daemon
com configurações padrão:. /usr/bin/flink-yarn-daemon
Envie um job para uma sessão: execute o seguinte comando para enviar um Flink do job para a sessão.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: o URL, incluindo o host
e a porta da VM mestre Flink em que os jobs são executados.
Remova
http:// prefix
de o URL. Esse URL é listado na resposta ao comando quando você inicia Sessão Flink. Execute o comando a seguir para listar este URL No campoTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: o URL, incluindo o host
e a porta da VM mestre Flink em que os jobs são executados.
Remova
Listar jobs em uma sessão:para listar jobs do Flink em uma sessão, siga um destes procedimentos. o seguinte:
Execute
flink list
sem argumentos. O comando procura a ID do aplicativo YARN da sessão em/tmp/.yarn-properties-${USER}
.Consiga o ID do aplicativo YARN da sessão.
/tmp/.yarn-properties-${USER}
ou a saída deyarn application -list
, e depois executar<code>
flink list -yid YARN_APPLICATION_ID.Execute
flink list -m FLINK_MASTER_URL
.
Interromper uma sessão:para interromper a sessão, consiga o ID do aplicativo YARN. da sessão de
/tmp/.yarn-properties-${USER}
ou a saída deyarn application -list
e execute um dos seguintes comandos:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Executar jobs do Apache Beam no Flink
É possível executar jobs do Apache Beam no Dataproc usando o FlinkRunner
.
É possível executar jobs do Beam no Flink das seguintes maneiras:
- Jobs do Beam em Java
- Jobs do Beam portáteis
Jobs do Beam em Java
Empacotar os jobs do Beam em um arquivo JAR. Forneça o arquivo JAR empacotado com as dependências necessárias para executar o job.
O exemplo a seguir executa um job do Java Beam a partir do nó mestre do cluster do Dataproc.
Crie um cluster do Dataproc com o componente Flink ativado.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: a versão de imagem do cluster, que determina a versão do Flink instalada no cluster. Por exemplo, consulte as versões do componente Flink do Apache listadas para a versão mais recente e quatro anteriores versões de lançamento de imagem 2.0.x.--region
: uma região compatível do Dataproc.--enable-component-gateway
: ativa o acesso à IU do Flink Job Manager.--scopes
: ativa o acesso do cluster às APIs do Google Cloud Consulte Prática recomendada para escopos. O escopocloud-platform
é ativado por padrão. Você não precisa incluir essa configuração de sinalização) quando você cria um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
Usar o utilitário SSH para abrir uma janela de terminal no nó mestre do cluster Flink.
Iniciar uma sessão do Flink YARN no mestre do cluster do Dataproc nó.
. /usr/bin/flink-yarn-daemon
Anote a versão do Flink no cluster do Dataproc.
flink --version
Na máquina local, gere o exemplo de contagem de palavras canônica do Beam em Java.
Escolha uma versão do Beam compatível com a versão do Flink no cluster do Dataproc. Consulte a Compatibilidade da versão do Flink que lista a compatibilidade da versão Beam-Flink.
Abra o arquivo POM gerado. Verifique a versão do executor do Flink do Beam especificada pela tag
<flink.artifact.name>
. Se o executor do Flink do Beam no nome do artefato do Flink não corresponder à versão do Flink no cluster, atualize o número da versão para corresponder.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Crie um exemplo de contagem de palavras.
mvn package -Pflink-runner
Faça upload do arquivo JAR uber empacotado (
word-count-beam-bundled-0.1.jar
, cerca de 135 MB) no nó mestre do cluster do Dataproc. É possível usargcloud storage cp
para transferências de arquivos mais rápidas para o cluster do Dataproc do Cloud Storage.No terminal local, crie um bucket do Cloud Storage e faça o upload do JAR uber.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
No nó mestre do Dataproc, faça o download do JAR uber.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Execute o job Java Beam no nó mestre do cluster do Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Verifique se os resultados foram gravados no seu bucket do Cloud Storage.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Interrompa a sessão YARN do Flink.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Jobs do Beam portáteis
Para executar jobs do Beam escritos em Python, Go e outras linguagens compatíveis, é possível
use o FlinkRunner
e o PortableRunner
, conforme descrito no
Flink runner (link em inglês)
(consulte também Roteiro do framework de portabilidade).
No exemplo a seguir, um job portátil do Beam é executado no Python a partir do nó mestre do cluster do Dataproc.
Crie um cluster do Dataproc com os componentes Flink e Docker ativados.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Observações:
--optional-components
: Flink e Docker.--image-version
: a versão de imagem do cluster; que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões do componente Apache Flink listadas para as versões mais recentes e anteriores quatro versões de lançamento da imagem 2.0.x).--region
: uma região disponível do Dataproc.--enable-component-gateway
: ativa o acesso à interface do Gerenciador de jobs do Flink.--scopes
: ativar o acesso do cluster às APIs do Google Cloud Consulte Prática recomendada para escopos. O escopocloud-platform
é ativado por padrão. Você não precisa incluir essa configuração de sinalização) quando você cria um cluster que usa a imagem do Dataproc versão 2.1 ou posterior.
Use a CLI gcloud localmente ou no Cloud Shell para criar do bucket do Cloud Storage. Especifique o BUCKET_NAME quando executar um exemplo de programa de contagem de palavras.
gcloud storage buckets create BUCKET_NAME
Em uma janela do terminal na VM do cluster, inicie uma sessão Flink YARN. Anote o URL do mestre do Flink, o endereço do mestre do Flink onde os jobs são executados. Você vai especificar o FLINK_MASTER_URL ao executar um exemplo de programa de contagem de palavras.
. /usr/bin/flink-yarn-daemon
Exiba e observe a versão do Flink que executa o Dataproc. aglomerado. Você vai especificar o FLINK_VERSION ao executar um exemplo de programa de contagem de palavras.
flink --version
Instale as bibliotecas Python necessárias para o job no nó mestre do cluster.
Instale um Versão do Beam compatível com a versão do Flink no cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Execute o exemplo de contagem de palavras no mestre do cluster nó.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Observações:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, anotado anteriormente.--flink_master
: FLINK_MASTER_URL, anotado anteriormente.--flink_submit_uber_jar
: use o JAR uber para executar o job do Beam.--output
: BUCKET_NAME, criado anteriormente.
Verifique se os resultados foram gravados no bucket.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Interrompa a sessão YARN do Flink.
- Consiga o ID do aplicativo.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Executar o Flink em um cluster do Kerberos
O componente Flink do Dataproc é compatível com clusters do Kerberos. Um tíquete do Kerberos válido é necessário para enviar e manter um job do Flink ou para iniciar um aglomerado Flink. Por padrão, um tíquete do Kerberos permanece válido por sete dias.
Acessar a interface do Flink Job Manager
A interface da Web do Flink Job Manager está disponível durante a execução de um job de Flink ou um cluster de sessão do Flink. Para usar a interface da Web:
- Crie um cluster Flink do Dataproc.
- Após a criação do cluster, clique em Gateway de Componentes. Link do YARN ResourceManager na guia "Interface da Web" da página Detalhes do cluster no console do Google Cloud.
- Na IU do YARN Resource Manager, identifique a entrada do aplicativo de cluster
Flink. Dependendo do status de conclusão de um job, um ApplicationMaster
ou Histórico será listado.
- Para um job de streaming de longa duração, clique no link ApplicationManager para abrir o painel do Flink. Para um job concluído, clique no link Histórico para visualizar os detalhes dele.