기본 템플릿 실행

Dataflow 템플릿을 만들고 스테이징한 후 Google Cloud 콘솔, REST API, Google Cloud CLI를 사용하여 템플릿을 실행합니다. App Engine 표준 환경, Cloud Functions, 기타 제한된 환경 등 다양한 환경에서 Dataflow 템플릿 작업을 배포할 수 있습니다.

Google Cloud 콘솔 사용

Google Cloud 콘솔을 사용하여 Google 제공 Dataflow 템플릿 및 커스텀 Dataflow 템플릿을 실행할 수 있습니다.

Google 제공 템플릿

Google 제공 템플릿을 실행하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔의 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 생성을 클릭합니다.
  4. Google Cloud 콘솔 템플릿에서 작업 만들기 버튼
  5. Dataflow 템플릿 드롭다운 메뉴에서 실행할 Google 제공 템플릿을 선택합니다.
  6. WordCount 템플릿 실행 양식
  7. 작업 이름 필드에 작업 이름을 입력합니다.
  8. 제공된 매개변수 필드에 매개변수 값을 입력합니다. Google 제공 템플릿을 사용할 때는 추가 매개변수 섹션이 필요하지 않습니다.
  9. 작업 실행을 클릭합니다.

커스텀 템플릿

커스텀 템플릿을 실행하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔의 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 생성을 클릭합니다.
  4. Google Cloud 콘솔 템플릿에서 작업 만들기 버튼
  5. Dataflow 템플릿 드롭다운 메뉴에서 커스텀 템플릿을 선택합니다.
  6. 커스텀 템플릿 실행 양식
  7. 작업 이름 필드에 작업 이름을 입력합니다.
  8. 템플릿 Cloud Storage 경로 필드에 템플릿 파일의 Cloud Storage 경로를 입력합니다.
  9. 템플릿에 매개변수가 필요할 경우 추가 매개변수 섹션의 항목 추가를 클릭합니다. 매개변수의 이름을 입력합니다. 필요한 매개변수마다 이 과정을 반복합니다.
  10. 작업 실행을 클릭합니다.

REST API 사용

REST API 요청으로 템플릿을 실행하려면 프로젝트 ID로 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

사용 가능한 매개변수에 대한 자세한 내용은 projects.locations.templates.launch의 REST API 참조를 참조하세요.

커스텀 템플릿 일괄 작업 만들기

projects.locations.templates.launch 요청 예시에서는 템플릿에서 텍스트 파일을 읽고 출력 텍스트 파일을 작성하는 일괄 작업을 만듭니다. 요청이 성공한 경우 응답 본문에는 LaunchTemplateResponse의 인스턴스가 포함됩니다.

다음 값을 수정합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • LOCATION을 원하는 Dataflow 리전으로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • gcsPath를 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • parameters를 키/값 쌍 목록으로 설정합니다.
  • tempLocation을 쓰기 권한이 있는 위치로 설정합니다. 이 값은 Google 제공 템플릿을 실행하는 데 필요합니다.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

커스텀 템플릿 스트리밍 작업 만들기

projects.locations.templates.launch 요청 예시에서는 기본 템플릿에서 Pub/Sub 구독을 읽고 BigQuery 테이블에 작성하는 스트리밍 작업을 만듭니다. Flex 템플릿을 실행하려는 경우 projects.locations.flexTemplates.launch를 대신 사용하세요. 템플릿 예시는 Google 제공 템플릿입니다. 커스텀 템플릿을 가리키도록 템플릿에서 경로를 수정할 수 있습니다. Google 제공 및 커스텀 템플릿을 실행하는 데 동일한 논리가 사용됩니다. 이 예시에서 BigQuery 테이블은 이미 적합한 스키마와 함께 있어야 합니다. 성공한 경우 응답 본문에는 LaunchTemplateResponse의 인스턴스가 포함됩니다.

다음 값을 수정합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • LOCATION을 원하는 Dataflow 리전으로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • GCS_PATH를 템플릿 파일의 Cloud Storage 위치로 바꿉니다. 위치는 gs://로 시작해야 합니다.
  • parameters를 키/값 쌍 목록으로 설정합니다. 나열된 매개변수는 이 템플릿 예시에 적용됩니다. 커스텀 템플릿을 사용하는 경우 매개변수를 필요에 따라 수정합니다. 예시 템플릿을 사용하는 경우 다음 변수를 바꿉니다.
    • YOUR_SUBSCRIPTION_NAME을 Pub/Sub 구독 이름으로 바꿉니다.
    • YOUR_DATASET를 BigQuery 데이터 세트로 바꾸고 YOUR_TABLE_NAME을 BigQuery 테이블 이름으로 바꿉니다.
  • tempLocation을 쓰기 권한이 있는 위치로 설정합니다. 이 값은 Google 제공 템플릿을 실행하는 데 필요합니다.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

