よくある質問

次のセクションでは、Cloud Dataflow に関してよく寄せられる質問をご紹介します。

一般的な質問

追加サポートはどこにありますか?

Google Cloud Platform(GCP)サポートにアクセスして、Cloud Dataflow を含む GCP のサポート パッケージを入手できます。

StackOverflow を使用して、質問を検索したり、新しい質問を投稿したりできます。投稿する場合は、質問に google-cloud-dataflow のタグを付けてください。Google のエンジニアは、質問に回答するためにこのグループをチェックしています。

質問、機能のリクエスト、バグや不具合の報告、その他のフィードバックを UserVoice フォーラムに投稿することもできます。

パイプライン インスタンス間でデータを共有できますか?

パイプライン間でデータや処理コンテキストを共有するための Cloud Dataflow 固有のパイプライン間通信メカニズムはありません。Cloud Storage のような永続ストレージ、または App Engine のようなメモリ内キャッシュを使用して、パイプライン インスタンス間でデータを共有できます。

特定の時間または間隔でパイプラインを実行するための組み込みのスケジュール設定メカニズムはありますか?

パイプラインの実行を自動化するには、次の操作を行います。

使用している環境でどのバージョンの Cloud Dataflow SDK がインストール / 実行されているかを確認するには、どうすればよいですか?

インストールの詳細は開発環境によって異なります。Maven を使用している場合、1 つ以上のローカル Maven リポジトリに、複数の異なるバージョンの Cloud Dataflow SDK が「インストール」されている場合もあります。

Java

特定のパイプラインで実行している Cloud Dataflow SDK のバージョンを確認するには、DataflowPipelineRunner または BlockingDataflowPipelineRunner で実行中のコンソールの出力を調べます。コンソールには、Cloud Dataflow SDK のバージョン情報を含む次のようなメッセージが表示されます。

Python

特定のパイプラインで実行している Cloud Dataflow SDK のバージョンを確認するには、DataflowRunner で実行中のコンソールの出力を調べます。コンソールには、Cloud Dataflow SDK のバージョン情報を含む次のようなメッセージが表示されます。

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Cloud Dataflow ジョブの操作

パイプラインの実行中にジョブのワーカーマシン(Compute Engine VM)にアクセスできますか?

Google Cloud Platform Console を使用して、特定のパイプラインの VM インスタンスを表示できます。そこから、SSH を使用して各インスタンスにアクセスできます。ただし、ジョブが完了または失敗すると、Cloud Dataflow サービスが自動的にシャットダウンし、VM インスタンスがクリーンアップされます。

Cloud Dataflow Monitoring Interface に、ストリーミング ジョブに予約された CPU 時間が表示されないのはなぜですか?

Cloud Dataflow サービスでは、ジョブの完了後に予約済みの CPU 時間が報告されます。バインドされないジョブの場合、これはジョブがキャンセルされるか失敗した後にのみ予約済みの CPU 時間が報告されることを意味します。

Cloud Dataflow Monitoring Interface で、最近更新されたストリーミング ジョブの状態とウォーターマーク情報を使用できないのはなぜですか?

更新オペレーションで行われる複数の変更が Cloud Dataflow Monitoring Interface に伝播されるまでには数分かかります。ジョブを更新してから 5 分後に、モニタリング インターフェースを更新してみてください。

Cloud Dataflow Monitoring Interface でカスタム複合変換が展開して表示されるのはなぜですか?

パイプラインのコードで、複合変換を次のように呼び出した可能性があります。

result = transform.apply(input);

このようにして呼び出された複合変換では予期されるネストが省略されるため、Cloud Dataflow Monitoring Interface で展開して表示されることがあります。パイプラインでは、パイプラインの実行時に安定した一意名に関する警告やエラーが生成されることもあります。

これらの問題を回避するには、推奨される形式を使用して変換を呼び出してください。

result = input.apply(transform);

以前は表示されていた、進行中のジョブに関する情報が、Cloud Dataflow Monitoring Interface で表示されなくなりました。どうしてですか?

