サンプルの WordCount パイプライン

クイックスタートの手順をまだ行っていない場合は、こちらを続行する前に行ってください。

サンプル WordCount では、テキストを読み取り、テキスト行を個別の単語にトークン化し、それらの単語ごとに頻度計算を実行できる処理パイプラインの設定方法について説明します。Dataflow SDK には、以下に示す 4 つのサンプル WordCount が含まれています。これらは相互をベースに構築されており、処理の詳細度によって分けられています。いずれのサンプルも、入力テキストにはシェイクスピア作品の文章が使用されています。

各サンプル WordCount は、Dataflow SDK に異なるコンセプトを提供するものとなっています。

  • Minimal WordCount は、Dataflow パイプラインの作成に関する基本原則を示すものです。
  • WordCount は、再利用や保守が可能なパイプラインを作成するための一般的なベスト プラクティスを導入したものとなっています。
  • Debugging WordCount は、ロギングとデバッグに関するプラクティスを取り入れたものとなっています。
  • Windowed WordCount は、Dataflow のプログラミング モデルを使用して、制限付きと制限なしの両方のデータセットを処理する方法を示したものです。

まずは、最もシンプルな Minimal WordCount サンプルから理解します。パイプライン作成の基本原則に慣れたら、WordCount で Dataflow プログラムの記述に関するベスト プラクティスを学習します。その後、Debugging WordCount に進んで、ロギングとデバッグに関する一般的なプラクティスの活用方法を理解します。最後に、Windowed WordCount で、制限付きデータセットと制限なしデータセットの両方に同じ計算パターンを使用する方法について学習します。

MinimalWordCount

Minimal WordCount はシンプルなパイプラインを示したものです。このパイプラインでは、Google Cloud Storage にあるファイルからテキストのブロックを読み取り、変換を適用して単語をトークン化し、単語数をカウントして、Cloud Storage バケット内の出力ファイルにデータを書き込むことができます。このサンプルでは、入力ファイルと出力ファイルの場所をハードコードし、エラーチェックは実行しません。このサンプルは単に、Dataflow パイプラインの作成方法の「ベアボーン」を示すものです。パイプラインの入力ソースと出力ソースをパラメータ化する方法や、その他のベスト プラクティスについては、以降のサンプルで示します。

Java

