Como criar solicitações HTTP em lote para o Data Catalog

Cada conexão HTTP feita pelo aplicativo requer uma certa quantidade de sobrecarga. A API Data Catalog solicita o agrupamento de lotes, o que permite combinar várias chamadas da API em uma única solicitação HTTP. É recomendável usar o agrupamento de HTTP em caso de muitas solicitações pequenas e querer minimizar a sobrecarga das solicitações HTTP. Observe que os lotes reduzem a sobrecarga, mas as solicitações em um lote ainda contam como várias solicitações para fins de cota da API.

Para obter uma documentação genérica sobre o uso do lote HTTP com o Google Cloud, consulte a documentação do cliente Python da API do Google.

Como criar solicitações HTTP em lote no Python

Para usar solicitações em lote para criar ou manipular entradas no Data Catalog, primeiro você precisa pesquisar as entradas que quer alterar usando catalog.search() ou entries.lookup().

Em seguida, siga estas etapas para criar uma solicitação HTTP em lote usando a API Google Python:

  1. Crie um objeto BatchHttpRequest chamando new_batch_http_request() ou com o construtor BatchHttpRequest(). Você pode transmitir um retorno de chamada, que será chamado em resposta a cada solicitação.
  2. Chame add() no objeto BatchHttpRequest de cada solicitação que você quer executar. Se você transmitiu um callback ao criar seu objeto BatchHttpRequest, cada add() poderá incluir parâmetros a serem transmitidos para o callback.
  3. Depois de adicionar as solicitações, chame execute() no objeto BatchHttpRequest para executá-las. A função execute() bloqueia até que todos os callbacks sejam chamados.

As solicitações em um BatchHttpRequest podem ser executadas em paralelo, e não há garantias para a ordem de execução. Isso significa que as solicitações no mesmo lote não podem ser dependentes entre si. Por exemplo, não crie um EntryGroup e um Entry pertencentes a ele na mesma solicitação, já que a criação de Entry pode ser executada antes da criação do EntryGroup, fazendo com que a execução falhe.

Solicitações em lote com endpoints regionais

Ao usar solicitações em lote HTTP com endpoints da API regional do Data Catalog, todas as solicitações de API em um lote precisam pertencer à mesma região. Ao executar o lote, você precisa chamar o endpoint regional correto. Por exemplo, se seus recursos estiverem em us-central1, chame https://us-central1-datacatalog.googleapis.com/batch.

APIs independentes de região

As APIs independentes de região (como catalog.lookup() e entries.search() podem ser agrupadas umas com as outras), mas não podem ser agrupadas com APIs de acordo com a região. Para APIs independentes de região, use o endpoint: https://datacatalog.googleapis.com/batch.

Exemplo

Com esta amostra de aplicativo em Python, demonstramos como usar uma solicitação em lote HTTP para criar várias tags a partir de um modelo de tag usando a API do Data Catalog.

 
from googleapiclient.discovery import build
from googleapiclient.http import BatchHttpRequest
from oauth2client.service_account import ServiceAccountCredentials
import uuid

#-------------------------------------------------------------#
# 0. Helper and initialization logic
#-------------------------------------------------------------#

# Set the environment configuration.
service_key_file_location = '[SA_PATH]'

project_id = '[MY_PROJECT_ID]'

# Helper container to store results.
class DataContainer:
    def __init__(self):
        self.data = {}

    def callback(self, request_id, response, exception):
        if exception is not None:
            print('request_id: {}, exception: {}'.format(request_id, str(exception)))
            pass
        else:
            print(request_id)
            self.data[request_id] = response

# Helper function to build the Discovery Service config.
def get_service(api_name, api_version, scopes, key_file_location):
    """
    Get a service that communicates to a Google API.

    Args:
        api_name: The name of the API to connect to.
        api_version: The API version to connect to.
        scopes: A list auth scopes to authorize for the application.
        key_file_location: The path to a valid service account JSON key file.

    Returns:
        A service that is connected to the specified API.
    """
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=scopes)

    # Build the service object.
    service = build(api_name, api_version, credentials=credentials)

    return service

# Helper function to create a UUID for each request
def generated_uui():
    return str(uuid.uuid4())

def create_batch_request(callback):
    # For more info on supported regions
    # check: https://cloud.google.com/data-catalog/docs/concepts/regions

    region='us-datacatalog.googleapis.com'

    return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback)

container = DataContainer()

# Scope to set up the Discovery Service config.
scope = 'https://www.googleapis.com/auth/cloud-platform'

# Create service.
service = get_service(
    api_name='datacatalog',
    api_version='v1',
    scopes=[scope],
    key_file_location=service_key_file_location)

# Create the batch request config.
batch = create_batch_request(container.callback)

#-------------------------------------------------------------#
# 1. Start by fetching a list of entries using search call
#-------------------------------------------------------------#

# Create the search request body.
# This example searches for all BigQuery tables in a project.
search_request_body = {
  'query': 'type=TABLE system=BIGQUERY',
  'scope': {'includeProjectIds': [project_id]}
}

# Generated a unique ID for the request.
request_id = generated_uui()

# Add the request to the batch client.
batch.add(service.catalog().search(body=search_request_body), request_id=request_id)

# Execute the batch request.
batch.execute()

# Uncomment to verify the full response from search.
# print(container.data)

response = container.data[request_id]

results = response['results']

first_table = results[0]

# Verify that a first table is present.
print(first_table)

second_table = results[1]

# Verify that a second table is present
print(second_table)

#-------------------------------------------------------------------#
# 2. Send the batch request to attach tags over the entire result set
#-------------------------------------------------------------------#

# Create a new container
container = DataContainer()

# Create a new batch request
batch = create_batch_request(container.callback)

# Set the template name config
template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]'

for result in results:
    # Generated a unique id for request.
    request_id = generated_uui()

    # Add the entry name as the tag parent.
    parent=result['relativeResourceName']

    # Create the tag request body.
    create_tag_request_body = {
      'template': template_name,
       # CHANGE for your template field values.
      'fields': {'etl_score': {'doubleValue': 0.5}}
    }

    # Add requests to the batch client.
    batch.add(service.projects().locations().
              entryGroups().entries().tags().
              create(body=create_tag_request_body,
                     parent=parent),
              request_id=request_id)

# Execute the batch request.

# Since the Batch Client works with regions
# If you receive [HttpError 400 errors]
# 1. Verify the region you used to create the Batch client
# 2. Verify the region where the Entry is located.
# 3. verify the region of the parent tag template used by the tag.

batch.execute()

# Uncomment to verify the full response from tag creation.
# print(container)