Utilizzare le pipeline di dati Dataflow

Panoramica

Puoi utilizzare le pipeline di dati di Dataflow per le seguenti attività:

  • Crea pianificazioni ricorrenti dei job.
  • Scopri dove vengono utilizzate le risorse su più esecuzioni di job.
  • Definisci e gestisci gli obiettivi di aggiornamento dei dati.
  • Analizza in dettaglio le singole fasi della pipeline per correggerle e ottimizzarle.

Per la documentazione dell'API, consulta la documentazione di riferimento di Data Pipelines.

Funzionalità

  • Crea una pipeline batch ricorrente per eseguire un job batch in base a una pianificazione.
  • Crea una pipeline batch incrementale ricorrente per eseguire un job batch sulla versione più recente dei dati di input.
  • Utilizza il prospetto di riepilogo della pipeline per visualizzare l'utilizzo della capacità aggregata e il consumo di risorse di una pipeline.
  • Visualizza l'aggiornamento dei dati di una pipeline in streaming. Questa metrica, che si evolve nel tempo, può essere collegata a un avviso che ti invia una notifica quando l'aggiornamento diventa inferiore a un obiettivo specificato.
  • Utilizza i grafici delle metriche della pipeline per confrontare i job della pipeline batch e trovare anomalie.

Limitazioni

  • Disponibilità a livello di regione: puoi creare pipeline di dati nelle regioni Cloud Scheduler disponibili.

  • Quota:

    • Numero predefinito di pipeline per progetto: 500
    • Numero predefinito di pipeline per organizzazione: 2500

      La quota a livello di organizzazione è disattivata per impostazione predefinita. Puoi attivare le quote a livello di organizzazione. In questo caso, per impostazione predefinita ogni organizzazione può avere al massimo 2500 pipeline.

  • Etichette: non puoi utilizzare etichette definite dall'utente per etichettare le pipeline di dati di Dataflow. Tuttavia, quando utilizzi il campo additionalUserLabels, questi valori vengono trasmessi al job Dataflow. Per saperne di più su come le etichette vengono applicate ai singoli job Dataflow, consulta Opzioni pipeline.

Tipi di pipeline di dati

Dataflow dispone di due tipi di pipeline di dati: in streaming e batch. Entrambi i tipi di pipeline eseguono job definiti nei modelli Dataflow.

Pipeline di dati in modalità flusso
Una pipeline di dati in modalità flusso esegue un job di streaming Dataflow immediatamente dopo la sua creazione.
Pipeline di dati batch

Una pipeline di dati in batch esegue un job Dataflow in batch in base a una pianificazione definita dall'utente. Il nome file dell'input della pipeline batch può essere parametro per consentire l'elaborazione incrementale della pipeline batch.

Pipeline batch incrementali

Puoi utilizzare i segnaposto data/ora per specificare un formato di file di input incrementale per una pipeline batch.

  • È possibile utilizzare i segnaposto per anno, mese, data, ora, minuto e secondo e devono seguire il formato strftime(). I segnaposto sono preceduti dal simbolo della percentuale (%).
  • La formattazione dei parametri non viene verificata durante la creazione della pipeline.
    • Esempio: se specifichi "gs://bucket/Y" come percorso di input parametro, viene valutato come "gs://bucket/Y", perché "Y" senza un "%" precedente non viene mappato al formato strftime().

A ogni ora di esecuzione pianificata della pipeline batch, la parte del segnaposto del percorso di input viene valutata in base alla data e all'ora correnti (o con spostamento nel tempo). I valori di data vengono valutati utilizzando la data corrente nel fuso orario del job pianificato. Se il percorso valutato corrisponde al percorso di un file di input, il file viene acquisito per l'elaborazione dalla pipeline batch all'ora pianificata.

  • Esempio: una pipeline batch è pianificata per ripetersi all'inizio di ogni ora PST. Se parametrizzi il percorso di input come gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, il 15 aprile 2021 alle 18:00 PST, il percorso di input viene valutato come gs://bucket-name/2021-04-15/prefix-18_00.csv.

Utilizzare i parametri di sfasamento temporale

Puoi utilizzare i parametri di sfasamento dell'ora di + o - minuti o ore. Per supportare la corrispondenza di un percorso di input con una data e ora valutata spostata prima o dopo la data e ora corrente della pianificazione della pipeline,chiudere questi parametri tra parentesi graffe. Utilizza il formato {[+|-][0-9]+[m|h]}. La pipeline batch continua a ripetersi all'ora programmata, ma il percorso di input viene valutato con l'offset temporale specificato.

  • Esempio: una pipeline batch è pianificata per ripetersi all'inizio di ogni ora PST. Se parametrizzi il percorso di input come gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, il 15 aprile 2021 alle 18:00 PST, il percorso di input viene valutato come gs://bucket-name/2021-04-15/prefix-16_00.csv.

Ruoli della pipeline di dati

