This document describes how to write text data from Dataflow to
Cloud Storage by using the Apache Beam TextIO
I/O connector.
Include the Google Cloud library dependency
To use the TextIO
connector with Cloud Storage, include the following
dependency. This library provides a schema handler for "gs://"
filenames.
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Python
apache-beam[gcp]==VERSION
Go
import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
For more information, see Install the Apache Beam SDK.
Parallelism
Parallelism is determined primarily by the number of shards. By default, the runner automatically sets this value. For most pipelines, using the default behavior is recommended. In this document, see Best practices.
Performance
The following table shows performance metrics for writing to
Cloud Storage. The workloads were run on one e2-standard2
worker,
using the Apache Beam SDK 2.49.0 for Java. They did not use Runner v2.
100 M records | 1 kB | 1 column | Throughput (bytes) | Throughput (elements) |
---|---|---|
Write | 130 MBps | 130,000 elements per second |
These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, see Beam IO Performance.
Best practices
In general, avoid setting a specific number of shards. This allows the runner to select an appropriate value for your scale. If you tune the number of shards, we recommend writing between 100MB and 1GB per shard. However, the optimum value might depend on the workload.
Cloud Storage can scale to a very large number of requests per second. However, if your pipeline has large spikes in write volume, consider writing to multiple buckets, to avoid temporarily overloading any single Cloud Storage bucket.
In general, writing to Cloud Storage is more efficient when each write is larger (1 kb or greater). Writing small records to a large number of files can result in worse performance per byte.
When generating file names, consider using non-sequential file names, in order to distribute load. For more information, see Use a naming convention that distributes load evenly across key ranges.
When naming files, don't use the at sign ('@') followed by a number or an asterisk ('*'). For more information, see "@*" and "@N" are reserved sharding specs.
Example: Write text files to Cloud Storage
The following example creates a batch pipeline that writes text files using GZIP compression:
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
If the input PCollection
is unbounded, you must define a window or a
trigger on the collection, and then specify windowed writes by calling
TextIO.Write.withWindowedWrites
.
To authenticate to Dataflow, set up Application Default Credentials.
For more information, see
Set up authentication for a local development environment.
Python
For the output path, specify a Cloud Storage path that includes the
bucket name and a filename prefix. For example, if you specify
gs://my_bucket/output/file
, the TextIO
connector writes to the
Cloud Storage bucket named my_bucket
, and the output files have the prefix
output/file*
.
By default, the TextIO
connector shards the output files, using a naming
convention like this: <file-prefix>-00000-of-00001
. Optionally, you can
specify a filename suffix and a compression scheme, as shown in the example.
To ensure idempotent writes, Dataflow writes to a temporary file
and then copies the completed temporary file to the final file.
To control where these temporary files are stored,
use the
withTempDirectory
method.
What's next
- Read the
TextIO
API documentation. - See the list of Google-provided templates.