Esegui un job Dataflow in un container personalizzato

Questo documento descrive come eseguire una pipeline Dataflow utilizzando un container personalizzato.

Per informazioni sulla creazione dell'immagine container, consulta Creare immagini container personalizzate per Dataflow.

Quando esegui la pipeline, avviala utilizzando l'SDK Apache Beam con la stessa versione e la stessa lingua dell'SDK sull'immagine container personalizzata. Questo passaggio evita errori imprevisti dovuti a dipendenze o SDK incompatibili.

Esegui test in locale

Prima di eseguire la pipeline in Dataflow, è consigliabile testare l'immagine container in locale, in modo da poter eseguire test e debug più rapidamente.

Per ulteriori informazioni sull'utilizzo specifico di Apache Beam, consulta la guida di Apache Beam per Esecuzione di pipeline con immagini container personalizzate.

Test di base con PortableRunner

Per verificare che sia possibile eseguire il pull delle immagini dei container remoti e che possano eseguire una pipeline semplice, utilizza Apache Beam PortableRunner. Quando utilizzi PortableRunner, l'invio del job avviene nell'ambiente locale e l'esecuzione di DoFn avviene nell'ambiente Docker.

Quando utilizzi le GPU, il container Docker potrebbe non avere accesso alle GPU. Per testare il container con GPU, utilizza il direct runner e segui i passaggi per testare un'immagine container su una VM autonoma con GPU nella sezione Debug con una VM autonoma della pagina "Utilizzare GPU".

Di seguito viene eseguita una pipeline di esempio:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Sostituisci quanto segue:

  • REGION: la regione del servizio job da utilizzare, sotto forma di indirizzo e porta. Ad esempio: localhost:3000. Utilizza embed per eseguire un servizio job in elaborazione.
  • IMAGE_URI: l'URI dell'immagine del container personalizzato.
  • INPUT_FILE: un file di input che può essere letto come file di testo. Questo file deve essere accessibile dall'immagine del container
    con deployment dell'SDK, precaricata sull'immagine container o su un file remoto.
  • OUTPUT_FILE: un percorso in cui scrivere l'output. Questo percorso è un percorso remoto o un percorso locale sul container.

Una volta completata la pipeline, esamina i log della console per verificare che la pipeline sia stata completata e che venga utilizzata l'immagine remota, specificata da IMAGE_URI.

Dopo aver eseguito la pipeline, i file salvati nel container non sono presenti nel file system locale e il container viene arrestato. Puoi copiare i file dal file system del container interrotto utilizzando docker cp.

In alternativa:

  • Fornisce output a un file system remoto come Cloud Storage. Potrebbe essere necessario configurare manualmente l'accesso a scopo di test, ad esempio per i file delle credenziali o le credenziali predefinite dell'applicazione.
  • Per un rapido debug, aggiungi un logging temporaneo.

Uso di Direct Runner

Per test locali più approfonditi dell'immagine container e della pipeline, utilizza Direct Runner di Apache Beam.

Per verificare la pipeline separatamente dal container, puoi eseguire un test in un ambiente locale corrispondente all'immagine del container o avviare la pipeline su un container in esecuzione.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Sostituisci IMAGE_URI con l'URI dell'immagine del container personalizzato.

Gli esempi presuppongono che tutti i file della pipeline, inclusa la pipeline stessa, si trovino nel container personalizzato, siano stati montati da un file system locale o siano remoti e accessibili da Apache Beam e dal container. Ad esempio, per utilizzare Maven (mvn) per eseguire l'esempio Java precedente, Maven e le sue dipendenze devono essere impostate temporaneamente nel container. Per maggiori informazioni, consulta Archiviazione e docker run nella documentazione Docker.

L'obiettivo dei test su Direct Runner è testare la pipeline nell'ambiente container personalizzato, non testare l'esecuzione del container con il valore predefinito di ENTRYPOINT. Modifica ENTRYPOINT (ad esempio, docker run --entrypoint ...) per eseguire direttamente la pipeline o per consentire l'esecuzione manuale dei comandi nel container.

Se ti affidi a una configurazione specifica basata sull'esecuzione del container su Compute Engine, puoi eseguire il container direttamente su una VM di Compute Engine. Per ulteriori informazioni, consulta Container su Compute Engine.

Lancia il job Dataflow

Quando avvii la pipeline Apache Beam su Dataflow, specifica il percorso dell'immagine container. Non utilizzare il tag :latest con le tue immagini personalizzate. Codifica le build con una data o un identificatore univoco. In caso di problemi, l'utilizzo di questo tipo di tag potrebbe consentire di ripristinare l'esecuzione della pipeline a una configurazione funzionante precedentemente nota e consentire un'ispezione delle modifiche.

Java

Utilizza --sdkContainerImage per specificare un'immagine container SDK per il runtime Java.

Utilizza --experiments=use_runner_v2 per abilitare Runner v2.

Python

Se utilizzi l'SDK versione 2.30.0 o successive, utilizza l'opzione pipeline --sdk_container_image per specificare un'immagine container SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione pipeline --worker_harness_container_image per specificare la posizione dell'immagine container da utilizzare per il cablaggio worker.

I container personalizzati sono supportati solo per Dataflow Runner v2. Se stai avviando una pipeline Python batch, imposta il flag --experiments=use_runner_v2. Se stai avviando una pipeline Python in modalità flusso, non è necessario specificare l'esperimento, perché le pipeline Python in modalità flusso utilizzano Runner v2 per impostazione predefinita.

Go

Se utilizzi l'SDK versione 2.40.0 o successive, utilizza l'opzione pipeline --sdk_container_image per specificare un'immagine container SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione pipeline --worker_harness_container_image per specificare la posizione dell'immagine container da utilizzare per il cablaggio worker.

I container personalizzati sono supportati in tutte le versioni dell'SDK Go perché utilizzano Dataflow Runner v2 per impostazione predefinita.

L'esempio seguente mostra come avviare l'esempio di WordCount batch con un container personalizzato.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Utilizzo dell'SDK Apache Beam per Python versione 2.30.0 o successiva:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Sostituisci quanto segue:

  • INPUT_FILE: il percorso di input di Cloud Storage letto da Dataflow durante l'esecuzione dell'esempio.
  • OUTPUT_FILE: il percorso di output di Cloud Storage scritto dalla pipeline di esempio. Questo file contiene il conteggio delle parole.
  • PROJECT_ID: l'ID del tuo progetto Google Cloud.
  • REGION: la regione in cui eseguire il deployment del job Dataflow.
  • TEMP_LOCATION: il percorso di Cloud Storage per Dataflow per organizzare temporaneamente i file di job temporanei creati durante l'esecuzione della pipeline.
  • DISK_SIZE_GB: facoltativo. Se il container è di grandi dimensioni, valuta la possibilità di aumentare le dimensioni del disco di avvio predefinite per evitare di esaurire lo spazio su disco.
  • IMAGE_URI: l'URI dell'immagine del container personalizzato dell'SDK. Utilizza sempre un tag o un'algoritmo SHA o tag del contenitore con controllo delle versioni. Non utilizzare il tag :latest o un tag modificabile.