主なコンセプト:
  1. パイプラインを作成する
  2. パイプラインに変換を適用する
    • 入力を読み取る(このサンプルの場合: テキスト ファイルを読み取る)
    • ParDo 変換を適用する
    • SDK で提供されている変換を適用する(このサンプルの場合: Count
    • 出力を書き込む(このサンプルの場合: Google Cloud Storage に書き込む)
  3. パイプラインを実行する

以下のセクションでは、Minimal WordCount パイプラインから抜粋した関連コードとともに、上記のコンセプトについて詳しく説明します。

パイプラインを作成する

Cloud Dataflow パイプラインを作成するための最初のステップは、Pipeline Options オブジェクトを作成することです。このオブジェクトを使用すると、パイプラインについてのさまざまなオプションを設定できます。たとえば、パイプラインを実行するパイプライン ランナー、プロジェクトの ID、パイプラインのファイルを保存するステージング ロケーション(jar をクラウド上でアクセス可能にするために使用される場所)などです。このサンプルではこれらのオプションをプログラムで設定しますが、多くの場合、パイプライン オプションの設定にはコマンドラインの引数が使用されます

サンプルでは、PipelineRunner として BlockingDataflowPipelineRunner を指定しています。これは、パイプラインがクラウド上で Google Cloud Dataflow サービスを使用して実行されるようにするためです。クラウド上でパイプラインを実行するために設定できるオプションは他にもあります。また、このオプションを完全に省略することもできます。その場合、パイプラインはデフォルトのランナーによってローカルに実行されます。これらについては、後述の 2 つの WordCount サンプルで示されており、実行パラメータを指定するでさらに詳しく説明されています。

Java

DataflowPipelineOptions options = PipelineOptionsFactory.create()
    .as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("SET-YOUR-PROJECT-ID-HERE");
// The 'gs' URI means that this is a Google Cloud Storage path
options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");

次のステップは、上記で作成したオプションを使用して、Pipeline オブジェクトを作成することです。Pipeline オブジェクトは、実行される変換(そのパイプラインに関連付けられた変換)のグラフを構築します。

Java

Pipeline p = Pipeline.create(options);

パイプライン オブジェクトの詳しい説明と、その機能のしくみについては、パイプラインを参照してください。

パイプラインの変換を適用する

Minimal WordCount パイプラインにはいくつかの変換が含まれています。これらは、データをパイプライン内に読み込み、データを操作または変換し、結果を書き出すためのものです。各変換は、パイプライン内でのオペレーションを表しています。

各変換は、なんらかの入力(データなど)を受け取り、なんらかの出力データを生成します。入力データと出力データは、SDK クラス PCollection によって表されます。 PCollection は、Dataflow SDK によって提供される特殊なクラスであり、事実上任意のサイズのデータセット(制限なしデータセットを含む)を表すために使用できます。

図 1 は、パイプラインのデータフローを示したものです。

このパイプラインは、TextIO.Read 変換を使用して、入力データファイルに保存されたデータから PCollection を作成します。CountWords 変換は、Raw テキストの PCollection からワードカウントの PCollection を生成します。TextIO.Write は、フォーマットされたワードカウントを出力データファイルに書き込みます。
図 1: パイプラインのデータフロー。

Minimal WordCount パイプラインには、5 つの変換が含まれています。

  1. テキスト ファイルの Read 変換は、Pipeline オブジェクト自体に適用され、出力として PCollection を生成します。出力の PCollection に含まれる各要素は、入力ファイルから取得された 1 行のテキストを表します。
  2. Java

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
    
  3. (匿名クラスとしてインラインで定義される)DoFn を各要素で呼び出す ParDo 変換。これは、テキスト行を個別の単語にトークン化します。この変換の入力は、前述の TextIO.Read 変換で生成された PCollection のテキスト行です。ParDo 変換は、新しい PCollection を出力します。この出力に含まれる各要素は、テキスト内の個別の単語を表します。
  4. Java

    変換に名前を付けることができ、それが Dataflow Monitoring Interface に表示されます。名前を付けるには、このサンプルで示すように .named() オペレーションを使用します。Dataflow サービスによってパイプラインが実行されると、モニタリング インターフェースによって、各 ParDo 変換がいつ実行されるかが示されます。

      .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
         @Override
         public void processElement(ProcessContext c) {
           for (String word : c.element().split("[^a-zA-Z']+")) {
             if (!word.isEmpty()) {
               c.output(word);
             }
           }
         }
      }))
    
  5. SDK に付属の Count 変換は、任意の型の PCollection を取得して Key-Value ペアからなる PCollection を返す汎用変換です。各キーは、入力のコレクションから取得された一意の要素を表し、各値は、入力コレクション内でのキーの出現回数を表します。

    このパイプラインでは、Count の入力は前述の ParDo によって生成された個別の単語の PCollection であり、出力は Key-Value ペアの PCollection です。各キーは、テキスト内の一意の単語を表し、それに関連付けられた値は、各単語の出現回数です。
  6. Java

      .apply(Count.<String>perElement())
    
  7. 次は、一意の単語と出現回数の各 Key-Value ペアを、出力ファイルへの書き込みに適した印刷可能文字列へとフォーマットする変換です。
  8. Java

    MapElements は、単純な ParDo をカプセル化する高水準の複合変換です。MapElements は、入力 PCollection 内の要素ごとに厳密に 1 つの出力要素を生成する関数を適用します。この MapElements は、フォーマットを実行する SimpleFunction(匿名クラスとしてインラインで定義される)を呼び出します。この MapElements は、Count で生成された Key-Value ペアの PCollection を入力として取り、印刷可能文字列の PCollection を新規に生成します。

      .apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
         @Override
         public String apply(KV<String, Long> element) {
           return element.getKey() + ": " + element.getValue();
         }
      }))
    
  9. テキスト ファイルの Write です。この変換は、フォーマットされた String の最終的な PCollection を入力として取り、各要素を出力のテキスト ファイルに書き込みます。入力の PCollection に含まれる各要素は、結果の出力ファイルに含まれる 1 行のテキストを表します。
  10. Java

      .apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
    

    Write 変換は、型 PDone の自明な結果値を生成します。このケースでは、この値は無視されます。

