Caricare i dati da Cloud Storage in BigQuery utilizzando i flussi di lavoro

Last reviewed 2021-05-12 UTC

Questo tutorial mostra come eseguire in modo affidabile flussi di lavoro serverless utilizzando Flussi di lavoro, Funzioni di Cloud Run, e Firestore per caricare dati non elaborati, come i log eventi, Cloud Storage a BigQuery. Le piattaforme di analisi di solito dispongono di uno strumento di orchestrazione per caricare periodicamente i dati BigQuery con Job BigQuery, e poi trasformare i dati per fornire metriche aziendali utilizzando SQL estratti conto aggiuntivi, tra cui Istruzioni del linguaggio procedurale di BigQuery. Questo tutorial è rivolto a sviluppatori e architetti che vogliono creare pipeline di elaborazione dei dati basate su eventi serverless. Il tutorial presuppone che tu hanno familiarità con YAML, SQL e Python.

Architettura

Il seguente diagramma mostra l'architettura di alto livello di un ambiente serverless estrazione, caricamento e trasformazione (ELT) pipeline utilizzando Flussi di lavoro.

Estrai, carica e trasforma la pipeline.

Nel diagramma precedente, prendi in considerazione una piattaforma di vendita al dettaglio che raccoglie periodicamente gli eventi di vendita sotto forma di file da vari negozi e poi li scrive in un bucket Cloud Storage. Gli eventi vengono utilizzati per fornire metriche aziendali tramite l'importazione e l'elaborazione in BigQuery. Questa architettura offre un sistema di orchestrazione serverless e affidabile per l'importazione dei file in BigQuery ed è suddiviso nei seguenti due moduli:

  • Elenco file: mantiene l'elenco dei file non elaborati aggiunti a un Bucket Cloud Storage in una raccolta Firestore. Questo modulo funziona tramite una funzione Cloud Run attivata da un evento di archiviazione Object Finalize, generato quando un nuovo file viene aggiunto al bucket Cloud Storage. Il nome file viene aggiunto all'array files della raccolta denominata new in Firestore.
  • Flusso di lavoro: esegue i flussi di lavoro pianificati. Cloud Scheduler attiva un flusso di lavoro che esegue una serie di passaggi in base Sintassi basata su YAML orchestrare il caricamento e poi trasformare i dati in a BigQuery chiamando Funzioni di Cloud Run. I passaggi nel flusso di lavoro chiamano le funzioni di Cloud Run per eseguire le seguenti attività:

    • Crea e avvia un job di caricamento BigQuery.
    • Esegui un sondaggio sullo stato del job di caricamento.
    • Crea e avvia il job di query di trasformazione.
    • Esegui un sondaggio sullo stato del job di trasformazione.

Utilizzare le transazioni per gestire l'elenco dei nuovi file in Firestore aiuta ad assicurare che nessun file venga perso quando un flusso di lavoro e li importa in BigQuery. Le esecuzioni separate del flusso di lavoro vengono rese idempotenti memorizzando i metadati e lo stato del job in Firestore.

Obiettivi

  • Crea un database Firestore.
  • Configura un trigger della funzione Cloud Run per monitorare i file aggiunti al bucket Cloud Storage in Firestore.
  • Esegui il deployment delle funzioni Cloud Run per eseguire e monitorare i job BigQuery.
  • Esegui il deployment ed esegui un flusso di lavoro per automatizzare il processo.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Al termine delle attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la sezione Pulizia.

Prima di iniziare

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Vai alla pagina Ti diamo il benvenuto e annota l'ID progetto da utilizzare in un passaggio successivo.

    Vai alla pagina di benvenuto

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

prepara l'ambiente

