クイックスタートの手順をまだ行っていない場合は、こちらを続行する前に行ってください。
サンプル WordCount では、テキストを読み取り、テキスト行を個別の単語にトークン化し、それらの単語ごとに頻度計算を実行できる処理パイプラインの設定方法について説明します。Dataflow SDK には、以下に示す 4 つのサンプル WordCount が含まれています。これらは相互をベースに構築されており、処理の詳細度によって分けられています。いずれのサンプルも、入力テキストにはシェイクスピア作品の文章が使用されています。
各サンプル WordCount は、Dataflow SDK に異なるコンセプトを提供するものとなっています。
- Minimal WordCount は、Dataflow パイプラインの作成に関する基本原則を示すものです。
- WordCount は、再利用や保守が可能なパイプラインを作成するための一般的なベスト プラクティスを導入したものとなっています。
- Debugging WordCount は、ロギングとデバッグに関するプラクティスを取り入れたものとなっています。
- Windowed WordCount は、Dataflow のプログラミング モデルを使用して、制限付きと制限なしの両方のデータセットを処理する方法を示したものです。
まずは、最もシンプルな Minimal WordCount サンプルから理解します。パイプライン作成の基本原則に慣れたら、WordCount で Dataflow プログラムの記述に関するベスト プラクティスを学習します。その後、Debugging WordCount に進んで、ロギングとデバッグに関する一般的なプラクティスの活用方法を理解します。最後に、Windowed WordCount で、制限付きデータセットと制限なしデータセットの両方に同じ計算パターンを使用する方法について学習します。
MinimalWordCount
Minimal WordCount はシンプルなパイプラインを示したものです。このパイプラインでは、Google Cloud Storage にあるファイルからテキストのブロックを読み取り、変換を適用して単語をトークン化し、単語数をカウントして、Cloud Storage バケット内の出力ファイルにデータを書き込むことができます。このサンプルでは、入力ファイルと出力ファイルの場所をハードコードし、エラーチェックは実行しません。このサンプルは単に、Dataflow パイプラインの作成方法の「ベアボーン」を示すものです。パイプラインの入力ソースと出力ソースをパラメータ化する方法や、その他のベスト プラクティスについては、以降のサンプルで示します。
Java
- パイプラインを作成する
- パイプラインに変換を適用する
- 入力を読み取る(このサンプルの場合: テキスト ファイルを読み取る)
ParDo
変換を適用する- SDK で提供されている変換を適用する(このサンプルの場合:
Count
) - 出力を書き込む(このサンプルの場合: Google Cloud Storage に書き込む)
- パイプラインを実行する
以下のセクションでは、Minimal WordCount パイプラインから抜粋した関連コードとともに、上記のコンセプトについて詳しく説明します。
パイプラインを作成する
Cloud Dataflow パイプラインを作成するための最初のステップは、Pipeline Options オブジェクトを作成することです。このオブジェクトを使用すると、パイプラインについてのさまざまなオプションを設定できます。たとえば、パイプラインを実行するパイプライン ランナー、プロジェクトの ID、パイプラインのファイルを保存するステージング ロケーション(jar をクラウド上でアクセス可能にするために使用される場所)などです。このサンプルではこれらのオプションをプログラムで設定しますが、多くの場合、パイプライン オプションの設定にはコマンドラインの引数が使用されます。
サンプルでは、PipelineRunner
として BlockingDataflowPipelineRunner
を指定しています。これは、パイプラインがクラウド上で Google Cloud Dataflow サービスを使用して実行されるようにするためです。クラウド上でパイプラインを実行するために設定できるオプションは他にもあります。また、このオプションを完全に省略することもできます。その場合、パイプラインはデフォルトのランナーによってローカルに実行されます。これらについては、後述の 2 つの WordCount サンプルで示されており、実行パラメータを指定するでさらに詳しく説明されています。
Java
DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(BlockingDataflowPipelineRunner.class); options.setProject("SET-YOUR-PROJECT-ID-HERE"); // The 'gs' URI means that this is a Google Cloud Storage path options.setStagingLocation("gs://SET-YOUR-BUCKET-NAME-HERE");
次のステップは、上記で作成したオプションを使用して、Pipeline
オブジェクトを作成することです。Pipeline
オブジェクトは、実行される変換(そのパイプラインに関連付けられた変換)のグラフを構築します。
Java
Pipeline p = Pipeline.create(options);
パイプライン オブジェクトの詳しい説明と、その機能のしくみについては、パイプラインを参照してください。
パイプラインの変換を適用する
Minimal WordCount パイプラインにはいくつかの変換が含まれています。これらは、データをパイプライン内に読み込み、データを操作または変換し、結果を書き出すためのものです。各変換は、パイプライン内でのオペレーションを表しています。
各変換は、なんらかの入力(データなど)を受け取り、なんらかの出力データを生成します。入力データと出力データは、SDK クラス PCollection
によって表されます。
PCollection は、Dataflow SDK によって提供される特殊なクラスであり、事実上任意のサイズのデータセット(制限なしデータセットを含む)を表すために使用できます。
図 1 は、パイプラインのデータフローを示したものです。

