列出和获取政策

本页提供了代码示例,演示了如何列出和获取政策。

准备工作

完成设置 Policy API

列出政策

以下示例展示了如何使用 Python 列出组织中的政策。

  """Sample script to demonstrate the use of the List method in the Policy API."""

  from collections.abc import Mapping, Sequence
  import json
  import pprint
  import time
  from typing import Any
  import urllib.request
  from absl import app
  from absl import flags
  import google.auth.transport.requests
  from google.oauth2 import service_account

  SCOPES = ['https://www.googleapis.com/auth/cloud-identity.policies']
  BASE_URL = 'https://cloudidentity.googleapis.com/'
  VERSIONED_BASE_URL = f'{BASE_URL}v1/'

  _SA_FILE = flags.DEFINE_string(
      name='sa_file',
      default=None,
      help='Service account credentials file path',
      required=True,
  )

  _ADMIN_EMAIL = flags.DEFINE_string(
      name='admin_email',
      default=None,
      help='Administrator email to call as',
      required=True,
  )

  _PAGE_SIZE = flags.DEFINE_integer(
      name='page_size',
      default=50,
      help='Page size for the List API request',
      required=False,
  )

  # To list all policies, set FILTER to '';
  # To list policies for a specific customer, set FILTER to
  # 'customer == "customers/{obfuscated_target_customer_id}"';
  # To list policies for a specific Application, such as Gmail, set FILTER to
  # 'setting.type.matches("gmail.*")';
  # To list policies for a specific Setting, such as service_status, set FILTER to
  # 'setting.type.matches(".*service_status")'.
  _LIST_FILTER = flags.DEFINE_string(
      name='list_filter',
      default='',
      help='Filter for the List API request',
      required=False,
  )


  def create_delegated_credentials(
      sa_file: str, user_email: str
  ) -> service_account.Credentials:
    """Creates delegated credentials for the user.

    Args:
      sa_file: The service account credentials file path.
      user_email: The administrator email to call as.

    Returns:
      The delegated credentials for the user.
    """
    credentials = service_account.Credentials.from_service_account_file(
        sa_file, scopes=SCOPES
    )
    delegated_credentials = credentials.with_subject(user_email)
    return delegated_credentials


  def build_list_policies_request(
      page_size: int, list_filter: str, page_token: str, access_token: str
  ) -> urllib.request.Request:
    """Builds the request for the List Policies API.

    Args:
      page_size: The page size for the request.
      list_filter: The filter for the request.
      page_token: The page token for the request.
      access_token: The access token for the API.

    Returns:
      The request for the List Policies API.
    """
    list_url = (
        f'{VERSIONED_BASE_URL}policies?page_size={page_size}'
        f'&filter={list_filter}&page_token={page_token}'
    )
    request = urllib.request.Request(list_url)
    request.add_header('Authorization', 'Bearer ' + access_token)
    return request


  def call_list_policies_api(
      request: urllib.request.Request,
  ) -> Mapping[str, Any]:
    """Calls the List Policies API.

    Args:
      request: The request for the List Policies API.

    Returns:
      The response for the List Policies API.
    """
    content = urllib.request.urlopen(request).read()
    return json.loads(content)


  def call_list_policies_api_till_last_page(
      access_token: str, page_size: int, list_filter: str
  ) -> None:
    """Calls the List Policies API until the last page.

    Args:
      access_token: The access token for the API.
      page_size: The page size for the request.
      list_filter: The filter for the request.
    """
    page_token = ''
    # Paginate until the last page.
    while True:
      list_policies_request = build_list_policies_request(
          page_size, list_filter, page_token, access_token
      )
      list_policies_response = call_list_policies_api(list_policies_request)
      print_list_policies_response(list_policies_response)
      if (
          'nextPageToken' not in list_policies_response
          or not list_policies_response['nextPageToken']
      ):
        print('This is the last page.')
        break
      page_token = list_policies_response['nextPageToken']
      time.sleep(1)


  def print_list_policies_response(response: Mapping[str, Any]) -> None:
    pp = pprint.PrettyPrinter(indent=4)

    if 'policies' in response:
      for policy in response['policies']:
        pp.pprint(policy)
    if 'nextPageToken' in response:
      print('Next page token: ' + response['nextPageToken'])


  def main(argv: Sequence[str]):
    if len(argv) > 4:
      raise app.UsageError('Too many command-line arguments.')

    dc = create_delegated_credentials(_SA_FILE.value, _ADMIN_EMAIL.value)
    dc.refresh(google.auth.transport.requests.Request())
    call_list_policies_api_till_last_page(
        dc.token,
        _PAGE_SIZE.value,
        _LIST_FILTER.value,
    )


  if __name__ == '__main__':
    app.run(main)

