创建预定查询

安排查询每 24 小时运行一次,并具有基于运行日期的目标表标识符。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

Go

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
	datatransfer "cloud.google.com/go/bigquery/datatransfer/apiv1"
	datatransferpb "google.golang.org/genproto/googleapis/cloud/bigquery/datatransfer/v1"
	"google.golang.org/protobuf/types/known/structpb"
)

// createScheduledQuery schedule a query to run every
// 24 hours with a destination table identifier based on the run date.
func createScheduledQuery(projectID, datasetID, query string) error {
	// projectID := "my-project-id"
	// datasetID := "my-dataset-id"
	// query = `SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time,
	// @run_date as intended_run_date, 17 as some_integer`
	ctx := context.Background()
	dtc, err := datatransfer.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("datatransfer.NewClient: %w", err)
	}
	defer dtc.Close()

	paramsMap := map[string]interface{}{}
	paramsMap["destination_table_name_template"] = "my_destination_table_{run_date}"
	paramsMap["write_disposition"] = string(bigquery.WriteTruncate)
	paramsMap["partitioning_field"] = ""
	paramsMap["query"] = query
	params, err := structpb.NewStruct(paramsMap)
	if err != nil {
		return fmt.Errorf("structpb.NewStruct: %w", err)
	}

	req := &datatransferpb.CreateTransferConfigRequest{
		Parent: fmt.Sprintf("projects/%s", projectID),
		TransferConfig: &datatransferpb.TransferConfig{
			Destination: &datatransferpb.TransferConfig_DestinationDatasetId{
				DestinationDatasetId: datasetID,
			},
			DisplayName:  "Your Scheduled Query Name",
			DataSourceId: "scheduled_query",
			Params:       params,
			Schedule:     "every 24 hours",
		},
	}

	_, err = dtc.CreateTransferConfig(ctx, req)
	if err != nil {
		return fmt.Errorf("dtc.CreateTransferConfig: %w", err)
	}

	return nil
}

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to create a scheduled query
public class CreateScheduledQuery {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String datasetId = "MY_DATASET_ID";
    final String query =
        "SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
            + "@run_date as intended_run_date, 17 as some_integer";
    Map<String, Value> params = new HashMap<>();
    params.put("query", Value.newBuilder().setStringValue(query).build());
    params.put(
        "destination_table_name_template",
        Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
    params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
    params.put("partitioning_field", Value.newBuilder().build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Scheduled Query Name")
            .setDataSourceId("scheduled_query")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .build();
    createScheduledQuery(projectId, transferConfig);
  }

  public static void createScheduledQuery(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
      System.out.println("\nScheduled query created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("\nScheduled query was not created." + ex.toString());
    }
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

# The project where the query job runs is the same as the project
# containing the destination dataset.
project_id = "your-project-id"
dataset_id = "your_dataset_id"

# This service account will be used to execute the scheduled queries. Omit
# this request parameter to run the query as the user with the credentials
# associated with this client.
service_account_name = "abcdef-test-sa@abcdef-test.iam.gserviceaccount.com"

# Use standard SQL syntax for the query.
query_string = """
SELECT
  CURRENT_TIMESTAMP() as current_time,
  @run_time as intended_run_time,
  @run_date as intended_run_date,
  17 as some_integer
"""

parent = transfer_client.common_project_path(project_id)

transfer_config = bigquery_datatransfer.TransferConfig(
    destination_dataset_id=dataset_id,
    display_name="Your Scheduled Query Name",
    data_source_id="scheduled_query",
    params={
        "query": query_string,
        "destination_table_name_template": "your_table_{run_date}",
        "write_disposition": "WRITE_TRUNCATE",
        "partitioning_field": "",
    },
    schedule="every 24 hours",
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=parent,
        transfer_config=transfer_config,
        service_account_name=service_account_name,
    )
)

print("Created scheduled query '{}'".format(transfer_config.name))

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器