Affinché le operazioni della pipeline di dati Dataflow vengano eseguite correttamente, devi disporre dei ruoli IAM necessari, come segue:

  1. Per eseguire le operazioni, devi disporre del ruolo appropriato:

  2. L'account di servizio utilizzato da Cloud Scheduler deve disporre del ruolo roles/iam.serviceAccountUser, indipendentemente dal fatto che sia specificato dall'utente o che si tratti dell'account di servizio Compute Engine predefinito. Per ulteriori informazioni, consulta Ruoli della pipeline di dati.

  3. Devi essere in grado di agire come account di servizio utilizzato da Cloud Scheduler e Dataflow ottenendo il ruolo roles/iam.serviceAccountUser per quell'account. Se non selezioni un account di servizio per Cloud Scheduler e Dataflow, viene utilizzato il service account Compute Engine predefinito.

Crea una pipeline di dati

Puoi creare una pipeline di dati Dataflow in due modi:

  1. Importa un job oppure
  2. Crea una pipeline di dati

Pagina di configurazione delle pipeline di dati:la prima volta che accedi alla funzionalità delle pipeline di Dataflow nella console Google Cloud, si apre una pagina di configurazione. Abilita le API elencate per creare pipeline di dati.

Importare un job

Puoi importare un job batch o in streaming di Dataflow basato su un modello classico o flessibile e trasformarlo in una pipeline di dati.

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

  2. Seleziona un job completato, quindi nella pagina Dettagli job, seleziona +Importa come pipeline.

  3. Nella pagina Crea pipeline da modello, i parametri vengono compilati con le opzioni del job importato.

  4. Per un job batch, nella sezione Pianifica la pipeline, fornisci una pianificazione della ricorrenza. Fornire un indirizzo email per Cloud Scheduler, utilizzato per pianificare le esecuzioni collettive, è facoltativo. Se non viene specificato, viene utilizzato l'account di servizio Compute Engine predefinito.

Crea una pipeline di dati

  1. Nella console Google Cloud, vai alla pagina Pipeline di dati di Dataflow.

    Vai a Pipeline di dati

  2. Seleziona + Crea pipeline di dati.

  3. Nella pagina Crea pipeline da modello, fornisci un nome alla pipeline e compila gli altri campi di selezione e parametro del modello.

  4. Per un job batch, nella sezione Pianifica la pipeline, fornisci una pianificazione della ricorrenza. Fornire un indirizzo email per Cloud Scheduler, utilizzato per pianificare le esecuzioni collettive, è facoltativo. Se non viene specificato un valore, viene utilizzato l'account di servizio Compute Engine predefinito.

Creare una pipeline di dati batch

Per creare questa pipeline di dati batch di esempio, devi avere accesso alle seguenti risorse nel tuo progetto:

Questa pipeline di esempio utilizza il modello di pipeline batch Testo di Cloud Storage in BigQuery. Questo modello legge i file in formato CSV da Cloud Storage, esegue una trasformazione e inserisce i valori in una tabella BigQuery con tre colonne.

  1. Crea i seguenti file sul tuo disco locale:

    1. Un file bq_three_column_table.json contenente lo schema seguente della tabella BigQuery di destinazione.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. Un file JavaScript split_csv_3cols.js che implementa una semplice trasformazione dei dati di input prima dell'inserimento in BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. Un file CSV file01.csv con diversi record inseriti nella tabella BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Utilizza il comando gcloud storage cp per copiare i file nelle cartelle di un bucket Cloud Storage nel tuo progetto, come segue:

    1. Copia bq_three_column_table.json e split_csv_3cols.js in gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Copia file01.csv in gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. Nella console Google Cloud, vai alla pagina Bucket in Cloud Storage.

    Vai a Bucket

  4. Per creare una cartella tmp nel bucket Cloud Storage, seleziona il nome della cartella per aprire la pagina dei dettagli del bucket, quindi fai clic su Crea cartella.

    Pulsante Crea cartella nella pagina dei dettagli del bucket.

  5. Nella console Google Cloud, vai alla pagina Pipeline di dati di Dataflow.

    Vai a Pipeline di dati

  6. Seleziona Crea pipeline di dati. Inserisci o seleziona i seguenti elementi nella pagina Crea pipeline da modello:

    1. In Nome pipeline, inserisci text_to_bq_batch_data_pipeline.
    2. Per Endpoint a livello di regione, seleziona una regione Compute Engine. Le regioni di origine e di destinazione devono corrispondere. Pertanto, il bucket Cloud Storage e la tabella BigQuery devono trovarsi nella stessa regione.
    3. In Modello Dataflow, in Elabora i dati collettivamente (batch), seleziona File di testo su Cloud Storage a BigQuery.

    4. Per Pianifica la tua pipeline, seleziona una pianificazione, ad esempio Ogni ora al minuto 25, nel tuo fuso orario. Puoi modificare la pianificazione dopo aver inviato la pipeline. Fornire un indirizzo email per Cloud Scheduler, che viene utilizzato per pianificare le esecuzioni collettive, è facoltativo. Se non viene specificato, viene utilizzato l'account di servizio Compute Engine predefinito.

    5. In Parametri obbligatori, inserisci quanto segue:

      1. Per Percorso della funzione JavaScript definita dall'utente in Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. Per Percorso JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. Per Nome della funzione JavaScript definita dall'utente: transform
      4. Per Tabella di output BigQuery:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. Per Percorso di input di Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. Per la directory BigQuery temporanea:
        BUCKET_ID/tmp
        
      7. Per Posizione temporanea:
        BUCKET_ID/tmp
        
    6. Fai clic su Crea pipeline.

  7. Conferma le informazioni sulla pipeline e sul modello e visualizza la cronologia corrente e precedente dalla pagina Dettagli pipeline.

    Pagina dei dettagli della pipeline.

