Airflow 연결 관리

이 페이지에서는 Airflow 연결을 사용하는 방법을 설명합니다.

Airflow 연결을 사용하면 Cloud Composer 환경에서 Google Cloud 프로젝트의 리소스에 액세스 할 수 있습니다. 로그인 및 호스트 이름과 같은 정보를 저장할 Airflow 연결 ID를 만들면 워크플로에서 연결 ID를 참조합니다. Airflow 연결은 워크플로에 사용되는 보안 비밀과 사용자 인증 정보를 저장하는 데 권장되는 방법입니다.

Airflow 연결을 사용하면 Cloud Composer 환경이 Google Cloud 프로젝트, 다른 클라우드 제공 업체 또는 제3자 서비스와 같은 다른 API와 통신하는 데 필요한 연결 정보를 저장할 수 있습니다.

Airflow 연결은 사용자 인증 정보, 호스트 이름, 추가 API 매개변수와 같은 세부정보를 저장할 수 있습니다. 각 연결에는 사전 설정 세부정보를 참조하기 위해 워크플로 작업에서 사용할 수 있는 연결된 ID가 있습니다. Airflow 연결을 사용하여 워크플로 작업에 대한 보안 비밀 및 사용자 인증 정보를 저장하는 것이 좋습니다.

Google Cloud 연결 유형으로 Google Cloud 통합을 사용 설정합니다.

퍼넷 키 및 보안 연결

새 환경을 만들면 Cloud Composer가 환경의 고유 영구 퍼넷 키를 생성하고 기본적으로 연결 엑스트라를 보호합니다. Airflow 구성에서 fernet_key를 볼 수 있습니다. 연결 보안 방법에 대한 자세한 내용은 연결 보안을 참조하세요.

기본 연결 사용

기본적으로 Cloud Composer는 다음 Google Cloud Platform용 Airflow 연결을 구성합니다.

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

기본 연결 ID를 사용하여 사용자의 DAG로 부터 이러한 연결을 사용할 수 있습니다. 다음 예에서는 기본 연결과 함께 BigQueryOperator를 사용합니다.

Airflow 2

