データ分析

BigQuery からメールへのエクスポートの自動化

Google Cloud BigQuery.jpg

※この投稿は米国時間 2020 年 5 月 9 日に、Google Cloud blog に投稿されたものの抄訳です。

データ アクセシビリティとデータ分析は、データから価値ある情報を得るために欠かせません。BigQuery ではデータの表示方法がさまざまありますが、一般的な方法の 1 つが、スケジュールに基づいてクエリ結果をメールとしてエクスポートすることです。これにより、エンドユーザーは最新のクエリ結果へのリンクが記載されたメールを受け取ることができます。ビジネス プロセスに関する日次の統計情報、サイトに関する月次のデータの概要、週次のビジネス レビューを求めている方など、どなたにも最適なソリューションです。クエリが何であれ、情報を必要とする利害関係者は、関連する分析情報を得るためにメールからデータに簡単にアクセスできます。

この投稿では、BigQuery からメールに結果をエクスポートする操作を簡単に自動化する方法について説明します。

結果メールの設計上の検討事項

設計上の重要な検討事項は、データのサイズと複雑さです。メールの添付ファイルや BigQuery 内からの大規模クエリのエクスポートにはサイズ制限があることにご注意ください。クエリ結果が 1 GB を超える場合、BigQuery は結果を複数のテーブルに出力します。その結果、複数の CSV 添付ファイルをメールで送信する必要があります。

G Suite へのアクセス権を持っている場合は、スケジュール設定された Apps Script を使用してこの送信を行うことができます。Apps Script は BigQuery API を使用してクエリを実行し、結果を Google スプレッドシートにエクスポートします。スクリプトの時間ベースのトリガーは、スケジュール設定された間隔でデータを更新します。これで、Gmail サービスを利用して、スプレッドシートへのリンクが記載されたメールを簡単に送信できます。

これは G Suite を使用した方法です。より一般的なソリューションについては、BigQuery からメールへのエクスポートを自動化する主要なソリューションとして、Google Cloud を使用することをおすすめします。この方法では、Google Cloud のいくつかのプロダクトと、メールを送信するための SendGrid API を使用します。手順は次のとおりです。

BigQuery の結果をメールにエクスポートするための自動プロセス

BigQuery の結果をメールにエクスポートするための自動プロセスを作成する方法を、手順とアーキテクチャ図からご説明します。

1. Cloud Functions コードを実行するトリガーとなる Pub/Sub トピックを作成します。

2. エクスポート用の BigQuery データセットと Cloud Storage バケットを設定します。

3. クエリの実行、結果のエクスポート、メールの送信を行うコードを使用して Cloud 関数を作成します。

4. Pub/Sub トピックに関連付けられた Cloud Scheduler ジョブを作成し、スケジュールに基づいて関数を自動的に実行します。

このプロセスのアーキテクチャは次のとおりです。

このアーキテクチャでは次のことが確認できます。

  • Cloud Scheduler: Cloud Scheduler ジョブは、Pub/Sub トピックを呼び出して、メールの定期的なエクスポートをスケジューリングします。  
  • Pub/Sub: Pub/Sub トピックが Cloud 関数をトリガーします。
  • Cloud Functions: Cloud Functions は Pub/Sub トピックをサブスクライブし、BigQuery と Cloud Storage API を呼び出すコードを実行します。
  • BigQuery: BigQuery API はクエリ結果を生成してテーブルに保存し、結果を CSV として Cloud Storage にエクスポートします。
  • Cloud Storage: Cloud Storage バケットは CSV ファイルを保存します。Cloud Storage API は、ユーザーにメールで送信される CSV の署名付き URL を生成します。  

最後に、SendGrid API が、署名付き URL へのリンクを含むメールを指定の宛先に送信します。

メールのエクスポートを開始する