Per preparare l'ambiente, crea un database Firestore, clona gli esempi di codice dal repository GitHub, crea risorse utilizzando Terraform, modifica il file YAML di Workflows e installa i requisiti per il generatore di file.

  1. Per creare un database Firestore:

    1. Nella console Google Cloud, vai alla pagina Firestore.

      Vai a Firestore

    2. Fai clic su Seleziona modalità nativa.

    3. Nel menu Seleziona una località, scegli la regione in cui vuoi ospitare il database Firestore. Ti consigliamo di scegliere una regione vicina alla tua posizione fisica.

    4. Fai clic su Crea database.

  2. In Cloud Shell, clona il repository di codice sorgente:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. In Cloud Shell, crea le risorse seguenti utilizzando Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto Google Cloud
    • REGION: un account Google Cloud specifico posizione geografica per ospitare le tue risorse, ad esempio us-central1
    • ZONE: una località all'interno di una regione in cui ospitare le tue risorse, ad esempio us-central1-b

    Dovresti visualizzare un messaggio simile al seguente: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform può aiutarti a creare, modificare e eseguire l'upgrade dell'infrastruttura su larga scala in modo sicuro e prevedibile. Nel progetto vengono create le seguenti risorse:

    • Account di servizio con i privilegi richiesti per garantire l'accesso sicuro alle risorse.
    • Un set di dati BigQuery denominato serverless_elt_dataset e una tabella denominata word_count per caricare i file in arrivo.
    • Un bucket Cloud Storage denominato ${project_id}-ordersbucket per la gestione temporanea dei file di input.
    • Ecco le cinque funzioni di Cloud Run:
      • file_add_handler aggiunge il nome dei file che sono aggiunte al bucket Cloud Storage Firestore.
      • create_job crea un nuovo caricamento BigQuery e associa i file nella raccolta Firebase al job.
      • create_query crea un nuovo job di query BigQuery.
      • poll_bigquery_job ottiene lo stato di del job BigQuery.
      • run_bigquery_job avvia un job BigQuery.
  4. Recupera gli URL per create_job, create_query, poll_job e run_bigquery_job funzioni Cloud Run di cui hai eseguito il deployment nella precedente passaggio.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    L'output è simile al seguente:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Prendi nota di questi URL perché sono necessari quando esegui il deployment del flusso di lavoro.

Crea ed esegui il deployment di un flusso di lavoro

  1. In Cloud Shell, apri il file di origine per il flusso di lavoro, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Sostituisci quanto segue:

    • CREATE_JOB_URL: l'URL della funzione a crea un nuovo job
    • POLL_BIGQUERY_JOB_URL: l'URL del funzione per eseguire il polling dello stato di un job in esecuzione
    • RUN_BIGQUERY_JOB_URL: l'URL della funzione per avviare un job di caricamento BigQuery
    • CREATE_QUERY_URL: l'URL della funzione per avviare un job di query BigQuery
    • BQ_REGION: il Regione BigQuery in cui vengono archiviati i dati, ad esempio US
    • BQ_DATASET_TABLE_NAME: il nome della tabella del set di dati BigQuery nel formato PROJECT_ID.serverless_elt_dataset.word_count
  2. Esegui il deployment del file workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Sostituisci quanto segue:

    • WORKFLOW_NAME: il nome univoco del flusso di lavoro
    • WORKFLOW_REGION: la regione in cui è stato eseguito il deployment del flusso di lavoro, ad esempio us-central1
    • WORKFLOW_DESCRIPTION: la descrizione del flusso di lavoro
  3. Crea un ambiente virtuale Python 3 e installa i requisiti per generatore di file:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Genera i file da importare

Lo script Python gen.py genera contenuti casuali in Formato Avro. Lo schema è uguale a la tabella word_count di BigQuery. Questi file Avro vengono copiati nel bucket Cloud Storage specificato.

In Cloud Shell, genera i file:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Sostituisci quanto segue:

  • RECORDS_PER_FILE: il numero di record in un singolo file
  • NUM_FILES: il numero totale di file da caricare
  • FILE_PREFIX: il prefisso dei nomi del file generati

Visualizza le voci di file in Firestore

Quando i file vengono copiati in Cloud Storage, viene attivata la funzionehandle_new_file Cloud Run. Questa funzione aggiunge l'elenco dei file all'array dell'elenco dei file nel documento new nella raccolta jobs di Firestore.

Per visualizzare l'elenco dei file, nella console Google Cloud vai alla Pagina Dati di Firestore.

Vai a Dati

Elenco dei file aggiunti alla raccolta.

Attiva il flusso di lavoro

