Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina descrive come utilizzare Cloud Composer 2 per eseguire i carichi di lavoro Dataproc Serverless suGoogle Cloud.
Gli esempi nelle sezioni seguenti mostrano come utilizzare gli operatori per la gestione dei carichi di lavoro batch Dataproc Serverless. Utilizza questi operatori nei DAG che creano, eliminano, elencano e recuperano un carico di lavoro batch Spark Dataproc Serverless:
Crea DAG per gli operatori che funzionano con i carichi di lavoro batch di Dataproc Serverless:
Crea DAG che utilizzano container personalizzati e Dataproc Metastore.
Configura il server di cronologia permanente per questi DAG.
Prima di iniziare
Abilita l'API Dataproc:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Seleziona la posizione del file del carico di lavoro batch. Puoi utilizzare una delle seguenti opzioni:
- Crea un bucket Cloud Storage che immagazzina questo file.
- Utilizza il bucket del tuo ambiente. Poiché non devi sincronizzare questo file con Airflow, puoi creare una sottocartella separata al di fuori delle cartelle
/dags
o/data
. Ad esempio:/batches
. - Utilizza un bucket esistente.
Configura i file e le variabili Airflow
Questa sezione mostra come impostare i file e configurare le variabili Airflow per questo tutorial.
Carica un file del carico di lavoro Spark ML di Dataproc Serverless in un bucket
Il carico di lavoro in questo tutorial esegue uno script pyspark:
Salva qualsiasi script pyspark in un file locale denominato
spark-job.py
. Ad esempio, puoi utilizzare lo script pyspark di esempio.Carica il file nella posizione selezionata in Prima di iniziare.
Imposta le variabili Airflow
Gli esempi nelle sezioni seguenti utilizzano le variabili Airflow. Imposti i valori per queste variabili in Airflow, in modo che il codice DAG possa accedervi.
Gli esempi in questo tutorial utilizzano le seguenti variabili Airflow. Puoi impostarli come necessario, a seconda dell'esempio che utilizzi.
Imposta le seguenti variabili Airflow da utilizzare nel codice DAG:
project_id
: ID progetto.bucket_name
: URI di un bucket in cui si trova il file Python principale del workload (spark-job.py
). Hai selezionato questa località in Prima di iniziare.phs_cluster
: il nome del cluster del server di cronologia permanente. Imposti questa variabile quando crei un server di cronologia permanente.image_name
: nome e tag dell'immagine del contenitore personalizzato (image:tag
). Imposti questa variabile quando utilizzi l'immagine del contenitore personalizzato con DataprocCreateBatchOperator.metastore_cluster
: nome del servizio Dataproc Metastore. Imposti questa variabile quando utilizzi il servizio Dataproc Metastore con DataprocCreateBatchOperator.region_name
: regione in cui si trova il servizio Dataproc Metastore. Imposti questa variabile quando utilizzi il servizio Dataproc Metastore con DataprocCreateBatchOperator.
Utilizza la console Google Cloud e l'interfaccia utente di Airflow per impostare ogni variabile Airflow
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul link Airflow per il tuo ambiente. Si apre l'interfaccia utente di Airflow.
Nell'interfaccia utente di Airflow, seleziona Amministrazione > Voci.
Fai clic su Aggiungi un nuovo record.
Specifica il nome della variabile nel campo Key e imposta il relativo valore nel campo Val.
Fai clic su Salva.
Crea un server di cronologia permanente
Utilizza un server di cronologia permanente (PHS) per visualizzare i file di cronologia Spark dei carichi di lavoro in batch:
- Crea un server di cronologia permanente.
- Assicurati di aver specificato il nome del cluster PHS nella
phs_cluster
variabile Airflow.
DataprocCreateBatchOperator
Il seguente DAG avvia un carico di lavoro batch Dataproc Serverless.
Per ulteriori informazioni sugli argomenti DataprocCreateBatchOperator
, consulta il codice sorgente dell'operatore.
Per ulteriori informazioni sugli attributi che puoi passare nel parametro batch
di DataprocCreateBatchOperator
, consulta la
descrizione della classe Batch.
Utilizzare l'immagine del container personalizzato con DataprocCreateBatchOperator
L'esempio seguente mostra come utilizzare un'immagine del contenitore personalizzata per eseguire i carichi di lavoro. Puoi utilizzare un container personalizzato, ad esempio, per aggiungere dipendenze Python non fornite dall'immagine del container predefinito.
Per utilizzare un'immagine container personalizzata:
Crea un'immagine container personalizzata e caricala su Container Registry.
Specifica l'immagine nella variabile Airflow
image_name
.Utilizza DataprocCreateBatchOperator con la tua immagine personalizzata:
Utilizzare il servizio Dataproc Metastore con DataprocCreateBatchOperator
Per utilizzare un servizio Dataproc Metastore da un DAG:
Verifica che il servizio Metastore sia già avviato.
Per informazioni su come avviare un servizio metastore, consulta Abilitare e disabilitare Dataproc Metastore.
Per informazioni dettagliate sull'operatore batch per la creazione della configurazione, consulta PeripheralsConfig.
Una volta che il servizio Metastore è attivo e funzionante, specifica il relativo nome nella variabile
metastore_cluster
e la regione nella variabile Airflowregion_name
.Utilizza il servizio metastore in DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Puoi utilizzare DataprocDeleteBatchOperator per eliminare un batch in base all'ID del batch del carico di lavoro.
DataprocListBatchesOperator
DataprocDeleteBatchOperator elenca i batch esistenti in un determinato project_id e nella regione.
DataprocGetBatchOperator
DataprocGetBatchOperator recupera un determinato carico di lavoro batch.