应用建立的每个 HTTP 连接都会产生一定的开销。Data Catalog API 请求支持批处理,这样您就可以将多个 API 调用组合为一个 HTTP 请求。如果您要发出很多小请求,并且希望尽可能缩减 HTTP 请求开销,则不妨使用 HTTP 批处理。请注意,批处理可缩减开销,但是在计算 API 配额时,批处理内的请求仍会计为多个请求。
如需了解如何将 HTTP 批处理与 Google Cloud 搭配使用,请参阅 Google API Python 客户端文档。
在 Python 中创建 HTTP 批量请求
要使用批量请求在 Data Catalog 中创建或操纵条目,您首先需要使用 catalog.search()
或 entries.lookup()
搜索要更改的条目。
接下来,请按照以下步骤使用 Google Python API 构建 HTTP 批量请求:
- 可通过调用
new_batch_http_request()
或使用BatchHttpRequest()
构造函数来创建BatchHttpRequest
对象。您可以传入一个回调,该回调将通过响应每个请求来进行调用。 - 针对您要执行的每个请求的
BatchHttpRequest
对象调用add()
。如果您在创建BatchHttpRequest
对象时传递了回调,则每个add()
都可能包含可传递给该回调的参数。 - 添加请求后,请对
BatchHttpRequest
对象调用execute()
来执行这些请求。execute()
函数会被阻止,直到调用所有回调为止。
BatchHttpRequest
中的请求可以并行执行,但无法保证执行顺序。这意味着同一批请求不应相互依赖。例如,您不应在同一请求中创建 EntryGroup
以及属于它的 Entry
,因为创建 Entry
可能会在创建 EntryGroup
之前执行(从而导致执行失败)。
使用地区端点进行批量请求
将 HTTP 批量请求与 Data Catalog 地区 API 端点搭配使用时,一批次的所有 API 请求都必须属于同一地区。执行批处理时,您必须调用正确的地区端点。例如,如果您的资源位于 us-central1
,请调用 https://us-central1-datacatalog.googleapis.com/batch
。
独立于地区的 API
独立于地区的 API(例如 catalog.lookup()
和 entries.search()
)可以彼此分组,但不得与地区相关 API 进行分组。对于独立于地区的 API,请使用端点:https://datacatalog.googleapis.com/batch
。
示例
此示例 Python 应用演示了如何利用 HTTP 批量请求并通过 Data Catalog API 根据标记模板创建多个标记。
from googleapiclient.discovery import build from googleapiclient.http import BatchHttpRequest from oauth2client.service_account import ServiceAccountCredentials import uuid #-------------------------------------------------------------# # 0. Helper and initialization logic #-------------------------------------------------------------# # Set the environment configuration. service_key_file_location = '[SA_PATH]' project_id = '[MY_PROJECT_ID]' # Helper container to store results. class DataContainer: def __init__(self): self.data = {} def callback(self, request_id, response, exception): if exception is not None: print('request_id: {}, exception: {}'.format(request_id, str(exception))) pass else: print(request_id) self.data[request_id] = response # Helper function to build the Discovery Service config. def get_service(api_name, api_version, scopes, key_file_location): """ Get a service that communicates to a Google API. Args: api_name: The name of the API to connect to. api_version: The API version to connect to. scopes: A list auth scopes to authorize for the application. key_file_location: The path to a valid service account JSON key file. Returns: A service that is connected to the specified API. """ credentials = ServiceAccountCredentials.from_json_keyfile_name( key_file_location, scopes=scopes) # Build the service object. service = build(api_name, api_version, credentials=credentials) return service # Helper function to create a UUID for each request def generated_uui(): return str(uuid.uuid4()) def create_batch_request(callback): # For more info on supported regions # check: https://cloud.google.com/data-catalog/docs/concepts/regions region='us-datacatalog.googleapis.com' return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback) container = DataContainer() # Scope to set up the Discovery Service config. scope = 'https://www.googleapis.com/auth/cloud-platform' # Create service. service = get_service( api_name='datacatalog', api_version='v1', scopes=[scope], key_file_location=service_key_file_location) # Create the batch request config. batch = create_batch_request(container.callback) #-------------------------------------------------------------# # 1. Start by fetching a list of entries using search call #-------------------------------------------------------------# # Create the search request body. # This example searches for all BigQuery tables in a project. search_request_body = { 'query': 'type=TABLE system=BIGQUERY', 'scope': {'includeProjectIds': [project_id]} } # Generated a unique ID for the request. request_id = generated_uui() # Add the request to the batch client. batch.add(service.catalog().search(body=search_request_body), request_id=request_id) # Execute the batch request. batch.execute() # Uncomment to verify the full response from search. # print(container.data) response = container.data[request_id] results = response['results'] first_table = results[0] # Verify that a first table is present. print(first_table) second_table = results[1] # Verify that a second table is present print(second_table) #-------------------------------------------------------------------# # 2. Send the batch request to attach tags over the entire result set #-------------------------------------------------------------------# # Create a new container container = DataContainer() # Create a new batch request batch = create_batch_request(container.callback) # Set the template name config template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]' for result in results: # Generated a unique id for request. request_id = generated_uui() # Add the entry name as the tag parent. parent=result['relativeResourceName'] # Create the tag request body. create_tag_request_body = { 'template': template_name, # CHANGE for your template field values. 'fields': {'etl_score': {'doubleValue': 0.5}} } # Add requests to the batch client. batch.add(service.projects().locations(). entryGroups().entries().tags(). create(body=create_tag_request_body, parent=parent), request_id=request_id) # Execute the batch request. # Since the Batch Client works with regions # If you receive [HttpError 400 errors] # 1. Verify the region you used to create the Batch client # 2. Verify the region where the Entry is located. # 3. verify the region of the parent tag template used by the tag. batch.execute() # Uncomment to verify the full response from tag creation. # print(container)