为 Data Catalog 创建 HTTP 批量请求

应用建立的每个 HTTP 连接都会产生一定的开销。Data Catalog API 请求支持批处理,这样您就可以将多个 API 调用组合为一个 HTTP 请求。如果您要发出很多小请求,并且希望尽可能缩减 HTTP 请求开销,则不妨使用 HTTP 批处理。

如需详细了解如何将 HTTP 批处理与 Google Cloud搭配使用,请参阅 Google API Python 客户端文档

在 Python 中创建 HTTP 批量请求

如需使用批量请求在 Data Catalog 中创建或操纵条目,您首先需要使用 catalog.search()entries.lookup() 搜索要更改的条目。

接下来,请按照以下步骤使用 Google Python API 构建 HTTP 批量请求:

  1. 可通过调用 new_batch_http_request() 或使用 BatchHttpRequest() 构造函数来创建 BatchHttpRequest 对象。您可以传入一个回调,该回调将通过响应每个请求来进行调用。
  2. 针对您要执行的每个请求的 BatchHttpRequest 对象调用 add()。如果您在创建 BatchHttpRequest 对象时传递了回调,则每个 add() 都可能包含可传递给该回调的参数。
  3. 添加请求后,请对 BatchHttpRequest 对象调用 execute() 来执行这些请求。execute() 函数会被阻止,直到调用所有回调为止。

BatchHttpRequest 中的请求可以并行执行,但无法保证执行顺序。这意味着同一批请求不应相互依赖。例如,您不应在同一请求中创建 EntryGroup 以及属于它的 Entry,因为创建 Entry 可能会在创建 EntryGroup 之前执行,从而导致执行失败。

使用地区端点进行批量请求

将 HTTP 批量请求与 Data Catalog 地区 API 端点搭配使用时,一批次的所有 API 请求都必须属于同一地区。执行批处理时,您必须调用正确的地区端点。例如,如果您的资源位于 us-central1,请调用 https://us-central1-datacatalog.googleapis.com/batch

独立于地区的 API

独立于地区的 API(例如 catalog.lookupentries.search)可以彼此分组,但不得与地区相关 API 进行分组。对于独立于区域 API,请使用端点:https://datacatalog.googleapis.com/batch

示例

此示例 Python 应用演示了如何利用 HTTP 批量请求并通过 Data Catalog API 根据标记模板创建多个标记。

from googleapiclient.discovery import build
from googleapiclient.http import BatchHttpRequest
from oauth2client.service_account import ServiceAccountCredentials
import uuid

#-------------------------------------------------------------#
# 0. Helper and initialization logic
#-------------------------------------------------------------#

# Set the environment configuration.
service_key_file_location = '[SA_PATH]'

project_id = '[MY_PROJECT_ID]'

# Helper container to store results.
class DataContainer:
    def __init__(self):
        self.data = {}

    def callback(self, request_id, response, exception):
        if exception is not None:
            print('request_id: {}, exception: {}'.format(request_id, str(exception)))
            pass
        else:
            print(request_id)
            self.data[request_id] = response

# Helper function to build the Discovery Service config.
def get_service(api_name, api_version, scopes, key_file_location):
    """
    Get a service that communicates to a Google API.

    Args:
        api_name: The name of the API to connect to.
        api_version: The API version to connect to.
        scopes: A list auth scopes to authorize for the application.
        key_file_location: The path to a valid service account JSON key file.

    Returns:
        A service that is connected to the specified API.
    """
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=scopes)

    # Build the service object.
    service = build(api_name, api_version, credentials=credentials)

    return service

# Helper function to create a UUID for each request
def generated_uui():
    return str(uuid.uuid4())

def create_batch_request(callback):
    # For more info on supported regions
    # check: https://cloud.google.com/data-catalog/docs/concepts/regions

    region='us-datacatalog.googleapis.com'

    return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback)

container = DataContainer()

# Scope to set up the Discovery Service config.
scope = 'https://www.googleapis.com/auth/cloud-platform'

# Create service.
service = get_service(
    api_name='datacatalog',
    api_version='v1',
    scopes=[scope],
    key_file_location=service_key_file_location)

# Create the batch request config.
batch = create_batch_request(container.callback)

#-------------------------------------------------------------#
# 1. Start by fetching a list of entries using search call
#-------------------------------------------------------------#

# Create the search request body.
# This example searches for all BigQuery tables in a project.
search_request_body = {
  'query': 'type=TABLE system=BIGQUERY',
  'scope': {'includeProjectIds': [project_id]}
}

# Generated a unique ID for the request.
request_id = generated_uui()

# Add the request to the batch client.
batch.add(service.catalog().search(body=search_request_body), request_id=request_id)

# Execute the batch request.
batch.execute()

# Uncomment to verify the full response from search.
# print(container.data)

response = container.data[request_id]

results = response['results']

first_table = results[0]

# Verify that a first table is present.
print(first_table)

second_table = results[1]

# Verify that a second table is present
print(second_table)

#-------------------------------------------------------------------#
# 2. Send the batch request to attach tags over the entire result set
#-------------------------------------------------------------------#

# Create a new container
container = DataContainer()

# Create a new batch request
batch = create_batch_request(container.callback)

# Set the template name config
template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]'

for result in results:
    # Generated a unique id for request.
    request_id = generated_uui()

    # Add the entry name as the tag parent.
    parent=result['relativeResourceName']

    # Create the tag request body.
    create_tag_request_body = {
      'template': template_name,
       # CHANGE for your template field values.
      'fields': {'etl_score': {'doubleValue': 0.5}}
    }

    # Add requests to the batch client.
    batch.add(service.projects().locations().
              entryGroups().entries().tags().
              create(body=create_tag_request_body,
                     parent=parent),
              request_id=request_id)

# Execute the batch request.

# Since the Batch Client works with regions
# If you receive [HttpError 400 errors]
# 1. Verify the region you used to create the Batch client
# 2. Verify the region where the Entry is located.
# 3. verify the region of the parent tag template used by the tag.

batch.execute()

# Uncomment to verify the full response from tag creation.
# print(container)