파이프라인 상태 모니터링

이 페이지에서는 파이프라인 상태와 같은 Cloud Data Fusion 파이프라인 이벤트를 Pub/Sub 주제에 게시하는 방법을 설명합니다. 또한 Pub/Sub 메시지를 처리하고 실패한 파이프라인 식별 및 재시도와 같은 작업을 수행하는 Cloud Run 함수를 만드는 방법도 설명합니다.

시작하기 전에

필요한 역할

Cloud Data Fusion 서비스 계정이 Pub/Sub 주제에 파이프라인 이벤트를 게시하는 데 필요한 권한을 갖도록 하려면 관리자에게 Cloud Data Fusion 서비스 계정에 Pub/Sub 주제를 만드는 프로젝트에 대한 Pub/Sub 게시자(roles/pubsub.publisher) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

관리자는 커스텀 역할이나 다른 사전 정의된 역할을 통해 Cloud Data Fusion 서비스 계정에 필요한 권한을 부여할 수도 있습니다.

Cloud Data Fusion 인스턴스에서 이벤트 게시 관리

버전 6.7.0 이상에서 REST API를 사용하여 신규 및 기존 Cloud Data Fusion 인스턴스의 이벤트 게시를 관리할 수 있습니다.

새 인스턴스에서 이벤트 게시

새 인스턴스를 만들고 EventPublishConfig 필드를 포함합니다. 새 인스턴스의 필수 필드에 대한 자세한 내용은 인스턴스 리소스 참조를 확인하세요.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID
  • LOCATION: 프로젝트의 위치
  • INSTANCE_ID: Cloud Data Fusion 인스턴스의 ID
  • VERSION_NUMBER: 인스턴스를 만드는 Cloud Data Fusion의 버전(예: 6.10.1)
  • TOPIC_ID: Pub/Sub 주제의 ID

기존 Cloud Data Fusion 인스턴스에서 이벤트 게시 사용 설정

기존 Cloud Data Fusion 인스턴스에서 EventPublishConfig 필드를 업데이트합니다.

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID
  • LOCATION: 프로젝트의 위치
  • INSTANCE_ID: Cloud Data Fusion 인스턴스의 ID
  • TOPIC_ID: Pub/Sub 주제의 ID

인스턴스에서 이벤트 게시 삭제

인스턴스에서 이벤트 게시를 삭제하려면 이벤트 게시 enabled 값을 false로 업데이트합니다.

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Pub/Sub 메시지를 읽는 함수 만들기

Cloud Run 함수는 Pub/Sub 메시지를 읽고 실패한 파이프라인을 재시도하는 등 조치를 취할 수 있습니다. Cloud Run 함수를 만들려면 다음을 수행합니다.

  1. Google Cloud 콘솔에서 Cloud Run 함수 페이지로 이동합니다.

    Cloud Run 함수로 이동

  2. 함수 만들기를 클릭합니다.

  3. 함수 이름과 리전을 입력합니다.

  4. 트리거 유형 필드에서 Cloud Pub/Sub를 선택합니다.

  5. Pub/Sub 주제 ID를 입력합니다.

  6. 다음을 클릭합니다.

  7. Pub/Sub 메시지를 읽고 다른 작업을 실행하는 함수를 추가합니다. 예를 들어 다음과 같은 사용 사례에 함수를 추가할 수 있습니다.

    • 파이프라인 오류에 대한 알림을 전송합니다.
    • 레코드 수 또는 실행 정보와 같은 KPI 알림을 보냅니다.
    • 다시 실행되지 않은 실패한 파이프라인을 다시 시작합니다.

    Cloud Run 함수 예시는 사용 사례 섹션을 참조하세요.

  8. 배포를 클릭합니다. 자세한 내용은 Cloud Run 함수 배포를 참조하세요.

사용 사례: 파이프라인 상태 문서화 및 실패한 파이프라인 재시도

다음 Cloud Run 함수 예시는 파이프라인 실행 상태에 대한 Pub/Sub 메시지를 읽은 후 Cloud Data Fusion에서 실패한 파이프라인을 다시 시도합니다.

이 함수 예시는 다음 Google Cloud 구성요소를 참조합니다.

  • Google Cloud 프로젝트: Cloud Run 함수 및 Pub/Sub 주제가 생성된 프로젝트입니다.
  • Pub/Sub 주제: Cloud Data Fusion 인스턴스에 연결된 Pub/Sub 주제
  • Cloud Data Fusion 인스턴스: 파이프라인을 설계하고 실행하는 Cloud Data Fusion 인스턴스
  • BigQuery 테이블: 파이프라인 상태와 실행 및 재실행 세부정보를 캡처하는 BigQuery 테이블
  • Cloud Run 함수: 실패한 파이프라인을 다시 시도하는 코드를 배포하는 Cloud Run 함수
  1. 다음 Cloud Run 함수 예시는 Cloud Data Fusion 상태 이벤트에 대한 Pub/Sub 메시지를 읽습니다.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 다음 함수 예시는 BigQuery 테이블을 만들고 저장하며 파이프라인 실행 세부정보를 쿼리합니다.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 다음 예시 함수는 실패한 파이프라인과 최근 1시간 동안 다시 실행되었는지 여부를 확인합니다.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 실패한 파이프라인이 최근에 실행되지 않은 경우 다음 예시 함수가 실패한 파이프라인을 다시 실행합니다.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

다음 단계