Data Catalog の HTTP バッチ リクエストの作成

アプリケーションが実行する各 HTTP 接続には、ある程度のオーバーヘッドが必要です。Data Catalog API リクエストでは、複数の API 呼び出しを 1 つの HTTP リクエストにまとめることができるバッチ処理をサポートしています。多くの小規模リクエストを実行する必要があり、HTTP リクエストのオーバーヘッドを最小限に抑えるたい場合は、HTTP バッチ処理が適しています。バッチ処理を行うとオーバーヘッドが少なくなりますが、バッチ内のリクエストは API 割り当てに関する複数のリクエストとしてカウントされます

Google Cloud での HTTP バッチの使用に関する一般的なドキュメントについては、Google API Python クライアントのドキュメントをご覧ください。

Python で HTTP バッチ リクエストを作成する

Data Catalog でバッチ リクエストを使用してエントリを作成または操作するには、まず catalog.search() または entries.lookup() を使用して、変更するエントリを検索する必要があります。

次に、次の手順に従って、Google Python API を使用して HTTP バッチ リクエストを作成します。

  1. new_batch_http_request() を呼び出すか、BatchHttpRequest() コンストラクタを使用して BatchHttpRequest オブジェクトを作成します。各リクエストに応答して呼び出されるコールバックを渡すことができます。
  2. 実行するリクエストごとに BatchHttpRequest オブジェクトの add() を呼び出します。BatchHttpRequest オブジェクトの作成時にコールバックを渡した場合、各 add() にコールバックに渡されるパラメータが含まれる場合があります。
  3. リクエストを追加したら、BatchHttpRequest オブジェクトの execute() を呼び出して、これらのリクエストを実行します。execute() 関数は、すべてのコールバックが呼び出されるまでブロックされます。

BatchHttpRequest のリクエストは並行して実行される可能性があり、実行順序は保証されません。つまり、同じバッチ内のリクエストは互いに依存してはなりません。たとえば、同じ EntryGroup とそれに属する Entry を同じリクエストで作成することは避けてください。EntryGroup の作成前に Entry の作成が実行される(実行が失敗する)可能性があるためです。

リージョン エンドポイントを使用したバッチ リクエスト

Data Catalog リージョン API エンドポイントで HTTP バッチ リクエストを使用する場合、バッチ内のすべての API リクエストは、同じリージョンに属している必要があります。バッチを実行する際は、正しいリージョン エンドポイントを呼び出す必要があります。たとえば、リソースが us-central1 にある場合、https://us-central1-datacatalog.googleapis.com/batch を呼び出します。

リージョンに依存しない API

リージョンに依存しない API(catalog.lookup()entries.search() など)は互いにグループ化できますが、リージョン依存の API とはグループ化できません。リージョンに依存しない API の場合は、エンドポイント https://datacatalog.googleapis.com/batch を使用します。

この Python アプリケーションのサンプルでは、Data Catalog API を使用して HTTP バッチ リクエストでタグ テンプレートから複数のタグを作成する方法を紹介します。

 
from googleapiclient.discovery import build
from googleapiclient.http import BatchHttpRequest
from oauth2client.service_account import ServiceAccountCredentials
import uuid

#-------------------------------------------------------------#
# 0. Helper and initialization logic
#-------------------------------------------------------------#

# Set the environment configuration.
service_key_file_location = '[SA_PATH]'

project_id = '[MY_PROJECT_ID]'

# Helper container to store results.
class DataContainer:
    def __init__(self):
        self.data = {}

    def callback(self, request_id, response, exception):
        if exception is not None:
            print('request_id: {}, exception: {}'.format(request_id, str(exception)))
            pass
        else:
            print(request_id)
            self.data[request_id] = response

# Helper function to build the Discovery Service config.
def get_service(api_name, api_version, scopes, key_file_location):
    """
    Get a service that communicates to a Google API.

    Args:
        api_name: The name of the API to connect to.
        api_version: The API version to connect to.
        scopes: A list auth scopes to authorize for the application.
        key_file_location: The path to a valid service account JSON key file.

    Returns:
        A service that is connected to the specified API.
    """
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=scopes)

    # Build the service object.
    service = build(api_name, api_version, credentials=credentials)

    return service

# Helper function to create a UUID for each request
def generated_uui():
    return str(uuid.uuid4())

def create_batch_request(callback):
    # For more info on supported regions
    # check: https://cloud.google.com/data-catalog/docs/concepts/regions

    region='us-datacatalog.googleapis.com'

    return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback)

container = DataContainer()

# Scope to set up the Discovery Service config.
scope = 'https://www.googleapis.com/auth/cloud-platform'

# Create service.
service = get_service(
    api_name='datacatalog',
    api_version='v1',
    scopes=[scope],
    key_file_location=service_key_file_location)

# Create the batch request config.
batch = create_batch_request(container.callback)

#-------------------------------------------------------------#
# 1. Start by fetching a list of entries using search call
#-------------------------------------------------------------#

# Create the search request body.
# This example searches for all BigQuery tables in a project.
search_request_body = {
  'query': 'type=TABLE system=BIGQUERY',
  'scope': {'includeProjectIds': [project_id]}
}

# Generated a unique ID for the request.
request_id = generated_uui()

# Add the request to the batch client.
batch.add(service.catalog().search(body=search_request_body), request_id=request_id)

# Execute the batch request.
batch.execute()

# Uncomment to verify the full response from search.
# print(container.data)

response = container.data[request_id]

results = response['results']

first_table = results[0]

# Verify that a first table is present.
print(first_table)

second_table = results[1]

# Verify that a second table is present
print(second_table)

#-------------------------------------------------------------------#
# 2. Send the batch request to attach tags over the entire result set
#-------------------------------------------------------------------#

# Create a new container
container = DataContainer()

# Create a new batch request
batch = create_batch_request(container.callback)

# Set the template name config
template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]'

for result in results:
    # Generated a unique id for request.
    request_id = generated_uui()

    # Add the entry name as the tag parent.
    parent=result['relativeResourceName']

    # Create the tag request body.
    create_tag_request_body = {
      'template': template_name,
       # CHANGE for your template field values.
      'fields': {'etl_score': {'doubleValue': 0.5}}
    }

    # Add requests to the batch client.
    batch.add(service.projects().locations().
              entryGroups().entries().tags().
              create(body=create_tag_request_body,
                     parent=parent),
              request_id=request_id)

# Execute the batch request.

# Since the Batch Client works with regions
# If you receive [HttpError 400 errors]
# 1. Verify the region you used to create the Batch client
# 2. Verify the region where the Entry is located.
# 3. verify the region of the parent tag template used by the tag.

batch.execute()

# Uncomment to verify the full response from tag creation.
# print(container)