Workflows を使用した Cloud Storage から BigQuery へのデータの読み込み

Last reviewed 2021-05-12 UTC

このチュートリアルでは、Cloud Storage から BigQuery にイベントログなどの元データを読み込むために、WorkflowsCloud FunctionsFirestore を使用してサーバーレス ワークフローを確実に実行する方法を説明します。分析プラットフォームには通常、BigQuery ジョブを使用して BigQuery にデータを定期的に読み込むオーケストレーション ツールがあり、BigQuery の手続き型言語ステートメントを含む、SQL ステートメントを使用してデータを変換して、ビジネス指標を提供します。このチュートリアルは、サーバーレスのイベント ドリブン型データ処理パイプラインを構築するデベロッパーとアーキテクトを対象としています。このチュートリアルでは、ユーザーが YAML、SQL、Python に精通していることを前提としています。

アーキテクチャ

次の図は、Workflows を使用したサーバーレスの抽出、読み込み、変換(ELT)のハイレベル アーキテクチャを示しています。

パイプラインの抽出、読み込み、変換を行う。

上の図では、さまざまな店舗からセールス イベントを定期的にファイルとして収集し、Cloud Storage バケットにそのファイルを書き込む小売業のプラットフォームについて検討します。イベントは、BigQuery でインポートおよび処理を行うことにより、ビジネス指標を提供するために使用されます。このアーキテクチャには、ファイルを BigQuery にインポートする、信頼性が高いサーバーレスのオーケストレーション システムが用意されており、次の 2 つのモジュールに分けられます。

  • ファイルリスト: Firestore コレクションの Cloud Storage バケットに追加された、未処理のファイルのリストを保持します。このモジュールは、Cloud Storage バケットに新しいファイルが追加されると生成される Object Finalize ストレージ イベントによってトリガーされる Cloud Functions を介して動作します。ファイル名は、Firestore で new という名前のコレクションの files 配列に追加されます。
  • ワークフロー: スケジュール設定されたワークフローを実行します。Cloud Scheduler では、YAML ベースの構文に従って一連の手順を実行するワークフローがトリガーされ、読み込みがオーケストレートされて、Cloud Functions を呼び出して BigQuery 内のデータが変換されます。ワークフローの手順では、Cloud Functions を呼び出して次のタスクを実行します。

    • BigQuery の読み込みジョブを作成して開始します。
    • 読み込みジョブのステータスをポーリングします。
    • 変換クエリジョブを作成して開始します。
    • 変換ジョブのステータスをポーリングします。

トランザクションを使用して Firestore で新しいファイルのリストを保持すると、ワークフローによってファイルが BigQuery にインポートされるときに、ファイルの見落しが確実になくなります。ワークフローの個別の実行は、ジョブのメタデータとステータスを Firestore に保存することで、べき等になります。

目標

  • Firestore データベースを作成します。
  • Firestore の Cloud Storage バケットに追加されたファイルを追跡するようにその Cloud Functions の関数のトリガーを設定します。
  • Cloud Functions をデプロイして、BigQuery ジョブを実行、モニタリングします。
  • ワークフローをデプロイして実行し、プロセスを自動化します。

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトで課金が有効になっていることを確認します

  3. Cloud Build, Cloud Functions, Identity and Access Management, Resource Manager, and Workflows API を有効にします。

    API を有効にする

  4. [ようこそ] ページに移動し、後のステップで使用するためにプロジェクト ID をメモしておきます。

    [ようこそ] ページに移動

  5. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

環境を準備する

