よくある質問

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

次のセクションでは、Dataflow に関してよく寄せられる質問をご紹介します。

一般的な質問

追加サポートはどこにありますか?

Google Cloud サポートにアクセスして、Dataflow を含む Google Cloud のサポート パッケージを入手できます。

StackOverflow を使用して、質問の検索や新しい質問の投稿を行うことができます。投稿する場合は、質問に google-cloud-dataflow のタグを付けてください。Google のエンジニアはグループをチェックし、質問に回答しています。

Google 公開バグトラッカーを使用して、製品に関する質問、機能リクエスト、バグや不具合の報告、その他のフィードバックを送信することもできます。

パイプライン インスタンス間でデータを共有できますか?

パイプライン間でデータや処理コンテキストを共有するための Dataflow 固有のパイプライン間通信メカニズムはありません。Cloud Storage のような永続ストレージ、または App Engine のようなメモリ内キャッシュを使用して、パイプライン インスタンス間でデータを共有できます。

特定の時間または間隔でパイプラインを実行するための組み込みのスケジュール設定メカニズムはありますか?

パイプラインの実行を自動化するには、次の操作を行います。

自分の環境でインストールまたは実行されている Dataflow SDK のバージョンはどうすればわかりますか?

インストールの詳細は開発環境によって異なります。Maven を使用している場合は、1 つ以上のローカル Maven レポジトリに複数のバージョンの Dataflow SDK を "インストール" できます。

Java

特定のパイプラインで実行している Dataflow SDK のバージョンを確認するには、DataflowPipelineRunner または BlockingDataflowPipelineRunner で実行中のコンソールの出力を調べます。コンソールには、Dataflow SDK のバージョン情報を含む次のようなメッセージが含まれます。

Python

特定のパイプラインで実行している Dataflow SDK のバージョンを確認するには、DataflowRunner で実行中のコンソールの出力を調べます。コンソールには、Dataflow SDK のバージョン情報を含む次のようなメッセージが含まれます。

Go

特定のパイプラインで実行している Dataflow SDK のバージョンを確認するには、DataflowRunner で実行中のコンソールの出力を調べます。コンソールには、Dataflow SDK のバージョン情報を含む次のようなメッセージが含まれます。

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Dataflow はいくつの DoFn のインスタンスを作成しますか?

Dataflow のメモリ不足エラーのトラブルシューティング ページの DoFn のメモリ使用量セクションをご覧ください。

Cloud Dataflow ジョブの操作

パイプラインの実行中にジョブのワーカーマシン(Compute Engine VM)にアクセスできますか?

Google Cloud Console を使用して、特定のパイプラインの VM インスタンスを表示できます。そこから、SSH を使用して各インスタンスにアクセスできます。ただし、ジョブが完了または失敗すると、Dataflow サービスが自動的にシャットダウンし、VM インスタンスがクリーンアップされます。

Cloud Dataflow モニタリング インターフェースに、ストリーミング ジョブに予約された CPU 時間が表示されないのはなぜですか?

Dataflow サービスでは、ジョブの完了後に予約済みの CPU 時間が報告されます。バインドされないジョブの場合、ジョブがキャンセルされるか失敗した後にのみ予約済みの CPU 時間が報告されることになります。

Cloud Dataflow Monitoring Interface で、最近更新されたストリーミング ジョブの状態とウォーターマーク情報を使用できないのはなぜですか?

更新オペレーションで行われる複数の変更が Dataflow Monitoring Interface に伝播されるまでには数分かかります。ジョブを更新してから 5 分後に、モニタリング インターフェースを更新してみてください。

Dataflow Monitoring Interface でカスタム複合変換が展開して表示されるのはなぜですか?

パイプラインのコードで、複合変換を次のように呼び出した可能性があります。

result = transform.apply(input);

このようにして呼び出された複合変換では予期されるネストが省略されるため、Dataflow Monitoring Interface で展開して表示されることがあります。パイプラインでは、パイプラインの実行時に安定した一意名に関する警告やエラーが生成されることもあります。

