Creare una pipeline Dataflow usando Python
In questa guida rapida, imparerai a utilizzare l'SDK Apache Beam per Python per creare un programma che definisce una pipeline. Quindi, esegui la pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow.
Per indicazioni dettagliate su questa attività direttamente in Cloud Console, fai clic su Guida:
Nelle sezioni seguenti puoi seguire la stessa procedura utilizzata per fare clic su Procedura guidata.
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
-
Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.
-
Crea un account di servizio:
-
In Cloud Console, vai alla pagina Crea account di servizio.
Vai a Crea account di servizio - Seleziona il progetto.
-
Inserisci un nome nel campo Nome account di servizio. Cloud Console compila il campo ID account di servizio in base a questo nome.
Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio,
Service account for quickstart
. - Fai clic su Crea e continua.
-
Per fornire l'accesso al tuo progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > proprietario.
Nell'elenco Seleziona un ruolo, scegli un ruolo.
Per ulteriori ruoli, fai clic su
Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo. - Fai clic su Continua.
-
Fai clic su Fine per completare la creazione dell'account di servizio.
Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.
-
-
Crea una chiave dell'account di servizio:
- In Cloud Console, fai clic sull'indirizzo email dell'account di servizio che hai creato.
- Fai clic su Chiavi.
- Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
- Fai clic su Crea. Il file di una chiave JSON viene scaricato sul computer.
- Fai clic su Chiudi.
-
Imposta la variabile di ambiente
GOOGLE_APPLICATION_CREDENTIALS
sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo. -
Nella pagina del selettore dei progetti in Google Cloud Console, seleziona o crea un progetto Google Cloud.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata su un progetto.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.
-
Crea un account di servizio:
-
In Cloud Console, vai alla pagina Crea account di servizio.
Vai a Crea account di servizio - Seleziona il progetto.
-
Inserisci un nome nel campo Nome account di servizio. Cloud Console compila il campo ID account di servizio in base a questo nome.
Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio,
Service account for quickstart
. - Fai clic su Crea e continua.
-
Per fornire l'accesso al tuo progetto, concedi i seguenti ruoli al tuo account di servizio: Progetto > proprietario.
Nell'elenco Seleziona un ruolo, scegli un ruolo.
Per ulteriori ruoli, fai clic su
Aggiungi un altro ruolo e aggiungi ogni ruolo aggiuntivo. - Fai clic su Continua.
-
Fai clic su Fine per completare la creazione dell'account di servizio.
Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.
-
-
Crea una chiave dell'account di servizio:
- In Cloud Console, fai clic sull'indirizzo email dell'account di servizio che hai creato.
- Fai clic su Chiavi.
- Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
- Fai clic su Crea. Il file di una chiave JSON viene scaricato sul computer.
- Fai clic su Chiudi.
-
Imposta la variabile di ambiente
GOOGLE_APPLICATION_CREDENTIALS
sul percorso del file JSON che contiene la chiave dell'account di servizio. Questa variabile si applica solo alla sessione shell corrente, quindi se apri una nuova sessione, impostala di nuovo. - Crea un bucket Cloud Storage:
- In Cloud Console, vai alla pagina Browser di Cloud Storage.
- Fai clic su Crea bucket.
- Nella pagina Crea un bucket, inserisci le informazioni sul bucket. Per andare al passaggio successivo, fai clic su Continua.
- In Assegna un nome al bucket, inserisci un nome univoco per il bucket. Non includere informazioni sensibili nel nome del bucket, poiché lo spazio dei nomi del bucket è globale e visibile pubblicamente.
-
Per scegliere dove archiviare i dati, segui questi passaggi:
- Seleziona un'opzione Tipo di località.
- Seleziona un'opzione Località.
- Per Scegli una classe di archiviazione predefinita per i tuoi dati, seleziona quanto segue: Standard.
- In Scegli come controllare l'accesso agli oggetti, seleziona un'opzione di Controllo dell'accesso.
- (Facoltativo) Per le Impostazioni avanzate, specifica un metodo di crittografia, un criterio di conservazione o etichette di bucket.
- Fai clic su Crea.
- Copia l'ID progetto di Google Cloud e il nome del bucket Cloud Storage. Questi valori sono necessari in un secondo momento in questo documento.
Configura l'ambiente
In questa sezione, utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Questo processo consente di isolare le dipendenze di un progetto dalle dipendenze di altri progetti.
Se non hai un prompt dei comandi prontamente disponibile, puoi utilizzare Cloud Shell. Cloud Shell ha già installato il gestore di pacchetti per Python 3, quindi puoi passare alla creazione di un ambiente virtuale.
Per installare Python e creare un ambiente virtuale, segui questi passaggi:
- Verifica di avere in esecuzione Python 3 e
pip
nel tuo sistema:python --version python -m pip --version
- Se necessario, installa Python 3 e quindi configura un ambiente virtuale Python: segui le istruzioni fornite nelle sezioni Installazione di Python e Configurazione di venv della pagina Configurazione di un ambiente di sviluppo Python.
Dopo aver completato la guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate
.
Scarica l'SDK Apache Beam
L'SDK Apache Beam è un modello di programmazione open source per le pipeline di dati. Devi definire una pipeline con un programma Apache Beam e scegliere un runner, ad esempio Dataflow, per eseguire la pipeline.
Per scaricare e installare l'SDK Apache Beam, segui questi passaggi:
- Verifica di trovarti nell'ambiente virtuale Python creato nella sezione precedente.
Assicurati che la richiesta inizi con
<env_name>
, doveenv_name
è il nome dell'ambiente virtuale. - Installa lo standard di confezionamento Python wheel:
pip install wheel
- Installa la versione più recente dell'SDK Apache Beam per Python:
pip install 'apache-beam[gcp]'
A seconda della connessione, l'installazione potrebbe richiedere del tempo.
Esegui la pipeline in locale
Per vedere come viene eseguita una pipeline in locale, utilizza un modulo Python già pronto per l'esempio
wordcount
incluso nel pacchetto apache_beam
.
L'esempio di pipeline wordcount
esegue quanto segue:
Applica come input un file di testo.
Questo file di testo si trova in un bucket Cloud Storage con il nome della risorsa
gs://dataflow-samples/shakespeare/kinglear.txt
.- Analizza ogni riga in parole.
- Esegue un conteggio della frequenza sulle parole tokenizzate.
Per mettere in locale la pipeline wordcount
in locale, segui questi passaggi:
- Dal terminale locale, esegui l'esempio
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Visualizza l'output della pipeline:
more outputs*
- Per uscire, premi Q.
wordcount.py
su Apache Beam GitHub.
Esegui la pipeline sul servizio Dataflow
In questa sezione, esegui la pipeline di esempiowordcount
dal pacchetto apache_beam
sul servizio Dataflow. In questo
esempio specifica DataflowRunner
come parametro per
--runner
.
- Esegui la pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del job Dataflow, ad esempioeurope-west1
Il flag
--region
sostituisce l'area geografica predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.STORAGE_BUCKET
: il nome di Cloud Storage che hai copiato in precedenzaPROJECT_ID
: l'ID progetto di Google Cloud che hai copiato in precedenza
Visualizza i tuoi risultati
Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando Cloud Console o il terminale locale.
Cloud Console
Per visualizzare i risultati in Cloud Console, segui questi passaggi:
- In Cloud Console, vai alla pagina Job di Dataflow.
Nella pagina Job vengono visualizzati i dettagli del job
wordcount
, incluso lo stato In esecuzione all'inizio, quindi Riuscito. - Vai alla pagina del browser Cloud Storage.
Dall'elenco dei bucket nel progetto, fai clic sul bucket di archiviazione creato in precedenza.
Nella directory
wordcount
vengono visualizzati i file di output creati dal job.
Terminale locale
Per visualizzare i risultati dal tuo terminale, utilizza lo strumento gsutil
.
Puoi anche eseguire i comandi da Cloud Shell.
- Elenca i file di output:
gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"
- Visualizza i risultati nei file di output:
gsutil cat "gs://STORAGE_BUCKET/results/outputs*"
Sostituisci STORAGE_BUCKET
con il nome del bucket Cloud Storage utilizzato
nel programma della pipeline.
Modifica il codice della pipeline
La pipelinewordcount
negli esempi precedenti distingue tra parole maiuscole e minuscole.
I seguenti passaggi mostrano come modificare la pipeline in modo che la pipeline wordcount
non faccia distinzione tra maiuscole e minuscole.
- Sul computer locale, scarica l'ultima copia del
codice
wordcount
dal repository GitHub di Apache Beam. - Dal terminale locale, esegui la pipeline:
python wordcount.py --output outputs
- Visualizza i risultati:
more outputs*
- Per uscire, premi Q.
- In un editor a tua scelta, apri il file
wordcount.py
. - All'interno della funzione
run
, esamina i passaggi della pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Dopo il giorno
split
, le righe sono divise in parole come stringhe. - Per contenere le stringhe in minuscolo, modifica la riga dopo
split
:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Questa modifica mappa la funzionestr.lower
su ogni parola. Questa riga equivale abeam.Map(lambda word: str.lower(word))
. - Salva il file ed esegui il job
wordcount
modificato:python wordcount.py --output outputs
- Visualizza i risultati della pipeline modificata:
more outputs*
- Per uscire, premi Q.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.
- In Cloud Console, vai alla pagina Browser di Cloud Storage.
- Fai clic sulla casella di controllo del bucket da eliminare.
- Per eliminare il bucket, fai clic su Elimina e segui le istruzioni.
Passaggi successivi
- Scopri di più sul modello di programmazione Apache Beam.
- Sviluppa in modo interattivo una pipeline utilizzando un blocco note Apache Beam.
- Scopri come progettare e creare la tua pipeline.
- Esamina gli esempi di WordCount e Mobile Gaming.