パイプラインを実行する

run メソッドを呼び出してパイプラインを実行します。これにより、パイプラインの作成時に指定したパイプライン ランナーによってパイプラインが実行されます。

Java

p.run();

WordCount サンプル

この WordCount サンプルには、パイプラインの読み取り、書き込み、保守を容易にするための推奨プログラミング プラクティスが複数導入されています。必ずしも必要なものではありませんが、これらのプラクティスを使用することで、パイプラインをより柔軟に実行できたり、パイプラインのテストが行いやすくなったり、パイプラインのコードが再利用可能になる場合があります。

このセクションは、読者がパイプライン作成の基本コンセプトを十分に理解していることを前提としています。まだ理解が十分でないと判断される場合は、上のセクション Minimal WordCount をお読みください。

Java

新しいコンセプト:
  1. 明示的な DoFn を使用して ParDo を適用する
  2. 複合変換を作成する
  3. パラメータ化可能な PipelineOptions を使用する

以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。

明示的な DoFn を指定する

ParDo 変換を使用する際は、入力 PCollection 内の各要素に適用される処理オペレーションを指定する必要があります。この処理オペレーションは、SDK クラス DoFn のサブクラスです。前述の(Minimal WordCount)セクションのサンプル パイプラインでは、ParDoDoFn サブクラスを、匿名の内部クラス インスタンスとしてインラインで作成しました。

ただし多くの場合は、DoFn をグローバル レベルで定義したほうが、単体テストが行いやすくなり、ParDo コードがより読み取りやすくなるため合理的です。

前述の Minimal WordCount サンプルで述べたように、パイプラインを実行する際、Dataflow Monitoring Interface は各 ParDo 変換がいつ実行されるかを示します。Dataflow サービスは、渡した DoFn の名前から ParDo 変換の変換名を自動的に生成します。たとえば、FormatAsTextFn() を適用する ParDo は、モニタリング インターフェースに ParDo(FormatAsText) として表示されます。

Java

このサンプルでは、DoFn は静的クラスとして定義されます。

/** A DoFn that converts a Word and Count into a printable string. */
static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
  ...

  @Override
  public void processElement(ProcessContext c) {
    ...
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform.
  p.apply(...)
   .apply(...)
   .apply(ParDo.of(new FormatAsTextFn()))
   ...
}

ParDo 変換用に DoFn サブクラスを作成して指定する方法については、ParDo による並列処理をご覧ください。

複合変換を作成する

複数の変換または ParDo ステップで構成された処理オペレーションがある場合は、それを PTransform のサブクラスとして作成できます。PTransform サブクラスを作成すると、再利用可能な複合変換を作成でき、パイプラインの構造がわかりやすいモジュール型になり、簡単に単体テストを行えるようになります。

また、PTransform サブクラスを使ってパイプラインの論理構造を明示的にすることで、パイプラインのモニタリングがより簡単になります。Dataflow サービスによってパイプラインの最終的な最適化構造が作成されると、作成した変換が Dataflow Monitoring Interface によって使用され、パイプラインの構造がより正確に反映されます。

Java

このサンプルでは、2 つの変換が PTransform サブクラスの CountWords としてカプセル化されます。CountWords には、ExtractWordsFn と SDK 付属の Count 変換を実行する ParDo が含まれます。

CountWords が定義されたら、最終的な入力と出力を指定します。入力は抽出オペレーションのための PCollection<String> で、出力はカウント オペレーションによって生成される PCollection<KV<String, Long>> です。

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}

