テキスト ファイル

Dataflow SDK にはテキスト ファイルの組み込み Read 変換と Write 変換があります。これらの変換では、読み取りまたは書き込み対象のテキスト ファイルは改行文字で区切られていると仮定されます。つまり、ファイル内の各「レコード」は、末尾に改行文字がある 1 行のテキストです。

ローカル ファイル(Dataflow プログラムが実行しているシステム上のファイル)と Google Cloud Storage のリモート ファイルの両方の、読み取りと書き込みを行うことができます。

Java

注: パイプラインでローカル ファイルの読み取りや書き込みを行う場合は、DirectPipelineRunner を使用してパイプラインをローカルで実行する必要があります。Dataflow サービスがパイプラインの実行に使用する Google Compute Engine インスタンスは、読み取りや書き込みのためにローカルマシン上のファイルにアクセスできないためです。

テキスト ファイルから読み取る

Java

テキスト ファイルから読み取るには、TextIO.ReadPipeline オブジェクトに適用します。TextIO.Readテキスト ファイルを読み取り、PCollection<String> を返します。結果の PCollection のそれぞれの String は、テキスト ファイルから取り出した 1 行を表します。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<String> lines = p.apply(
    TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

この例では、コードは Pipeline オブジェクトの apply を呼び出し、Read 変換として TextIO.Read に渡します。.named オペレーションは read オペレーションの変換名を提供し、.from オペレーションはファイルパスを提供します。apply の戻り値は、lines という名前の結果の PCollection<String> です。

他のファイルベース Dataflow ソースと同様、TextIO.Read 変換は複数の入力ファイルを読み取ることができます。ファイルベースのソースを読み取る際に複数のファイルを処理する方法について詳しくは、入力データを読み取るをご覧ください。

テキスト ファイルに書き込む

Java

データをテキスト ファイルに出力するには、出力する PCollectionTextIO.Write を適用します TextIO.Write を使用する場合は次のことに注意してください。

  • PCollection<String> にのみ TextIO.Write を適用できます。場合によっては、TextIO.Write で書き込む前に、単純な ParDo を使って途中の PCollection から PCollection<String> にデータを整形する必要があります。
  • 出力 PCollection の各要素は、結果のテキスト ファイルの 1 行を表します。
  • Dataflow のファイルベースの書き込みオペレーション(TextIO.Write など)は、デフォルトで複数の出力ファイルに書き込みます。詳しくは、出力データを書き込むをご覧ください。
  PCollection<String> filteredWords = ...;
  filteredWords.apply(TextIO.Write.named("WriteMyFile")
                                  .to("gs://some/outputData"));

この例では、コードは PCollectionapply を呼び出して出力し、Write 変換として TextIO.Write に渡します。.named オペレーションは、書き込みオペレーションの変換名を提供し、.to オペレーションは結果の出力ファイルへのパスを提供します。

圧縮されたテキスト ファイルから読み取る

Java

圧縮テキスト ファイル(具体的には gzipbzip2 で圧縮されたテキスト ファイル)による TextIO.Read 変換を使用できます。圧縮されたファイルを読み取るには、圧縮タイプを指定する必要があります。

圧縮タイプを指定するには、.withCompressionType メソッドを使用します。

  Pipeline p = ...;
  p.apply(TextIO.Read.named("ReadMyFile")
                     .from("gs://some/inputData.gz")
                     .withCompressionType(TextIO.CompressionType.GZIP));

現在、TextIO を使用して、圧縮されたファイルに書き込むことはできません。

ファイルに .gz または .bz2 拡張機能がある場合、圧縮タイプを明示的に指定する必要はありません。デフォルトの圧縮タイプ AUTO は、ファイル拡張子を調べて、ファイルの正しい圧縮タイプを判別します。これは glob でも機能します。glob の結果として得られるファイルには、.gz.bz2 の他に非圧縮タイプが混在することがあります。