現在、1 か月以上実行されている一部の Cloud Dataflow ジョブに影響を与える可能性のある既知の問題があります。このようなジョブは、以前は表示されていたとしても、Cloud Dataflow Monitoring Interface に読み込めなかったり、古い情報が表示されたりする可能性があります。

Cloud Dataflow Monitoring Interface または Cloud Dataflow コマンドライン インターフェースの使用中は、ジョブリストでジョブの状態を確認できます。ただし、この問題が存在する場合は、ジョブの詳細を表示できません。

Apache Beam SDK for Java を使用したプログラミング

既存の ParDo オペレーションに追加(帯域外)データを渡すことはできますか?

はい。ユースケースに応じて従うパターンがいくつかあります。

  • DoFn サブクラスで情報をフィールドとしてシリアル化できます。
  • 匿名の DoFn でメソッドによって参照される変数が自動的にシリアル化されます。
  • DoFn.startBundle() の内部でデータを計算できます。
  • ParDo.withSideInputs を介してデータ内で渡すことができます。

詳しくは、ParDo のドキュメント、特に DoFn の作成と副入力に関するセクション、および ParDo の API for Java リファレンス ドキュメントをご覧ください。

Java の例外は Cloud Dataflow でどのように処理されますか?

データの処理中にパイプラインによって例外がスローされることがあります。これらのエラーのいくつかは一過性ですが(たとえば、一時的に外部サービスにアクセスできないなど)、破損したか解析不能な入力データや計算中の null ポインタを原因とするエラーなど、一部のエラーは永続的です。

エラーが任意のバンドル内の要素についてスローされた場合、Cloud Dataflow はそのバンドル内の要素を処理し、バンドル全体を再試行します。バッチモードで実行している場合、失敗した項目を含むバンドルは 4 回再試行されます。単一のバンドルが 4 回失敗した場合はパイプラインが完全に失敗します。ストリーミング モードで実行している場合、失敗した項目を含むバンドルは無期限に再試行され、パイプラインが恒久的に滞るおそれがあります。

ユーザーコード(DoFn インスタンスなど)における例外が Cloud Dataflow モニタリング インターフェースで報告されます。パイプラインを BlockingDataflowPipelineRunner で実行すると、コンソールまたはターミナル ウィンドウにもエラー メッセージが表示されます。

例外ハンドラを追加することでコード内のエラーから保護することを検討してください。たとえば、ParDo で実行されたいくつかのカスタム入力検証が失敗する要素を削除する場合は、ParDo 内で try/catch ブロックを使用して、例外を処理し、要素を削除します。また、Aggregator を使用してエラー件数をトラッキングすることもできます。

Cloud Dataflow SDK for Python を使用したプログラミング

NameError を処理するにはどうすればよいですか?

Cloud Dataflow サービスを使用してパイプラインを実行すると NameError を受け取る一方、ローカルで実行すると(つまり、DirectRunner を使用して実行すると)このエラーが発生しない場合、DoFn で、Cloud Dataflow ワーカーでは使用できない、グローバル名前空間にある値を使用していることが考えられます。

デフォルトでは、メイン セッションで定義されたグローバルなインポート、関数、変数は、Cloud Dataflow ジョブのシリアル化時に保存されません。たとえば、DoFn がメインファイルで定義され、グローバルな名前区間にあるインポートと関数を参照する場合、--save_main_session パイプライン オプションを True に設定できます。これにより、グローバルな名前空間の状態がピクル化され、Cloud Dataflow ワーカーに読み込まれます。

ピクル化できないグローバルな名前空間にオブジェクトが存在する場合は、ピクル化のエラーが発生します。エラーが、Python ディストリビューションで使用可能なモジュールに関するものである場合は、オブジェクトが使用されているモジュールをローカルでインポートすることで、これを解決できます。

たとえば、次のコマンドは使用しません。

import re
…
def myfunc():
  # use re module

次のコマンドを使用します。

def myfunc():
  import re
  # use re module

または、DoFn が複数のファイルにまたがっている場合は、別の方法でワークフローをパッケージ化して依存関係を管理してください。

