为 Data Catalog 创建 HTTP 批量请求

应用建立的每个 HTTP 连接都会产生一定的开销。Data Catalog API 请求支持批处理,这样您就可以将多个 API 调用组合为一个 HTTP 请求。如果您要发出很多小请求,并且希望尽可能缩减 HTTP 请求开销,则不妨使用 HTTP 批处理。请注意,批处理可缩减开销,但是在计算 API 配额时,批处理内的请求仍会计为多个请求

如需了解如何将 HTTP 批处理与 Google Cloud 搭配使用,请参阅 Google API Python 客户端文档

在 Python 中创建 HTTP 批量请求

要使用批量请求在 Data Catalog 中创建或操纵条目,您首先需要使用 catalog.search()entries.lookup() 搜索要更改的条目。

接下来,请按照以下步骤使用 Google Python API 构建 HTTP 批量请求:

  1. 可通过调用 new_batch_http_request() 或使用 BatchHttpRequest() 构造函数来创建 BatchHttpRequest 对象。您可以传入一个回调,该回调将通过响应每个请求来进行调用。
  2. 针对您要执行的每个请求的 BatchHttpRequest 对象调用 add()。如果您在创建 BatchHttpRequest 对象时传递了回调,则每个 add() 都可能包含可传递给该回调的参数。
  3. 添加请求后,请对 BatchHttpRequest 对象调用 execute() 来执行这些请求。execute() 函数会被阻止,直到调用所有回调为止。

BatchHttpRequest 中的请求可以并行执行,但无法保证执行顺序。这意味着同一批请求不应相互依赖。例如,您不应在同一请求中创建 EntryGroup 以及属于它的 Entry,因为创建 Entry 可能会在创建 EntryGroup 之前执行(从而导致执行失败)。

使用地区端点进行批量请求

将 HTTP 批量请求与 Data Catalog 地区 API 端点搭配使用时,一批次的所有 API 请求都必须属于同一地区。执行批处理时,您必须调用正确的地区端点。例如,如果您的资源位于 us-central1,请调用 https://us-central1-datacatalog.googleapis.com/batch

独立于地区的 API

独立于地区的 API(例如 catalog.lookup()entries.search())可以彼此分组,但不得与地区相关 API 进行分组。对于独立于地区的 API,请使用端点:https://datacatalog.googleapis.com/batch

示例

此示例 Python 应用演示了如何利用 HTTP 批量请求并通过 Data Catalog API 根据标记模板创建多个标记。

 
from googleapiclient.discovery import build
from googleapiclient.http import BatchHttpRequest
from oauth2client.service_account import ServiceAccountCredentials
import uuid

#-------------------------------------------------------------#
# 0. Helper and initialization logic
#-------------------------------------------------------------#

# Set the environment configuration.
service_key_file_location = '[SA_PATH]'

project_id = '[MY_PROJECT_ID]'

# Helper container to store results.
class DataContainer:
    def __init__(self):
        self.data = {}

    def callback(self, request_id, response, exception):
        if exception is not None:
            print('request_id: {}, exception: {}'.format(request_id, str(exception)))
            pass
        else:
            print(request_id)
            self.data[request_id] = response

# Helper function to build the Discovery Service config.
def get_service(api_name, api_version, scopes, key_file_location):
    """
    Get a service that communicates to a Google API.

    Args:
        api_name: The name of the API to connect to.
        api_version: The API version to connect to.
        scopes: A list auth scopes to authorize for the application.
        key_file_location: The path to a valid service account JSON key file.

    Returns:
        A service that is connected to the specified API.
    """
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=scopes)

    # Build the service object.
    service = build(api_name, api_version, credentials=credentials)

    return service

# Helper function to create a UUID for each request
def generated_uui():
    return str(uuid.uuid4())

def create_batch_request(callback):
    # For more info on supported regions
    # check: https://cloud.google.com/data-catalog/docs/concepts/regions

    region='us-datacatalog.googleapis.com'

    return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback)

container = DataContainer()

# Scope to set up the Discovery Service config.
scope = 'https://www.googleapis.com/auth/cloud-platform'

# Create service.
service = get_service(
    api_name='datacatalog',
    api_version='v1',
    scopes=[scope],
    key_file_location=service_key_file_location)

# Create the batch request config.
batch = create_batch_request(container.callback)

#-------------------------------------------------------------#
# 1. Start by fetching a list of entries using search call
#-------------------------------------------------------------#

# Create the search request body.
# This example searches for all BigQuery tables in a project.
search_request_body = {
  'query': 'type=TABLE system=BIGQUERY',
  'scope': {'includeProjectIds': [project_id]}
}

# Generated a unique ID for the request.
request_id = generated_uui()

# Add the request to the batch client.
batch.add(service.catalog().search(body=search_request_body), request_id=request_id)

# Execute the batch request.
batch.execute()

# Uncomment to verify the full response from search.
# print(container.data)

response = container.data[request_id]

results = response['results']

first_table = results[0]

# Verify that a first table is present.
print(first_table)

second_table = results[1]

# Verify that a second table is present
print(second_table)

#-------------------------------------------------------------------#
# 2. Send the batch request to attach tags over the entire result set
#-------------------------------------------------------------------#

# Create a new container
container = DataContainer()

# Create a new batch request
batch = create_batch_request(container.callback)

# Set the template name config
template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]'

for result in results:
    # Generated a unique id for request.
    request_id = generated_uui()

    # Add the entry name as the tag parent.
    parent=result['relativeResourceName']

    # Create the tag request body.
    create_tag_request_body = {
      'template': template_name,
       # CHANGE for your template field values.
      'fields': {'etl_score': {'doubleValue': 0.5}}
    }

    # Add requests to the batch client.
    batch.add(service.projects().locations().
              entryGroups().entries().tags().
              create(body=create_tag_request_body,
                     parent=parent),
              request_id=request_id)

# Execute the batch request.

# Since the Batch Client works with regions
# If you receive [HttpError 400 errors]
# 1. Verify the region you used to create the Batch client
# 2. Verify the region where the Entry is located.
# 3. verify the region of the parent tag template used by the tag.

batch.execute()

# Uncomment to verify the full response from tag creation.
# print(container)