Puoi modificare la pianificazione della pipeline di dati dal riquadro Informazioni sulla pipeline nella pagina Dettagli pipeline.

Pulsante Modifica accanto alla pianificazione della pipeline.

Puoi anche eseguire una pipeline batch su richiesta utilizzando il pulsante Esegui nella console delle pipeline Dataflow.

Creare una pipeline di dati in streaming di esempio

Puoi creare una pipeline di dati in streaming di esempio seguendo le istruzioni per la pipeline batch di esempio, con le seguenti differenze:

  • Per Pianificazione della pipeline, non specificare una pianificazione per una pipeline di dati in modalità flusso. Il job di streaming Dataflow viene avviato immediatamente.
  • In Modello Dataflow, in Elabora i dati in modo continuo (flusso), seleziona File di testo su Cloud Storage a BigQuery.
  • Per il tipo di macchina di lavoro, la pipeline elabora l'insieme iniziale di file corrispondenti al pattern gs://BUCKET_ID/inputs/file01.csv e tutti i file aggiuntivi corrispondenti a questo pattern che carichi nella cartella inputs/. Se le dimensioni dei file CSV superano diversi GB, per evitare possibili errori di esaurimento della memoria, seleziona un tipo di macchina con una memoria superiore al tipo di macchina n1-standard-4 predefinito, ad esempio n1-highmem-8.

Risoluzione dei problemi

Questa sezione mostra come risolvere i problemi relativi alle pipeline di dati Dataflow.

Impossibile avviare il job della pipeline di dati

Quando utilizzi le pipeline di dati per creare una pianificazione dei job ricorrenti, il job Dataflow potrebbe non essere avviato e nei file di log di Cloud Scheduler viene visualizzato un errore di stato 503.

Questo problema si verifica quando Dataflow non è temporaneamente in grado di eseguire il job.

Per risolvere il problema, configura Cloud Scheduler in modo che riprovi a eseguire il job. Poiché il problema è temporaneo, il nuovo tentativo di esecuzione del job potrebbe andare a buon fine. Per maggiori informazioni sull'impostazione dei valori di ripetizione in Cloud Scheduler, consulta Creare un job.

Esaminare le violazioni degli obiettivi della pipeline

Le sezioni seguenti descrivono come esaminare le pipeline che non soddisfano gli obiettivi di rendimento.

Pipeline batch ricorrenti

Per un'analisi iniziale dell'integrità della pipeline, nella pagina Informazioni sulla pipeline della console Google Cloud, utilizza i grafici Stato dei singoli job e Tempo thread per passaggio. Questi grafici si trovano nel riquadro dello stato della pipeline.

Indagine di esempio:

  1. Hai una pipeline batch ricorrente che viene eseguita ogni ora a 3 minuti dall'ora. In genere ogni job viene eseguito per circa 9 minuti. Hai un obiettivo per il completamento di tutti i job in meno di 10 minuti.

  2. Il grafico dello stato del job mostra che un job è stato eseguito per più di 10 minuti.

  3. Nella tabella della cronologia Aggiornamento/Esecuzione, individua il job eseguito durante l'ora di interesse. Fai clic per aprire la pagina dei dettagli del job Dataflow. In questa pagina, individua la fase in esecuzione più lunga, quindi cerca possibili errori nei log per determinare la causa del ritardo.

Pipeline in modalità flusso

Per un'analisi iniziale dell'integrità della pipeline, nella pagina Dettagli pipeline, nella scheda Informazioni sulla pipeline, utilizza il grafico Aggiornamento dei dati. Questo grafico si trova nel riquadro dello stato della pipeline.

Indagine di esempio:

  1. Hai una pipeline di streaming che in genere produce un output con una aggiornamento dei dati di 20 secondi.

  2. Hai impostato un obiettivo di garanzia dell'aggiornamento dei dati ogni 30 secondi. Quando esamini il grafico dell'aggiornamento dei dati, noti che tra le 9 e le 10:00, l'aggiornamento dei dati è aumentato a quasi 40 secondi.

    Grafico sull'aggiornamento dei dati che mostra un aumento del numero di minuti di aggiornamento dei dati.

  3. Passa alla scheda Metriche pipeline, quindi visualizza i grafici Utilizzo della CPU e Utilizzo della memoria per ulteriori analisi.

Errore: l'ID pipeline esiste già all'interno del progetto

Se provi a creare una nuova pipeline con un nome che esiste già nel progetto, viene visualizzato il seguente messaggio di errore: Pipeline Id already exist within the project. Per evitare questo problema, scegli sempre nomi univoci per le pipeline.