기본 템플릿 실행

Dataflow 템플릿을 만들고 스테이징한 후 Google Cloud Console, REST API 또는 gcloud 명령줄 도구를 사용하여 템플릿을 실행합니다. App Engine 표준 환경, Cloud Functions, 기타 제한된 환경 등 다양한 환경에서 Dataflow 템플릿 작업을 배포할 수 있습니다.

참고: 템플릿 기반 파이프라인을 실행할 때는 템플릿 파일 이외에도 템플릿 생성 시 스테이징 및 참조된 파일을 사용합니다. 스테이징된 파일을 이동하거나 삭제하면 파이프라인 작업이 실패합니다.

Cloud Console 사용

Cloud Console을 사용하여 Google 제공 Dataflow 템플릿 및 커스텀 Dataflow 템플릿을 실행할 수 있습니다.

Google 제공 템플릿

Google 제공 템플릿을 실행하려면 다음 안내를 따르세요.

  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 생성을 클릭합니다.
  4. 템플릿에서 Cloud Platform Console 생성 작업 버튼
  5. Dataflow 템플릿 드롭다운 메뉴에서 실행할 Google 제공 템플릿을 선택합니다.
  6. WordCount 템플릿 실행 양식
  7. 작업 이름 필드에 작업 이름을 입력합니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  8. 제공된 매개변수 필드에 매개변수 값을 입력합니다. Google에서 제공하는 템플릿을 사용할 경우 추가 매개변수 섹션은 필요하지 않습니다.
  9. 작업 실행을 클릭합니다.

커스텀 템플릿

커스텀 템플릿을 실행하려면 다음 안내를 따르세요.

  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 생성을 클릭합니다.
  4. 템플릿에서 Cloud Platform Console 생성 작업 버튼
  5. Dataflow 템플릿 드롭다운 메뉴에서 커스텀 템플릿을 선택합니다.
  6. 커스텀 템플릿 실행 양식
  7. 작업 이름 필드에 작업 이름을 입력합니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  8. 템플릿 Cloud Storage 경로 필드에 템플릿 파일의 Cloud Storage 경로를 입력합니다.
  9. 템플릿에 매개변수가 필요할 경우 추가 매개변수 섹션의 항목 추가를 클릭합니다. 매개변수의 이름을 입력합니다. 필요한 매개변수마다 이 과정을 반복합니다.
  10. 작업 실행을 클릭합니다.

REST API 사용

REST API 요청으로 템플릿을 실행하려면 프로젝트 ID로 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

사용 가능한 매개변수에 대한 자세한 내용은 projects.templates.launch의 REST API 참조를 참조하세요.

예 1: 커스텀 템플릿 일괄 작업 만들기

projects.templates.launch 요청 예시에서는 템플릿에서 텍스트 파일을 읽고 출력 텍스트 파일을 작성하는 배치 작업을 만듭니다. 요청이 성공한 경우 응답 본문에는 LaunchTemplateResponse의 인스턴스가 포함됩니다.

다음 값을 수정해야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • gcsPath를 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • parameters를 키/값 쌍 목록으로 설정합니다.
  • tempLocation을 쓰기 권한이 있는 위치로 설정합니다. 이 값은 Google 제공 템플릿을 실행하는 데 필요합니다.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

예 2: 커스텀 템플릿 스트리밍 작업 만들기

projects.templates.launch 요청 예시에서는 템플릿에서 Pub/Sub 주제를 읽고 BigQuery 테이블에 작성하는 스트리밍 작업을 만듭니다. BigQuery 테이블은 적절한 스키마와 함께 이미 존재해야 합니다. 성공한 경우 응답 본문에는 LaunchTemplateResponse의 인스턴스가 포함됩니다.

다음 값을 수정해야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • YOUR_TOPIC_NAME을 Pub/Sub 주제 이름으로 바꿉니다.
  • YOUR_DATASET를 BigQuery 데이터세트로 바꾸고 YOUR_TABLE_NAME을 BigQuery 테이블 이름으로 바꿉니다.
  • gcsPath를 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • parameters를 키/값 쌍 목록으로 설정합니다.
  • tempLocation을 쓰기 권한이 있는 위치로 설정합니다. 이 값은 Google 제공 템플릿을 실행하는 데 필요합니다.
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
            "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

예 3: 커스텀 템플릿 스트리밍 작업 업데이트

