このサンプルは、Dataflow パイプラインで顧客管理の暗号鍵を使用する方法を示しています。
コードサンプル
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
// Query from the NASA wildfires public dataset:
// https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=nasa_wildfire&t=past_week&page=table
String query =
"SELECT latitude,longitude,acq_date,acq_time,bright_ti4,confidence "
+ "FROM `bigquery-public-data.nasa_wildfire.past_week` "
+ "LIMIT 10";
// Schema for the output BigQuery table.
final TableSchema outputSchema = new TableSchema().setFields(Arrays.asList(
new TableFieldSchema().setName("latitude").setType("FLOAT"),
new TableFieldSchema().setName("longitude").setType("FLOAT"),
new TableFieldSchema().setName("acq_date").setType("DATE"),
new TableFieldSchema().setName("acq_time").setType("TIME"),
new TableFieldSchema().setName("bright_ti4").setType("FLOAT"),
new TableFieldSchema().setName("confidence").setType("STRING")));
// Create the BigQuery options from the command line arguments.
BigQueryKmsKeyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(BigQueryKmsKeyOptions.class);
// String outputBigQueryTable = "<project>:<dataset>.<table>";
String outputBigQueryTable = options.getOutputBigQueryTable();
// String kmsKey =
// "projects/<project>/locations/<kms-location>/keyRings/<kms-keyring>/cryptoKeys/<kms-key>";
String kmsKey = options.getKmsKey();
// Create and run an Apache Beam pipeline.
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from BigQuery with KMS key",
BigQueryIO.readTableRows()
.fromQuery(query)
.usingStandardSql()
.withKmsKey(kmsKey))
.apply("Write to BigQuery with KMS key",
BigQueryIO.writeTableRows()
.to(outputBigQueryTable)
.withSchema(outputSchema)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withKmsKey(kmsKey));
pipeline.run().waitUntilFinish();
Python
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import apache_beam as beam
# output_bigquery_table = '<project>:<dataset>.<table>'
# kms_key = 'projects/<project>/locations/<kms-location>/keyRings/<kms-keyring>/cryptoKeys/<kms-key>' # noqa
# beam_args = [
# '--project', 'your-project-id',
# '--runner', 'DataflowRunner',
# '--temp_location', 'gs://your-bucket/samples/dataflow/kms/tmp',
# '--region', 'us-central1',
# ]
# Query from the NASA wildfires public dataset:
# https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=nasa_wildfire&t=past_week&page=table
query = """
SELECT latitude,longitude,acq_date,acq_time,bright_ti4,confidence
FROM `bigquery-public-data.nasa_wildfire.past_week`
LIMIT 10
"""
# Schema for the output BigQuery table.
schema = {
"fields": [
{"name": "latitude", "type": "FLOAT"},
{"name": "longitude", "type": "FLOAT"},
{"name": "acq_date", "type": "DATE"},
{"name": "acq_time", "type": "TIME"},
{"name": "bright_ti4", "type": "FLOAT"},
{"name": "confidence", "type": "STRING"},
],
}
options = beam.options.pipeline_options.PipelineOptions(beam_args)
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Read from BigQuery with KMS key"
>> beam.io.Read(
beam.io.BigQuerySource(
query=query,
use_standard_sql=True,
kms_key=kms_key,
)
)
| "Write to BigQuery with KMS key"
>> beam.io.WriteToBigQuery(
output_bigquery_table,
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
kms_key=kms_key,
)
)
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。