安排查询每 24 小时运行一次,并具有基于运行日期的目标表标识符。查询作为服务账号用户执行。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
// Sample to create a scheduled query with service account
public class CreateScheduledQueryWithServiceAccount {
public static void main(String[] args) throws IOException {
// TODO(developer): Replace these variables before running the sample.
final String projectId = "MY_PROJECT_ID";
final String datasetId = "MY_DATASET_ID";
final String serviceAccount = "MY_SERVICE_ACCOUNT";
final String query =
"SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
+ "@run_date as intended_run_date, 17 as some_integer";
Map<String, Value> params = new HashMap<>();
params.put("query", Value.newBuilder().setStringValue(query).build());
params.put(
"destination_table_name_template",
Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
params.put("partitioning_field", Value.newBuilder().build());
TransferConfig transferConfig =
TransferConfig.newBuilder()
.setDestinationDatasetId(datasetId)
.setDisplayName("Your Scheduled Query Name")
.setDataSourceId("scheduled_query")
.setParams(Struct.newBuilder().putAllFields(params).build())
.setSchedule("every 24 hours")
.build();
createScheduledQueryWithServiceAccount(projectId, transferConfig, serviceAccount);
}
public static void createScheduledQueryWithServiceAccount(
String projectId, TransferConfig transferConfig, String serviceAccount) throws IOException {
try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
ProjectName parent = ProjectName.of(projectId);
CreateTransferConfigRequest request =
CreateTransferConfigRequest.newBuilder()
.setParent(parent.toString())
.setTransferConfig(transferConfig)
.setServiceAccountName(serviceAccount)
.build();
TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
System.out.println(
"\nScheduled query with service account created successfully :" + config.getName());
} catch (ApiException ex) {
System.out.print("\nScheduled query with service account was not created." + ex.toString());
}
}
}
Python
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
from google.cloud import bigquery_datatransfer
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
# The project where the query job runs is the same as the project
# containing the destination dataset.
project_id = "your-project-id"
dataset_id = "your_dataset_id"
# This service account will be used to execute the scheduled queries. Omit
# this request parameter to run the query as the user with the credentials
# associated with this client.
service_account_name = "abcdef-test-sa@abcdef-test.iam.gserviceaccount.com"
# Use standard SQL syntax for the query.
query_string = """
SELECT
CURRENT_TIMESTAMP() as current_time,
@run_time as intended_run_time,
@run_date as intended_run_date,
17 as some_integer
"""
parent = transfer_client.common_project_path(project_id)
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id=dataset_id,
display_name="Your Scheduled Query Name",
data_source_id="scheduled_query",
params={
"query": query_string,
"destination_table_name_template": "your_table_{run_date}",
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
schedule="every 24 hours",
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=parent,
transfer_config=transfer_config,
service_account_name=service_account_name,
)
)
print("Created scheduled query '{}'".format(transfer_config.name))
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。