projects.templates.launch 요청 예시에서는 템플릿 스트리밍 작업을 업데이트하는 방법을 보여줍니다.

  1. 예 2: 커스텀 템플릿 스트리밍 작업 만들기를 실행하여 스트리밍 템플릿 작업을 시작합니다.
  2. 다음 수정된 값을 사용하여 다음 HTTP POST 요청을 보냅니다.
    • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • JOB_NAME을 원하는 작업 이름으로 바꿉니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
    • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
    • YOUR_TOPIC_NAME을 Pub/Sub 주제 이름으로 바꿉니다.
    • YOUR_DATASET를 BigQuery 데이터세트로 바꾸고 YOUR_TABLE_NAME을 BigQuery 테이블 이름으로 바꿉니다.
    • gcsPath를 템플릿 파일의 Cloud Storage 위치로 설정합니다.
    • parameters를 키/값 쌍 목록으로 설정합니다.
    • tempLocation을 쓰기 권한이 있는 위치로 설정합니다. 이 값은 Google 제공 템플릿을 실행하는 데 필요합니다.
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
            "update": true
        }
    
  3. Dataflow 모니터링 인터페이스에 액세스하여 같은 이름의 새 작업이 생성되었는지 확인합니다. 이 작업의 상태가 업데이트되었습니다.

Google API 클라이언트 라이브러리 사용

Google API 클라이언트 라이브러리를 사용하면 Dataflow REST API를 간편하게 호출할 수 있습니다. 이 샘플 스크립트에서는 Python용 Google API 클라이언트 라이브러리를 사용합니다.

이 예시에서는 다음 변수를 설정해야 합니다.

  • project: 프로젝트 ID로 설정합니다.
  • job: 원하는 고유 작업 이름으로 설정합니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  • template: 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • parameters: 템플릿 매개변수가 있는 사전으로 설정합니다.
from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
    projectId=project,
    gcsPath=template,
    body={
        'jobName': job,
        'parameters': parameters,
    }
)

response = request.execute()

사용 가능한 옵션에 대한 자세한 내용은 Cloud Dataflow REST API 참조의 projects.templates.launch 메서드를 참조하세요.

gcloud 사용

참고: gcloud 명령줄 도구를 사용하여 템플릿을 실행하려면 Cloud SDK 버전 138.0.0 이상이 필요합니다.

gcloud 명령줄 도구는 gcloud dataflow jobs run 명령어를 사용하여 커스텀 템플릿이나 Google 제공 템플릿을 실행할 수 있습니다. Google 제공 템플릿을 실행하는 예시는 Google 제공 템플릿 페이지에 설명되어 있습니다.

다음 커스텀 템플릿 예시의 경우 다음 값을 설정하세요.

  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. [a-z]([-a-z0-9]{0,38}[a-z0-9])? 정규 표현식과 일치하는 작업 이름만 유효합니다.
  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.
  • --gcs-location 플래그를 포함해야 합니다. --gcs-location을 템플릿 파일의 Cloud Storage 위치로 설정합니다.
  • --parameters를 작업에 전달할 쉼표로 구분된 매개변수 목록으로 설정합니다. 쉼표와 값 사이에는 공백이 없어야 합니다.

예시 1: 커스텀 템플릿, 배치 작업

이 예시에서는 템플릿에서 텍스트 파일을 읽고 출력 텍스트 파일을 작성하는 배치 작업을 생성합니다.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output

이 요청은 다음 형식의 응답을 반환합니다.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

예 2: 커스텀 템플릿, 스트리밍 작업

이 예시에서는 Pub/Sub 주제에서 읽고 BigQuery 테이블에 쓰는 템플릿에서 스트리밍 작업을 만듭니다. BigQuery 테이블은 적절한 스키마와 함께 이미 존재해야 합니다.

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

이 요청은 다음 형식의 응답을 반환합니다.

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

gcloud dataflow jobs run 명령어의 전체 플래그 목록은 gcloud 도구 참조를 확인하세요.

모니터링 및 문제해결

Dataflow 모니터링 인터페이스를 사용하면 Dataflow 작업을 모니터링할 수 있습니다. 작업이 실패하면 파이프라인 문제해결 가이드의 문제해결 팁, 디버깅 전략, 일반적인 오류 카탈로그를 확인할 수 있습니다.