获取政策

以下示例展示了如何使用 Python 检索特定政策。

  """Sample script to demonstrate the use of the get method in the Policy API."""

  from collections.abc import Sequence
  import json
  import pprint
  import urllib.request
  from absl import app
  from absl import flags
  import google.auth.transport.requests
  from google.oauth2 import service_account

  SCOPES = ['https://www.googleapis.com/auth/cloud-identity.policies']
  BASE_URL = 'https://cloudidentity.googleapis.com/'
  VERSIONED_BASE_URL = f'{BASE_URL}v1/'

  _SA_FILE = flags.DEFINE_string(
      name='sa_file',
      default=None,
      help='Service account credentials file path',
      required=True,
  )

  _ADMIN_EMAIL = flags.DEFINE_string(
      name='admin_email',
      default=None,
      help='Administrator email to call as',
      required=True,
  )

  _POLICY_NAME = flags.DEFINE_string(
      name='policy_name',
      default=None,
      help='Policy name of the policy to get',
      required=True,
  )


  def create_delegated_credentials(
      sa_file: str, user_email: str
  ) -> service_account.Credentials:
    """Creates delegated credentials for the user.

    Args:
      sa_file: The service account credentials file path.
      user_email: The administrator email to call as.

    Returns:
      The delegated credentials for the user.
    """
    credentials = service_account.Credentials.from_service_account_file(
        sa_file, scopes=SCOPES
    )
    delegated_credentials = credentials.with_subject(user_email)
    return delegated_credentials


  def build_get_policy_request(
      policy_name: str, access_token: str
  ) -> urllib.request.Request:
    """Builds the request for the Get Policy API.

    Args:
      policy_name: The policy name to get.
      access_token: The access token for the API.

    Returns:
      The request for the Get Policy API.
    """
    list_url = f'{VERSIONED_BASE_URL}{policy_name}'
    request = urllib.request.Request(list_url)
    request.add_header('Authorization', 'Bearer ' + access_token)
    return request


  def call_get_policy_api(access_token: str, policy_name: str) -> None:
    """Calls the Get Policy API.

    Args:
      access_token: The access token for the API.
      policy_name: The policy name to get.
    """
    request = build_get_policy_request(policy_name, access_token)
    content = urllib.request.urlopen(request).read()
    response = json.loads(content)
    print_get_policy_response(response)


  def print_get_policy_response(response: str) -> None:
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(response)


  def main(argv: Sequence[str]):
    if len(argv) > 3:
      raise app.UsageError('Too many command-line arguments.')

    dc = create_delegated_credentials(_SA_FILE.value, _ADMIN_EMAIL.value)
    dc.refresh(google.auth.transport.requests.Request())
    call_get_policy_api(dc.token, _POLICY_NAME.value)


  if __name__ == '__main__':
    app.run(main)

配额

对于每个 Google Cloud 项目,Cloud Identity Policy API 支持每秒 1 次查询 (QPS)。对于每位客户,Cloud Identity Policy API 总计支持 1 QPS,即使客户创建了多个 Google Cloud 项目也是如此。

不支持配额增加。