Crea una pipeline Dataflow utilizzando Python
In questa guida rapida, imparerai a utilizzare l'SDK Apache Beam per Python per creare un programma che definisca una pipeline. Quindi, esegui la pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.
Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud, fai clic su Aiuto:
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON di Google Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, JSON di Google Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Datastore e Cloud Resource Manager.
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Concedi ruoli al tuo account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
PROJECT_NUMBER
con il numero del progetto. Per trovare il numero del progetto, consulta Identifica i progetti o utilizza il comandogcloud projects describe
. - Sostituisci
SERVICE_ACCOUNT_ROLE
con ogni singolo ruolo.
-
Crea un bucket Cloud Storage e configuralo nel seguente modo:
-
Imposta la classe di archiviazione su
S
(standard). -
Imposta la località di archiviazione come segue:
US
(Stati Uniti). -
Sostituisci
BUCKET_NAME
con un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Imposta la classe di archiviazione su
- Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Questi valori saranno necessari più avanti in questo documento.
configura l'ambiente
In questa sezione utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Questo processo consente di isolare le dipendenze di un progetto da quelle di altri progetti.
Se non hai un prompt dei comandi immediatamente disponibile, puoi utilizzare Cloud Shell. Cloud Shell ha già installato il gestore di pacchetti per Python 3, quindi puoi passare direttamente alla creazione di un ambiente virtuale.
Per installare Python e quindi creare un ambiente virtuale, segui questi passaggi:
- Verifica che nel sistema siano in esecuzione Python 3 e
pip
:python --version python -m pip --version
- Se necessario, installa Python 3, quindi configura un ambiente virtuale Python: segui le istruzioni fornite nelle sezioni Installazione di Python e Configurazione di venv della pagina Configurazione di un ambiente di sviluppo Python. Se utilizzi Python 3.10 o versioni successive, devi anche abilitare Dataflow Runner v2. Per usare Runner v1, usa Python 3.9 o versioni precedenti.
Una volta completata la guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate
.
Scarica l'SDK Apache Beam
L'SDK Apache Beam è un modello di programmazione open source per le pipeline di dati. Devi definire una pipeline con un programma Apache Beam, quindi scegliere un runner, ad esempio Dataflow, per eseguire la pipeline.
Per scaricare e installare l'SDK Apache Beam:
- Verifica di essere nell'ambiente virtuale Python che hai creato nella sezione precedente.
Assicurati che il prompt inizi con
<env_name>
, doveenv_name
è il nome dell'ambiente virtuale. - Installa lo standard di pacchetto Python wheel:
pip install wheel
- Installa la versione più recente dell'SDK Apache Beam per Python:
pip install 'apache-beam[gcp]'
Su Microsoft Windows, utilizza il seguente comando:
pip install apache-beam[gcp]
A seconda della connessione, l'installazione potrebbe richiedere del tempo.
esegui la pipeline in locale
Per vedere come viene eseguita una pipeline localmente, utilizza un modulo Python pronto per l'esempio wordcount
incluso nel pacchetto apache_beam
.
L'esempio di pipeline wordcount
esegue quanto segue:
Prende un file di testo come input.
Questo file di testo si trova in un bucket Cloud Storage con il nome della risorsa
gs://dataflow-samples/shakespeare/kinglear.txt
.- Analizza ogni riga in parole.
- Esegue un conteggio di frequenza sulle parole tokenizzate.
Per posizionare in un'area intermedia la pipeline wordcount
in locale, segui questi passaggi:
- Dal terminale locale, esegui l'esempio
wordcount
:python -m apache_beam.examples.wordcount \ --output outputs
- Visualizza l'output della pipeline:
more outputs*
- Per uscire, premi Q.
wordcount.py
su GitHub di Apache Beam.
Esegui la pipeline sul servizio Dataflow
In questa sezione, esegui la pipeline di esempiowordcount
dal pacchetto apache_beam
sul servizio Dataflow. Questo
esempio specifica DataflowRunner
come parametro per
--runner
.
- Esegui la pipeline:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempioeurope-west1
Il flag
--region
sostituisce la regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.BUCKET_NAME
: il nome del bucket Cloud Storage che hai copiato in precedenzaPROJECT_ID
: l'ID progetto Google Cloud che hai copiato in precedenza
Visualizza i tuoi risultati
Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando la console Google Cloud o il terminale locale.
Console Google Cloud
Per visualizzare i risultati nella console Google Cloud, segui questi passaggi:
- Nella console Google Cloud, vai alla pagina Job di Dataflow.
La pagina Job mostra i dettagli del job
wordcount
, tra cui lo stato In esecuzione prima del job Riuscito. - Vai alla pagina Bucket di Cloud Storage.
Dall'elenco dei bucket del progetto, fai clic sul bucket di archiviazione creato in precedenza.
Nella directory
wordcount
, vengono visualizzati i file di output creati dal job.
Terminale locale
Visualizza i risultati dal terminale o utilizzando Cloud Shell.
- Per elencare i file di output, utilizza il comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Per visualizzare i risultati nei file di output, utilizza il comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Sostituisci BUCKET_NAME
con il nome del bucket Cloud Storage utilizzato
nel programma della pipeline.
Modifica il codice della pipeline
La pipelinewordcount
negli esempi precedenti fa una distinzione tra parole maiuscole e minuscole.
I passaggi seguenti mostrano come modificare la pipeline in modo che la pipeline wordcount
non sia sensibile alle maiuscole.
- Sulla macchina locale, scarica la copia più recente del codice
wordcount
dal repository GitHub di Apache Beam. - Esegui la pipeline dal terminale locale:
python wordcount.py --output outputs
- Visualizza i risultati:
more outputs*
- Per uscire, premi Q.
- In un editor a tua scelta, apri il file
wordcount.py
. - All'interno della funzione
run
, esamina i passaggi della pipeline:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Dopo
split
, le righe vengono divise in parole come stringhe. - Per modificare le stringhe in minuscolo, modifica la riga dopo
split
:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Questa modifica mappa la funzionestr.lower
su ogni parola. Questa riga equivale abeam.Map(lambda word: str.lower(word))
. - Salva il file ed esegui il job
wordcount
modificato:python wordcount.py --output outputs
- Visualizza i risultati della pipeline modificata:
more outputs*
- Per uscire, premi Q.
- Esegui la pipeline modificata sul servizio Dataflow:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Sostituisci quanto segue:
DATAFLOW_REGION
: la regione in cui vuoi eseguire il deployment del job DataflowBUCKET_NAME
: nome del tuo bucket Cloud StoragePROJECT_ID
: il tuo ID progetto Google Cloud
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.
- Nella console Google Cloud, vai alla pagina Bucket di Cloud Storage.
- Fai clic sulla casella di controllo relativa al bucket da eliminare.
- Per eliminare il bucket, fai clic su Elimina e segui le istruzioni.
Se mantieni il progetto, revoca i ruoli che hai concesso all'account di servizio predefinito di Compute Engine. Esegui il comando seguente una volta per ognuno dei seguenti ruoli IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Facoltativo: revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.
gcloud auth application-default revoke
-
Facoltativo: revoca le credenziali dallgcloud CLI.
gcloud auth revoke