Puoi installare componenti aggiuntivi come Flink quando crei un cluster Dataproc utilizzando la funzionalità Componenti facoltativi. In questa pagina viene descritto il componente Flink.
Il componente Dataproc Flink installa Apache Flink su un cluster Dataproc.
Installa il componente
Installa il componente quando crei un cluster Dataproc. Il componente Flink Dataproc può essere installato sui cluster creati con immagine versione 1.5 o successiva.
La versione dell'immagine del cluster determina la versione del componente Flink installato sul cluster. Consulta la pagina relativa alle versioni di Dataproc supportate per visualizzare un elenco delle versioni dei componenti incluse in ogni release dell'immagine Dataproc.
Consiglio: usa un cluster VM standard a 1 master con il componente Flink, I cluster della modalità ad alta disponibilità di Dataproc (con 3 VM master) non supportano la modalità ad alta disponibilità di Flink.
Comando gcloud
Per creare un cluster Dataproc che include il componente Flink, utilizza il comando gcloud dataproc clusters create cluster-name con il flag --optional-components
.
gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ ... other flagsNote:
- Il flag
--enable-component-gateway
consente di accedere all'interfaccia utente di Flink Job Manager. - Il flag
--scopes=https://www.googleapis.com/auth/cloud-platform
consente l'accesso API ai servizi Cloud Platform nel progetto. - Puoi aggiungere
--metadata flink-start-yarn-session=true
facoltativo per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul cluster per avviare una sessione Flink YARN (vedi modalità sessione Flink).gcloud dataproc clusters create cluster-name \ --optional-components=FLINK \ --region=region \ --enable-component-gateway \ --image-version=DATAPROC_IMAGE_VERSION \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --metadata flink-start-yarn-session=true \ ... other flags
API REST
Il componente Flink può essere specificato tramite l'API Dataproc utilizzando SoftwareConfig.Component come parte di una richiesta clusters.create.
Imposta EndpointConfig.enableHttpPortAccess
su true
per abilitare
la connessione all'UI di Flink Job Manager.
Console
- Attiva il componente e il gateway del componente.
- Nella console Google Cloud, apri la pagina Crea un cluster Dataproc su Compute Engine di Dataproc. È selezionato il riquadro Configura cluster.
- Nella sezione Controllo delle versioni, conferma o modifica Tipo di immagine e versione.
- Nella sezione Componenti:
- In Gateway dei componenti, seleziona Abilita gateway dei componenti.
- In Componenti facoltativi, seleziona Flink e altri componenti facoltativi da installare sul tuo cluster.
Configura Flink
Per impostare le proprietà Flink:
Utilizza un'azione di inizializzazione per aggiornare le proprietà Flink predefinite in
/etc/flink/conf/flink-conf.yaml
oppureSpecifica le proprietà Flink con flag della riga di comando quando invii un job Flink o avvii una sessione Flink.
Imposta un percorso di classe
Inizializza il classclass Hadoop da una finestra del terminale SSH sulla VM master del cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Esegui job Flink
Puoi eseguire Flink in diverse modalità di deployment su YARN: in modalità applicazione, per job o sessione.
Modalità applicazione
La modalità applicazione Flink è supportata dall'immagine Dataproc 2.0 e versioni successive.
Questa modalità esegue il metodo main()
del job su Job Manager YARN. Il cluster si arresta al termine del job.
Esempio di invio del job:
flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Djobmanager.heap.mb=820 \
-Dtaskmanager.heap.mb=1640 \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=4 \
/usr/lib/flink/examples/batch/WordCount.jar
Elenca il job in esecuzione:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Annulla il job in esecuzione:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modalità per lavoro
Questa modalità Flink esegue il metodo main()
del job sul lato client.
Esempio di invio del job:
flink run \
-m yarn-cluster \
-p 4 \
-ys 2 \
-yjm 1024m \
-ytm 2048m \
/usr/lib/flink/examples/batch/WordCount.jar
Modalità sessione
Avvia una sessione Flink YARN a lunga esecuzione, quindi invia uno o più job alla sessione.
Puoi avviare una sessione Flink in uno dei seguenti modi:
Dopo aver creato il cluster Flink, esegui lo script
yarn-session.sh
Flink, preinstallato sulla VM master del cluster, con impostazioni personalizzate:Esempio con impostazioni personalizzate:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached ```
Dopo aver creato il cluster Flink, esegui lo script del wrapper Flink
/usr/bin/flink-yarn-daemon
con le impostazioni predefinite:. /usr/bin/flink-yarn-daemon
Crea un cluster Flink, aggiungendo il flag
--metadata flink-start-yarn-session=true
al comandogcloud dataproc clusters create
. Con questo flag abilitato, dopo la creazione del cluster, Dataproc esegue/usr/bin/flink-yarn-daemon
per avviare una sessione Flink sul cluster.
L'ID applicazione YARN della sessione viene salvato in /tmp/.yarn-properties-${USER}
.
Puoi elencare l'ID con il comando yarn application -list
.
Invia un job alla sessione
Quando avvii una sessione Flink, l'output comando elenca l'URL (inclusi host e porta) della VM master Flink in cui vengono eseguiti i job.
Un altro modo per visualizzare l'URL principale di Flink: il seguente output di comando elenca l'URL principale di Flink nel campo Tracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`
Esegui il comando seguente per inviare un job Flink alla sessione.
Sostituisci FLINK_MASTER_URL con l'URL principale di Flink
dopo aver rimosso l'http:// prefix
.
flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar
Elencare i job in una sessione
Per elencare i job Flink in una sessione, esegui una delle seguenti operazioni:
Esegui
flink list
senza argomenti. Il comando cerca l'ID applicazione YARN della sessione in/tmp/.yarn-properties-${USER}
.Esegui
flink list -yid YARN_APPLICATION_ID
. Esegui/tmp/.yarn-properties-${USER}
oyarn application -list
per ottenere l'ID applicazione YARN.Esegui
flink list -m FLINK_MASTER_URL
.
Interrompere una sessione
Per arrestare la sessione, ottieni l'ID applicazione YARN della sessione da
/tmp/.yarn-properties-${USER}
o dall'output di yarn application -list
,
quindi esegui uno dei seguenti comandi:
echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Eseguire job Apache Beam
Puoi eseguire job Apache Beam su Dataproc utilizzando FlinkRunner
.
Puoi eseguire job Beam su Flink nei seguenti modi:
- Job Java Beam
- Job Portable portatili
Job Java Beam
Pacchettizza i tuoi job Beam in un file JAR. Fornisci al file JAR in bundle le dipendenze necessarie per eseguire il job.
L'esempio seguente esegue un job Java Beam dal nodo master del cluster Dataproc.
Crea un cluster Dataproc con il componente Link abilitato.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: la versione immagine del cluster, che determina la versione Flink installata sul cluster (ad esempio, consulta le versioni dei componenti Apache Flink elencate per le quattro e le versioni di immagine 2.0.x più recenti e precedenti).--region
: un'area geografica Dataproc supportata.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso API ai servizi Cloud Platform nel progetto.
Utilizza l'utilità SSH per aprire una finestra del terminale sul nodo master del cluster FLink.
Avvia una sessione Flink YARN sul nodo master del cluster Dataproc.
. /usr/bin/flink-yarn-daemon
Prendi nota della versione Flink nel tuo cluster Dataproc.
flink --version
Sul tuo computer locale, genera l'esempio di conteggio canonico di Beam in Java.
Scegli una versione di Beam compatibile con la versione Flink sul tuo cluster Dataproc. Consulta la tabella Compatibilità delle versioni di Flink che elenca la compatibilità della versione di Beam.
Apri il file POM generato. Controlla la versione del corridore Beam Flink specificata dal tag
<flink.artifact.name>
. Se la versione del runner Flink Beam nel nome dell'elemento Flink non corrisponde alla versione Flink del tuo cluster, aggiorna il numero di versione in modo che corrisponda.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Inserisci un esempio di conteggio delle parole.
mvn package -Pflink-runner
Carica il file JAR JAR pacchettizzato,
word-count-beam-bundled-0.1.jar
(~135 MB) nel nodo master del cluster Dataproc. Puoi utilizzaregsutil cp
per velocizzare i trasferimenti di file al tuo cluster Dataproc da Cloud Storage.Sul terminale locale, crea un bucket Cloud Storage e carica la JAR Uber.
gsutil mb BUCKET_NAME
gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Sul nodo master di Dataproc, scarica la JAR uber.
gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Eseguire il job Java Beam sul nodo master del cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Verifica che i risultati siano stati scritti nel bucket Cloud Storage.
gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Interrompi la sessione Flink YARN.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Job Beam portatili
Per eseguire i job Beam scritti in Python, Go e altri linguaggi supportati, puoi utilizzare FlinkRunner
e PortableRunner
, come descritto nella pagina Flink Runner di Beam (consulta anche Portability Framework Roadmap).
L'esempio seguente esegue un job portatile di Beam in Python del nodo master del cluster Dataproc.
Creare un cluster Dataproc con entrambi i componenti Flink e Docker abilitati.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Note:
--optional-components
: Flink e Docker.--image-version
: la versione immagine del cluster, che determina la versione Flink installata sul cluster (ad esempio, consulta le versioni dei componenti Apache Flink elencate per le quattro e le versioni di immagine 2.0.x più recenti e precedenti).--region
: è disponibile una regione Dataproc.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso API ai servizi Cloud Platform nel progetto.
Utilizza
gsutil
localmente o in Cloud Shell per creare un bucket Cloud Storage. Devi specificare BUCKET_NAME quando esegui un programma di conteggio delle parole di esempio.gsutil mb BUCKET_NAME
In una finestra del terminale sulla VM del cluster, avvia una sessione Flink YARN. Prendi nota dell'URL del master Flink, dell'indirizzo del master Flink in cui vengono eseguiti i job. Devi specificare FLINK_MASTER_URL quando esegui un programma di conteggio delle parole di esempio.
. /usr/bin/flink-yarn-daemon
Visualizza e indica la versione di Flink che esegue il cluster Dataproc. Devi specificare FLINK_VERSION quando esegui un programma di conteggio delle parole di esempio.
flink --version
Installare le librerie Python necessarie per il job sul nodo master del cluster.
Installa una versione Beam compatibile con la versione Flink del cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Eseguire l'esempio di conteggio parole sul nodo master del cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Note:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, annotato in precedenza.--flink_master
: FLINK_MASTER_URL, annotato in precedenza.--flink_submit_uber_jar
: usa uber JAR per eseguire il job Beam.--output
: BUCKET_NAME, creata prima.
Verifica che i risultati siano stati scritti nel tuo bucket.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Interrompi la sessione Flink YARN.
- Recupera l'ID applicazione.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Esegui Flink su un cluster Kerberized
Il componente Dataproc Flink supporta i cluster Kubernetes. Per inviare e mantenere un job Flink o per avviare un cluster Flink, è necessario un ticket Kerberos valido. Per impostazione predefinita, un ticket Kerberos rimane valido per sette giorni.
Accedere all'interfaccia utente di Flink Job Manager
L'interfaccia web di Flink Job Manager è disponibile mentre è in esecuzione un cluster di job Flink o di sessione Flink. Per utilizzare l'interfaccia web:
- Crea un cluster Dataproc con Component Gateway abilitato.
- Dopo la creazione del cluster, fai clic su Gateway dei componenti Link YARN ResourceManager nella scheda Interfaccia web della pagina Dettagli cluster nella console Google Cloud.
- Nell'interfaccia utente di YARN Resource Manager, identifica la voce dell'applicazione cluster Flink. In base allo stato di completamento di un job, viene visualizzato il link ApplicationMaster o Cronologia.
- Per un job di streaming a lunga esecuzione, fai clic sul link ApplicationManager per aprire la dashboard Flink; per un job completato, fai clic sul link Cronologia per visualizzare i dettagli del job.