task_default = bigquery.BigQueryInsertJobOperator(
    task_id='task_default_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    sql='SELECT 1', use_legacy_sql=False)

연산자를 만들 때 연결 ID를 명시적으로 지정할 수도 있습니다.

Airflow 2

# Composer creates a 'google_cloud_default' connection by default.
task_explicit = bigquery.BigQueryInsertJobOperator(
    task_id='task_explicit_connection',
    gcp_conn_id='google_cloud_default',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

다른 프로젝트의 리소스에 액세스하기

Cloud Composer 환경에서 Google Cloud 프로젝트의 리소스에 액세스할 수 있도록 허용하는 방법은 기본 연결을 사용하고, 환경과 연결된 서비스 계정에 적절한 ID 및 액세스 관리 권한을 할당하는 것입니다.

다음 섹션에서는 프로젝트 ID your-composer-project에 배포된 Cloud Composer 환경의 your-storage-project에서 Cloud Storage 버킷에 대한 읽기 및 쓰기를 허용하는 방법의 예시를 보여줍니다.

환경과 연결된 서비스 계정 결정

Console

  1. Cloud Console에서 환경 페이지를 엽니다.

    환경 페이지 열기

  2. 이름 열에서 환경 이름을 클릭하여 환경 세부정보 페이지를 엽니다.
  3. 서비스 계정을 확인합니다. 이 값은 service-account-name@your-composer-project.iam.gserviceaccount.com과 같은 이메일 주소입니다.

gcloud

다음 명령어를 입력하고 VARIABLES를 적절한 값으로 바꿉니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

결과에서 service-account-name@your-composer-project.iam.gserviceaccount.com과 같은 주소가 표시됩니다.

서비스 계정에 적절한 IAM 권한 부여

your-storage-project에서 Cloud Storage 버킷에 대한 읽기 및 쓰기를 허용하려면 Cloud Composer 환경과 연결된 서비스 계정에 roles/storage.objectAdmin 역할을 부여합니다.

Console

  1. 스토리지 프로젝트의 IAM 및 관리자 페이지

    IAM 및 관리자 페이지 열기

  2. 멤버 추가를 클릭합니다.

  3. 멤버 추가 대화상자에서 Cloud Composer 환경과 연결된 서비스 계정의 전체 이메일 주소를 지정합니다.

  4. 역할 선택 드롭다운에서 적절한 권한을 선택합니다. 이 예시에서는 스토리지 > 객체 관리자 역할을 선택합니다.

  5. 추가를 클릭합니다.

gcloud

gcloud projects add-iam-policy-binding 명령어를 사용하여 프로젝트 수준의 IAM 권한을 추가합니다. VARIABLES를 적절한 값으로 바꿉니다.

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

적절한 권한이 부여되면 your-composer-project 프로젝트의 리소스에 액세스하는 데 사용하는 것과 동일한 기본 Airflow 연결로 your-storage-project 프로젝트의 리소스에 액세스할 수 있습니다.

새 Airflow 연결 만들기

시작하기 전에

Cloud Composer 환경과 연결된 서비스 계정에 적절한 IAM 권한을 부여하고 DAG 정의의 기본 연결을 사용합니다. 이렇게 할 수 없는 경우에는 이 섹션의 단계를 따르세요.

다른 프로젝트에 대한 연결 만들기

다음 단계에서는 프로젝트 ID your-composer-project에 배포된 Cloud Composer 환경의 your-storage-project에서 Cloud Storage 버킷에 대한 읽기 및 쓰기를 허용하는 방법의 예시를 보여줍니다.

  1. your-storage-project에서 서비스 계정을 만들고 JSON 키를 다운로드합니다.

    1. Cloud Console에서 서비스 계정 페이지를 엽니다.

      서비스 계정 페이지 열기

    2. 프로젝트 선택을 클릭합니다.

    3. 프로젝트를 선택하고 열기를 클릭합니다.

    4. 서비스 계정 만들기를 클릭합니다.

    5. 서비스 계정 이름을 입력하고 서비스 계정에 부여할 역할(예시: 스토리지 > 객체 관리자)을 선택합니다.

    6. 새 비공개 키 제공을 선택하고 저장을 클릭합니다.

    7. 일반 텍스트 편집기로 JSON 파일을 엽니다. 콘텐츠가 다음과 같이 표시됩니다.

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. 새 연결 만들기:

    Airflow UI

    1. Cloud Composer 환경의 Airflow 웹 인터페이스에 액세스합니다.

    2. Airflow 웹 인터페이스에서 관리 > 연결 페이지를 엽니다.

      Airflow 스크린샷 관리자 연결 메뉴를 엽니다.

    3. 새 연결 양식을 열려면 만들기 탭을 클릭합니다.

      Airflow 스크린샷 만들기 탭을 클릭합니다.

    4. 새 연결 만들기:

      1. 연결 ID를 선택하려면 Conn ID 필드(예: my_gcp_connection)를 작성합니다. DAG 정의 파일에서 이 ID를 사용합니다.
      2. 연결 유형 필드에서 Google Cloud Platform 옵션을 선택합니다.
      3. 서비스 계정이 속한 프로젝트에 해당하는 프로젝트 ID 값을 입력합니다.
      4. 다음 중 하나를 수행합니다.

        1. 다운로드한 서비스 계정 JSON 키 파일을 환경의 Cloud Storage 버킷의 data/ 디렉터리에 복사합니다. 그런 다음 키 파일 경로에 Airflow 작업자의 로컬 파일 경로를 JSON 키 파일의 위치(예시: /home/airflow/gcs/data/keyfile.json)로 입력합니다.
        2. 키 파일 JSON에서 다운로드한 서비스 계정 JSON 키 파일의 콘텐츠를 복사합니다.

        CLI 또는 웹 UI를 통해 Airflow 연결에 액세스할 수 있는 사용자는 keyfile_dict에 저장된 사용자 인증 정보를 읽을 수 있습니다. 이러한 사용자 인증 정보를 보호하려면 키 파일 경로를 사용하고 Cloud Storage ACL을 사용하여 키 파일에 대한 액세스를 제한하는 것이 좋습니다.

      5. 범위 필드에 값을 입력합니다. https://www.googleapis.com/auth/cloud-platform을 범위로 사용하고 서비스 계정에 대한 IAM 권한을 사용하여 Google Cloud 리소스로 액세스를 제한하는 것이 좋습니다.

      6. 연결을 만들려면 저장을 클릭합니다.

        Airflow 스크린샷 만들기 탭을 클릭합니다.

    gcloud

    다음 명령어를 입력합니다.

    Airflow 1.10 CLI

    gcloud composer environments run \
      ENVIRONMENT_NAME \
      --location LOCATION \
      connections -- --add \
      --conn_id=CONNECTION_ID \
      --conn_type=google_cloud_platform \
      --conn_extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
      "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    Airflow 2.0 CLI

    gcloud beta composer environments run \
    ENVIRONMENT_NAME \
      --location LOCATION \
      connections add -- \
      CONNECTION_ID \
      --conn-type=google_cloud_platform \
      --conn-extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
      "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    각 매개변수는 다음과 같습니다.

    • ENVIRONMENT_NAME은 환경 이름입니다.
    • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
    • CONNECTION_ID는 연결을 위한 식별자입니다. 소문자를 사용하고 밑줄로 단어를 구분합니다.
    • CMD_ARGS는 다음과 같습니다.
      • project는 프로젝트 ID입니다. extra__google_cloud_platform__project만 필요합니다.
      • key_path는 JSON 키 파일에 대한 Airflow 작업자의 로컬 파일 경로입니다(예: /home/airflow/gcs/data/keyfile.json). 제공된 경우 scope도 필요합니다. key_path 또는 keyfile_dict 중 하나만 사용합니다.
      • keyfile_dict는 다운로드한 JSON 키 파일의 콘텐츠를 지정하는 JSON 객체입니다. 제공된 경우 scope도 필요합니다. keyfile_dict 또는 key_path 중 하나만 사용합니다. CLI 또는 웹 UI를 통해 Airflow 연결에 액세스할 수 있는 사용자는 keyfile_dict에 저장된 사용자 인증 정보를 읽을 수 있습니다. 이러한 사용자 인증 정보를 보호하려면 key_path를 사용하고 Cloud Storage ACL을 적용하여 키 파일에 대해 액세스를 제한하는 것이 좋습니다.
      • scope는 쉼표로 구분된 OAuth 범위 목록입니다.

    예를 들면 다음과 같습니다.

    Airflow 1.10 CLI

    gcloud composer environments run test-environment \
       --location us-central1 connections -- --add \
       --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
       --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
       "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

    Airflow 2.0 CLI

    gcloud beta composer environments run test-environment \
       --location us-central1 connections add -- \
       --conn-id=my_gcp_connection --conn-type=google_cloud_platform \
       --conn-extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
       "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
       "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'
    

새 Airflow 연결 사용

생성한 연결을 사용하려면 Google Cloud Airflow 연산자를 생성할 때 해당 연결 ID 인수로 설정합니다.

Airflow 2

# Set a gcp_conn_id to use a connection that you have created.
task_custom = bigquery.BigQueryInsertJobOperator(
    task_id='task_custom_connection',
    gcp_conn_id='my_gcp_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Airflow 1

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    sql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')