Puoi attivare componenti aggiuntivi come Flink quando crei un cluster Dataproc utilizzando la funzionalità Componenti facoltativi. Questa pagina illustra come creare un cluster Dataproc con il componente facoltativo Apache Flink attivato (un cluster Flink) ed eseguire job Flink sul cluster.
Puoi utilizzare il tuo cluster Flink per:
Esegui job Flink utilizzando la risorsa
Jobs
Dataproc dalla console Google Cloud, da Google Cloud CLI o dall'API Dataproc.Esegui i job Flink utilizzando l'interfaccia a riga di comando
flink
in esecuzione sul nodo master del cluster Flink.
Crea un cluster Flink Dataproc
Puoi utilizzare la console Google Cloud, Google Cloud CLI o l'API Dataproc per creare un cluster Dataproc con il componente Flink attivato.
Consiglio:utilizza un cluster di VM con un master standard con il componente Flink. I cluster Dataproc in modalità ad alta disponibilità (con 3 VM master) non supportano la modalità ad alta disponibilità di Flink.
Console
Per creare un cluster Flink Dataproc utilizzando la console Google Cloud, segui questi passaggi:
Apri la pagina Dataproc Crea un cluster Dataproc su Compute Engine.
- Il riquadro Configura cluster è selezionato.
- Nella sezione Versionamento, conferma o modifica il Tipo e la versione dell'immagine. La versione dell'immagine del cluster determina la versione del componente Flink installato sul cluster.
- Per attivare il componente Flink nel cluster, la versione dell'immagine deve essere 1.5 o successiva (consulta le versioni Dataproc supportate per visualizzare le schede delle versioni dei componenti incluse in ogni release dell'immagine Dataproc).
- La versione dell'immagine deve essere [TBD] o successiva per eseguire i job Flink tramite l'API Dataproc Jobs (vedi Eseguire job Flink di Dataproc).
- Nella sezione Componenti:
- In Gateway dei componenti, seleziona Attiva gateway dei componenti. Devi attivare il gateway dei componenti per attivare il collegamento del gateway dei componenti all'interfaccia utente di Flink History Server. L'attivazione del gateway dei componenti consente inoltre di accedere all'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink.
- In Componenti facoltativi, seleziona Flink e altri componenti facoltativi da attivare sul cluster.
- Nella sezione Versionamento, conferma o modifica il Tipo e la versione dell'immagine. La versione dell'immagine del cluster determina la versione del componente Flink installato sul cluster.
Fai clic sul riquadro Personalizza cluster (facoltativo).
Nella sezione Proprietà del cluster, fai clic su Aggiungi proprietà per ogni proprietà del cluster facoltativa da aggiungere al cluster. Puoi aggiungere proprietà con prefisso
flink
per configurare le proprietà Flink in/etc/flink/conf/flink-conf.yaml
che fungeranno da predefinite per le applicazioni Flink eseguite sul cluster.Esempi:
- Imposta
flink:historyserver.archive.fs.dir
per specificare la posizione di Cloud Storage in cui scrivere i file della cronologia dei job Flink (questa posizione verrà utilizzata dal server di cronologia Flink in esecuzione sul cluster Flink). - Imposta gli slot di attività Flink con
flink:taskmanager.numberOfTaskSlots=n
.
- Imposta
Nella sezione Metadati cluster personalizzati, fai clic su Aggiungi metadati per aggiungere i metadati facoltativi. Ad esempio, aggiungi
flink-start-yarn-session
true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul nodo master del cluster per avviare una sessione Flink YARN (vedi Modalità di sessione Flink).
Se utilizzi la versione 2.0 o precedente dell'immagine Dataproc, fai clic sul riquadro Gestisci la sicurezza (facoltativo), quindi, in Accesso al progetto, seleziona
Enables the cloud-platform scope for this cluster
. L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
- Il riquadro Configura cluster è selezionato.
Fai clic su Crea per creare il cluster.
gcloud
Per creare un cluster Flink Dataproc utilizzando gcloud CLI, esegui il seguente comando gcloud dataproc clusters create localmente in una finestra del terminale o in Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Note:
- CLUSTER_NAME: specifica il nome del cluster.
- REGION: specifica una regione Compute Engine in cui verrà posizionato il cluster.
DATAPROC_IMAGE_VERSION: facoltativamente, specifica la versione dell'immagine da utilizzare sul cluster. La versione dell'immagine del cluster determina la versione del componente Flink installato sul cluster.
La versione dell'immagine deve essere 1.5 o successiva per attivare il componente Flink nel cluster (consulta le versioni Dataproc supportate per visualizzare le schede delle versioni dei componenti incluse in ogni release dell'immagine Dataproc).
La versione dell'immagine deve essere [TBD] o successiva per eseguire i job Flink tramite l'API Dataproc Jobs (vedi Eseguire job Flink di Dataproc).
--optional-components
: devi specificare il componenteFLINK
per eseguire i job Flink e il servizio web Flink HistoryServer sul cluster.--enable-component-gateway
: devi attivare il Gateway dei componenti per attivare il collegamento del Gateway dei componenti all'interfaccia utente di Flink History Server. L'attivazione del gateway dei componenti consente anche di accedere all'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink.PROPERTIES. Se vuoi, specifica una o più proprietà del cluster.
Quando crei cluster Dataproc con le versioni dell'immagine
2.0.67
e2.1.15
e versioni successive, puoi utilizzare il flag--properties
per configurare le proprietà Flink in/etc/flink/conf/flink-conf.yaml
che fungeranno da predefinite per le applicazioni Flink eseguite sul cluster.Puoi impostare
flink:historyserver.archive.fs.dir
per specificare la posizione di Cloud Storage in cui scrivere i file della cronologia dei job Flink (questa posizione verrà utilizzata dal server di cronologia Flink in esecuzione sul cluster Flink).Esempio di più proprietà:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Altri flag:
- Puoi aggiungere il flag facoltativo
--metadata flink-start-yarn-session=true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul nodo master del cluster per avviare una sessione Flink YARN (vedi Modalità di sessione Flink).
- Puoi aggiungere il flag facoltativo
Quando utilizzi le versioni delle immagini 2.0 o precedenti, puoi aggiungere il flag
--scopes=https://www.googleapis.com/auth/cloud-platform
per consentire al tuo cluster di accedere alle API Google Cloud (consulta le best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
API
Per creare un cluster Flink Dataproc utilizzando l'API Dataproc, invia una richiesta clusters.create come segue:
Note:
Imposta SoftwareConfig.Component su
FLINK
.Facoltativamente, puoi impostare
SoftwareConfig.imageVersion
per specificare la versione dell'immagine da utilizzare nel cluster. La versione dell'immagine del cluster determina la versione del componente Flink installato sul cluster.La versione dell'immagine deve essere 1.5 o successiva per attivare il componente Flink nel cluster (consulta le versioni Dataproc supportate per visualizzare le schede delle versioni dei componenti incluse in ogni release dell'immagine Dataproc).
La versione dell'immagine deve essere [TBD] o successiva per eseguire i job Flink tramite l'API Dataproc Jobs (vedi Eseguire job Flink di Dataproc).
Imposta EndpointConfig.enableHttpPortAccess su
true
per abilitare il collegamento di Component Gateway all'interfaccia utente del server di cronologia Flink. L'attivazione del gateway dei componenti consente anche di accedere all'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink.Facoltativamente, puoi impostare
SoftwareConfig.properties
per specificare una o più proprietà cluster.- Puoi specificare proprietà Flink che fungeranno da predefinite per le applicazioni Flink eseguite nel cluster. Ad esempio,
puoi impostare
flink:historyserver.archive.fs.dir
per specificare la posizione di Cloud Storage in cui scrivere i file della cronologia dei job Flink (questa posizione verrà utilizzata dal server di cronologia Flink in esecuzione sul cluster Flink).
- Puoi specificare proprietà Flink che fungeranno da predefinite per le applicazioni Flink eseguite nel cluster. Ad esempio,
puoi impostare
Se vuoi, puoi impostare:
GceClusterConfig.metadata
. ad esempio, per specificareflink-start-yarn-session
true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul nodo master del cluster per avviare una sessione Flink YARN (vedi Modalità di sessione Flink).- GceClusterConfig.serviceAccountScopes
a
https://www.googleapis.com/auth/cloud-platform
(ambitocloud-platform
) quando utilizzi versioni delle immagini 2.0 o precedenti per consentire al tuo cluster di accedere alle API Google Cloud (consulta la best practice sugli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
Dopo aver creato un cluster Flink
- Utilizza il link
Flink History Server
in Component Gateway per visualizzare il server Flink History in esecuzione sul cluster Flink. - Utilizza
YARN ResourceManager link
in Component Gateway per visualizzare l'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink . - Crea un server di cronologia permanente Dataproc per visualizzare i file della cronologia dei job Flink scritti da cluster Flink esistenti ed eliminati.
Esegui job Flink utilizzando la risorsa Jobs
Dataproc
Puoi eseguire job Flink utilizzando la risorsa Jobs
Dataproc dalla console Google Cloud, da Google Cloud CLI o dall'API Dataproc.
Console
Per inviare un job di conteggio delle parole Flink di esempio dalla console:
Apri la pagina Dataproc Invia un job nella console Google Cloud nel browser.
Compila i campi nella pagina Invia un job:
- Seleziona il nome del cluster dall'elenco dei cluster.
- Imposta Tipo di job su
Flink
. - Imposta Classe principale o jar su
org.apache.flink.examples.java.wordcount.WordCount
. - Imposta File jar su
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
indica un file situato nel cluster. Dataproc ha installatoWordCount.jar
durante la creazione del cluster Flink.- Questo campo accetta anche un percorso Cloud Storage
(
gs://BUCKET/JARFILE
) o un percorso Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR
).
Fai clic su Invia.
- L'output del driver del job viene visualizzato nella pagina Dettagli job.
- I job Flink sono elencati nella pagina Job di Dataproc nella console Google Cloud.
- Fai clic su Interrompi o Elimina nella pagina Job o Dettagli job per interrompere o eliminare un job.
gcloud
Per inviare un job Flink a un cluster Flink Dataproc, esegui il comando gcloud CLI gcloud dataproc jobs submit localmente in una finestra del terminale o in Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Note:
- CLUSTER_NAME: specifica il nome del cluster Flink Dataproc a cui inviare il job.
- REGION: specifica una regione Compute Engine in cui si trova il cluster.
- MAIN_CLASS: specifica la classe
main
della tua applicazione Flink, ad esempio:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: specifica il file jar dell'applicazione Flink. Puoi specificare:
- Un file jar installato nel cluster, utilizzando il prefisso
file:///`:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Un file JAR in Cloud Storage:
gs://BUCKET/JARFILE
- Un file jar in HDFS:
hdfs://PATH_TO_JAR
- Un file jar installato nel cluster, utilizzando il prefisso
JOB_ARGS: facoltativo, aggiungi gli argomenti del job dopo il doppio trattino (
--
).Dopo l'invio del job, l'output del driver del job viene visualizzato nel terminal locale o Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Questa sezione mostra come inviare un job Flink a un cluster Flink Dataproc utilizzando l'API Dataproc jobs.submit.
Prima di utilizzare i dati della richiesta, apporta le seguenti sostituzioni:
- PROJECT_ID: ID progetto Google Cloud
- REGION: regione del cluster
- CLUSTER_NAME: specifica il nome del cluster Flink Dataproc a cui inviare il job
Metodo HTTP e URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Corpo JSON della richiesta:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Per inviare la richiesta, espandi una di queste opzioni:
Dovresti ricevere una risposta JSON simile alla seguente:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- I job Flink sono elencati nella pagina Job di Dataproc nella console Google Cloud.
- Puoi fare clic su Interrompi o Elimina nella pagina Job o Dettagli job nella console Google Cloud per interrompere o eliminare un job.
Esegui job Flink utilizzando l'interfaccia a riga di comando flink
Anziché
eseguire job Flink utilizzando la risorsa Jobs
Dataproc,
puoi eseguire job Flink sul nodo principale del tuo cluster Flink utilizzando l'interfaccia a riga di comando flink
.
Le seguenti sezioni descrivono diversi modi per eseguire un job flink
CLI sul tuo cluster Flink di Dataproc.
Accedi al nodo principale tramite SSH:utilizza l'utilità SSH per aprire una finestra del terminale sulla VM master del cluster.
Imposta il percorso di classe: inizializza il percorso di classe Hadoop dalla finestra del terminale SSH sulla VM master del cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Esegui job Flink:puoi eseguire job Flink in diverse modalità di deployment su YARN: applicazione, per job e sessione.
Modalità applicazione: la modalità applicazione Flink è supportata dalla versione 2.0 e successive dell'immagine Dataproc. Questa modalità esegue il metodo
main()
del job sul Job Manager YARN. Il cluster si arresta al termine del job.Esempio di invio del job:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Elenca i job in esecuzione:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Per annullare un job in esecuzione:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modalità per job:questa modalità Flink esegue il metodo
main()
del job lato client.Esempio di invio del job:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Modalità sessione:avvia una sessione Flink YARN di lunga durata, quindi invia uno o più job alla sessione.
Avvia una sessione:puoi avviare una sessione Flink in uno dei seguenti modi:
Crea un cluster Flink aggiungendo il flag
--metadata flink-start-yarn-session=true
al comandogcloud dataproc clusters create
(consulta Creare un cluster Flink Dataproc). Con questo flag attivo, dopo la creazione del cluster, Dataproc esegue/usr/bin/flink-yarn-daemon
per avviare una sessione Flink sul cluster.L'ID applicazione YARN della sessione viene salvato in
/tmp/.yarn-properties-${USER}
. Puoi elencare l'ID con il comandoyarn application -list
.Esegui lo script Flink
yarn-session.sh
preinstallato sulla VM master del cluster con impostazioni personalizzate:Esempio con impostazioni personalizzate:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Esegui lo script wrapper Flink
/usr/bin/flink-yarn-daemon
con le impostazioni predefinite:. /usr/bin/flink-yarn-daemon
Invia un job a una sessione:esegui il seguente comando per inviare un job Flink alla sessione.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: l'URL, inclusi host
e porta, della VM master Flink in cui vengono eseguiti i job.
Rimuovi
http:// prefix
dall'URL. Questo URL è elencato nell'output del comando quando avvii una sessione Flink. Puoi eseguire il seguente comando per elencare questo URL nel campoTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: l'URL, inclusi host
e porta, della VM master Flink in cui vengono eseguiti i job.
Rimuovi
Elenca i job in una sessione:per elencare i job Flink in una sessione, esegui una delle seguenti operazioni:
Esegui
flink list
senza argomenti. Il comando cerca l'ID applicazione YARN della sessione in/tmp/.yarn-properties-${USER}
.Ottieni l'ID applicazione YARN della sessione da
/tmp/.yarn-properties-${USER}
o dall'output diyarn application -list
, quindi esegui<code>
flink list -yid YARN_APPLICATION_ID.Esegui
flink list -m FLINK_MASTER_URL
.
Interrompi una sessione: per interrompere la sessione, ottieni l'ID applicazione YARN della sessione da
/tmp/.yarn-properties-${USER}
o dall'output diyarn application -list
, quindi esegui uno dei seguenti comandi:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Eseguire job Apache Beam su Flink
Puoi eseguire job Apache Beam su Dataproc utilizzando FlinkRunner
.
Puoi eseguire i job Beam su Flink nei seguenti modi:
- Job Java Beam
- Job Beam portatili
Job Java Beam
Pacchettizza i job Beam in un file JAR. Fornisci il file JAR incluso con le dipendenze necessarie per eseguire il job.
L'esempio seguente esegue un job Java Beam dal nodo master del cluster Dataproc.
Crea un cluster Dataproc con il componente Flink abilitato.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: la versione dell'immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta le versioni dei componenti Apache Flink elencate per le quattro versioni di release dell'immagine 2.0.x più recenti e precedenti).--region
: una regione Dataproc supportata.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta le Best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
Utilizza l'utilità SSH per aprire una finestra del terminale sul nodo master del cluster Flink.
Avvia una sessione Flink YARN sul nodo master del cluster Dataproc.
. /usr/bin/flink-yarn-daemon
Prendi nota della versione di Flink nel tuo cluster Dataproc.
flink --version
Sulla tua macchina locale, genera l'esempio canonico di conteggio delle parole di Beam in Java.
Scegli una versione di Beam compatibile con la versione di Flink nel cluster Dataproc. Consulta la tabella Compatibilità delle versioni di Flink che elenca la compatibilità delle versioni di Beam-Flink.
Apri il file POM generato. Controlla la versione del runner Beam Flink specificata dal tag
<flink.artifact.name>
. Se la versione del runner Beam Flink nel nome dell'elemento Flink non corrisponde alla versione di Flink sul tuo cluster, aggiorna il numero di versione in modo che corrisponda.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Impacchetta l'esempio di conteggio parole.
mvn package -Pflink-runner
Carica il file JAR uber pacchettizzato,
word-count-beam-bundled-0.1.jar
(~135 MB), sul nodo principale del tuo cluster Dataproc. Puoi utilizzaregcloud storage cp
per trasferimenti di file più rapidi al tuo cluster Dataproc da Cloud Storage.Sul terminale locale, crea un bucket Cloud Storage e carica l'uber JAR.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Sul nodo principale di Dataproc, scarica il file JAR uber.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Esegui il job Java Beam sul nodo master del cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Verifica che i risultati siano stati scritti nel bucket Cloud Storage.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Interrompi la sessione YARN di Flink.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Job di Beam portatili
Per eseguire job Beam scritti in Python, Go e altri linguaggi supportati, puoi utilizzare FlinkRunner
e PortableRunner
come descritto nella pagina del runner Flink di Beam (vedi anche la roadmap del framework di portabilità).
L'esempio seguente esegue un job Beam portatile in Python dal nodo master del cluster Dataproc.
Crea un cluster Dataproc con i componenti Flink e Docker abilitati.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Note:
--optional-components
: Flink e Docker.--image-version
: la versione dell'immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta le versioni dei componenti Apache Flink elencate per le quattro versioni di release dell'immagine 2.0.x più recenti e precedenti).--region
: una regione Dataproc disponibile.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta le Best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza la versione 2.1 o successive dell'immagine Dataproc.
Utilizza gcloud CLI in locale o in Cloud Shell per creare un bucket Cloud Storage. Specifica BUCKET_NAME quando esegui un programma di conteggio delle parole di esempio.
gcloud storage buckets create BUCKET_NAME
In una finestra del terminale sulla VM del cluster, avvia una sessione Flink YARN. Prendi nota dell'URL del master Flink, ovvero l'indirizzo del master Flink dove vengono eseguiti i job. Specificherai FLINK_MASTER_URL quando eseguirai un programma di conteggio delle parole di esempio.
. /usr/bin/flink-yarn-daemon
Visualizza e prendi nota della versione di Flink in esecuzione nel cluster Dataproc. Specificherai FLINK_VERSION quando eseguirai un programma di conteggio delle parole di esempio.
flink --version
Installa le librerie Python necessarie per il job sul node master del cluster.
Installa una versione di Beam compatibile con la versione di Flink sul cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Esegui l'esempio di conteggio delle parole sul nodo master del cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Note:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, come indicato in precedenza.--flink_master
: FLINK_MASTER_URL, come indicato in precedenza.--flink_submit_uber_jar
: utilizza il file JAR uber per eseguire il job Beam.--output
: BUCKET_NAME, creato in precedenza.
Verifica che i risultati siano stati scritti nel bucket.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Interrompi la sessione YARN di Flink.
- Recupera l'ID applicazione.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Esegui Flink su un cluster Kerberized
Il componente Flink di Dataproc supporta i cluster Kerberized. Per inviare e mantenere un job Flink o avviare un cluster Flink è necessario un ticket Kerberos valido. Per impostazione predefinita, un ticket Kerberos rimane valido per sette giorni.
Accedere all'interfaccia utente di Flink Job Manager
L'interfaccia web di Flink Job Manager è disponibile durante l'esecuzione di un job Flink o di un cluster di sessioni Flink. Per utilizzare l'interfaccia web:
- Crea un cluster Flink Dataproc.
- Dopo aver creato il cluster, fai clic su Component Gateway Link a ResourceManager YARN nella scheda Interfaccia web della pagina Dettagli cluster nella console Google Cloud.
- Nell'interfaccia utente di YARN Resource Manager, identifica la voce dell'applicazione del cluster Flink. A seconda dello stato di completamento di un job, verrà visualizzato un link ApplicationMaster o History.
- Per un job di streaming di lunga durata, fai clic sul link ApplicationManager per aprire la dashboard di Flink. Per un job completato, fai clic sul link Cronologia per visualizzarne i dettagli.