使用 Apache Beam 将数据写入 Cloud Bigtable。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
如需了解如何安装和使用 Bigtable 的客户端库,请参阅 Bigtable 客户端库。
如需向 Bigtable 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class HelloWorldWrite {
public static void main(String[] args) {
BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
.apply(
ParDo.of(
new DoFn<String, Mutation>() {
@ProcessElement
public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
long timestamp = System.currentTimeMillis();
Put row = new Put(Bytes.toBytes(rowkey));
row.addColumn(
Bytes.toBytes("stats_summary"),
Bytes.toBytes("os_build"),
timestamp,
Bytes.toBytes("android"));
out.output(row);
}
}))
.apply(CloudBigtableIO.writeToTable(bigtableTableConfig));
p.run().waitUntilFinish();
}
public interface BigtableOptions extends DataflowPipelineOptions {
@Description("The Bigtable project ID, this can be different than your Dataflow project")
@Default.String("bigtable-project")
String getBigtableProjectId();
void setBigtableProjectId(String bigtableProjectId);
@Description("The Bigtable instance ID")
@Default.String("bigtable-instance")
String getBigtableInstanceId();
void setBigtableInstanceId(String bigtableInstanceId);
@Description("The Bigtable table ID in the instance.")
@Default.String("mobile-time-series")
String getBigtableTableId();
void setBigtableTableId(String bigtableTableId);
}
public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
BigtableOptions options) {
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.build();
return bigtableTableConfig;
}
}
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。