Crea solicitudes por lotes HTTP para Data Catalog

Cada conexión HTTP que realiza tu aplicación requiere una cierta cantidad de sobrecarga. Las solicitudes a la API de Data Catalog admiten lotes, lo que te permite combinar varias llamadas a la API en una sola solicitud HTTP. Te recomendamos usar procesamiento por lotes HTTP si tiene muchas solicitudes pequeñas y quiere minimizar la sobrecarga de solicitudes HTTP. Ten en cuenta que el procesamiento por lotes reduce la sobrecarga, pero las solicitudes en un lote se siguen contando como varias solicitudes para fines de cuota de API.

Para ver la documentación genérica sobre el uso de lotes de HTTP con Google Cloud, consulta la documentación de cliente de las API de Google para Python.

Crea solicitudes por lotes HTTP en Python

Para usar solicitudes por lotes a fin de crear o manipular entradas en Data Catalog, primero debes buscar las entradas que deseas cambiar con catalog.search() o entries.lookup().

A continuación, sigue estos pasos para compilar una solicitud por lotes de HTTP con la API de Python de Google:

  1. Crea un objeto BatchHttpRequest mediante un llamado a new_batch_http_request() o con el constructor BatchHttpRequest(). Puedes pasar una devolución de llamada, que llamará en respuesta a cada solicitud.
  2. Llama a add() en el objeto BatchHttpRequest para cada solicitud que desees ejecutar. Si pasaste una devolución de llamada cuando creas tu objeto BatchHttpRequest, cada add() puede incluir parámetros que se pasarán a la devolución de llamada.
  3. Después de agregar las solicitudes, llama a execute() en el objeto BatchHttpRequest para ejecutarlas. La función execute() bloquea hasta que se llame a todas las devoluciones de llamada.

Las solicitudes en una BatchHttpRequest se pueden ejecutar en paralelo, y no hay garantías para el orden de ejecución. Esto significa que las solicitudes en el mismo lote no deberían depender de sí mismas. Por ejemplo, no debes crear un EntryGroup y un Entry que pertenezcan a la misma solicitud, ya que la creación de Entry puede ejecutarse antes de la creación del EntryGroup (lo que provoca que la ejecución falle).

Solicitudes por lotes con extremos regionales

Cuando uses solicitudes por lotes HTTP con extremos de la API regional de Data Catalog, todas las solicitudes a la API en un lote deben pertenecer a la misma región. Cuando ejecutas el lote, debes llamar al extremo regional correcto. Por ejemplo, si tus recursos están en us-central1, llama a https://us-central1-datacatalog.googleapis.com/batch.

API independientes de la región

Las API independientes de la región (como catalog.lookup() y entries.search() se pueden agrupar entre sí, pero no deben agruparse con API dependientes de la región). Para las API independientes de la región, usa el extremo: https://datacatalog.googleapis.com/batch.

Ejemplo

Esta aplicación de Python de muestra indica cómo usar una solicitud por lotes de HTTP para crear varias etiquetas a partir de una plantilla de etiqueta mediante la API de Data Catalog.

 
from googleapiclient.discovery import build
from googleapiclient.http import BatchHttpRequest
from oauth2client.service_account import ServiceAccountCredentials
import uuid

#-------------------------------------------------------------#
# 0. Helper and initialization logic
#-------------------------------------------------------------#

# Set the environment configuration.
service_key_file_location = '[SA_PATH]'

project_id = '[MY_PROJECT_ID]'

# Helper container to store results.
class DataContainer:
    def __init__(self):
        self.data = {}

    def callback(self, request_id, response, exception):
        if exception is not None:
            print('request_id: {}, exception: {}'.format(request_id, str(exception)))
            pass
        else:
            print(request_id)
            self.data[request_id] = response


# Helper function to build the Discovery Service config.
def get_service(api_name, api_version, scopes, key_file_location):
    """
    Get a service that communicates to a Google API.

    Args:
        api_name: The name of the API to connect to.
        api_version: The API version to connect to.
        scopes: A list auth scopes to authorize for the application.
        key_file_location: The path to a valid service account JSON key file.

    Returns:
        A service that is connected to the specified API.
    """
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=scopes)

    # Build the service object.
    service = build(api_name, api_version, credentials=credentials)

    return service

# Helper function to create a UUID for each request
def generated_uui():
    return str(uuid.uuid4())

def create_batch_request(callback):
    # For more info on supported regions
    # check: https://cloud.google.com/data-catalog/docs/concepts/regions

    region='us-datacatalog.googleapis.com'

    return BatchHttpRequest(batch_uri='https://{}/batch'.format(region), callback=callback)

container = DataContainer()

# Scope to set up the Discovery Service config.
scope = 'https://www.googleapis.com/auth/cloud-platform'

# Create service.
service = get_service(
    api_name='datacatalog',
    api_version='v1',
    scopes=[scope],
    key_file_location=service_key_file_location)

# Create the batch request config.
batch = create_batch_request(container.callback)

#-------------------------------------------------------------#
# 1. Start by fetching a list of entries using search call
#-------------------------------------------------------------#

# Create the search request body.
# This example searches for all BigQuery tables in a project.
search_request_body = {
  'query': 'type=TABLE system=BIGQUERY',
  'scope': {'includeProjectIds': [project_id]}
}

# Generated a unique ID for the request.
request_id = generated_uui()

# Add the request to the batch client.
batch.add(service.catalog().search(body=search_request_body), request_id=request_id)

# Execute the batch request.
batch.execute()

# Uncomment to verify the full response from search.
# print(container.data)

response = container.data[request_id]

results = response['results']

first_table = results[0]

# Verify that a first table is present.
print(first_table)

second_table = results[1]

# Verify that a second table is present
print(second_table)

#-------------------------------------------------------------------#
# 2. Send the batch request to attach tags over the entire result set
#-------------------------------------------------------------------#

# Create a new container
container = DataContainer()

# Create a new batch request
batch = create_batch_request(container.callback)

# Set the template name config
template_name = 'projects/[MY_PROJECT_ID]/locations/[MY-LOCATION]/tagTemplates/[MY-TEMPLATE-NAME]'

for result in results:
    # Generated a unique id for request.
    request_id = generated_uui()

    # Add the entry name as the tag parent.
    parent=result['relativeResourceName']

    # Create the tag request body.
    create_tag_request_body = {
      'template': template_name,
       # CHANGE for your template field values.
      'fields': {'etl_score': {'doubleValue': 0.5}}
    }

    # Add requests to the batch client.
    batch.add(service.projects().locations().
              entryGroups().entries().tags().
              create(body=create_tag_request_body,
                     parent=parent),
              request_id=request_id)

# Execute the batch request.

# Since the Batch Client works with regions
# If you receive [HttpError 400 errors]
# 1. Verify the region you used to create the Batch client
# 2. Verify the region where the Entry is located.
# 3. verify the region of the parent tag template used by the tag.

batch.execute()

# Uncomment to verify the full response from tag creation.
# print(container)