Questo documento descrive come eseguire una pipeline Dataflow utilizzando un contenuto personalizzato.
Per informazioni sulla creazione dell'immagine container, consulta Creare immagini container personalizzate per Dataflow.
Quando esegui la pipeline, lanciala utilizzando l'SDK Apache Beam con la stessa versione e la stessa versione del linguaggio dell'SDK nell'immagine del contenitore personalizzato. Questo passaggio evita errori imprevisti dovuti a dipendenze o SDK incompatibili.
Eseguire il test localmente
Prima di eseguire la pipeline in Dataflow, è buona norma testare l'immagine container localmente, in modo da velocizzare i test e il debug.
Per scoprire di più sull'utilizzo specifico di Apache Beam, consulta la guida di Apache Beam per l'esecuzione di pipeline con immagini container personalizzate.
Test di base con PortableRunner
Per verificare che le immagini container remote possano essere estratte ed eseguire una semplice
pipeline, utilizza Apache Beam PortableRunner
. Quando utilizzi PortableRunner
, l'invio del job avviene nell'ambiente locale e l'esecuzione di PortableRunner
nell'ambiente Docker.DoFn
Quando utilizzi le GPU, il contenitore Docker potrebbe non avere accesso alle GPU. Per eseguire il test del contenitore con le GPU, utilizza il runner diretto e segui i passaggi per testare un'immagine del contenitore su una VM autonoma con GPU nella sezione Eseguire il debug con una VM autonoma della pagina "Utilizzare le GPU".
Di seguito viene eseguita una pipeline di esempio:
Java
mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
-Dexec.args="--runner=PortableRunner \
--jobEndpoint=REGION \
--defaultEnvironmentType=DOCKER \
--defaultEnvironmentConfig=IMAGE_URI \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE"
Python
python path/to/my/pipeline.py \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Vai
go path/to/my/pipeline.go \
--runner=PortableRunner \
--job_endpoint=REGION \
--environment_type=DOCKER \
--environment_config=IMAGE_URI \
--input=INPUT_FILE \
--output=OUTPUT_FILE
Sostituisci quanto segue:
REGION
: la regione del servizio di job da utilizzare, sotto forma di indirizzo e porta. Ad esempio:localhost:3000
. Utilizzaembed
per eseguire un servizio di job in esecuzione.IMAGE_URI
: l'URI dell'immagine container personalizzata.INPUT_FILE
: un file di input che può essere letto come file di testo. Questo file deve essere accessibile dall'immagine del container
dell'harness dell'SDK, precaricato nell'immagine del container o in un file remoto.OUTPUT_FILE
: un percorso in cui scrivere l'output. Questo percorso è un percorso remoto o locale nel contenitore.
Al termine della pipeline, controlla i log della console per verificare che sia stata completata correttamente e che venga utilizzata l'immagine remota specificata da IMAGE_URI
.
Dopo l'esecuzione della pipeline, i file salvati nel contenitore non sono nel file system locale e il contenitore viene interrotto. Puoi copiare i file dal
sistema file del contenitore fermato utilizzando
docker cp
.
In alternativa:
- Fornisci output a un file system remoto come Cloud Storage. Potresti dover configurare manualmente l'accesso a scopo di test, anche per i file delle credenziali o le credenziali predefinite dell'applicazione.
- Per un debug rapido, aggiungi il logging temporaneo.
Utilizzare il programma Direct Runner
Per test locali più approfonditi dell'immagine del contenitore e della pipeline, utilizza il runner diretto di Apache Beam.
Puoi verificare la pipeline separatamente dal contenitore eseguendo il test in un ambiente locale corrispondente all'immagine del contenitore o avviando la pipeline su un contenitore in esecuzione.
Java
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...
Python
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# python path/to/my/pipeline.py ...
Vai
docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/# go path/to/my/pipeline.go ...
Sostituisci IMAGE_URI
con l'URI dell'immagine container personalizzata.
Gli esempi presuppongono che tutti i file della pipeline, inclusa la pipeline stessa, si trovino nel contenitore personalizzato, siano stati montati da un file system locale o siano remoti e accessibili da Apache Beam e dal contenitore. Ad esempio, per utilizzare Maven (mvn
) per eseguire l'esempio Java precedente, Maven e le relative dipendenze devono essere sottoposti a staging nel contenitore. Per ulteriori informazioni, consulta Storage e docker run
nella documentazione di Docker.
Lo scopo dei test su Direct Runner è testare la pipeline
nell'ambiente del contenitore personalizzato, non testare l'esecuzione del contenitore
con il relativo ENTRYPOINT
predefinito. Modifica ENTRYPOINT
(ad esempio
docker run --entrypoint ...
) per eseguire direttamente la pipeline o per consentire
l'esecuzione manuale di comandi sul contenitore.
Se utilizzi una configurazione specifica basata sull'esecuzione del contenitore su Compute Engine, puoi eseguire il contenitore direttamente su una VM Compute Engine. Per ulteriori informazioni, consulta Contenitori su Compute Engine.
Avvia il job Dataflow
Quando avvii la pipeline Apache Beam su Dataflow, specifica
il percorso dell'immagine container. Non utilizzare il tag :latest
con le immagini personalizzate. Tagga le build con una data o un identificatore univoco. Se si verifica un problema, l'utilizzo di questo tipo di tag potrebbe consentire di ripristinare l'esecuzione della pipeline a una configurazione funzionante nota in precedenza e consentire un'ispezione delle modifiche.
Java
Utilizza --sdkContainerImage
per specificare un'immagine container dell'SDK per il runtime Java.
Usa --experiments=use_runner_v2
per abilitare Runner v2.
Python
Se utilizzi la versione dell'SDK 2.30.0 o successive, utilizza l'opzione della pipeline --sdk_container_image
per specificare un'immagine del contenitore dell'SDK.
Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image
per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.
I contenitori personalizzati sono supportati solo per Dataflow Runner v2. Se stai lanciando una pipeline Python batch, imposta il flag --experiments=use_runner_v2
.
Se stai lanciando una pipeline Python in streaming, non è necessario specificare l'esperimento, perché le pipeline Python in streaming utilizzano Runner v2 per impostazione predefinita.
Vai
Se utilizzi la versione dell'SDK 2.40.0 o successive, utilizza l'opzione della pipeline --sdk_container_image
per specificare un'immagine del contenitore dell'SDK.
Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image
per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.
I contenitori personalizzati sono supportati in tutte le versioni dell'SDK Go perché per impostazione predefinita utilizzano Dataflow Runner v2.
L'esempio seguente mostra come lanciare l'esempio di batch
WordCount
con un contenitore personalizzato.
Java
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner \
--inputFile=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--gcpTempLocation=TEMP_LOCATION \
--diskSizeGb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdkContainerImage=IMAGE_URI"
Python
Utilizza la versione 2.30.0 o successiva dell'SDK Apache Beam per Python:
python -m apache_beam.examples.wordcount \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--disk_size_gb=DISK_SIZE_GB \
--experiments=use_runner_v2 \
--sdk_container_image=IMAGE_URI
Vai
wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--sdk_container_image=IMAGE_URI
Sostituisci quanto segue:
INPUT_FILE
: il percorso di input Cloud Storage letto da Dataflow durante l'esecuzione dell'esempio.OUTPUT_FILE
: il percorso di output di Cloud Storage in cui scrive la pipeline di esempio. Questo file contiene i conteggi delle parole.PROJECT_ID
: l'ID del tuo progetto Google Cloud.REGION
: la regione in cui eseguire il deployment del job Dataflow.TEMP_LOCATION
: il percorso Cloud Storage per la gestione in anteprima dei file temporanei dei job creati durante l'esecuzione della pipeline.DISK_SIZE_GB
: facoltativo. Se il contenitore è di grandi dimensioni, valuta la possibilità di aumentare le dimensioni del disco di avvio predefinite per evitare di esaurire lo spazio su disco.IMAGE_URI
: l'URI dell'immagine del contenitore personalizzato dell'SDK. Utilizza sempre un tag o un SHA del contenitore con versione. Non utilizzare il tag:latest
o un tag mutabile.