複数の BigQuery ジョブを並列実行する


BigQuery は、クエリ対象として一般公開できるいくつかの一般公開データセットをホストしています。このチュートリアルでは、複数の BigQuery クエリジョブを同時に実行するワークフローを作成して、ジョブを順次実行する場合に比べてパフォーマンスが向上していることを示します。

目標

このチュートリアルの内容は次のとおりです。

  1. Wikipedia 一般公開データセットに対してクエリを実行し、特定の月における閲覧回数が最も多いタイトルを特定します。
  2. 複数の BigQuery クエリジョブを順次実行するワークフローをデプロイして実行します。
  3. 並列イテレーションを使用して BigQuery ジョブを実行するワークフローをデプロイして実行し、通常の for ループが並列実行されます。

Google Cloud コンソールで次のコマンドを実行するか、ターミナルまたは Cloud Shell で Google Cloud CLI を使用できます。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

組織で定義されているセキュリティの制約により、次の手順を完了できない場合があります。トラブルシューティング情報については、制約のある Google Cloud 環境でアプリケーションを開発するをご覧ください。

コンソール

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Workflows API を有効にします。

    API を有効にする

  5. サービス アカウントを作成します。

    1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. サービス アカウントに次のロールを付与します: BigQuery > BigQuery Job User, Logging > Logs Writer.

      ロールを付与するには、[ロールを選択] リストを見つけてロールを選択します。

      追加のロールを付与するには、 [別のロールを追加] をクリックして各ロールを追加します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

  6. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  7. Google Cloud プロジェクトで課金が有効になっていることを確認します

  8. Workflows API を有効にします。

    API を有効にする

  9. サービス アカウントを作成します。

    1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. サービス アカウントに次のロールを付与します: BigQuery > BigQuery Job User, Logging > Logs Writer.

      ロールを付与するには、[ロールを選択] リストを見つけてロールを選択します。

      追加のロールを付与するには、 [別のロールを追加] をクリックして各ロールを追加します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

gcloud

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Workflows API を有効にします。

    gcloud services enable workflows.googleapis.com
  7. 認証を設定します。

    1. サービス アカウントを作成します。

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME をサービス アカウントの名前に置き換えます。

    2. サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。roles/bigquery.jobUser, roles/logging.logWriter

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      次のように置き換えます。

      • SERVICE_ACCOUNT_NAME: サービス アカウントの名前
      • PROJECT_ID: サービス アカウントを作成したプロジェクト ID
      • ROLE: 付与するロール
  8. Google Cloud CLI をインストールします。
  9. gcloud CLI を初期化するには:

    gcloud init
  10. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  11. Google Cloud プロジェクトで課金が有効になっていることを確認します

  12. Workflows API を有効にします。

    gcloud services enable workflows.googleapis.com
  13. 認証を設定します。

    1. サービス アカウントを作成します。

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME をサービス アカウントの名前に置き換えます。

    2. サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。roles/bigquery.jobUser, roles/logging.logWriter

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      次のように置き換えます。

      • SERVICE_ACCOUNT_NAME: サービス アカウントの名前
      • PROJECT_ID: サービス アカウントを作成したプロジェクト ID
      • ROLE: 付与するロール

BigQuery クエリジョブを実行する

BigQuery では、インタラクティブ(オンデマンド)クエリジョブを実行できます。詳細については、インタラクティブ クエリとバッチクエリのジョブの実行をご覧ください。

コンソール

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    BigQuery に移動

  2. [クエリエディタ] テキスト領域に、次の BigQuery SQL クエリを入力します。

    SELECT TITLE, SUM(views)
    FROM `bigquery-samples.wikipedia_pageviews.201207h`
    GROUP BY TITLE
    ORDER BY SUM(views) DESC
    LIMIT 100
    
  3. [実行] をクリックします。

bq

ターミナルで、次の bq query コマンドを入力して、標準 SQL 構文を使用してインタラクティブ クエリを実行します。

bq query \
--use_legacy_sql=false \
'SELECT
  TITLE, SUM(views)
FROM
  `bigquery-samples.wikipedia_pageviews.201207h`
GROUP BY
  TITLE
ORDER BY
  SUM(views) DESC
LIMIT 100'

特定の月のビューが最も多い上位 100 件の Wikipedia タイトルを返し、出力を一時テーブルに書き込むクエリを実行します。

クエリの実行に要した時間をメモします。

複数のクエリを順次実行するワークフローをデプロイする

ワークフロー定義は、ワークフロー構文を使用して説明した一連のステップで構成されています。ワークフローを作成したら、デプロイして実行できるようにします。デプロイの手順では、ソースファイルを実行できることも検証されます。

次のワークフローでは、Workflows の BigQuery コネクタを使用して、クエリを実行する 5 つのテーブルのリストを定義しています。クエリは順次実行され、各テーブルで最も閲覧されたタイトルが結果マップに保存されます。

コンソール

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: workflow-serial-bqjobs)。

  4. 適切なリージョンを選択します(例: us-central1)。

  5. 先ほど作成したサービス アカウントを選択します。

    BigQuery > BigQuery ジョブユーザーLogging > ログ書き込み IAM ロールの両方がすでにサービス アカウントに付与されている必要があります。

  6. [Next] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. [Deploy] をクリックします。

