Dataflow managed I/O

The managed I/O connector is an Apache Beam transform that provides a common API for creating sources and sinks.

Overview

You create the managed I/O connector using Apache Beam code, just like any other I/O connector. You specify a source or sink to instantiate and pass in a set of configuration parameters. You can also put the configuration parameters into a YAML file and provide a URL to the file.

On the backend, Dataflow treats the managed I/O connector as a service, which allows Dataflow to manage runtime operations for the connector. You can then focus on the business logic in your pipeline, rather than managing these details.

For more information about the managed I/O API, see Managed in the Apache Beam Java SDK documentation.

Dynamic destinations

For some sinks, the managed I/O connector can dynamically select a destination based on field values in the incoming records.

To use dynamic destinations, provide a template string for the destination. The template string can include field names within curly brackets, such as "tables.{field1}". At runtime, the connector substitutes the value of the field for each incoming record, to determine the destination for that record.

For example, suppose your data has a field named airport. You could set the destination to "flights.{airport}". If airport=SFO, the record is written to flights.SFO. For nested fields, use dot-notation. For example: {top.middle.nested}.

Filtering

You might want to filter out certain fields before they are written to the destination table. For sinks that support dynamic destinations, you can use the drop, keep, or only parameter for this purpose. These parameters let you include destination metadata in the input records, without writing the metadata to the destination.

You can set at most one of these parameters for a given sink.

Configuration parameter Data type Description
drop list of strings A list of field names to drop before writing to the destination.
keep list of strings A list of field names to keep when writing to the destination. Other fields are dropped.
only string The name of exactly one field to keep when writing to the destination. All other fields are dropped. This field must be of row type.

Apache Iceberg

Managed I/O supports the following capabilities for Apache Iceberg:

Catalog Batch read Batch write Streaming write Dynamic table creation Dynamic destinations
Hadoop Supported Supported Supported Supported Supported
Hive Supported Supported Supported Supported Supported
REST-based catalogs Supported Supported Supported Supported Supported

For BigQuery tables for Apache Iceberg, use the BigQueryIO connector with BigQuery Storage API. The table must already exist; dynamic table creation is not supported.

Managed I/O uses the following configuration parameters for Apache Iceberg:

Read and write configuration Data type Description
table string The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name string The name of the catalog. Example: "local".
catalog_properties map A map of configuration properties for the Apache Iceberg catalog. The required properties depend on the catalog. For more information, see CatalogUtil in the Apache Iceberg documentation.
config_properties map An optional set of Hadoop configuration properties. For more information, see CatalogUtil in the Apache Iceberg documentation.
Write configuration Data type Description
triggering_frequency_seconds integer For streaming write pipelines, the frequency at which the sink attempts to produce snapshots, in seconds.

For more information, including code examples, see the following topics:

Apache Kafka

For Apache Kafka, the Managed I/O uses the following configuration parameters.

Read and write configuration Data type Description
bootstrap_servers string Required. A comma-separated list of Kafka bootstrap servers. Example: localhost:9092.
topic string Required. The Kafka topic to read or write.
file_descriptor_path string The path to a protocol buffer file descriptor set. Applies only if data_format is "PROTO".
data_format string The format of the messages. Supported values: "AVRO", "JSON", "PROTO", "RAW". The default value is "RAW", which reads or writes the raw bytes of the message payload.
message_name string The name of the protocol buffer message. Required if data_format is "PROTO".
schema string

The Kafka message schema. The expected schema type depends on the data format:

For read pipelines, this parameter is ignored if confluent_schema_registry_url is set.

Read configuration Data type Description
auto_offset_reset_config string

Specifies the behavior when there is no initial offset or the current offset no longer exists on the Kafka server. The following values are supported:

  • "earliest": Reset the offset to the earliest offset.
  • "latest": Reset the offset to the latest offset.

The default value is "latest".

confluent_schema_registry_subject string The subject of a Confluent schema registry. Required if confluent_schema_registry_url is specified.
confluent_schema_registry_url string The URL of a Confluent schema registry. If specified, the schema parameter is ignored.
consumer_config_updates map Sets configuration parameters for the Kafka consumer. For more information, see Consumer configs in the Kafka documentation. You can use this parameter to customize the Kafka consumer.
max_read_time_seconds int The maximum read time, in seconds. This option produces a bounded PCollection and is mainly intended for testing or other non-production scenarios.
Write configuration Data type Description
producer_config_updates map Sets configuration parameters for the Kafka producer. For more information, see Producer configs in the Kafka documentation. You can use this parameter to customize the Kafka producer.

To read Avro or JSON messages, you must specify a message schema. To set a schema directly, use the schema parameter. To provide the schema through a Confluent schema registry, set the confluent_schema_registry_url and confluent_schema_registry_subject parameters.

To read or write Protocol Buffer messages, either specify a message schema or set the file_descriptor_path parameter.

BigQuery

Requires Apache Beam SDK 2.61.0 or later.

Managed I/O supports the following capabilities for BigQuery:

Managed I/O uses the following configuration parameters for BigQuery:

Read and write configuration Data type Description
table string The BigQuery table to read or write. Format as "PROJECT.DATASET.TABLE". Example: "my_project.dataset1.table1".
kms_key string Specifies a Cloud Key Management Service (Cloud KMS) key to encrypt the BigQuery table when writing, or to encrypt any temporary tables created during reads.
Read configuration Data type Description
fields list of strings A list of columns to read from the table. This parameter allows efficient reads when a table contains many columns.
query string A SQL query to read from. If specified, the connector runs the query on BigQuery and reads the query results.
row_restriction string A predicate that filters data on the server side. Example: "age > 18".
Write configuration Data type Description
triggering_frequency integer For unbounded sources, specifies the frequency at which file writes are triggered, in seconds.

For reads, you must specify either table or query. For writes, you must specify table.