パラメータ化可能な PipelineOptions を使用する

前述のサンプル Minimal WordCount では、パイプラインの作成時にさまざまな実行オプションを設定しました。このサンプルでは、PipelineOptions を拡張して独自のカスタム構成オプションを定義します。

コマンドライン パーサーによって処理される独自の引数を追加し、それらのデフォルト値を指定できます。その後、パイプライン コード内のオプション値にアクセスできます。

Minimal WordCount サンプルでは、パイプライン オプションをハードコードしました。ただし、PipelineOptions を作成する最も一般的な方法は、コマンドライン引数を解析する方法です。

Java

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}

WordCount サンプルをデバッグする

Debugging WordCount サンプルでは、パイプライン コードを実装するうえでのベスト プラクティスを示します。Dataflow Monitoring InterfaceAggregators を使用すると、パイプラインの実行時にパイプラインの可視性を高めることができます。

Java

また、SDK の DataflowAssert を使用して、変換の出力をパイプラインの複数のステージでテストできます。

Java

新しいコンセプト:
  1. Dataflow Monitoring Interface でログを表示する
  2. Dataflow ワーカーのログレベルを管理する
  3. Aggregators を作成する
  4. DataflowAssert を通じてパイプラインをテストする

以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。

Dataflow Monitoring Interface でログを表示する

Google Cloud Logging は、Dataflow ジョブのすべてのワーカーから、Google Cloud Platform Console 内の 1 つの場所にログを集約します。Dataflow Monitoring Interface を使用すると、Dataflow が Dataflow ジョブを完了するために起動したすべての Compute Engine インスタンスのログを表示できます。パイプラインの DoFn インスタンスにログ ステートメントを追加すると、パイプラインの実行時にそれらを Monitoring Interface に表示できます。

Java

次の SLF4J ロガーでは、FilterTextFn の完全修飾クラス名がロガー名として使用されています。このロガーによって出力されたすべてのログ ステートメントはこの名前によって参照され、Dataflow Monitoring Interface に表示されます(適切なログレベル設定が指定された場合)。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebuggingWordCount {
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...
    private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
    ...

    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        // Log at the "DEBUG" level each element that we match. When executing this pipeline
        // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI
        // only if the log level is set to "DEBUG" or lower.
        LOG.debug("Matched: " + c.element().getKey());
        ...
      } else {
        // Log at the "TRACE" level each element that is not matched. Different log levels
        // can be used to control the verbosity of logging providing an effective mechanism
        // to filter less important information.
        LOG.trace("Did not match: " + c.element().getKey());
        ...
      }
    }
  }
}

Dataflow ワーカーのログレベルを管理する

Java

ユーザーコードを実行する Dataflow ワーカーは、デフォルトで、INFO ログレベル以上で Cloud Logging にロギングされるように構成されます。次のオプションを指定すれば、特定のロギング名前空間のログレベルをオーバーライドすることもできます。

--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}

たとえば、次のように指定したとします。

--workerLogLevelOverrides={"com.example":"DEBUG"}

Dataflow サービスを使用してこのパイプラインを実行すると、Monitoring Interface にはデフォルトの INFO レベル以上のログに加え、com.example パッケージに関して DEBUG レベル以上のログが記録されます。

また、次のように指定すれば、Dataflow ワーカーのデフォルトのロギング構成をオーバーライドできます。

--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>

たとえば、次のように指定したとします。

--defaultWorkerLogLevel=DEBUG

Dataflow サービスを使用してこのパイプラインを実行すると、Monitoring Interface には DEBUG レベル以上のすべてのログが記録されます。なお、デフォルトのワーカー ログレベルを TRACE または DEBUG に変更すると、ログの情報量が大幅に増加します。

詳細については、Cloud Dataflow 内でのロギングを参照してください。

アグリゲータを作成する

