BigQuery Data Transfer Service の実行通知

このページでは、BigQuery Data Transfer Service の実行通知の概要について説明します。

概要

BigQuery Data Transfer Service で構成できる実行通知には、次の 2 種類があります。

  • Pub/Sub 通知: 転送実行の正常完了時または失敗時に送信される、機械で読み取り可能な通知
  • メール通知: 転送実行の失敗時に送信される、人が読んで理解できる通知

それぞれの種類を個別に構成することも、Pub/Sub とメールの両方の実行通知を使用することもできます。

Pub/Sub 通知

Pub/Sub の通知では、転送実行に関する情報が Pub/Sub トピックに送信されます。転送実行が次の状態で完了すると、Pub/Sub 通知がトリガーされます。

  • SUCCEEDED
  • FAILED
  • CANCELLED

通知は、自分に必要な権限が付与されているプロジェクトの任意の Pub/Sub トピックに送信できます。Pub/Sub トピックで受信されたメッセージは、そのトピックの任意の数のサブスクライバーに送信できます。

始める前に

Pub/Sub の転送実行通知を構成する前に、以下を作業を行う必要があります。

  1. 通知を受け取るプロジェクトの Pub/Sub API を有効にします。

    API の有効化

  2. 通知を受け取るプロジェクトに対して十分な権限を持っていること。

    • 所有するプロジェクトの通知を受け取る場合は、必要な権限を保持していると考えられます。

    • 通知を受け取るトピックを作成する場合は、pubsub.topics.create 権限が必要です。

    • 新規トピックと既存トピックのいずれを使用するかにかかわりなく、pubsub.topics.setIamPolicy 権限が必要です。通常、トピックの作成者には、そのトピックに対する pubsub.topics.setIamPolicy があります。以下の IAM 事前定義ロール pubsub.admin には、pubsub.topics.setIamPolicy 権限が含まれています。詳細については、Pub/Sub アクセス制御をご覧ください。

  3. 通知の送信先とする既存の Pub/Sub トピックが存在する必要があります。

通知形式

Pub/Sub トピックに送信される通知は、次の 2 つの部分で構成されます。

  • 属性: イベントを記述する Key-Value ペアのセット
  • ペイロード: 変更されたオブジェクトのメタデータを含む文字列

属性

属性とは、BigQuery Data Transfer Service から Pub/Sub トピックに送信されるすべての通知に含まれる Key-Value ペアのことです。ペイロードに関係なく、通知には次の Key-Value ペアが含まれます。

属性名 説明
eventType TRANSFER_RUN_FINISHED 発生したイベントの種類。TRANSFER_RUN_FINISHED が唯一の有効な値です。
payloadFormat JSON_API_V1 オブジェクトのペイロードの形式。JSON_API_V1 が唯一の有効な値です。

ペイロード

ペイロードは、転送実行のメタデータを含む文字列です。ペイロードの種類は現時点では構成できず、将来の API バージョンの変更に備えて用意されています。

ペイロードの種類 説明
JSON_API_V1 ペイロードは、TransferRun のリソース表現を含む、UTF-8 のシリアル化された JSON 形式の文字列になります。

メール通知

メール通知では、転送実行が失敗したときに人が読めるメール メッセージが送信されます。これらのメッセージは、転送を設定したユーザーに送信されます。メッセージは構成できません

他のユーザーが転送実行のメール通知を受信する必要がある場合は、メッセージを配信するメール転送ルールを設定します。Gmail を使用している場合は、Gmail メッセージを別のアカウントに自動転送できます。

メール通知は BigQuery Data Transfer Service から送信され、転送構成と転送実行に関する詳細、さらに、転送実行が失敗した場合には、実行履歴へのリンクも含まれます。例:

From: bigquery-data-transfer-service-noreply@google.com
To: user_who_set_up_transfer
Title: BigQuery Data Transfer Service — Transfer Run Failure —
display_name

Transfer Configuration
Display Name: display_name
Source: data_source
Destination: project_id

Run Summary
Run: run_name
Schedule Time: schedule_time
Run Time: run_time
View Run History

Google LLC 1600 Amphitheatre Parkway, Mountain View, CA 94043

This email was sent because you indicated you are willing to receive Run
Notifications from the BigQuery Data Transfer Service. If you do not wish to
receive such emails in the future, click View Transfer Configuration and
un-check the "Send E-mail Notifications" option.

通知をオンにする

通知をオンにするには:

Console

  • BigQuery Data Transfer Service コンソールの [通知オプション] セクションで次の操作を行います。
  • 切り替えボタンをクリックしてメール通知を有効にします。このオプションを有効にすると、転送の実行が失敗した場合、転送管理者にメール通知が送信されます。
  • [Pub/Sub トピックを選択してください] で、トピック名を選択するか、[トピックを作成する] をクリックします。このオプションで、Pub/Sub の転送実行通知を構成します。

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to get run notification
public class RunNotification {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String datasetId = "MY_DATASET_ID";
    final String pubsubTopicName = "MY_TOPIC_NAME";
    final String query =
        "SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
            + "@run_date as intended_run_date, 17 as some_integer";
    Map<String, Value> params = new HashMap<>();
    params.put("query", Value.newBuilder().setStringValue(query).build());
    params.put(
        "destination_table_name_template",
        Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
    params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
    params.put("partitioning_field", Value.newBuilder().build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Scheduled Query Name")
            .setDataSourceId("scheduled_query")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .setNotificationPubsubTopic(pubsubTopicName)
            .build();
    runNotification(projectId, transferConfig);
  }

  public static void runNotification(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
      System.out.println(
          "\nScheduled query with run notification created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("\nScheduled query with run notification was not created." + ex.toString());
    }
  }
}

Python

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
pubsub_topic = "projects/PROJECT-ID/topics/TOPIC-ID"
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.notification_pubsub_topic = pubsub_topic
update_mask = field_mask_pb2.FieldMask(paths=["notification_pubsub_topic"])

transfer_config = transfer_client.update_transfer_config(
    {"transfer_config": transfer_config, "update_mask": update_mask}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"Notification Pub/Sub topic: '{transfer_config.notification_pubsub_topic}'")

実行通知の料金

Pub/Sub の実行通知を構成すると、Pub/Sub の料金が発生します。詳細については、Pub/Sub の料金ページをご覧ください。

次のステップ