環境を準備するには、Firestore データベースを作成し、GitHub リポジトリからコードサンプルをクローニングします。Terraform を使用してリソースを作成し、Workflows YAML ファイルを編集して、ファイル ジェネレータの要件をインストールします。

  1. Firestore データベースを作成する方法は次のとおりです。

    1. Google Cloud Console で、[Firestore] ページに移動します。

      Firestore に移動

    2. [ネイティブ モードを選択] をクリックします。

    3. [ロケーションを選択] メニューで、Firestore データベースをホストするリージョンを選択します。物理的なロケーションに近いリージョンを選択することをおすすめします。

    4. [データベースを作成] をクリックします。

  2. Cloud Shell で、ソース リポジトリのクローンを作成します。

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Cloud Shell で、Terraform を使用して次のリソースを作成します。

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    以下を置き換えます。

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID
    • REGION: リソースをホストする Google Cloud の特定の地理的なロケーション(例: us-central1)。
    • ZONE: リソースをホストするリージョン内のロケーション(例: us-central1-b

    次のようなメッセージが表示されます。 Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform を使用すると、大規模に安全で予想通りに、インフラストラクチャを作成、変更、アップグレードできます。プロジェクトに以下のリソースが作成されます。

    • リソースへの安全なアクセスを確保するために必要な権限を有するサービス アカウント
    • serverless_elt_dataset という名前の BigQuery データセットと、受信ファイルを読み込む word_count という名前のテーブル。
    • 入力ファイルをステージングするための ${project_id}-ordersbucket という名前の Cloud Storage バケット。
    • 次の 5 つの Cloud Functions の関数
      • file_add_handler によって、Cloud Storage バケットに追加されたファイルの名前が Firestore コレクションに追加されます。
      • create_job によって、新しい BigQuery 読み込みジョブが作成され、Firebase コレクション内のファイルがジョブに関連付けられます。
      • create_query によって、新しい BigQuery クエリジョブが作成されます。
      • poll_bigquery_job によって、BigQuery ジョブのステータスが取得されます。
      • run_bigquery_job によって、BigQuery ジョブが開始されます。
  4. 前の手順でデプロイした create_jobcreate_querypoll_jobrun_bigquery_job の各 Cloud Functions の URL を取得します。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    出力は次のようになります。

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    ワークフローをデプロイする際に必要になるため、これらの URL をメモしておきます。

ワークフローを作成してデプロイする

  1. Cloud Shell で、ワークフロー workflow.yaml のソースファイルを開きます。

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    次のように置き換えます。

    • CREATE_JOB_URL: 新しいジョブを作成する関数の URL
    • POLL_BIGQUERY_JOB_URL: 実行中のジョブのステータスをポーリングする関数の URL
    • RUN_BIGQUERY_JOB_URL: BigQuery 読み込みジョブを開始する関数の URL
    • CREATE_QUERY_URL: BigQuery クエリジョブを開始する関数の URL
    • BQ_REGION: データが保存される BigQuery リージョン(例: US
    • BQ_DATASET_TABLE_NAME: BigQuery データセット テーブル名(形式は PROJECT_ID.serverless_elt_dataset.word_count
  2. workflow ファイルをデプロイします。

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    以下を置き換えます。

    • WORKFLOW_NAME: ワークフローの固有の名前
    • WORKFLOW_REGION: ワークフローがデプロイされているリージョン(例: us-central1
    • WORKFLOW_DESCRIPTION: ワークフローの説明
  3. Python 3 仮想環境を作成し、ファイル生成ツールの要件をインストールします。

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

インポートするファイルを生成する

gen.py Python スクリプトは、Avro 形式でランダムなコンテンツを生成します。スキーマは BigQuery word_count テーブルと同じです。これらの Avro ファイルは、指定された Cloud Storage バケットにコピーされます。

Cloud Shell で、以下のファイルを生成します。

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

以下を置き換えます。

  • RECORDS_PER_FILE: 単一のファイル内のレコード数
  • NUM_FILES: アップロードされるファイルの総数
  • FILE_PREFIX: 生成されたファイルの名前の接頭辞

Firestore でファイル エントリを確認する

ファイルが Cloud Storage にコピーされると、handle_new_file Cloud Functions の関数がトリガーされます。この関数によって、Firestore jobs コレクション内の new ドキュメントのファイルリスト配列にファイルリストが追加されます。

ファイルリストを表示するには、Google Cloud コンソールで Firestore の [データ] ページに移動します。

[データ] に移動

コレクションに追加されたファイルのリスト。

ワークフローをトリガーする

Workflows によって、Google Cloud と API サービスから、一連のサーバーレス タスクが同時にリンクされます。このワークフローの個々の手順は Cloud Functions として実行され、その状態は Firestore に保存されます。Cloud Functions への呼び出しはすべて、ワークフローのサービス アカウントを使用して、認証されます。

Cloud Shell で、ワークフローを実行します。

gcloud workflows execute WORKFLOW_NAME

次の図に、このワークフローで使用される手順を示します。

メインとサブのワークフローで使用される手順。

ワークフローは、メイン ワークフローとサブ ワークフローの 2 つの部分にわかれています。メイン ワークフローはジョブの作成と条件付き実行を処理する一方で、サブワークフローは BigQuery ジョブを実行します。このワークフローでは、次の操作が行われます。

  • create_job Cloud Functions の関数によって、新しいジョブ オブジェクトが作成され、Firestore ドキュメントから Cloud Storage に追加されたファイルのリストが取得され、そのファイルが読み込みジョブに関連付けられます。読み込み対象のファイルが存在しない場合、その関数によって新しいジョブは作成されません。
  • create_query Cloud Functions の関数では、実行する必要があるクエリと、そのクエリを実行する必要がある BigQuery リージョンが一緒に受け取られます。この関数では、Firestore でジョブが作成されてジョブ ID が返されます。
  • run_bigquery_job Cloud Functions の関数によって、実行する必要があるジョブの ID が取得され、BigQuery API が呼び出されてジョブが送信されます。
  • この Cloud Functions の関数でジョブが完了するのを待機する代わりに、定期的にジョブのステータスをポーリングできます。
    • poll_bigquery_job Cloud Functions の関数によって、ジョブのステータスが提供されます。ジョブが完了するまで繰り返し呼び出されます。
    • poll_bigquery_job Cloud Functions への呼び出し間に遅延を追加するには、Workflows から sleep ルーチンを呼び出します。

ジョブのステータスを確認する

ファイルリストとジョブのステータスを表示できます。

  1. Google Cloud コンソールで、Firestore の [データ] ページに移動します。

    [データ] に移動

  2. ジョブごとに固有の識別子(UUID)が生成されます。job_typestatus を確認するには、ジョブ ID をクリックします。各ジョブには次のいずれかのタイプとステータスがあります。

    • job_type: 次のいずれかの値があるワークフローによって実行されているジョブのタイプ。

      • 0: BigQuery へデータを読み込みます。
      • 1: BigQuery でクエリを実行します。
    • status: 次のいずれかの値があるジョブの現在の状態。

      • 0: ジョブは作成されていますが、開始されていません。
      • 1: ジョブが実行中。
      • 2: ジョブの実行が正常に完了しました。
      • 3: エラーが発生し、ジョブが正常に完了しませんでした。

    ジョブ オブジェクトには、BigQuery データセットのリージョン、BigQuery テーブルの名前などのメタデータ属性も含まれます。そのジョブ オブジェクトがクエリジョブの場合は、実行中のクエリ文字列が含まれます。

ジョブのステータスがハイライト表示されたファイルのリスト。

BigQuery でデータを確認する

ELT ジョブが成功したことを確認するには、データがテーブルに表示されていることを確認します。

  1. Google Cloud コンソールで、BigQuery の [エディタ] ページに移動します。

    [エディタ] に移動する

  2. [serverless_elt_dataset.word_count] テーブルをクリックします。

  3. [プレビュー] タブをクリックします。

    テーブルにデータを表示している [プレビュー] タブ。

ワークフローをスケジュールする

スケジュールに沿ってワークフローを定期的に実行するには、Cloud Scheduler を使用します

クリーンアップ

課金を停止する最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。また、リソースを個別に削除することもできます。

個々のリソースの削除

  1. Cloud Shell で、Terraform を使用して作成したすべてのリソースを削除します。

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Google Cloud コンソールで、Firestore の [データ] ページに移動します。

    [データ] に移動

  3. [ジョブ] の横にある [メニュー] をクリックし、[削除] を選択します。

    コレクションを削除するためのメニューパス。

プロジェクトの削除

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