Esecuzione di modelli classici

Dopo aver creato e graduale il modello Dataflow, eseguilo con la console Google Cloud, l'API REST o Google Cloud CLI. Puoi eseguire il deployment dei modelli di job Dataflow da numerosi ambienti, tra cui l'ambiente standard di App Engine, Cloud Functions e altri ambienti vincolati.

Utilizzare la console Google Cloud

Puoi utilizzare la console Google Cloud per eseguire modelli Dataflow personalizzati e forniti da Google.

Modelli forniti da Google

Per eseguire un modello fornito da Google:

  1. Vai alla pagina Dataflow nella console Google Cloud.
  2. Vai alla pagina Dataflow
  3. Fai clic su CREA JOB DA MODELLO.
  4. Pulsante Crea job da modello nella console Google Cloud
  5. Seleziona il modello fornito da Google che vuoi eseguire dal menu a discesa Modello Dataflow.
  6. Modulo di esecuzione del modello per il conteggio di parole
  7. Inserisci un nome per il job nel campo Nome job.
  8. Inserisci i valori dei parametri nei campi dei parametri forniti. Non è necessaria la sezione Parametri aggiuntivi quando utilizzi un modello fornito da Google.
  9. Fai clic su Esegui job.

Modelli personalizzati

Per eseguire un modello personalizzato:

  1. Vai alla pagina Dataflow nella console Google Cloud.
  2. Vai alla pagina Dataflow
  3. Fai clic su CREA JOB DA MODELLO.
  4. Pulsante Crea job da modello nella console Google Cloud
  5. Seleziona Modello personalizzato dal menu a discesa Modello Dataflow.
  6. Modulo di esecuzione modello personalizzato
  7. Inserisci un nome per il job nel campo Nome job.
  8. Inserisci il percorso Cloud Storage del file del modello nel campo Percorso Cloud Storage del modello.
  9. Se il modello richiede parametri, fai clic su AGGIUNGI PARAMETRO nella sezione Parametri aggiuntivi. Inserisci Nome e Valore del parametro. Ripeti questo passaggio per ogni parametro necessario.
  10. Fai clic su Esegui job.

Utilizzo dell'API REST

Per eseguire un modello con una richiesta API REST, invia una richiesta POST HTTP con il tuo ID progetto. Questa richiesta richiede un'autorizzazione.

Consulta il riferimento dell'API REST per projects.locations.templates.launch per scoprire di più sui parametri disponibili.

Crea un job batch di modelli personalizzati

Questa richiesta projects.locations.templates.launch di esempio crea un job batch da un modello che legge un file di testo e scrive un file di testo di output. Se la richiesta ha esito positivo, il corpo della risposta contiene un'istanza di LaunchTemplateResponse.

Modifica i seguenti valori:

  • Sostituisci YOUR_PROJECT_ID con l'ID progetto.
  • Sostituisci LOCATION con l'regione Dataflow che preferisci.
  • Sostituisci JOB_NAME con un nome job a tua scelta.
  • Sostituisci YOUR_BUCKET_NAME con il nome del tuo bucket Cloud Storage.
  • Imposta gcsPath sul percorso di Cloud Storage del file del modello.
  • Imposta parameters sul tuo elenco di coppie chiave-valore.
  • Imposta tempLocation su una posizione per la quale disponi dell'autorizzazione di scrittura. Questo valore è obbligatorio per eseguire i modelli forniti da Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Crea un job di inserimento di flussi di modello personalizzato

Questa richiesta di esempio projects.locations.templates.launch crea un job di inserimento di flussi da un modello classico che legge da una sottoscrizione Pub/Sub e scrive in una tabella BigQuery. Se vuoi avviare un modello flessibile, utilizza invece projects.locations.flexTemplates.launch. Il modello di esempio è fornito da Google. Puoi modificare il percorso nel modello in modo che punti a un modello personalizzato. La stessa logica viene utilizzata per avviare i modelli forniti e personalizzati da Google. In questo esempio, la tabella BigQuery deve esistere già con lo schema appropriato. In caso di esito positivo, il corpo della risposta contiene un'istanza di LaunchTemplateResponse.

Modifica i seguenti valori:

  • Sostituisci YOUR_PROJECT_ID con l'ID progetto.
  • Sostituisci LOCATION con l'regione Dataflow che preferisci.
  • Sostituisci JOB_NAME con un nome job a tua scelta.
  • Sostituisci YOUR_BUCKET_NAME con il nome del tuo bucket Cloud Storage.
  • Sostituisci GCS_PATH con il percorso di Cloud Storage del file del modello. La località deve iniziare con gs://
  • Imposta parameters sul tuo elenco di coppie chiave-valore. I parametri elencati sono specifici per questo esempio di modello. Se utilizzi un modello personalizzato, modifica i parametri in base alle esigenze. Se utilizzi il modello di esempio, sostituisci le seguenti variabili.
    • Sostituisci YOUR_SUBSCRIPTION_NAME con il nome della tua sottoscrizione Pub/Sub.
    • Sostituisci YOUR_DATASET con il tuo set di dati BigQuery e YOUR_TABLE_NAME con il nome della tua tabella BigQuery.
  • Imposta tempLocation su una posizione per la quale disponi dell'autorizzazione di scrittura. Questo valore è obbligatorio per eseguire i modelli forniti da Google.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