カスタム アグリゲータを使用すると、パイプラインの実行時にパイプライン内の値を追跡できます。これらの値は、Dataflow サービスを使用してパイプラインを実行する際に Dataflow Monitoring Interface に表示されます。

アグリゲータは、それらを作成した ParDo 変換の実行がシステムによって開始されるか、それらの初期値が変更されるまでは表示されない場合があります。表示後は、モニタリング インターフェースの [ジョブの概要] の下部でアグリゲータを確認できます。

以下のカスタム アグリゲータは、一致した単語数と一致しない単語数を追跡します。

Java

public class DebuggingWordCount {
  /** A DoFn that filters for a specific key based upon a regular expression. */
  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    private final Aggregator<Long, Long> matchedWords =
        createAggregator("matchedWords", new Sum.SumLongFn());
    private final Aggregator<Long, Long> unmatchedWords =
      createAggregator("umatchedWords", new Sum.SumLongFn());

    @Override
    public void processElement(ProcessContext c) {
      if (filter.matcher(c.element().getKey()).matches()) {
        ...
        matchedWords.addValue(1L);
        ...
      } else {
        ...
        unmatchedWords.addValue(1L);
      }
    }
  }
}

バッチ パイプラインとストリーミング パイプラインのアグリゲータ

バッチ パイプラインのアグリゲータには一貫性があります。成功したバンドルに対しては一度だけコミットされ、失敗したバンドルに対してはコミットされません。

ストリーミング パイプラインでは、アグリゲータはより寛容なセマンティクスを提供します。成功したバンドルからの提供はベスト エフォートであり、失敗したバンドルは最終的な値に反映される可能性があります。

DataflowAssert を通じてパイプラインをテストする

Java

DataflowAssert は、Hamcrest collection matcher のスタイルを使用した便利な PTransforms のセットです。パイプライン レベルでのテストを作成する際にこれを使用すると、PCollections の内容を検証できます。DataflowAssert は、小さなデータセットを使用した単体テストで使用するのが最善ですが、ここでは学習ツールとして示されています。

以下のコードでは、フィルタされた一連の単語が予想のカウント数に一致するかどうかを検証しています。なお、DataflowAssert では出力は提供されません。パイプラインが正常に完了すれば、結果が予想と一致したことになります。詳細については、パイプラインのテスト方法をご覧ください。単体テストの例については、DebuggingWordCountTest をご覧ください。

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}

WindowedWordCount

Java

このサンプル(WindowedWordCount)では、前述のサンプルと同様にテキスト内の単語をカウントしますが、いくつかの高度なコンセプトが導入されています。WindowedWordCount の入力は、固定のデータセット(前述のサンプルと同様)でも、制限なしのデータ ストリームでも構いません。

Dataflow SDK は、制限付きと制限なしの両方の入力型を処理できる、単一のパイプラインを作成できるという点で便利です。入力が制限なしの場合は、パイプラインの PCollections も制限なしになります。制限付きの入力についても同様です。

このセクションを読む前に、ご自身がパイプライン作成の基本原則について理解していることと、それらに慣れていることを確認してください。

新しいコンセプト:
  1. 制限なしおよび制限付きの入力を読み取る
  2. データにタイムスタンプを追加する
  3. ウィンドウ処理をする
  4. 制限なしおよび制限付きの出力を書き込む

以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。

制限なしおよび制限付きの入力を読み取る

WindowedWordCount の入力は制限付きでも制限なしでも構いません。入力に固定数の要素が含まれる場合、その入力は「制限付き」のデータセットと見なされます。入力が継続的に更新される場合、その入力は「制限なし」と見なされます。入力型の詳細については、制限付き PCollection と制限なし PCollection を参照してください。

このサンプルでは、入力が制限付きになるか制限なしになるかを選択できます。先にも述べましたが、いずれのサンプルも、入力テキストにはシェイクスピア作品の文章(制限なし入力)が使用されています。ただし、このサンプルでの新しいコンセプトを説明する目的上、入力はシェイクスピアの文章の繰り返しとなっています。

