Puoi attivare componenti aggiuntivi come Flink quando crei un progetto Dataproc utilizzando il cluster Componenti facoltativi funzionalità. Questa pagina mostra come creare un cluster Dataproc con Apache Flink componente facoltativo attivato (un cluster Flink) ed eseguire job Flink sul cluster.
Puoi utilizzare il cluster Flink per:
Esegui job Flink utilizzando la risorsa Dataproc
Jobs
dalla console Google Cloud, da Google Cloud CLI o dall'API Dataproc.Esegui job Flink utilizzando l'interfaccia a riga di comando
flink
in esecuzione sul nodo master del cluster Flink.
Crea un cluster Dataproc Flink
Puoi utilizzare la console Google Cloud, Google Cloud CLI o Dataproc API per creare un cluster Dataproc con il componente Flink sia attivata sul cluster.
Suggerimento: utilizza un cluster VM standard a 1 master con il componente Flink. Cluster in modalità Dataproc ad alta disponibilità (con 3 VM master) non supportano Flink della modalità ad alta disponibilità.
Console
Per creare un cluster Dataproc Flink utilizzando la console Google Cloud, segui questi passaggi:
Apri Dataproc Pagina Crea un cluster Dataproc su Compute Engine.
- Il riquadro Configura cluster è selezionato.
- Nella sezione Controllo delle versioni, conferma o modifica
Tipo e versione dell'immagine. La versione dell'immagine del cluster determina
del componente Flink installato sul cluster.
- La versione dell'immagine deve essere 1.5 o successiva per attivare il componente Flink sul cluster (Vedi Versioni di Dataproc supportate per visualizzare l'elenco delle versioni dei componenti incluse in ogni release immagine Dataproc).
- La versione dell'immagine deve essere [da definire] o successiva per eseguire job Flink tramite l'API Dataproc Jobs (vedi Esegui job Flink Dataproc).
- Nella sezione Componenti:
- .
- In Gateway dei componenti, seleziona Attiva gateway dei componenti. Devi attivare il parametro Gateway dei componenti per attivare il link Gateway dei componenti all'interfaccia utente del server di cronologia Flink. L'attivazione del gateway dei componenti abilita anche Accedere all'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink.
- In Componenti facoltativi, seleziona Flink e altri facoltativi per l'attivazione sul cluster.
- Nella sezione Controllo delle versioni, conferma o modifica
Tipo e versione dell'immagine. La versione dell'immagine del cluster determina
del componente Flink installato sul cluster.
Fai clic sul riquadro Personalizza cluster (facoltativo).
Nella sezione Proprietà del cluster, fai clic su Aggiungi proprietà per ognuna facoltativa proprietà cluster da aggiungere al cluster. Puoi aggiungere
flink
proprietà con prefisso per configurare le proprietà Flink in/etc/flink/conf/flink-conf.yaml
che agiranno come predefiniti per le applicazioni Flink in esecuzione sul cluster.Esempi:
- Imposta
flink:historyserver.archive.fs.dir
per specificare il percorso di Cloud Storage in cui scrivere la cronologia dei job Flink (la posizione sarà utilizzata dal server di cronologia Flink in esecuzione sull'ammasso Flink). - Imposta gli slot attività Flink con
flink:taskmanager.numberOfTaskSlots=n
.
- Imposta
Nella sezione Metadati cluster personalizzati, fai clic su Aggiungi metadati per aggiungerli metadati facoltativi. Ad esempio, aggiungi
flink-start-yarn-session
true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul master del cluster per avviare una sessione Flink YARN (vedi Modalità sessione Flink).
Se utilizzi l'immagine Dataproc versione 2.0 o precedente, Fai clic sul riquadro Gestisci sicurezza (facoltativo) e poi, in Accesso al progetto, seleziona
Enables the cloud-platform scope for this cluster
. L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
- Il riquadro Configura cluster è selezionato.
Fai clic su Crea per creare il cluster.
gcloud
Per creare un cluster Dataproc Flink utilizzando gcloud CLI, esegui questo comando gcloud dataproc clusters create localmente in una finestra del terminale o Cloud Shell
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Note:
- CLUSTER_NAME: specifica il nome del cluster.
- REGION: specifica una regione di Compute Engine in cui si troverà il cluster.
DATAPROC_IMAGE_VERSION: specifica la versione dell'immagine (facoltativo) da utilizzare sul cluster. La versione dell'immagine del cluster determina del componente Flink installato sul cluster.
La versione dell'immagine deve essere 1.5 o successiva per attivare il componente Flink sul cluster (Vedi Versioni di Dataproc supportate per visualizzare l'elenco delle versioni dei componenti incluse in ogni release immagine Dataproc).
La versione dell'immagine deve essere [da definire] o successiva per eseguire job Flink tramite l'API Dataproc Jobs (vedi Eseguire job Flink Dataproc).
--optional-components
: devi specificare il componenteFLINK
per eseguire Flink e il servizio web Flink HistoryServer sul cluster.--enable-component-gateway
: devi attivare il Gateway dei componenti per attiva il link Gateway dei componenti all'interfaccia utente del server di cronologia Flink. L'attivazione del gateway dei componenti consente anche l'accesso ai L'interfaccia web di Flink Job Manager in esecuzione sulla Ammasso Flink.PROPERTIES. (Facoltativo) Puoi specificare uno o più proprietà del cluster.
Quando crei cluster Dataproc con versioni immagine
2.0.67
+ e2.1.15
+, puoi usare il flag--properties
per per configurare le proprietà Flink in/etc/flink/conf/flink-conf.yaml
che agiranno da valori predefiniti per le applicazioni Flink in esecuzione sul cluster.Puoi impostare
flink:historyserver.archive.fs.dir
per specificare il percorso di Cloud Storage in cui scrivere la cronologia dei job Flink (la posizione sarà utilizzata dal server di cronologia Flink in esecuzione sull'ammasso Flink).Esempio di più proprietà:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Altri flag:
- Puoi aggiungere l'elemento facoltativo
--metadata flink-start-yarn-session=true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul nodo master del cluster per avviare una sessione Flink YARN. (vedi Modalità sessione Flink).
- Puoi aggiungere l'elemento facoltativo
Quando utilizzi versioni immagine 2.0 o precedenti, puoi aggiungere il parametro
--scopes=https://www.googleapis.com/auth/cloud-platform
flag per abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta la best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
API
Per creare un cluster Dataproc Flink utilizzando l'API Dataproc, invia un clusters.create come segue:
Note:
Imposta il parametro SoftwareConfig.Component a
FLINK
.Se vuoi, puoi impostare
SoftwareConfig.imageVersion
per specificare la versione dell'immagine da utilizzare sul cluster. La versione dell'immagine del cluster determina del componente Flink installato sul cluster.La versione dell'immagine deve essere 1.5 o successiva per attivare il componente Flink sul cluster (Vedi Versioni di Dataproc supportate per visualizzare l'elenco delle versioni dei componenti incluse in ogni release immagine Dataproc).
La versione dell'immagine deve essere [da definire] o successiva per eseguire job Flink tramite l'API Dataproc Jobs (vedi Eseguire job Flink Dataproc).
Imposta EndpointConfig.enableHttpPortAccess a
true
per abilitare il gateway dei componenti link all'interfaccia utente del server di cronologia Flink. L'attivazione del gateway dei componenti consente anche l'accesso ai L'interfaccia web di Flink Job Manager in esecuzione sulla Ammasso Flink.Se vuoi, puoi impostare
SoftwareConfig.properties
per specificare uno o più proprietà del cluster.- Puoi specificare le proprietà Flink che verranno utilizzate
i valori predefiniti per le applicazioni Flink in esecuzione sul cluster. Ad esempio:
puoi impostare
flink:historyserver.archive.fs.dir
per specificare il percorso di Cloud Storage in cui scrivere la cronologia dei job Flink (la posizione sarà utilizzata dal server di cronologia Flink in esecuzione sull'ammasso Flink).
- Puoi specificare le proprietà Flink che verranno utilizzate
i valori predefiniti per le applicazioni Flink in esecuzione sul cluster. Ad esempio:
puoi impostare
Se vuoi, puoi impostare:
GceClusterConfig.metadata
. ad esempio, per specificareflink-start-yarn-session
true
per eseguire il daemon Flink YARN (/usr/bin/flink-yarn-daemon
) in background sul master del cluster per avviare una sessione Flink YARN (vedi Modalità sessione Flink).- GceClusterConfig.serviceAccountScopes
a
https://www.googleapis.com/auth/cloud-platform
(ambitocloud-platform
) se utilizzi versioni dell'immagine 2.0 o precedenti per consentire l'accesso a Google Cloud API per il tuo cluster (consulta Best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
Dopo aver creato un cluster Flink
- Usa il link
Flink History Server
nella Gateway dei componenti per visualizzare il server di cronologia Flink in esecuzione sul cluster Flink. - Utilizza
YARN ResourceManager link
nel gateway dei componenti per visualizzare l'interfaccia web di Flink Job Manager in esecuzione sul cluster Flink . - Crea un server di cronologia permanente di Dataproc per visualizzare i file di cronologia dei job Flink scritti da cluster Flink esistenti ed eliminati.
Esegui job Flink utilizzando la risorsa Dataproc Jobs
Puoi eseguire job Flink utilizzando la risorsa Dataproc Jobs
dal
Console Google Cloud, Google Cloud CLI o API Dataproc.
Console
Per inviare un job di conteggio parole Flink di esempio dalla console:
Apri Dataproc Nella pagina Invia un'offerta di lavoro nella console Google Cloud nel tuo browser.
Compila i campi nella pagina Invia un job:
- Seleziona il nome del tuo cluster dall'elenco dei cluster.
- Imposta Tipo di job su
Flink
. - Imposta Classe principale o jar su
org.apache.flink.examples.java.wordcount.WordCount
. - Imposta File jar su
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
indica un file che si trova nel cluster. Dataproc ha installatoWordCount.jar
durante la creazione del cluster Flink.- Questo campo accetta anche un percorso Cloud Storage
(
gs://BUCKET/JARFILE
) o un Percorso HDFS (Hadoop Distributed File System) (hdfs://PATH_TO_JAR
).
Fai clic su Invia.
- L'output del driver del job viene visualizzato nella pagina Dettagli job.
- I job Flink sono elencati nella Pagina Job di Dataproc nella console Google Cloud.
- Fai clic su Arresta o Elimina dalla pagina Job o Dettagli job. per arrestare o eliminare un job.
gcloud
Per inviare un job Flink a un cluster Dataproc Flink, esegui gcloud CLI gcloud dataproc job send localmente in una finestra del terminale o Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Note:
- CLUSTER_NAME: specifica il nome del Flink Dataproc a cui inviare il job.
- REGION: specifica una regione di Compute Engine in cui si trova il cluster.
- MAIN_CLASS: specifica la classe
main
del tuo Applicazione Flink, ad esempio:- .
org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: specifica il file jar dell'applicazione Flink. Puoi specificare:
- Un file jar installato nel cluster, utilizzando
file:///` prefisso:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Un file jar in Cloud Storage:
gs://BUCKET/JARFILE
- Un file jar in HDFS:
hdfs://PATH_TO_JAR
- Un file jar installato nel cluster, utilizzando
JOB_ARGS: se vuoi, aggiungi argomenti del job dopo il doppio trattino (
--
).Dopo aver inviato il job, l'output del driver del job viene visualizzato locale o nel terminale Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Questa sezione mostra come inviare un job Flink a un Flink Dataproc. con Dataproc utilizzando jobs.submit.
Prima di utilizzare i dati della richiesta, effettua le seguenti sostituzioni:
- PROJECT_ID: ID progetto Google Cloud
- REGION: regione del cluster
- CLUSTER_NAME: specifica il nome del cluster Dataproc Flink a cui inviare il job
Metodo HTTP e URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Corpo JSON della richiesta:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Per inviare la richiesta, espandi una delle seguenti opzioni:
Dovresti ricevere una risposta JSON simile alla seguente:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- I job Flink sono elencati nella Pagina Job di Dataproc nella console Google Cloud.
- Puoi fare clic su Arresta o Elimina dalla pagina Job o Dettagli job. nella console Google Cloud per arrestare o eliminare un job.
Esegui job Flink utilizzando l'interfaccia a riga di comando flink
Invece di
l'esecuzione di job Flink utilizzando la risorsa Dataproc Jobs
,
puoi eseguire job Flink sul nodo master del tuo cluster Flink utilizzando l'interfaccia a riga di comando flink
.
Le sezioni seguenti descrivono i diversi modi in cui puoi eseguire un job dell'interfaccia a riga di comando flink
su
del tuo cluster Dataproc Flink.
Accedi tramite SSH al nodo master: utilizza il protocollo SSH. per aprire una finestra del terminale sulla VM master del cluster.
Imposta il classpath: inizializza il classpath di Hadoop dalla finestra del terminale SSH sul VM master del cluster Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Esegui job Flink: puoi eseguire job Flink in diverse modalità di deployment su YARN: modalità applicazione, per job e sessione.
Modalità applicazione: La modalità applicazione Flink è supportata da Dataproc Image 2.0 e versioni successive. Questa modalità esegue il metodo
main()
del job nel gestore job YARN. Il cluster si arresta una volta terminato il job.Esempio di invio di un job:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Elenca i job in esecuzione:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Annulla un job in esecuzione:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modalità per job: questa modalità Flink esegue il metodo
main()
del job sulla lato client.Esempio di invio di un job:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Modalità sessione:avvia una sessione Flink YARN di lunga durata, quindi invia a uno o più job per la sessione.
Avvia una sessione:puoi avviare una sessione Flink in una delle nei seguenti modi:
Crea un cluster Flink, aggiungendo
--metadata flink-start-yarn-session=true
per Comandogcloud dataproc clusters create
(vedi Crea un cluster Dataproc Flink). Con questa bandiera abilitato, una volta creato il cluster, Dataproc esegue/usr/bin/flink-yarn-daemon
per avviare una sessione Flink sul cluster.L'ID applicazione YARN della sessione viene salvato in
/tmp/.yarn-properties-${USER}
. Puoi elencare l'ID con il comandoyarn application -list
.Fai sballo
yarn-session.sh
preinstallato sulla VM master del cluster, con impostazioni personalizzate:Esempio con impostazioni personalizzate:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Esegui lo script wrapper
/usr/bin/flink-yarn-daemon
di Flink con impostazioni predefinite:. /usr/bin/flink-yarn-daemon
Invia un job a una sessione: esegui questo comando per inviare un Fai clic su "Flink" per il job alla sessione.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: l'URL, incluso l'host
e la porta della VM master Flink in cui vengono eseguiti i job.
Rimuovi
http:// prefix
da l'URL. Questo URL viene elencato nell'output comando quando avvii un Sessione Flink. Per elencare questo URL puoi eseguire questo comando nel campoTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: l'URL, incluso l'host
e la porta della VM master Flink in cui vengono eseguiti i job.
Rimuovi
Elenco di lavori in una sessione: per elencare i job Flink in una sessione, esegui una delle seguenti le seguenti:
Esegui
flink list
senza argomenti. Il comando cerca l'ID applicazione YARN della sessione in/tmp/.yarn-properties-${USER}
.Recupera l'ID applicazione YARN della sessione da
/tmp/.yarn-properties-${USER}
o l'output diyarn application -list
, quindi esegui<code>
flink list -yid YARN_APPLICATION_ID.Esegui
flink list -m FLINK_MASTER_URL
.
Interrompere una sessione: per interrompere la sessione, ottieni l'ID applicazione YARN della sessione da
/tmp/.yarn-properties-${USER}
o l'outputyarn application -list
, quindi esegui uno dei seguenti comandi:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Esegui job Apache Beam su Flink
Puoi eseguire job Apache Beam su
Dataproc utilizzando
FlinkRunner
.
Puoi eseguire job Beam su Flink nei seguenti modi:
- Job Java Beam
- Job Beam portatili
Job Java Beam
Pacchettizza i tuoi job Beam in un file JAR. Fornisci il file JAR in bundle con le dipendenze necessarie per eseguire il job.
L'esempio seguente esegue un job Java Beam da Dataproc nel nodo master del cluster.
Crea un cluster Dataproc con Componente Flink attivato.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: versione immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta l'elenco delle versioni del componente Apache Flink per le versioni più recenti e precedenti quattro versioni di rilascio delle immagini 2.0.x).--region
: una regione Dataproc supportata.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta la best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
Utilizza l'utilità SSH per aprire una finestra del terminale sul nodo master del cluster Flink.
Avvia una sessione Flink YARN sul master del cluster Dataproc nodo.
. /usr/bin/flink-yarn-daemon
Prendi nota della versione di Flink sul tuo cluster Dataproc.
flink --version
Sulla tua macchina locale, genera l'esempio canonico di conteggio delle parole di Beam in Java.
Scegli una versione Beam compatibile con la versione Flink del tuo di un cluster Dataproc. Consulta la Compatibilità delle versioni di Flink . che elenca la compatibilità delle versioni Beam-Flink.
Apri il file POM generato. Controlla la versione runner Beam Flink specificata da il tag
<flink.artifact.name>
. Se la versione runner Beam Flink nello Il nome dell'artefatto Flink non corrisponde alla versione di Flink sul tuo cluster, aggiorna il numero di versione che corrisponda.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Inserisci l'esempio di conteggio delle parole.
mvn package -Pflink-runner
Carica il file JAR uber in pacchetto,
word-count-beam-bundled-0.1.jar
(~135 MB) al nodo master del cluster Dataproc. Puoi utilizzaregcloud storage cp
per trasferimenti di file più rapidi nel cluster Dataproc di archiviazione ideale in Cloud Storage.Sul terminale locale, crea un bucket Cloud Storage e carica l'uber JAR.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Sul nodo master di Dataproc, scarica l'uber JAR.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Esegui il job Java Beam sul nodo master del cluster Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Verifica che i risultati siano stati scritti nel tuo bucket Cloud Storage.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Interrompi la sessione Flink YARN.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Job Beam portatili
Per eseguire job Beam scritti in Python, Go e altri linguaggi supportati, puoi
utilizza FlinkRunner
e PortableRunner
come descritto sullo schermo
Flink Runner
(vedi anche Roadmap del framework per la portabilità).
L'esempio seguente esegue un job Beam portatile in Python dall' il nodo master del cluster Dataproc.
Crea un cluster Dataproc con Flink e Docker abilitati.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Note:
--optional-components
: Flink e Docker.--image-version
: la versione immagine del cluster, che determina la versione di Flink installata sul cluster (ad esempio, consulta l'elenco delle versioni del componente Apache Flink per le versioni più recenti e precedenti quattro versioni di rilascio delle immagini 2.0.x).--region
: un'area geografica Dataproc disponibile.--enable-component-gateway
: abilita l'accesso all'interfaccia utente di Flink Job Manager.--scopes
: abilita l'accesso alle API Google Cloud da parte del tuo cluster (consulta la best practice per gli ambiti). L'ambitocloud-platform
è abilitato per impostazione predefinita (non è necessario includere questa impostazione del flag) quando crei un cluster che utilizza l'immagine Dataproc versione 2.1 o successiva.
Utilizza gcloud CLI in locale o Cloud Shell per creare nel bucket Cloud Storage. Dovrai specificare BUCKET_NAME quando esegui un programma di conteggio parole di esempio.
gcloud storage buckets create BUCKET_NAME
In una finestra del terminale sulla VM del cluster, avvia una sessione Flink YARN. Prendi nota dell'URL master Flink, l'indirizzo del master Flink in cui vengono eseguiti i job. Dovrai specificare FLINK_MASTER_URL quando esegui un programma di conteggio parole di esempio.
. /usr/bin/flink-yarn-daemon
Visualizza e osserva la versione di Flink che esegue Dataproc in un cluster Kubernetes. Dovrai specificare FLINK_VERSION quando esegui un programma di conteggio parole di esempio.
flink --version
Installa le librerie Python necessarie per il job nel nodo master del cluster.
Installa un Versione Beam che sia compatibile con la versione di Flink sul cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Esegui l'esempio di conteggio parole sul master del cluster nodo.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Note:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, indicato in precedenza.--flink_master
: FLINK_MASTER_URL, indicato in precedenza.--flink_submit_uber_jar
: utilizza l'uber JAR per eseguire il job Beam.--output
: BUCKET_NAME, creata in precedenza.
Verifica che i risultati siano stati scritti nel tuo bucket.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Interrompi la sessione Flink YARN.
- Recupera l'ID applicazione.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Esegui Flink su un cluster Kerberized
Il componente Dataproc Flink supporta Cluster Kerberizzati. È necessario un ticket Kerberos valido per inviare e mantenere un job Flink o per avviare in un ammasso Flink. Per impostazione predefinita, un ticket Kerberos rimane valido per sette giorni.
Accedi all'interfaccia utente di Flink Job Manager
L'interfaccia web di Flink Job Manager è disponibile durante un job Flink o Flink è in esecuzione un cluster di sessione. Per utilizzare l'interfaccia web:
- Crea un cluster Dataproc Flink.
- Dopo la creazione del cluster, fai clic sul Gateway dei componenti. Link YARN ResourceManager nella scheda Interfaccia web della pagina Dettagli cluster nella console Google Cloud.
- Nella UI di YARN Resource Manager, identifica l'applicazione del cluster Flink
. A seconda dello stato di completamento di un job, viene restituito un ApplicationMaster
o Cronologia.
- Per un job di flussi di dati a lunga esecuzione, fai clic sul link ApplicationManager per
aprire la dashboard di Flink; Per un job completato, fai clic sul link Cronologia
per visualizzare i dettagli del job.