これらの問題を回避するには、推奨される形式を使用して変換を呼び出してください。

result = input.apply(transform);

以前は表示されていた、進行中のジョブに関する情報が、Cloud Dataflow Monitoring Interface で表示されなくなりました。どうしてですか?

現在、1 か月以上実行されていた一部の Dataflow ジョブに影響を与える可能性のある既知の問題があります。このようなジョブでは、Dataflow Monitoring Interface でロードに失敗したり、ジョブが以前に表示されていた場合でも古い情報が表示されたりすることがあります。

Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用しているときに、ジョブリストでジョブのステータスを取得することはできます。ただし、この問題が存在する場合は、ジョブの詳細を表示できません。

Apache Beam SDK for Java を使用したプログラミング

既存の ParDo オペレーションに追加(帯域外)データを渡すことはできますか?

はい。ユースケースに応じて従うパターンがいくつかあります。

  • DoFn サブクラスで情報をフィールドとしてシリアル化できます。
  • 匿名の DoFn でメソッドによって参照される変数が自動的にシリアル化されます。
  • DoFn.startBundle() の内部でデータを計算できます。
  • ParDo.withSideInputs を介してデータ内で渡すことができます。

詳しくは、ParDo のドキュメント(特に DoFn の作成と副入力に関するセクション)と ParDo の API for Java リファレンス ドキュメントをご覧ください。

Apache Beam SDK for Python を使用したプログラミング

NameError を処理するにはどうすればよいですか?

Dataflow サービスを使用してパイプラインを実行すると NameError を受け取る一方、ローカルで実行すると(つまり、DirectRunner を使用して実行すると)このエラーが発生しない場合、DoFn において、Dataflow ワーカーでは使用できない、グローバル名前空間にある値を使用していることが考えられます。

デフォルトでは、メイン セッションで定義されたグローバルなインポート、関数、変数は、Dataflow ジョブのシリアル化時に保存されません。たとえば、DoFn がメインファイルで定義され、グローバルな名前区間にあるインポートと関数を参照する場合、--save_main_session パイプライン オプションを True に設定できます。これにより、グローバルな名前空間の状態がピクル化され、Dataflow ワーカーに読み込まれます。

ピクル化できないグローバルな名前空間にオブジェクトが存在する場合は、ピクル化のエラーが発生します。エラーが、Python ディストリビューションで使用可能なモジュールに関するものである場合は、オブジェクトが使用されているモジュールをローカルでインポートすることで、これを解決できます。

たとえば、次のコマンドは使用しません。

import re
…
def myfunc():
  # use re module

次のコマンドを使用します。

def myfunc():
  import re
  # use re module

または、DoFn が複数のファイルにまたがっている場合は、別の方法でワークフローをパッケージ化して依存関係を管理してください。

パイプライン I/O

TextIO ソースとシンクは GZip のような圧縮ファイルをサポートしますか?

はい。Dataflow Java は gzipbzip2 で圧縮されたファイルを読み取ることができます。追加情報については、TextIO のドキュメントをご覧ください。

TextIO ソースで特定のファイルを対象とする正規表現を使用することはできますか?

Dataflow では、一般的なワイルドカード パターンがサポートされます。glob 式は、ファイルパス内の任意の場所で使用できます。ただし、Dataflow は再帰的なワイルドカード(**)をサポートしません。

TextIO 入力ソースでは JSON がサポートされますか?

はい。ただし、Dataflow サービスで入力を並列化できるように、ソースデータを改行で区切る必要があります。

カスタムソースでダイナミック ワーク リバランシングがアクティブになっていないのはなぜですか?

ダイナミック ワーク リバランシングでは、アクティブ化にカスタムソースの getProgress() メソッドの戻り値が使用されます。getProgress() のデフォルト実装では null が返されます。自動スケーリングが確実にアクティブになるように、カスタムソースで getProgress() をオーバーライドして適切な値を返してください。

