BigQuery Data Transfer Service の実行通知

このページでは、BigQuery Data Transfer Service の実行通知の概要について説明します。

概要

BigQuery Data Transfer Service で構成できる実行通知には、次の 2 種類があります。

  • Pub/Sub 通知: 転送実行の正常完了時または失敗時に送信される、機械で読み取り可能な通知
  • メール通知: 転送実行の失敗時に送信される、人が読んで理解できる通知

それぞれの種類を個別に構成することも、Pub/Sub とメールの両方の実行通知を使用することもできます。

Pub/Sub 通知

Pub/Sub の通知では、転送実行に関する情報が Pub/Sub トピックに送信されます。転送実行が次の状態で完了すると、Pub/Sub 通知がトリガーされます。

  • SUCCEEDED
  • FAILED
  • CANCELLED

通知は、自分に必要な権限が付与されているプロジェクトの任意の Pub/Sub トピックに送信できます。Pub/Sub トピックで受信されたメッセージは、そのトピックの任意の数のサブスクライバーに送信できます。

始める前に

Pub/Sub の転送実行通知を構成する前に、以下の作業を行う必要があります。

  1. 通知を受け取るプロジェクトの Pub/Sub API を有効にします。

    API を有効にする

  2. 通知を受け取るプロジェクトに対して十分な権限が必要です。

    • 所有するプロジェクトの通知を受け取る場合は、必要な権限を保持していると考えられます。

    • 通知を受け取るトピックを作成する場合は、pubsub.topics.create 権限が必要です。

    • 新規トピックと既存トピックのいずれを使用するかにかかわりなく、pubsub.topics.getIamPolicy 権限および pubsub.topics.setIamPolicy 権限が必要です。通常、トピックの作成者には、そのトピックに対する権限があります。pubsub.topics.getIamPolicy 権限および pubsub.topics.setIamPolicy 権限はいずれも、事前定義された IAM のロール(pubsub.admin)に含まれています。詳細については、Pub/Sub アクセス制御をご覧ください。

  3. 通知の送信先とする既存の Pub/Sub トピックが存在する必要があります。

通知形式

Pub/Sub トピックに送信される通知は、次の 2 つの部分で構成されます。

  • 属性: イベントを記述する Key-Value ペアのセット
  • ペイロード: 変更されたオブジェクトのメタデータを含む文字列

属性

属性とは、BigQuery Data Transfer Service から Pub/Sub トピックに送信されるすべての通知に含まれる Key-Value ペアのことです。ペイロードに関係なく、通知には次の Key-Value ペアが含まれます。

属性名 説明
eventType TRANSFER_RUN_FINISHED 発生したイベントの種類。TRANSFER_RUN_FINISHED が唯一の有効な値です。
payloadFormat JSON_API_V1 オブジェクトのペイロードの形式。JSON_API_V1 が唯一の有効な値です。

ペイロード

ペイロードは、転送実行のメタデータを含む文字列です。ペイロードの種類は現時点では構成できず、将来の API バージョンの変更に備えて用意されています。

ペイロードの種類 説明
JSON_API_V1 ペイロードは、TransferRun のリソース表現を含む、UTF-8 のシリアル化された JSON 形式の文字列になります。

メール通知

メール通知では、転送実行が失敗したときに人が読めるメール メッセージが送信されます。これらのメッセージは、転送を設定したユーザーに送信されます。メッセージは構成できません

他のユーザーが転送実行のメール通知を受信する必要がある場合は、メッセージを配信するメール転送ルールを設定します。Gmail を使用している場合は、Gmail メッセージを別のアカウントに自動転送できます。

メール通知は BigQuery Data Transfer Service から送信され、転送構成と転送実行に関する詳細、さらに、転送実行が失敗した場合には、実行履歴へのリンクも含まれます。次に例を示します。

From: bigquery-data-transfer-service-noreply@google.com
To: user_who_set_up_transfer
Title: BigQuery Data Transfer Service — Transfer Run Failure —
display_name

Transfer Configuration
Display Name: display_name
Source: data_source
Destination: project_id

Run Summary
Run: run_name
Schedule Time: schedule_time
Run Time: run_time
View Run History

Google LLC 1600 Amphitheatre Parkway, Mountain View, CA 94043

This email was sent because you indicated you are willing to receive Run
Notifications from the BigQuery Data Transfer Service. If you do not wish to
receive such emails in the future, click View Transfer Configuration and
un-check the "Send E-mail Notifications" option.

通知をオンにする

通知をオンにするには:

コンソール

  1. Google Cloud コンソールの [BigQuery] ページに移動します。

    [BigQuery] ページに移動

  2. ナビゲーション メニューで [データ転送] をクリックします。

  3. 新しい転送の通知を有効にするには、 [転送の作成] をクリックします。既存の転送の通知を調整するには、転送の名前をクリックして [編集] をクリックします。

  4. [通知オプション] セクションで、有効にする通知タイプの横にある切り替えボタンをクリックします。

    • メール通知: このオプションを有効にすると、転送の実行が失敗した場合、転送管理者にメール通知が送信されます。
    • Pub/Sub 通知: このオプションを有効にする場合は、トピック名を選択するか、[トピックを作成する] をクリックします。このオプションで、Pub/Sub の転送実行通知を構成します。

Java

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートJava の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to get run notification
public class RunNotification {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String datasetId = "MY_DATASET_ID";
    final String pubsubTopicName = "MY_TOPIC_NAME";
    final String query =
        "SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
            + "@run_date as intended_run_date, 17 as some_integer";
    Map<String, Value> params = new HashMap<>();
    params.put("query", Value.newBuilder().setStringValue(query).build());
    params.put(
        "destination_table_name_template",
        Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
    params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
    params.put("partitioning_field", Value.newBuilder().build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Scheduled Query Name")
            .setDataSourceId("scheduled_query")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .setNotificationPubsubTopic(pubsubTopicName)
            .build();
    runNotification(projectId, transferConfig);
  }

  public static void runNotification(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
      System.out.println(
          "\nScheduled query with run notification created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("\nScheduled query with run notification was not created." + ex.toString());
    }
  }
}

Python

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートPython の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
pubsub_topic = "projects/PROJECT-ID/topics/TOPIC-ID"
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.notification_pubsub_topic = pubsub_topic
update_mask = field_mask_pb2.FieldMask(paths=["notification_pubsub_topic"])

transfer_config = transfer_client.update_transfer_config(
    {"transfer_config": transfer_config, "update_mask": update_mask}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"Notification Pub/Sub topic: '{transfer_config.notification_pubsub_topic}'")

実行通知の料金

Pub/Sub の実行通知を構成すると、Pub/Sub の料金が発生します。詳細については、Pub/Sub の料金ページをご覧ください。

次のステップ