パイプライン I/O

TextIO ソースとシンクは GZip のような圧縮ファイルをサポートしますか?

はい。Cloud Dataflow Java は gzipbzip2 で圧縮されたファイルを読み取ることができます。追加情報については、TextIO のドキュメントをご覧ください。

TextIO ソースで特定のファイルを対象とする正規表現を使用することはできますか?

Cloud Dataflow では、一般的なワイルドカード パターンがサポートされます。glob 式は、ファイルパス内の任意の場所で使用できます。ただし、Cloud Dataflow は再帰的なワイルドカード(**)をサポートしません。

TextIO 入力ソースでは JSON がサポートされますか?

はい。ただし、Cloud Dataflow サービスで入力と出力を並列化できるように、ソースデータを改行で区切る必要があります。

カスタムソースでダイナミック ワーク リバランシングがアクティブになっていないのはなぜですか?

ダイナミック ワーク リバランシングでは、アクティブ化にカスタムソースの getProgress() メソッドの戻り値が使用されます。getProgress() のデフォルト実装では null が返されます。自動スケーリングが確実にアクティブになるように、カスタムソースで getProgress() をオーバーライドして適切な値を返してください。

別の Google Cloud Platform プロジェクト(つまり、Cloud Dataflow を使用しているプロジェクトではないもの)で所有する BigQuery データセットや Pub/Sub トピックまたはサブスクリプションにアクセスするには、どうすればよいですか?

Cloud Dataflow を使用しているプロジェクトとは別のプロジェクトに含まれる BigQuery または Cloud Pub/Sub データにアクセスする方法については、Cloud Dataflow のセキュリティと権限に関するガイドをご覧ください。

ストリーミング

ストリーミング モードでパイプラインを実行するにはどうすればよいですか?

パイプラインの実行時に、コマンドライン--streaming フラグを設定できます。また、パイプラインを作成する際に、プログラムによってストリーミング モードを設定することもできます。

ストリーミング モードではどのようなデータソースとシンクがサポートされますか?

Cloud Pub/Sub からのストリーミング データの読み込み、Cloud Pub/Sub または BigQuery へのストリーミング データの書き込みを実行できます。

ストリーミング モードの現在の制限事項は何ですか?

Cloud Dataflow のストリーミング モードには次の制限があります。

  • バッチソースはまだストリーミング モードではサポートされません。
  • Cloud Dataflow サービスの自動スケーリング機能はベータ版でサポートされています。

Pub/Sub から読み取るストリーミング パイプラインの速度が低下しているようです。どうすればよいですか?

プロジェクトの Cloud Pub/Sub 割り当てが不十分な可能性があります。429 (Rate limit exceeded) クライアント エラーをチェックすることにより、プロジェクトに不十分な割り当てがあるかどうかを調べることができます。

  1. Google Cloud Platform Console に移動します。
  2. 左側のメニューで、[API とサービス] を選択します。
  3. 検索ボックスで、Cloud Pub/Sub を検索します。
  4. [使用状況] タブをクリックします。
  5. [レスポンス コード] を確認し、(4xx) クライアント エラーコードを探します。

ワーカーの大きなプールでパイプラインを更新した場合、ストリーミング ジョブが適切にアップスケーリングされないのはなぜですか?

Cloud Dataflow ジョブを更新する場合、新しいジョブの指定でワーカー数を増やすときには、元のジョブに対して指定した --maxNumWorkers と等しいワーカー数だけを指定できます。元のジョブの開始時に割り当てた元のワーカー数(したがって、永続ディスク リソース)を超えてスケールすることはできません。

これは、Cloud Dataflow サービスでの既知の問題であり、現在積極的に調査が進められています。

ストリーミング自動スケーリング

は、

注: Cloud Dataflow SDK for Python では現在、ストリーミングの自動スケーリングをサポートしていません。

ワーカー数を固定にするにはどうすればいいですか?