別の Google Cloud Platform プロジェクト(つまり、Cloud Dataflow を使用しているプロジェクトではないもの)で所有する BigQuery データセットや Pub/Sub トピックまたはサブスクリプションにアクセスするには、どうすればよいですか?

Dataflow を使用しているプロジェクトとは別の Google Cloud プロジェクトに含まれる BigQuery または Pub/Sub データにアクセスする方法については、Dataflow のセキュリティと権限に関するガイドをご覧ください。

ストリーミング

ストリーミング モードでパイプラインを実行するにはどうすればよいですか?

パイプラインの実行時に、コマンドライン--streaming フラグを設定できます。また、パイプラインを作成する際に、プログラムによってストリーミング モードを設定することもできます。

ストリーミング モードではどのようなデータソースとシンクがサポートされますか?

Pub/Sub からのストリーミング データの読み込み、Pub/Sub または BigQuery へのストリーミング データの書き込みを実行できます。

ストリーミング モードの現在の制限事項は何ですか?

バッチソースはまだストリーミング モードではサポートされません。

ワーカーの大きなプールでパイプラインを更新した場合、ストリーミング ジョブが適切にアップスケーリングされないのはなぜですか?

Java

ストリーミング エンジンを使用しないストリーミング ジョブでは、元のジョブの開始時に割り当てられたワーカーと永続ディスク リソースの数を超えてスケーリングすることはできません。Dataflow ジョブを更新する場合、新しいジョブの指定でワーカー数を増やすときには、元のジョブに対して指定した --maxNumWorkers と等しいワーカー数だけを指定できます。

Python

ストリーミング エンジンを使用しないストリーミング ジョブでは、元のジョブの開始時に割り当てられたワーカーと永続ディスク リソースの数を超えてスケーリングすることはできません。Dataflow ジョブを更新する場合、新しいジョブの指定でワーカー数を増やすときには、元のジョブに対して指定した --max_num_workers と等しいワーカー数だけを指定できます。

Go

ストリーミング エンジンを使用しないストリーミング ジョブでは、元のジョブの開始時に割り当てられたワーカーと永続ディスク リソースの数を超えてスケーリングすることはできません。Dataflow ジョブを更新する場合、新しいジョブの指定でワーカー数を増やすときには、元のジョブに対して指定した --max_num_workers と等しいワーカー数だけを指定できます。

ストリーミング自動スケーリング

Dataflow の自動スケーリングのトラブルシューティングをご覧ください。

Cloud Dataflow を使用するように Google Cloud Platform プロジェクトを設定する

Cloud Dataflow で使用しているプロジェクトが読み取り / 書き込み対象の Cloud Storage バケットを所有しているかどうかを確認するには、どうすればよいですか?

Google Cloud プロジェクトが特定の Cloud Storage バケットを所有しているかどうかを確認するには、次のコンソール コマンドを使用できます。

gsutil acl get gs://<your-bucket>

このコマンドは、次のような JSON 文字列を出力します。

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

関連するエントリは、"role" がオーナーであるエントリです。関連付けられた projectNumber は、そのバケットを所有しているプロジェクトを示します。プロジェクト番号がプロジェクトの番号と一致しない場合は、次のいずれかを行う必要があります。

  • プロジェクトによって所有されている新しいバケットを作成します。
  • バケットに適切なアカウント アクセス権を付与します。

Cloud Dataflow プロジェクトが所有する新しいバケットを作成するにはどうすればよいですか?

Google Cloud プロジェクト内に、Dataflow を使用する新しいバケットを作成するには、次のコンソール コマンドを使用できます。

gsutil mb -p <Project to own the bucket> <bucket-name>

別のプロジェクトが所有するバケットを、Cloud Dataflow で使用している Google Cloud Platform プロジェクトで読み取り / 書き込み可能にするには、どうすればよいですか?

別の Google Cloud プロジェクトが所有する Google Cloud リソースに Dataflow パイプラインでアクセスする方法については、Dataflow のセキュリティと権限ガイドをご覧ください。