Pub/Sub ストリーミング ソースから読み取る

Cloud Data Fusion は、ストリーミング データ パイプライン内の Pub/Sub ソースをサポートしています。

準備

ロールと権限

Pub/Sub ストリーミング ソースからの読み取りに必要な権限を取得するには、Pub/Sub サブスクリプションにアクセスする際に必要なサービス アカウントに対する Pub/Sub 編集者roles/pubsub.editor)IAM ロールの付与を管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

この事前定義ロールには、Pub/Sub ストリーミング ソースからの読み取りに必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Pub/Sub ストリーミング ソースから読み取るには、次の権限が必要です。

  • pubsub.snapshots.create
  • pubsub.snapshots.delete
  • pubsub.snapshots.seek
  • pubsub.subscriptions.consume
  • pubsub.topics.attachSubscription

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Pub/Sub にアクセスするには、プラグイン プロパティで指定したサービス アカウントに対するロールを付与します。何も指定されていない場合は、Dataproc サービス アカウントに対するロールを付与します。

ロールの付与の詳細については、アクセスの管理をご覧ください。

Pub/Sub ソースをストリーミング データ パイプラインに追加する

  1. インスタンスに移動します:

    1. Google Cloud コンソールで、Cloud Data Fusion のページに移動します。

    2. Cloud Data Fusion ウェブ インターフェースでインスタンスを開くには、[Instances] をクリックしてから、[View instance] をクリックします。

      [インスタンス] に移動

  2. Cloud Data Fusion ウェブ インターフェースで、[Studio] をクリックします。

  3. [Data Pipeline - Realtime] を選択します。

  4. [ソース] メニューで、[Pub/Sub] を選択します。Pub/Sub ストリーミング ソースノードがパイプラインに表示されます。

  5. Pub/Sub ノードで [Properties] をクリックして、ソースを構成します。詳細については、Pub/Sub ストリーミング ソースをご覧ください。

Windower プラグインのない単一の Pub/Sub ソースのサポート

Cloud Data Fusion バージョン 6.9.1 は、単一の Pub/Sub ストリーミング ソースを持ち、Windower プラグインを持たないリアルタイム パイプラインをサポートしています。

  • Pub/Sub ストリーミング ソースには組み込みサポートがあり、データは少なくとも 1 回処理されます。Spark チェックポインティングを有効にする必要はありません。
  • Pub/Sub ストリーミング ソースは、各バッチの先頭で Pub/Sub スナップショットを作成し、各バッチの最後で削除します。
  • Pub/Sub スナップショットの作成には、費用が関連付けられます。詳細については、Pub/Sub の料金をご覧ください。
  • スナップショットの作成は Cloud Audit Logs でモニタリングできます。

Pub/Sub ストリーミング ソースを使用したパイプラインをアップグレードする

Cloud Data Fusion は、6.9.1 以降で作成された Pub/Sub ストリーミング ソースを使用するストリーミング パイプラインの直接アプリケーション アップグレードをサポートしています。

Cloud Data Fusion は、バージョン 6.9.0 以前の Pub/Sub ストリーミング ソースを使用するデータ パイプラインのアップグレードをサポートしていません。代わりに、これらのパイプラインを 6.9.1 にアップグレードします。

  1. インスタンスのアップグレードが計画されたときに、トピックへのデータの公開を停止します。
  2. パイプラインが公開データの処理を終了するまで待ちます。
  3. データの処理が完了したら、パイプラインを停止します。
  4. インスタンスをアップグレードします。
  5. 既存のパイプラインを複製し、最新のプラグインに更新します。
  6. パイプラインをデプロイします。
  7. 新しいパイプラインを実行してデータを読み取ります。

    新しいバージョンでは、Spark チェックポインティングの代わりに、スナップショットを自動的に使用します。

  8. 古いパイプラインを削除します。

次のステップ