このプロセスを開始する際に、データの保存とメールの送信に関連する 1 回限りの設定の手順がいくつかあります。まず、エクスポートごとに作成されたテーブルをホストする BigQuery データセットを作成します。たとえば毎日メールを受信する場合、このデータセットには「daily_export_${TIMESTAMP}」などの命名規則を使用した日次のエクスポートごとのテーブルがあります。このデータセットはサイズがすぐに大きくなる可能性があるため、デフォルトのテーブル有効期限を設定することをおすすめします。このようにすることで、古いデータを保持しているテーブルを削除できます。

次に、BigQuery からエクスポートされた CSV ファイルをホストする Cloud Storage バケットを作成します。データセットの有効期限と同様に、バケットのライフサイクル管理の構成では、「Age」条件で定義された期間後に、CSV を自動的に削除したり、ファイルを別のストレージ クラスに移動したりできます。

最後の設定手順では、SendGrid API へのアクセス権を構成します。そのためには、アカウントを作成して SendGrid API キーを生成します。これにより、Cloud Function が Email API で認証され、メールを送信できます。SendGrid の無料枠の料金は、最初の 30 日間は 1 日あたり 40,000 件のメッセージに適用され、その後は終了日なしで 1 日あたり 100 件のメッセージに適用されます(API の実装については、次のセクションでご説明します)。

実装の詳細

サービス アカウントの作成

SendGrid API を認証するには、サービス アカウントを作成する必要があります。Cloud Functions の署名済み認証情報を生成するには、サービス アカウントにサービス アカウント トークン作成者のロールが必要です。また、BigQuery と Storage のアクションを実行するためのアクセス権も必要となりますが、このために BigQuery 管理者のロールとストレージ オブジェクト管理者のロールを追加します。次のサンプルコードは、前述のロールを持つサービス アカウントを作成します。

  SA_NAME="bq-exports-svc"
SVC_DESC="BQ exports service account"
PROJECT_ID="bq-automated-exports"
gcloud iam service-accounts create $SA_NAME --description "$SVC_DESC" --project "$PROJECT_ID"
export SVC_ACCOUNT_EMAIL=$(gcloud iam service-accounts list --filter="name:$SA_NAME" --format "value(email)") 
 
 
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$SVC_ACCOUNT_EMAIL" --role='roles/bigquery.admin'
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$SVC_ACCOUNT_EMAIL" --role='roles/storage.objectAdmin' 
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:$SVC_ACCOUNT_EMAIL" --role='roles/iam.serviceAccountTokenCreator'

Cloud Functions コードの記述

このソリューションを構築するには、Python クライアント ライブラリを使用して Google BigQuery と Cloud Storage API を呼び出します。適切なサービス アカウントの認証情報を使用してクライアント ライブラリをインスタンス化し、正しい認証を行って必要なタスクを実行します。メイン スクリプトが Cloud Functions で実行される場合、認証情報はデフォルトでアプリケーションのデフォルト認証情報になります。ローカルで実行される場合、認証情報は環境変数 GOOGLE_APPLICATION_CREDENTIALS によって提供されるサービス アカウントの鍵ファイルを使用します。次のサンプルコードは認証情報の作成を示しています。

  def credentials():
    # To use this file locally set $IS_LOCAL=1 and populate environment variable $GOOGLE_APPLICATION_CREDENTIALS with path to keyfile
    # Get Application Default Credentials if running in Cloud Functions
    if os.getenv("IS_LOCAL") is None:
        credentials, project = default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
    else:
        credentials = service_account.Credentials.from_service_account_file(
            os.getenv("GOOGLE_APPLICATION_CREDENTIALS"), scopes=["https://www.googleapis.com/auth/cloud-platform"],
        )
    return credentials

ここで、メイン関数でクライアント ライブラリをインスタンス化します。

  bq_client = bigquery.Client(credentials=credentials())
storage_client = storage.Client(credentials=credentials())

BigQuery と Cloud Storage クライアント ライブラリを使用すると、テーブルの作成、そのテーブルへのクエリ結果の出力、Cloud Storage への CSV のテーブルデータのエクスポートができます。