Minimal WordCount パイプラインには、5 つの変換が含まれています。
- テキスト ファイルの Read 変換は、
Pipeline
オブジェクト自体に適用され、出力としてPCollection
を生成します。出力のPCollection
に含まれる各要素は、入力ファイルから取得された 1 行のテキストを表します。 - (匿名クラスとしてインラインで定義される)
DoFn
を各要素で呼び出す ParDo 変換。これは、テキスト行を個別の単語にトークン化します。この変換の入力は、前述のTextIO.Read
変換で生成されたPCollection
のテキスト行です。ParDo
変換は、新しいPCollection
を出力します。この出力に含まれる各要素は、テキスト内の個別の単語を表します。 - SDK に付属の
Count
変換は、任意の型のPCollection
を取得して Key-Value ペアからなるPCollection
を返す汎用変換です。各キーは、入力のコレクションから取得された一意の要素を表し、各値は、入力コレクション内でのキーの出現回数を表します。
このパイプラインでは、Count
の入力は前述のParDo
によって生成された個別の単語のPCollection
であり、出力は Key-Value ペアのPCollection
です。各キーは、テキスト内の一意の単語を表し、それに関連付けられた値は、各単語の出現回数です。 - 次は、一意の単語と出現回数の各 Key-Value ペアを、出力ファイルへの書き込みに適した印刷可能文字列へとフォーマットする変換です。
- テキスト ファイルの Write です。この変換は、フォーマットされた
String
の最終的なPCollection
を入力として取り、各要素を出力のテキスト ファイルに書き込みます。入力のPCollection
に含まれる各要素は、結果の出力ファイルに含まれる 1 行のテキストを表します。
Java
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
Java
変換に名前を付けることができ、それが Dataflow Monitoring Interface に表示されます。名前を付けるには、このサンプルで示すように .named()
オペレーションを使用します。Dataflow サービスによってパイプラインが実行されると、モニタリング インターフェースによって、各 ParDo
変換がいつ実行されるかが示されます。
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { c.output(word); } } } }))
Java
.apply(Count.<String>perElement())
Java
MapElements
は、単純な ParDo をカプセル化する高水準の複合変換です。MapElements
は、入力 PCollection
内の要素ごとに厳密に 1 つの出力要素を生成する関数を適用します。この MapElements
は、フォーマットを実行する SimpleFunction
(匿名クラスとしてインラインで定義される)を呼び出します。この MapElements
は、Count
で生成された Key-Value ペアの PCollection
を入力として取り、印刷可能文字列の PCollection
を新規に生成します。
.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> element) { return element.getKey() + ": " + element.getValue(); } }))
Java
.apply(TextIO.Write.to("gs://my-bucket/counts.txt"));
Write
変換は、型 PDone
の自明な結果値を生成します。このケースでは、この値は無視されます。
パイプラインを実行する
run
メソッドを呼び出してパイプラインを実行します。これにより、パイプラインの作成時に指定したパイプライン ランナーによってパイプラインが実行されます。
Java
p.run();
WordCount サンプル
この WordCount サンプルには、パイプラインの読み取り、書き込み、保守を容易にするための推奨プログラミング プラクティスが複数導入されています。必ずしも必要なものではありませんが、これらのプラクティスを使用することで、パイプラインをより柔軟に実行できたり、パイプラインのテストが行いやすくなったり、パイプラインのコードが再利用可能になる場合があります。
このセクションは、読者がパイプライン作成の基本コンセプトを十分に理解していることを前提としています。まだ理解が十分でないと判断される場合は、上のセクション Minimal WordCount をお読みください。
Java
- 明示的な
DoFn
を使用してParDo
を適用する - 複合変換を作成する
- パラメータ化可能な
PipelineOptions
を使用する
以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。
明示的な DoFn を指定する
ParDo
変換を使用する際は、入力 PCollection
内の各要素に適用される処理オペレーションを指定する必要があります。この処理オペレーションは、SDK クラス DoFn
のサブクラスです。前述の(Minimal WordCount)セクションのサンプル パイプラインでは、ParDo
の DoFn
サブクラスを、匿名の内部クラス インスタンスとしてインラインで作成しました。
ただし多くの場合は、DoFn
をグローバル レベルで定義したほうが、単体テストが行いやすくなり、ParDo
コードがより読み取りやすくなるため合理的です。
前述の Minimal WordCount サンプルで述べたように、パイプラインを実行する際、Dataflow Monitoring Interface は各 ParDo
変換がいつ実行されるかを示します。Dataflow サービスは、渡した DoFn
の名前から ParDo
変換の変換名を自動的に生成します。たとえば、FormatAsTextFn()
を適用する ParDo
は、モニタリング インターフェースに ParDo(FormatAsText)
として表示されます。
Java
このサンプルでは、DoFn
は静的クラスとして定義されます。
/** A DoFn that converts a Word and Count into a printable string. */ static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { ... @Override public void processElement(ProcessContext c) { ... } } public static void main(String[] args) throws IOException { Pipeline p = ... // Our pipeline passes an instance of static FormatAsTextFn() to the ParDo transform. p.apply(...) .apply(...) .apply(ParDo.of(new FormatAsTextFn())) ... }
ParDo
変換用に DoFn
サブクラスを作成して指定する方法については、ParDo による並列処理をご覧ください。
複合変換を作成する
複数の変換または ParDo
ステップで構成された処理オペレーションがある場合は、それを PTransform
のサブクラスとして作成できます。PTransform
サブクラスを作成すると、再利用可能な複合変換を作成でき、パイプラインの構造がわかりやすいモジュール型になり、簡単に単体テストを行えるようになります。
また、PTransform
サブクラスを使ってパイプラインの論理構造を明示的にすることで、パイプラインのモニタリングがより簡単になります。Dataflow サービスによってパイプラインの最終的な最適化構造が作成されると、作成した変換が Dataflow Monitoring Interface によって使用され、パイプラインの構造がより正確に反映されます。
Java
このサンプルでは、2 つの変換が PTransform
サブクラスの CountWords
としてカプセル化されます。CountWords
には、ExtractWordsFn
と SDK 付属の Count
変換を実行する ParDo
が含まれます。
CountWords
が定義されたら、最終的な入力と出力を指定します。入力は抽出オペレーションのための PCollection<String>
で、出力はカウント オペレーションによって生成される PCollection<KV<String, Long>>
です。
public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } public static void main(String[] args) throws IOException { Pipeline p = ... p.apply(...) .apply(new CountWords()) ... }
パラメータ化可能な PipelineOptions
を使用する
前述のサンプル Minimal WordCount では、パイプラインの作成時にさまざまな実行オプションを設定しました。このサンプルでは、PipelineOptions
を拡張して独自のカスタム構成オプションを定義します。
コマンドライン パーサーによって処理される独自の引数を追加し、それらのデフォルト値を指定できます。その後、パイプライン コード内のオプション値にアクセスできます。
Minimal WordCount サンプルでは、パイプライン オプションをハードコードしました。ただし、PipelineOptions
を作成する最も一般的な方法は、コマンドライン引数を解析する方法です。
Java
public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); ... } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); ... }
WordCount サンプルをデバッグする
Debugging WordCount サンプルでは、パイプライン コードを実装するうえでのベスト プラクティスを示します。Dataflow Monitoring Interface と Aggregators を使用すると、パイプラインの実行時にパイプラインの可視性を高めることができます。
Java
また、SDK の DataflowAssert を使用して、変換の出力をパイプラインの複数のステージでテストできます。
Java
- Dataflow Monitoring Interface でログを表示する
- Dataflow ワーカーのログレベルを管理する
Aggregators
を作成するDataflowAssert
を通じてパイプラインをテストする
以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。
Dataflow Monitoring Interface でログを表示する
Google Cloud Logging は、Dataflow ジョブのすべてのワーカーから、Google Cloud Platform Console 内の 1 つの場所にログを集約します。Dataflow Monitoring Interface を使用すると、Dataflow が Dataflow ジョブを完了するために起動したすべての Compute Engine インスタンスのログを表示できます。パイプラインの DoFn
インスタンスにログ ステートメントを追加すると、パイプラインの実行時にそれらを Monitoring Interface に表示できます。
Java
次の SLF4J ロガーでは、FilterTextFn
の完全修飾クラス名がロガー名として使用されています。このロガーによって出力されたすべてのログ ステートメントはこの名前によって参照され、Dataflow Monitoring Interface に表示されます(適切なログレベル設定が指定された場合)。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DebuggingWordCount { public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); ... public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // using the Dataflow service, these log lines will appear in the Dataflow Monitoring UI // only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); ... } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. LOG.trace("Did not match: " + c.element().getKey()); ... } } } }
Dataflow ワーカーのログレベルを管理する
Java
ユーザーコードを実行する Dataflow ワーカーは、デフォルトで、INFO ログレベル以上で Cloud Logging にロギングされるように構成されます。次のオプションを指定すれば、特定のロギング名前空間のログレベルをオーバーライドすることもできます。
--workerLogLevelOverrides={"Name1"="Level1","Name2"="Level2",...}
たとえば、次のように指定したとします。
--workerLogLevelOverrides={"com.example":"DEBUG"}
Dataflow サービスを使用してこのパイプラインを実行すると、Monitoring Interface にはデフォルトの INFO レベル以上のログに加え、com.example
パッケージに関して DEBUG レベル以上のログが記録されます。
また、次のように指定すれば、Dataflow ワーカーのデフォルトのロギング構成をオーバーライドできます。
--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>
たとえば、次のように指定したとします。
--defaultWorkerLogLevel=DEBUG
Dataflow サービスを使用してこのパイプラインを実行すると、Monitoring Interface には DEBUG レベル以上のすべてのログが記録されます。なお、デフォルトのワーカー ログレベルを TRACE または DEBUG に変更すると、ログの情報量が大幅に増加します。
詳細については、Cloud Dataflow 内でのロギングを参照してください。
アグリゲータを作成する
カスタム アグリゲータを使用すると、パイプラインの実行時にパイプライン内の値を追跡できます。これらの値は、Dataflow サービスを使用してパイプラインを実行する際に Dataflow Monitoring Interface に表示されます。
アグリゲータは、それらを作成した ParDo
変換の実行がシステムによって開始されるか、それらの初期値が変更されるまでは表示されない場合があります。表示後は、モニタリング インターフェースの [ジョブの概要] の下部でアグリゲータを確認できます。
以下のカスタム アグリゲータは、一致した単語数と一致しない単語数を追跡します。
Java
public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { ... private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); @Override public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { ... matchedWords.addValue(1L); ... } else { ... unmatchedWords.addValue(1L); } } } }
バッチ パイプラインとストリーミング パイプラインのアグリゲータ
バッチ パイプラインのアグリゲータには一貫性があります。成功したバンドルに対しては一度だけコミットされ、失敗したバンドルに対してはコミットされません。
ストリーミング パイプラインでは、アグリゲータはより寛容なセマンティクスを提供します。成功したバンドルからの提供はベスト エフォートであり、失敗したバンドルは最終的な値に反映される可能性があります。
DataflowAssert
を通じてパイプラインをテストする
Java
DataflowAssert は、Hamcrest collection matcher のスタイルを使用した便利な PTransforms
のセットです。パイプライン レベルでのテストを作成する際にこれを使用すると、PCollections
の内容を検証できます。DataflowAssert
は、小さなデータセットを使用した単体テストで使用するのが最善ですが、ここでは学習ツールとして示されています。
以下のコードでは、フィルタされた一連の単語が予想のカウント数に一致するかどうかを検証しています。なお、DataflowAssert
では出力は提供されません。パイプラインが正常に完了すれば、結果が予想と一致したことになります。詳細については、パイプラインのテスト方法をご覧ください。単体テストの例については、DebuggingWordCountTest をご覧ください。
public static void main(String[] args) { ... List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); ... }
WindowedWordCount
Java
このサンプル(WindowedWordCount
)では、前述のサンプルと同様にテキスト内の単語をカウントしますが、いくつかの高度なコンセプトが導入されています。WindowedWordCount
の入力は、固定のデータセット(前述のサンプルと同様)でも、制限なしのデータ ストリームでも構いません。
Dataflow SDK は、制限付きと制限なしの両方の入力型を処理できる、単一のパイプラインを作成できるという点で便利です。入力が制限なしの場合は、パイプラインの PCollections
も制限なしになります。制限付きの入力についても同様です。
このセクションを読む前に、ご自身がパイプライン作成の基本原則について理解していることと、それらに慣れていることを確認してください。
新しいコンセプト:- 制限なしおよび制限付きの入力を読み取る
- データにタイムスタンプを追加する
- ウィンドウ処理をする
- 制限なしおよび制限付きの出力を書き込む
以下のセクションでは、これらのコンセプトについて詳しく説明しながら、パイプライン コードをさらに小さなセクションに分けて見ていきます。
制限なしおよび制限付きの入力を読み取る
WindowedWordCount
の入力は制限付きでも制限なしでも構いません。入力に固定数の要素が含まれる場合、その入力は「制限付き」のデータセットと見なされます。入力が継続的に更新される場合、その入力は「制限なし」と見なされます。入力型の詳細については、制限付き PCollection と制限なし PCollection を参照してください。
このサンプルでは、入力が制限付きになるか制限なしになるかを選択できます。先にも述べましたが、いずれのサンプルも、入力テキストにはシェイクスピア作品の文章(制限なし入力)が使用されています。ただし、このサンプルでの新しいコンセプトを説明する目的上、入力はシェイクスピアの文章の繰り返しとなっています。
このサンプルでは、入力が制限なしの場合、Google Cloud Pub/Sub トピックから入力が読み取られます。その場合、パイプラインに適用される Read
変換は PubSubIO.Read
です。その他の場合、入力は Google Cloud Storage から読み取られます。
public static void main(String[] args) throws IOException { ... PCollection<String> input; if (options.isUnbounded()) { LOG.info("Reading from PubSub."); // Read from the Pub/Sub topic. A topic will be created if it wasn't specified as an arg. input = pipeline.apply(PubsubIO.Read.topic(options.getPubsubTopic())); } else { // Else, this is a bounded pipeline. Read from the Google Cloud Storage file. input = pipeline.apply(TextIO.Read.from(options.getInputFile())) ... } ... }
データにタイムスタンプを追加する
PCollection
内の各要素にはタイムスタンプが関連付けられます。各要素のタイムスタンプは、PCollection
を作成するソースによって割り当てられます。このサンプルでは、パイプラインに制限なしの入力を選択した場合、タイムスタンプは Pub/Sub データソースから取得されます。制限付きの入力を選択した場合、AddTimestampsFn
という名前の DoFn
メソッド(ParDo
によって呼び出される)が、PCollection
内の各要素に対してタイムスタンプを設定します。
public static void main(String[] args) throws IOException { ... input = pipeline .apply(...) // Add an element timestamp, using an artificial time. .apply(ParDo.of(new AddTimestampFn())); }
以下に示すのは、要素自身から取得されたタイムスタンプのデータ要素を設定する AddTimestampsFn
のコード(ParDo
によって呼び出される DoFn
)です。たとえば、要素がログ行の場合、その ParDo
はログ文字列から時刻を解析し、その時刻を要素のタイムスタンプとして設定できる可能性があります。シェイクスピアの作品にタイムスタンプはないので、このケースでは、コンセプトを説明するためにランダムなタイムスタンプを生成しています。入力テキストの各行には、2 時間の間のいずれかの時点で、ランダムなタイムスタンプが関連付けられます。
static class AddTimestampFn extends DoFn<String, String> { private static final Duration RAND_RANGE = Duration.standardHours(2); private final Instant minTimestamp; AddTimestampFn() { this.minTimestamp = new Instant(System.currentTimeMillis()); } @Override public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past 2 hours. long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant randomTimestamp = minTimestamp.plus(randMillis); // Set the data element with that timestamp. c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); } }
タイムスタンプの詳細については、PCollection 要素のタイムスタンプを参照してください。
ウィンドウ処理をする
Dataflow SDK ではウィンドウ処理というコンセプトを使用し、個々の要素のタイムスタンプに従って PCollection
を細分化します。複数の要素を集約する Dataflow 変換は、コレクション全体が無制限のサイズ(制限なし)であっても、各 PCollection
を複数の無制限ウィンドウの連続として処理します。
WindowingWordCount
サンプルは、固定時間のウィンドウ処理を適用します。この処理では、各ウィンドウが固定の時間間隔を表します。このサンプルの固定ウィンドウ サイズは、デフォルトで 1 分に設定されます(これはコマンドライン オプションで変更できます)。その後、パイプラインは CountWords
変換を適用します。
PCollection<KV<String, Long>> wordCounts = input .apply(Window.<String>into( FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) .apply(new WordCount.CountWords());
制限なしおよび制限付きの出力を書き込む
入力は制限付きと制限なしのどちらも場合もあるので、出力 PCollection
にも同じことが言えます。適切なシンクを選択するようにしてください。出力シンクによっては、制限付きの出力しかサポートしないものや、制限なしの出力しかサポートしないものがあります。たとえば、テキスト ファイルは制限付きのデータのみを受け取ることができるシンクです。BigQuery 出力ソースは、制限付きと制限なしの両方の入力をサポートするシンクです。
このサンプルでは、BigQuery テーブルに結果をストリームします。その後、結果は BigQuery テーブル用にフォーマットされ、BigQueryIO.Write
を使って BigQuery に書き込まれます。
wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema()));