Esegui un job Dataflow in un contenitore personalizzato

Questo documento descrive come eseguire una pipeline Dataflow utilizzando un contenuto personalizzato.

Per informazioni sulla creazione dell'immagine container, consulta Creare immagini container personalizzate per Dataflow.

Quando esegui la pipeline, lanciala utilizzando l'SDK Apache Beam con la stessa versione e la stessa versione del linguaggio dell'SDK nell'immagine del contenitore personalizzato. Questo passaggio evita errori imprevisti dovuti a dipendenze incompatibili tramite SDK.

Eseguire il test localmente

Prima di eseguire la pipeline in Dataflow, è buona norma testare in locale l'immagine container, il che consente test e debug più rapidi.

Per ulteriori informazioni sull'utilizzo specifico di Apache Beam, consulta la guida di Apache Beam per Esecuzione di pipeline con immagini container personalizzate.

Test di base con PortableRunner

Per verificare che le immagini container remote possano essere estratte ed eseguire una semplice pipeline, utilizza Apache Beam PortableRunner. Quando utilizzi PortableRunner, l'invio del job avviene nell'ambiente locale e l'esecuzione di DoFn nell'ambiente Docker.

Quando utilizzi le GPU, il contenitore Docker potrebbe non avere accesso alle GPU. A testare il container con le GPU, utilizzare il direct runner e segui i passaggi per testare un'immagine container su una VM autonoma con GPU Eseguire il debug con una VM autonoma "Usa GPU" .

Di seguito viene eseguita una pipeline di esempio:

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Vai

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Sostituisci quanto segue:

  • REGION: la regione del servizio di job da utilizzare, sotto forma di indirizzo e porta. Ad esempio: localhost:3000. Utilizza embed per eseguire un servizio di job in esecuzione.
  • IMAGE_URI: l'URI dell'immagine del container personalizzato.
  • INPUT_FILE: un file di input che può essere letto come file di testo. Questo file deve essere accessibile dall'immagine del container
    dell'harness dell'SDK, precaricato nell'immagine del container o in un file remoto.
  • OUTPUT_FILE: un percorso in cui scrivere l'output. Questo è un percorso remoto o un percorso locale sul container.

Una volta completata la pipeline, esamina i log della console per verificare che la pipeline sia stata completata correttamente e che l'immagine remota, specificata IMAGE_URI.

Dopo l'esecuzione della pipeline, i file salvati nel contenitore non sono nel file system locale e il contenitore viene interrotto. Puoi copiare file da il file system del container interrotto, docker cp

In alternativa:

  • Fornisci output a un file system remoto come Cloud Storage. Potresti dover configurare manualmente l'accesso a scopo di test. inclusi i file delle credenziali Credenziali predefinite dell'applicazione.
  • Per un debug rapido, aggiungi il logging temporaneo.

Uso di Direct Runner

Per test locali più approfonditi dell'immagine container e della pipeline, utilizza Direct Runner di Apache Beam.

Puoi verificare la pipeline separatamente dal container eseguendo test in una che corrisponda all'immagine container o avviando la pipeline su un container in esecuzione.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Vai

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

Sostituisci IMAGE_URI con l'URI dell'immagine container personalizzata.

Gli esempi presuppongono che tutti i file della pipeline, inclusa la pipeline stessa, si trovino nel contenitore personalizzato, siano stati montati da un file system locale o siano remoti e accessibili da Apache Beam e dal contenitore. Ad esempio, per utilizzare Maven (mvn) per eseguire l'esempio Java precedente, Maven e le relative dipendenze devono essere inseriti nel container. Per ulteriori informazioni, consulta Storage e docker run nella documentazione di Docker.

L'obiettivo dei test su Direct Runner è testare la pipeline nell'ambiente container personalizzato, non per testare l'esecuzione con il valore predefinito ENTRYPOINT. Modifica il valore ENTRYPOINT (ad esempio, docker run --entrypoint ...) per eseguire direttamente la pipeline o per consentire manualmente i comandi sul container.

Se utilizzi una configurazione specifica basata sull'esecuzione del contenitore su Compute Engine, puoi eseguire il contenitore direttamente su una VM Compute Engine. Per ulteriori informazioni, consulta Contenitori su Compute Engine.

Avvia il job Dataflow

Quando avvii la pipeline Apache Beam su Dataflow, specifica il percorso dell'immagine container. Non utilizzare il tag :latest con le immagini personalizzate. Codifica le build con una data o un identificatore univoco. Se qualcosa va errato, l'utilizzo di questo tipo di tag potrebbe consentire di ripristinare la pipeline configurazione di lavoro nota in precedenza e consente un controllo delle modifiche.

Java

Utilizza --sdkContainerImage per specificare un'immagine container SDK per il runtime Java.

Utilizza --experiments=use_runner_v2 per abilitare Runner v2.

Python

Se utilizzi l'SDK versione 2.30.0 o successive, utilizza l'opzione pipeline --sdk_container_image per specificare un'immagine container SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.

I contenitori personalizzati sono supportati solo per Dataflow Runner v2. Se stai lanciando una pipeline Python batch, imposta il flag --experiments=use_runner_v2. Se stai lanciando una pipeline Python in streaming, non è necessario specificare l'esperimento, perché le pipeline Python in streaming utilizzano Runner v2 per impostazione predefinita.

Vai

Se utilizzi l'SDK versione 2.40.0 o successive, utilizza l'opzione pipeline --sdk_container_image per specificare un'immagine container SDK.

Per le versioni precedenti dell'SDK, utilizza l'opzione della pipeline --worker_harness_container_image per specificare la posizione dell'immagine del contenitore da utilizzare per il cablaggio del worker.

I contenitori personalizzati sono supportati in tutte le versioni dell'SDK Go perché per impostazione predefinita utilizzano Dataflow Runner v2.

L'esempio seguente mostra come avviare l'esempio di batch WordCount con un contenitore personalizzato.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Utilizza la versione 2.30.0 o successiva dell'SDK Apache Beam per Python:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Vai

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

Sostituisci quanto segue:

  • INPUT_FILE: il percorso di input di Cloud Storage letti da Dataflow durante l'esecuzione dell'esempio.
  • OUTPUT_FILE: il percorso di output di Cloud Storage in cui scrive la pipeline di esempio. Questo file contiene i conteggi delle parole.
  • PROJECT_ID: l'ID del tuo progetto Google Cloud.
  • REGION: la regione in cui eseguire il deployment del job Dataflow.
  • TEMP_LOCATION: il percorso Cloud Storage per Dataflow per organizzare i file di job temporanei creati durante dell'esecuzione della pipeline.
  • DISK_SIZE_GB: facoltativo. Se il contenitore è di grandi dimensioni, valuta la possibilità di aumentare le dimensioni del disco di avvio predefinite per evitare di esaurire lo spazio su disco.
  • IMAGE_URI: l'URI dell'immagine del container personalizzato dell'SDK. Utilizza sempre un tag o un'algoritmo SHA o tag del contenitore con controllo delle versioni. Non utilizzare il tag :latest o un tag modificabile.