次に、バケットに保存されている CSV ファイルの署名付き URL を生成します。このプロセスには、リンクにアクセスできる期間を示す有効期限の設定が含まれます。有効期限については、受信者が古いデータにアクセスできないように、メール間の時間差を設定する必要があります。認証のために、関数 ID は関数の現在の ID(関数を実行するサービス アカウント)を取得します。iam.Signer() は、そのサービス アカウントにリクエストを送信して、generate_signed_url() 関数を認証するための OAuth 認証情報を生成します。

  # Generate a v4 signed URL for downloading a blob.
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(csv_name)
    signing_credentials = None
    if os.getenv("IS_LOCAL") is None:
        signer = iam.Signer(request=requests.Request(), credentials=credentials(), service_account_email=os.getenv("FUNCTION_IDENTITY"),)
        # Create Token-based service account credentials for signing
        signing_credentials = service_account.IDTokenCredentials(
            signer=signer,
            token_uri="https://www.googleapis.com/oauth2/v4/token",
            target_audience="",
            service_account_email=os.getenv("FUNCTION_IDENTITY"),
        )
    url = blob.generate_signed_url(
        version="v4",
        # This URL is valid for 24 hours, until the next email
        expiration=datetime.timedelta(hours=24),
        # Allow GET requests using this URL.
        method="GET",
        # Signing credentials; if None falls back to client credentials
        credentials=signing_credentials,
    )

SendGrid API を使用してメールを送信するには、生成したトークンを使用してウェブ API の SendGrid 実装ガイドに沿って操作します。次のサンプルをご参照ください。

  # Send email through SendGrid with link to signed URL
   message = Mail(
       from_email="test@example.com",
       to_emails="username@example.com",
       subject="Daily BQ export",
       html_content="<p> Your daily BigQuery export from Google Cloud Platform \
           is linked <a href={}>here</a>.</p>".format(
           url
       ),
   )

   sg = SendGridAPIClient(os.getenv("SENDGRID_API_KEY"))
   response = sg.send(message)

上記のように、SendGrid API キーには Cloud Functions の環境変数としてアクセスできます。API キーをより安全に保存するために、Cloud Key Management Service でキーを暗号化して保存することもできます。

パイプラインのデプロイ

パイプラインを構築するには、Pub/Sub トピックを作成し、前のセクションのコードを使用して Cloud 関数をデプロイしたうえで、パイプラインをトリガーするように Cloud Scheduler を構成します。このサンプルコードは、「main.py」が Cloud 関数のコードを保持していると仮定して、パイプラインをローカルにデプロイする方法を示しています。

  CF_NAME="bq-email-export"
CS_NAME="bq-email-export"
TOPIC_NAME="bq-exports"
PROJECT_ID="bq-automated-exports"
SVC_ACCOUNT_EMAIL="bq-exports-svc@projectid.iam.gserviceaccount.com"
SENDGRID_API_KEY="API-key"
TOPIC_PATH="projects/{$PROJECT_ID}/topics/bq-exports"
SCHEDULE="00 00 * * *"
 
gcloud pubsub topics create "$TOPIC_NAME"
gcloud functions deploy "$CF_NAME" --entry-point=main --trigger-topic "$TOPIC_NAME" --runtime python37 --memory "512MB" --service-account "$SVC_ACCOUNT_EMAIL" --project "$PROJECT_ID" --set-env-vars SENDGRID_API_KEY="$SENDGRID_API_KEY"
gcloud scheduler jobs create pubsub "$CS_NAME" --schedule="$SCHEDULE" --topic="$TOPIC_PATH" --message-body="bq-email" --project="$PROJECT_ID"

これで、BigQuery からのエクスポートを一連のメールに直接送信する自動パイプラインが構築され、データへのアクセスと分析が簡単になります。

ここで使用されている Google Cloud ツールの詳細:

BigQuery クライアント ライブラリを使ってみる

● BigQuery でクエリをスケジュールする

● Cloud Storage で署名付き URLを使用する

Cloud Scheduler のジョブの作成方法

-By Neha Nene, Cloud Technical Resident and Ishita Shah, Cloud Technical Resident