커스텀 템플릿 스트리밍 작업 업데이트

projects.locations.templates.launch 요청 예시에서는 템플릿 스트리밍 작업을 업데이트하는 방법을 보여줍니다. Flex 템플릿을 업데이트하려는 경우 projects.locations.flexTemplates.launch를 대신 사용하세요.

  1. 예시 2: 커스텀 템플릿 스트리밍 작업 만들기를 실행하여 스트리밍 템플릿 작업을 시작합니다.
  2. 다음 수정된 값을 사용하여 다음 HTTP POST 요청을 보냅니다.
    • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • LOCATION을 업데이트하려는 작업의 Dataflow 리전으로 바꿉니다.
    • JOB_NAME을 업데이트할 작업의 정확한 이름으로 바꿉니다.
    • GCS_PATH를 템플릿 파일의 Cloud Storage 위치로 바꿉니다. 위치는 gs://로 시작해야 합니다.
    • parameters를 키/값 쌍 목록으로 설정합니다. 나열된 매개변수는 이 템플릿 예시에 적용됩니다. 커스텀 템플릿을 사용하는 경우 매개변수를 필요에 따라 수정합니다. 예시 템플릿을 사용하는 경우 다음 변수를 바꿉니다.
      • YOUR_SUBSCRIPTION_NAME을 Pub/Sub 구독 이름으로 바꿉니다.
      • YOUR_DATASET를 BigQuery 데이터 세트로 바꾸고 YOUR_TABLE_NAME을 BigQuery 테이블 이름으로 바꿉니다.
    • environment 매개변수를 사용하여 머신 유형과 같은 환경 설정을 변경합니다. 이 예시에서는 기본 머신 유형보다 작업자당 메모리와 CPU가 더 많은 n2-highmem-2 머신 유형을 사용합니다.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. Dataflow 모니터링 인터페이스에 액세스하여 같은 이름의 새 작업이 생성되었는지 확인합니다. 이 작업의 상태가 업데이트되었습니다.

Google API 클라이언트 라이브러리 사용

Google API 클라이언트 라이브러리를 사용하면 Dataflow REST API를 간편하게 호출할 수 있습니다. 이 샘플 스크립트에서는 Python용 Google API 클라이언트 라이브러리를 사용합니다.

이 예시에서는 다음 변수를 설정해야 합니다.

  • project: 프로젝트 ID로 설정합니다.
  • job: 원하는 고유 작업 이름으로 설정합니다.
  • template: 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • parameters: 템플릿 매개변수가 있는 사전으로 설정합니다.

리전을 설정하려면 location 매개변수를 포함합니다.

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

사용 가능한 옵션에 대한 자세한 내용은 Dataflow REST API 참조의 projects.locations.templates.launch 메서드를 참조하세요.

gcloud CLI 사용

gcloud CLI는 gcloud dataflow jobs run 명령어를 사용하여 커스텀 또는 Google 제공 템플릿을 실행할 수 있습니다. Google 제공 템플릿을 실행하는 예시는 Google 제공 템플릿 페이지에 설명되어 있습니다.

다음 커스텀 템플릿 예시의 경우 다음 값을 설정하세요.

  • JOB_NAME을 원하는 작업 이름으로 바꿉니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • --gcs-location를 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • --parameters를 작업에 전달할 쉼표로 구분된 매개변수 목록으로 설정합니다. 쉼표와 값 사이에는 공백이 없어야 합니다.
  • VM에서 프로젝트 메타데이터에 저장된 SSH 키를 허용하지 않도록 하려면 block_project_ssh_keys 서비스 옵션과 함께 additional-experiments 플래그를 사용합니다: --additional-experiments=block_project_ssh_keys

커스텀 템플릿 일괄 작업 만들기

이 예시에서는 템플릿에서 텍스트 파일을 읽고 출력 텍스트 파일을 작성하는 일괄 작업을 생성합니다.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

이 요청은 다음 형식의 응답을 반환합니다.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

커스텀 템플릿 스트리밍 작업 만들기

이 예시에서는 Pub/Sub 주제에서 읽고 BigQuery 테이블에 쓰는 템플릿에서 스트리밍 작업을 만듭니다. BigQuery 테이블은 적절한 스키마와 함께 이미 존재해야 합니다.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

이 요청은 다음 형식의 응답을 반환합니다.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run 명령어의 전체 플래그 목록은 gcloud CLI 참조를 확인하세요.

모니터링 및 문제 해결

Dataflow 모니터링 인터페이스를 사용하면 Dataflow 작업을 모니터링할 수 있습니다. 작업이 실패하면 파이프라인 문제 해결 가이드의 문제 해결 팁, 디버깅 전략, 일반적인 오류 카탈로그를 확인할 수 있습니다.