Esegui un job Dataflow in un contenitore personalizzato

Questo documento descrive come eseguire una pipeline Dataflow utilizzando un contenuto personalizzato.

Per informazioni sulla creazione dell'immagine container, consulta Creare immagini container personalizzate per Dataflow.

Quando esegui la pipeline, lanciala utilizzando l'SDK Apache Beam con la stessa versione e la stessa versione del linguaggio dell'SDK nell'immagine del contenitore personalizzato. Questo passaggio evita errori imprevisti dovuti a dipendenze o SDK incompatibili.

Eseguire il test localmente

Prima di eseguire la pipeline in Dataflow, è buona norma testare l'immagine container localmente, in modo da velocizzare i test e il debug.

Per scoprire di più sull'utilizzo specifico di Apache Beam, consulta la guida di Apache Beam per la gestione di pipeline con immagini container personalizzate.

Test di base con PortableRunner

Per verificare che le immagini container remote possano essere estratte ed eseguire una semplice pipeline, utilizza Apache Beam PortableRunner. Quando utilizzi PortableRunner, l'invio del job avviene nell'ambiente locale e l'esecuzione di PortableRunner nell'ambiente Docker.DoFn

Quando utilizzi le GPU, il contenitore Docker potrebbe non avere accesso alle GPU. Per eseguire il test del contenitore con le GPU, utilizza il runner diretto e segui i passaggi per testare un'immagine del contenitore su una VM autonoma con GPU nella sezione Eseguire il debug con una VM autonoma della pagina "Utilizzare le GPU".

Di seguito viene eseguita una pipeline di esempio:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Vai

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Sostituisci quanto segue:

  • REGION: la regione del servizio di job da utilizzare, sotto forma di indirizzo e porta. Ad esempio: localhost:3000. Utilizza embed per eseguire un servizio di job in esecuzione.
  • IMAGE_URI: l'URI dell'immagine container personalizzata.
  • INPUT_FILE: un file di input che può essere letto come file di testo. Questo file deve essere accessibile dall'immagine container
    dell'harness dell'SDK, precaricato nell'immagine container o in un file remoto.
  • OUTPUT_FILE: un percorso in cui scrivere l'output. Questo percorso è un percorso remoto o locale nel contenitore.

Al termine della pipeline, controlla i log della console per verificare che sia stata completata correttamente e che venga utilizzata l'immagine remota specificata da IMAGE_URI.

Dopo l'esecuzione della pipeline, i file salvati nel contenitore non sono nel file system locale e il contenitore viene interrotto. Puoi copiare i file dal filesystem del contenitore fermato utilizzando docker cp.

In alternativa:

  • Fornisci output a un file system remoto come Cloud Storage. Potresti dover configurare manualmente l'accesso a scopo di test, anche per i file delle credenziali o le credenziali predefinite dell'applicazione.
  • Per un debug rapido, aggiungi il logging temporaneo.

Utilizzare il programma Direct Runner

Per test locali più approfonditi dell'immagine del contenitore e della pipeline, utilizza il runner diretto di Apache Beam.

Puoi verificare la pipeline separatamente dal contenitore eseguendo il test in un ambiente locale corrispondente all'immagine del contenitore o avviando la pipeline su un contenitore in esecuzione.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Vai

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Sostituisci IMAGE_URI con l'URI dell'immagine container personalizzata.

Gli esempi presuppongono che tutti i file della pipeline, inclusa la pipeline stessa, si trovino nel contenitore personalizzato, siano stati montati da un file system locale o siano remoti e accessibili da Apache Beam e dal contenitore. Ad esempio, per utilizzare Maven (mvn) per eseguire l'esempio Java precedente, Maven e le relative dipendenze devono essere inseriti nel container. Per ulteriori informazioni, consulta Storage e docker run nella documentazione di Docker.

Lo scopo dei test su Direct Runner è testare la pipeline nell'ambiente del contenitore personalizzato, non testare l'esecuzione del contenitore con il relativo ENTRYPOINT predefinito. Modifica ENTRYPOINT (ad esempio docker run --entrypoint ...) per eseguire direttamente la pipeline o per consentire l'esecuzione manuale di comandi sul contenitore.

Se utilizzi una configurazione specifica basata sull'esecuzione del contenitore su Compute Engine, puoi eseguire il contenitore direttamente su una VM Compute Engine. Per ulteriori informazioni, consulta Contenitori su Compute Engine.

Avvia il job Dataflow

Quando avvii la pipeline Apache Beam su Dataflow, specifica il percorso dell'immagine del contenitore. Non utilizzare il tag :latest con le immagini personalizzate. Tagga le build con una data o un identificatore univoco. Se si verifica un problema, l'utilizzo di questo tipo di tag potrebbe consentire di ripristinare l'esecuzione della pipeline a una configurazione funzionante nota in precedenza e consentire un'ispezione delle modifiche.

Java

Utilizza --sdkContainerImage per specificare un'immagine container dell'SDK per il runtime Java.

Usa --experiments=use_runner_v2 per abilitare Runner v2.

Python

Se utilizzi la versione dell'SDK 2.30.0 o successive, utilizza l'opzione della pipeline --sdk_container_image per specificare un'immagine del contenitore dell'SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.

I contenitori personalizzati sono supportati solo per Dataflow Runner v2. Se stai lanciando una pipeline Python batch, imposta il flag --experiments=use_runner_v2. Se stai lanciando una pipeline Python in streaming, non è necessario specificare l'esperimento, perché le pipeline Python in streaming utilizzano Runner v2 per impostazione predefinita.

Vai

Se utilizzi la versione dell'SDK 2.40.0 o successive, utilizza l'opzione della pipeline --sdk_container_image per specificare un'immagine del contenitore dell'SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.

I contenitori personalizzati sono supportati in tutte le versioni dell'SDK Go perché per impostazione predefinita utilizzano Dataflow Runner v2.

L'esempio seguente mostra come lanciare l'esempio di batch WordCount con un contenitore personalizzato.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Utilizza la versione 2.30.0 o successiva dell'SDK Apache Beam per Python:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Vai

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Sostituisci quanto segue:

  • INPUT_FILE: il percorso di input Cloud Storage letto da Dataflow durante l'esecuzione dell'esempio.
  • OUTPUT_FILE: il percorso di output di Cloud Storage in cui scrive la pipeline di esempio. Questo file contiene i conteggi delle parole.
  • PROJECT_ID: l'ID del tuo progetto Google Cloud.
  • REGION: la regione in cui eseguire il deployment del job Dataflow.
  • TEMP_LOCATION: il percorso Cloud Storage per la gestione in anteprima dei file temporanei dei job creati durante l'esecuzione della pipeline.
  • DISK_SIZE_GB: facoltativo. Se il contenitore è di grandi dimensioni, valuta la possibilità di aumentare le dimensioni del disco di avvio predefinite per evitare di esaurire lo spazio su disco.
  • IMAGE_URI: l'URI dell'immagine del contenitore personalizzato dell'SDK. Utilizza sempre un tag o un SHA del contenitore con versione. Non utilizzare il tag :latest o un tag mutabile.