このサンプルでは、入力が制限なしの場合、Google Cloud Pub/Sub トピックから入力が読み取られます。その場合、パイプラインに適用される Read 変換は PubSubIO.Read です。その他の場合、入力は Google Cloud Storage から読み取られます。

public static void main(String[] args) throws IOException {
    ...
    PCollection<String> input;
    if (options.isUnbounded()) {
      LOG.info("Reading from PubSub.");
      // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg.
      input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic()));

  } else {
      // Else, this is a bounded pipeline. Read from the Google Cloud Storage file.
      input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
      ...
    }
    ...
}

データにタイムスタンプを追加する

PCollection 内の各要素にはタイムスタンプが関連付けられます。各要素のタイムスタンプは、PCollection を作成するソースによって割り当てられます。このサンプルでは、パイプラインに制限なしの入力を選択した場合、タイムスタンプは Pub/Sub データソースから取得されます。制限付きの入力を選択した場合、AddTimestampsFn という名前の DoFn メソッド(ParDo によって呼び出される)が、PCollection 内の各要素に対してタイムスタンプを設定します。

public static void main(String[] args) throws IOException {
  ...
  input = pipeline
    .apply(...)
    // Add an element timestamp, using an artificial time.
    .apply(ParDo.of(new AddTimestampFn()));
}

以下に示すのは、要素自身から取得されたタイムスタンプのデータ要素を設定する AddTimestampsFn のコード(ParDo によって呼び出される DoFn)です。たとえば、要素がログ行の場合、その ParDo はログ文字列から時刻を解析し、その時刻を要素のタイムスタンプとして設定できる可能性があります。シェイクスピアの作品にタイムスタンプはないので、このケースでは、コンセプトを説明するためにランダムなタイムスタンプを生成しています。入力テキストの各行には、2 時間の間のいずれかの時点で、ランダムなタイムスタンプが関連付けられます。

static class AddTimestampFn extends DoFn<String, String> {
  private static final Duration RAND_RANGE = Duration.standardHours(2);
  private final Instant minTimestamp;

  AddTimestampFn() {
    this.minTimestamp = new Instant(System.currentTimeMillis());
  }

  @Override
  public void processElement(ProcessContext c) {
    // Generate a timestamp that falls somewhere in the past 2 hours.
    long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
        Instant randomTimestamp = minTimestamp.plus(randMillis);
    // Set the data element with that timestamp.
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}

タイムスタンプの詳細については、PCollection 要素のタイムスタンプを参照してください。

ウィンドウ処理をする

Dataflow SDK ではウィンドウ処理というコンセプトを使用し、個々の要素のタイムスタンプに従って PCollection を細分化します。複数の要素を集約する Dataflow 変換は、コレクション全体が無制限のサイズ(制限なし)であっても、各 PCollection を複数の無制限ウィンドウの連続として処理します。

WindowingWordCount サンプルは、固定時間のウィンドウ処理を適用します。この処理では、各ウィンドウが固定の時間間隔を表します。このサンプルの固定ウィンドウ サイズは、デフォルトで 1 分に設定されます(これはコマンドライン オプションで変更できます)。その後、パイプラインは CountWords 変換を適用します。

PCollection<KV<String, Long>> wordCounts = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
   .apply(new WordCount.CountWords());

制限なしおよび制限付きの出力を書き込む

入力は制限付きと制限なしのどちらも場合もあるので、出力 PCollection にも同じことが言えます。適切なシンクを選択するようにしてください。出力シンクによっては、制限付きの出力しかサポートしないものや、制限なしの出力しかサポートしないものがあります。たとえば、テキスト ファイルは制限付きのデータのみを受け取ることができるシンクです。BigQuery 出力ソースは、制限付きと制限なしの両方の入力をサポートするシンクです。

このサンプルでは、BigQuery テーブルに結果をストリームします。その後、結果は BigQuery テーブル用にフォーマットされ、BigQueryIO.Write を使って BigQuery に書き込まれます。

wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
  .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。