Write from Dataflow to Cloud Storage (Java)

This document describes how to write text data from Dataflow to Cloud Storage by using the TextIO I/O connector in Java.

Include the Google Cloud library dependency

To use the TextIO connector with Cloud Storage, include the following dependency. This library provides a schema handler for "gs://" filenames.

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
   <version>${beam.version}</version>
</dependency>

For more information, see Install the Apache Beam SDK.

Write text files to Cloud Storage

The following example creates a batch pipeline that writes text files using GZIP compression:

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

For the output path, specify a Cloud Storage path that includes the bucket name and a filename prefix. For example, if you specify gs://my_bucket/output/file, the TextIO connector writes to the Cloud Storage bucket named my_bucket, and the output files have the prefix output/file*.

By default, the TextIO connector shards the output files, using a naming convention like this: <file-prefix>-00000-of-00001. Optionally, you can specify a filename suffix and a compression scheme, as shown in the example.

If the input PCollection is unbounded, you must define a window or a trigger on the collection, and then specify windowed writes by calling TextIO.Write.withWindowedWrites.

What's next