Aggiorna un job di inserimento di flussi di modello personalizzato

Questo esempio di richiesta projects.locations.templates.launch mostra come aggiornare un job di flussi di dati del modello. Se vuoi aggiornare un modello flessibile, utilizza invece projects.locations.flexTemplates.launch.

  1. Esegui Esempio 2: creazione di un job di inserimento di flussi di modello personalizzato per avviare un job di modello di flussi.
  2. Invia la seguente richiesta POST HTTP, con i seguenti valori modificati:
    • Sostituisci YOUR_PROJECT_ID con l'ID progetto.
    • Sostituisci LOCATION con l'regione Dataflow del job che stai aggiornando.
    • Sostituisci JOB_NAME con il nome esatto del job che vuoi aggiornare.
    • Sostituisci GCS_PATH con il percorso di Cloud Storage del file del modello. La località deve iniziare con gs://
    • Imposta parameters sul tuo elenco di coppie chiave-valore. I parametri elencati sono specifici per questo esempio di modello. Se utilizzi un modello personalizzato, modifica i parametri in base alle esigenze. Se utilizzi il modello di esempio, sostituisci le seguenti variabili.
      • Sostituisci YOUR_SUBSCRIPTION_NAME con il nome della tua sottoscrizione Pub/Sub.
      • Sostituisci YOUR_DATASET con il tuo set di dati BigQuery e YOUR_TABLE_NAME con il nome della tua tabella BigQuery.
    • Utilizza il parametro environment per modificare le impostazioni dell'ambiente, ad esempio il tipo di macchina. Questo esempio utilizza il tipo di macchina n2-highmem-2, che ha più memoria e CPU per worker rispetto al tipo di macchina predefinito.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Accedi all'interfaccia di monitoraggio di Dataflow e verifica che sia stato creato un nuovo job con lo stesso nome. Questo job ha lo stato Aggiornato.

Utilizzo delle librerie client delle API di Google

Valuta l'utilizzo delle librerie client delle API di Google per effettuare facilmente chiamate alle API REST di Dataflow. Questo script di esempio utilizza la libreria client dell'API di Google per Python.

In questo esempio, devi impostare le seguenti variabili:

  • project: imposta sul tuo ID progetto.
  • job: impostalo su un nome job univoco a tua scelta.
  • template: imposta il percorso di Cloud Storage del file modello.
  • parameters: impostalo su un dizionario con i parametri del modello.

Per impostare la regione, includi il parametro location.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

Per ulteriori informazioni sulle opzioni disponibili, consulta il metodo projects.locations.templates.launch nel riferimento dell'API REST Dataflow.

Utilizza gcloud CLI

gcloud CLI può eseguire un modello personalizzato o fornito da Google utilizzando il comando gcloud dataflow jobs run. Gli esempi di esecuzione dei modelli forniti da Google sono documentati nella pagina dei modelli forniti da Google.

Per i seguenti esempi di modelli personalizzati, imposta i seguenti valori:

  • Sostituisci JOB_NAME con un nome job a tua scelta.
  • Sostituisci YOUR_BUCKET_NAME con il nome del tuo bucket Cloud Storage.
  • Imposta --gcs-location sul percorso di Cloud Storage del file del modello.
  • Imposta --parameters sull'elenco di parametri separati da virgole da passare al job. Gli spazi tra virgole e valori non sono consentiti.
  • Per impedire alle VM di accettare chiavi SSH archiviate nei metadati del progetto, utilizza il flag additional-experiments con l'opzione di servizio block_project_ssh_keys: --additional-experiments=block_project_ssh_keys.

Crea un job batch di modelli personalizzati

In questo esempio viene creato un job batch da un modello che legge un file di testo e scrive un file di testo di output.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

La richiesta restituisce una risposta con il formato seguente.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

Crea un job di inserimento di flussi di modello personalizzato

Questo esempio crea un job di inserimento di flussi da un modello che legge da un argomento Pub/Sub e scrive in una tabella BigQuery. La tabella BigQuery deve già esistere con lo schema appropriato.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

La richiesta restituisce una risposta con il formato seguente.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

Per un elenco completo dei flag per il comando gcloud dataflow jobs run, consulta il riferimento dell'interfaccia a riga di comando gcloud.

Monitoraggio e risoluzione dei problemi

L'interfaccia di monitoraggio Dataflow consente di monitorare i job Dataflow. Se un job non va a buon fine, puoi trovare suggerimenti per la risoluzione dei problemi, strategie di debug e un catalogo di errori comuni nella guida Risoluzione dei problemi della pipeline.