gcloud

  1. ターミナルを開いて、ワークフローのソースコード ファイルを作成します。

    touch workflow-serial-bqjobs.yaml
    
  2. 次のワークフローをソースコード ファイルにコピーします。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            for:
                value: table
                in: ${tables}
                steps:
                - logTable:
                    call: sys.log
                    args:
                        text: ${"Running query for table " + table}
                - runQuery:
                    call: googleapis.bigquery.v2.jobs.query
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            useLegacySql: false
                            useQueryCache: false
                            timeoutMs: 30000
                            # Find top 100 titles with most views on Wikipedia
                            query: ${
                                "SELECT TITLE, SUM(views)
                                FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                WHERE LENGTH(TITLE) > 10
                                GROUP BY TITLE
                                ORDER BY SUM(VIEWS) DESC
                                LIMIT 100"
                                }
                    result: queryResult
                - returnResult:
                    assign:
                        # Return the top title from each table
                        - results[table]: {}
                        - results[table].title: ${queryResult.rows[0].f[0].v}
                        - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy workflow-serial-bqjobs \
       --source=workflow-serial-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM は、先ほど作成したサービス アカウントのメールアドレスに置き換えます。

    すでにサービス アカウントに roles/bigquery.jobUserroles/logging.logWriter の両方の IAM ロールを付与している必要があります。

ワークフローを実行し、複数のクエリを順次実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。

コンソール

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [Workflows] ページで、[workflow-serial-bqjobs] ワークフローを選択して、詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

  5. ワークフローの結果が [出力] ペインに表示されます。

gcloud

  1. ターミナルを開きます。

  2. ワークフローを実行します。

     gcloud workflows run workflow-serial-bqjob

ワークフローの実行には、約 1 分すなわち前回の実行時間の 5 倍ほどの時間を要します。結果には、各テーブルが含まれ、次のようになります。

{
  "201201h": {
    "title": "Special:Search",
    "views": "14591339"
  },
  "201202h": {
    "title": "Special:Search",
    "views": "132765420"
  },
  "201203h": {
    "title": "Special:Search",
    "views": "123316818"
  },
  "201204h": {
    "title": "Special:Search",
    "views": "116830614"
  },
  "201205h": {
    "title": "Special:Search",
    "views": "131357063"
  }
}

複数のクエリを並列に実行するワークフローをデプロイして実行する

5 つのクエリを順次実行する代わりに、いくつかの変更を行うことで並行してクエリを実行できます。

 - runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}
  • parallel ステップを使用すると、for ループの各イテレーションを並行して実行できます。
  • results 変数は shared として宣言されます。これにより、ブランチによる書き込みが可能になり、各ブランチの結果を追加できます。

コンソール

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: workflow-parallel-bqjobs)。

  4. 適切なリージョンを選択します(例: us-central1)。

  5. 先ほど作成したサービス アカウントを選択します。

  6. [Next] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  8. [Deploy] をクリックします。

  9. [ワークフローの詳細] ページで [ 実行] を選択します。

  10. もう一度 [Execute] をクリックします。

  11. ワークフローの結果が [出力] ペインに表示されます。

gcloud

  1. ターミナルを開いて、ワークフローのソースコード ファイルを作成します。

    touch workflow-parallel-bqjobs.yaml
    
  2. 次のワークフローをソースコード ファイルにコピーします。

    main:
        steps:
        - init:
            assign:
                - results : {} # result from each iteration keyed by table name
                - tables:
                    - 201201h
                    - 201202h
                    - 201203h
                    - 201204h
                    - 201205h
        - runQueries:
            parallel:
                shared: [results]
                for:
                    value: table
                    in: ${tables}
                    steps:
                    - logTable:
                        call: sys.log
                        args:
                            text: ${"Running query for table " + table}
                    - runQuery:
                        call: googleapis.bigquery.v2.jobs.query
                        args:
                            projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                            body:
                                useLegacySql: false
                                useQueryCache: false
                                timeoutMs: 30000
                                # Find top 100 titles with most views on Wikipedia
                                query: ${
                                    "SELECT TITLE, SUM(views)
                                    FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                                    WHERE LENGTH(TITLE) > 10
                                    GROUP BY TITLE
                                    ORDER BY SUM(VIEWS) DESC
                                    LIMIT 100"
                                    }
                        result: queryResult
                    - returnResult:
                        assign:
                            # Return the top title from each table
                            - results[table]: {}
                            - results[table].title: ${queryResult.rows[0].f[0].v}
                            - results[table].views: ${queryResult.rows[0].f[1].v}
        - returnResults:
            return: ${results}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy workflow-parallell-bqjobs \
       --source=workflow-parallel-bqjobs.yaml \
       --service-account=MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM
    

    MY_SERVICE_ACCOUNT@MY_PROJECT.IAM.GSERVICEACCOUNT.COM は、先ほど作成したサービス アカウントのメールアドレスに置き換えます。

  4. ワークフローを実行します。

     gcloud workflows run workflow-serial-bqjob

結果は先ほどの出力と類似していますが、ワークフローの実行は約 20 秒以内に完了します。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用し、このチュートリアルで変更を加えずに残す場合は、チュートリアル用に作成したリソースを削除します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

チュートリアル リソースの削除

このチュートリアルで作成したワークフローを削除します。

gcloud workflows delete WORKFLOW_NAME

次のステップ