アプリケーションが実行する各 HTTP 接続には、ある程度のオーバーヘッドが必要です。Data Catalog API リクエストでは、複数の API 呼び出しを 1 つの HTTP リクエストにまとめることができるバッチ処理をサポートしています。多くの小規模リクエストを実行する必要があり、HTTP リクエストのオーバーヘッドを最小限に抑えるたい場合は、HTTP バッチ処理が適しています。バッチ処理を行うとオーバーヘッドが少なくなりますが、バッチ内のリクエストは API 割り当てに関する複数のリクエストとしてカウントされます。
Google Cloud での HTTP バッチの使用に関する一般的なドキュメントについては、Google API Python クライアントのドキュメントをご覧ください。
Python で HTTP バッチ リクエストを作成する
Data Catalog でバッチ リクエストを使用してエントリを作成または操作するには、まず catalog.search()
または entries.lookup()
を使用して、変更するエントリを検索する必要があります。
次に、次の手順に従って、Google Python API を使用して HTTP バッチ リクエストを作成します。
new_batch_http_request()
を呼び出すか、BatchHttpRequest()
コンストラクタを使用してBatchHttpRequest
オブジェクトを作成します。各リクエストに応答して呼び出されるコールバックを渡すことができます。- 実行するリクエストごとに
BatchHttpRequest
オブジェクトのadd()
を呼び出します。BatchHttpRequest
オブジェクトの作成時にコールバックを渡した場合、各add()
にコールバックに渡されるパラメータが含まれる場合があります。 - リクエストを追加したら、
BatchHttpRequest
オブジェクトのexecute()
を呼び出して、これらのリクエストを実行します。execute()
関数は、すべてのコールバックが呼び出されるまでブロックされます。
BatchHttpRequest
のリクエストは並行して実行される可能性があり、実行順序は保証されません。つまり、同じバッチ内のリクエストは互いに依存してはなりません。たとえば、同じ EntryGroup
とそれに属する Entry
を同じリクエストで作成することは避けてください。EntryGroup
の作成前に Entry
の作成が実行される(実行が失敗する)可能性があるためです。
リージョン エンドポイントを使用したバッチ リクエスト
Data Catalog リージョン API エンドポイントで HTTP バッチ リクエストを使用する場合、バッチ内のすべての API リクエストは、同じリージョンに属している必要があります。バッチを実行する際は、正しいリージョン エンドポイントを呼び出す必要があります。たとえば、リソースが us-central1
にある場合、https://us-central1-datacatalog.googleapis.com/batch
を呼び出します。
リージョンに依存しない API
リージョンに依存しない API(catalog.lookup()
や entries.search()
など)は互いにグループ化できますが、リージョン依存の API とはグループ化できません。リージョンに依存しない API の場合は、エンドポイント https://datacatalog.googleapis.com/batch
を使用します。
例
この Python アプリケーションのサンプルでは、Data Catalog API を使用して HTTP バッチ リクエストでタグ テンプレートから複数のタグを作成する方法を紹介します。
from googleapiclient.discovery import build from googleapiclient.http import BatchHttpRequest from oauth2client.service_account import ServiceAccountCredentials import uuid #-------------------------------------------------------------# # 0. Helper and initialization logic #-------------------------------------------------------------# # Set the environment configuration. service_key_file_location = '[SA_PATH]' project_id = '[MY_PROJECT_ID]' # Helper container to store results. class DataContainer: def __init__(self): self.data = {} def callback(self, request_id, response, exception): if exception is not None: print('request_id: {}, exception: {}'.format(request_id, str(exception))) pass else: print(request_id) self.data[request_id] = response # Helper function to build the Discovery Service config. def get_service(api_name, api_version, scopes, key_file_location): """ Get a service that communicates to a Google API. Args: api_name: The name of the API to connect to. api_version: The API version to connect to. scopes: A list auth scopes to authorize for the application. key_file_location: The path to a valid service account JSON key file. Returns: A service that is connected to the specified API. """ credentials = ServiceAccountCredentials.from_json_keyfile_name( key_file_location, scopes=scopes) # Build the service object. service = build(api_name, api_version, credentials=credentials) return service # Helper function to create a UUID for each request def generated_uui(): return str(uuid.uuid4()) def create_batch_request(callback): # For more info on supported regions # check: https://cloud.google.com/data-catalog/docs/concepts/regions region='us-datacatalog.googleapis.com' return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback) container = DataContainer() # Scope to set up the Discovery Service config. scope = 'https://www.googleapis.com/auth/cloud-platform' # Create service. service = get_service( api_name='datacatalog', api_version='v1', scopes=[scope], key_file_location=service_key_file_location) # Create the batch request config. batch = create_batch_request(container.callback) #-------------------------------------------------------------# # 1. Start by fetching a list of entries using search call #-------------------------------------------------------------# # Create the search request body. # This example searches for all BigQuery tables in a project. search_request_body = { 'query': 'type=TABLE system=BIGQUERY', 'scope': {'includeProjectIds': [project_id]} } # Generated a unique ID for the request. request_id = generated_uui() # Add the request to the batch client. batch.add(service.catalog().search(body=search_request_body), request_id=request_id) # Execute the batch request. batch.execute() # Uncomment to verify the full response from search. # print(container.data) response = container.data[request_id] results = response['results'] first_table = results[0] # Verify that a first table is present. print(first_table) second_table = results[1] # Verify that a second table is present print(second_table) #-------------------------------------------------------------------# # 2. Send the batch request to attach tags over the entire result set #-------------------------------------------------------------------# # Create a new container container = DataContainer() # Create a new batch request batch = create_batch_request(container.callback) # Set the template name config template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]' for result in results: # Generated a unique id for request. request_id = generated_uui() # Add the entry name as the tag parent. parent=result['relativeResourceName'] # Create the tag request body. create_tag_request_body = { 'template': template_name, # CHANGE for your template field values. 'fields': {'etl_score': {'doubleValue': 0.5}} } # Add requests to the batch client. batch.add(service.projects().locations(). entryGroups().entries().tags(). create(body=create_tag_request_body, parent=parent), request_id=request_id) # Execute the batch request. # Since the Batch Client works with regions # If you receive [HttpError 400 errors] # 1. Verify the region you used to create the Batch client # 2. Verify the region where the Entry is located. # 3. verify the region of the parent tag template used by the tag. batch.execute() # Uncomment to verify the full response from tag creation. # print(container)