Workflows collega una serie di attività serverless Google Cloud e i servizi API. I singoli passaggi di questo flusso di lavoro vengono eseguiti Le funzioni di Cloud Run e lo stato è archiviato in Firestore. Tutte le chiamate alle funzioni Cloud Run vengono autenticate utilizzando l'account di servizio del flusso di lavoro.

In Cloud Shell, esegui il flusso di lavoro:

gcloud workflows execute WORKFLOW_NAME

Il seguente diagramma mostra i passaggi utilizzati nel flusso di lavoro:

Passaggi utilizzati nel flusso di lavoro principale e secondario.

Il flusso di lavoro è suddiviso in due parti: il flusso di lavoro principale e il flusso di lavoro secondario. Il flusso di lavoro principale gestisce la creazione dei job e l'esecuzione condizionale, mentre un flusso di lavoro esegue Job BigQuery. Il flusso di lavoro esegue le seguenti operazioni:

  • La funzione Cloud Run create_job crea un nuovo oggetto job, ottiene l'elenco dei file aggiunti a Cloud Storage dalla documento Firestore e associa i file al caricamento lavoro. Se non ci sono file da caricare, la funzione non crea un nuovo job.
  • La funzione Cloud Run create_query prende la query che deve da eseguire insieme alla regione BigQuery a cui deve essere eseguito. La funzione crea il job in Firestore e restituisce l'ID job.
  • La funzione Cloud Run run_bigquery_job ottiene l'ID del un job da eseguire, quindi chiama l'API BigQuery per inviare il job.
  • Invece di aspettare il completamento del lavoro nel funzione Cloud Run, puoi eseguire periodicamente il polling dello stato del job.
    • La funzione Cloud Run poll_bigquery_job fornisce stato del lavoro. Viene chiamato ripetutamente fino al completamento del job.
    • Per aggiungere un ritardo tra le chiamate alla funzione poll_bigquery_job Cloud Run, viene chiamata una sleep routine da Workflows.

Visualizza lo stato del job

Puoi visualizzare l'elenco dei file e lo stato del job.

  1. Nella console Google Cloud, vai alla pagina Dati di Firestore.

    Vai a Dati

  2. Per ogni job viene generato un identificatore univoco (UUID). Per visualizzare job_type e status, fai clic sull'ID job. Ogni job potrebbe avere uno dei i seguenti tipi e stati:

    • job_type: il tipo di job eseguito dal flusso di lavoro con uno dei seguenti valori:

      • 0: carica i dati in BigQuery.
      • 1: esegui una query in BigQuery.
    • status: lo stato attuale del job con uno dei seguenti valori:

      • 0: il job è stato creato, ma non è stato avviato.
      • 1: il job è in esecuzione.
      • 2: l'esecuzione del job è riuscita.
      • 3: si è verificato un errore e il job non è stato completato correttamente.

    L'oggetto job contiene anche attributi dei metadati, come la regione del set di dati BigQuery, il nome della tabella BigQuery e, se si tratta di un job di query, la stringa di query in esecuzione.

Elenco di file con lo stato del job evidenziato.

Visualizzare i dati in BigQuery

Per verificare che il job ELT sia andato a buon fine, controlla che i dati vengano visualizzati nella tabella.

  1. Nella console Google Cloud, vai a BigQuery Pagina Editor.

    Vai all'editor

  2. Fai clic sulla tabella serverless_elt_dataset.word_count.

  3. Fai clic sulla scheda Anteprima.

    Scheda Anteprima che mostra i dati nella tabella.

Pianifica il flusso di lavoro

Per eseguire periodicamente il flusso di lavoro in base a una pianificazione, puoi utilizzare Cloud Scheduler.

Esegui la pulizia

Il modo più semplice per eliminare la fatturazione è quello di eliminare il progetto Google Cloud creato per il tutorial. In alternativa, puoi eliminare il singolo Google Cloud.

Elimina le singole risorse

  1. In Cloud Shell, rimuovi tutte le risorse create utilizzando Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Nella console Google Cloud, vai alla pagina Dati di Firestore.

    Vai a Dati

  3. Accanto a Job, fai clic su Menu e seleziona Elimina.

    Percorso del menu per eliminare una raccolta.

Elimina il progetto

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Passaggi successivi