ストリーミング自動スケーリングを有効にするには、オプトインする必要がありますが、デフォルトでは、オプトインが有効になっていません。現在のオプションのセマンティックは変更されていないため、ワーカー数を固定にするには何もする必要はありません。

自動スケーリングで請求額が増えることが心配です。自動スケーリングを制限するにはどうすればよいですか?

--maxNumWorkers を指定して、ジョブの処理に使用するスケーリング範囲を制限できます。

ストリーミング自動スケーリングのパイプラインのスケーリング範囲はどれくらいですか?

ストリーミング自動スケーリングのパイプラインに使用するワーカー数の範囲は、N/15~N ワーカーです。この N は --maxNumWorkers の値です。たとえば、パイプラインで定常状態のワーカーが 3 つまたは 4 つ必要な場合、--maxNumWorkers=15 を設定できます。これにより、パイプラインは 1~15 の間でワーカーの自動スケーリングを行います。

--maxNumWorkers の最大値は 1000 です。

自動スケーリングで使用できるワーカーの最大数はいくつですか?

Cloud Dataflow は、プロジェクトの Compute Engine インスタンス数の割り当てまたは maxNumWorkers のいずれか少ないほうの制限内で動作します。

ストリーミング パイプラインで自動スケーリングをオフにできますか?

はい。--autoscalingAlgorithm=NONE を設定できます。スケーリング範囲内の numWorkers を含む固定クラスタ仕様(手動スケーリングのドキュメントを参照)を指定して、パイプラインを更新してください。

ストリーミング パイプラインのスケーリング範囲を変更できますか?

はい。ただし、[更新]* を使用してこの範囲を変更することはできません。[キャンセル] または [ドレイン] を使用してパイプラインを停止し、新しい適切な maxNumWorkers を使ってパイプラインを再デプロイする必要があります。

Cloud Dataflow を使用するように Google Cloud Platform プロジェクトを設定する

Cloud Dataflow で使用しているプロジェクトが読み取り / 書き込み対象の Cloud Storage バケットを所有しているかどうかを確認するには、どうすればよいですか?

GCP プロジェクトが特定の Cloud Storage バケットを所有しているかどうかを確認するには、次のコンソール コマンドを使用できます。

gsutil acl get gs://<your-bucket>

このコマンドは、次のような JSON 文字列を出力します。

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

関連するエントリは、"role" がオーナーであるエントリです。関連付けられた projectNumber は、そのバケットを所有しているプロジェクトを示します。プロジェクト番号がプロジェクトの番号と一致しない場合は、次のいずれかを行う必要があります。

  • プロジェクトによって所有されている新しいバケットを作成します。
  • バケットに適切なアカウント アクセス権を付与します。

Cloud Dataflow プロジェクトが所有する新しいバケットを作成するにはどうすればよいですか?

GCP プロジェクト内に、Cloud Dataflow を使用する新しいバケットを作成するには、次のコンソール コマンドを使用できます。

gsutil mb -p <Project to own the bucket> <bucket-name>

別のプロジェクトが所有するバケットを、Cloud Dataflow で使用している Google Cloud Platform プロジェクトで読み取り / 書き込み可能にするには、どうすればよいですか?

別の GCP プロジェクトが所有する GCP リソースに Cloud Dataflow パイプラインでアクセスする方法について、詳しくは Cloud Dataflow のセキュリティと権限ガイドをご覧ください。

Cloud Dataflow ジョブを実行しようとすると、「Some Cloud APIs need to be enabled for your project in order for Cloud Dataflow to run this job」(Cloud Dataflow でこのジョブを実行するには、プロジェクトでいくつかの Cloud API を有効にする必要があります)というエラーが表示されます。どうすればよいですか?

Cloud Dataflow ジョブを実行するには、プロジェクトで次の GCP API を有効にする必要があります。

  • Compute Engine API(Compute Engine)
  • Cloud Logging API
  • Cloud Storage
  • Cloud Storage JSON API
  • BigQuery API
  • Cloud Pub/Sub
  • Cloud Datastore API

手順について詳しくは、GCP API の有効化に関するスタートガイド セクションをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。