Go를 사용하여 Dataflow 파이프라인 만들기

이 페이지에서는 Go용 Apache Beam SDK를 사용하여 파이프라인을 정의하는 프로그램을 빌드하는 방법을 설명합니다. 그런 다음 로컬과 Dataflow 서비스에서 파이프라인을 실행합니다. WordCount 파이프라인 소개는 Apache Beam에서 WordCount를 사용하는 방법 동영상을 참조하세요.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Compute Engine 기본 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • 여기에서 PROJECT_NUMBER를 프로젝트 번호로 바꿉니다. 프로젝트 번호를 찾으려면 프로젝트 식별을 참조하거나 gcloud projects describe 명령어를 사용합니다.
    • SERVICE_ACCOUNT_ROLE을 각 개별 역할로 바꿉니다.
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S(Standard).
    • 스토리지 위치를 다음과 같이 설정합니다. US(미국)
    • BUCKET_NAME을 고유한 버킷 이름으로 바꿉니다. 버킷 네임스페이스는 전역적이며 공개로 표시되므로 버킷 이름에 민감한 정보를 포함해서는 안 됩니다.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Google Cloud 프로젝트 ID와 Cloud Storage 버킷 이름을 복사합니다. 이러한 값은 이 빠른 시작 뒷부분에 필요합니다.

개발 환경 설정

Apache Beam SDK는 데이터 파이프라인용 오픈소스 프로그래밍 모델입니다. Apache Beam 프로그램으로 파이프라인을 정의한 후 Dataflow와 같은 실행기를 선택하여 파이프라인을 실행합니다.

Go용 Apache Beam SDK로 작업할 때는 최신 버전의 Go를 사용하는 것이 좋습니다. 최신 버전의 Go가 설치되어 있지 않으면 Go의 다운로드 및 설치 가이드를 사용하여 특정 운영체제에 맞는 Go를 다운로드하고 설치합니다.

설치한 Go 버전을 확인하려면 로컬 터미널에서 다음 명령어를 실행합니다.

go version

Beam WordCount 예시 실행

Go용 Apache Beam SDK에는 wordcount 파이프라인 예시가 포함되어 있습니다. wordcount 예시에서는 다음을 수행합니다.

  1. 텍스트 파일을 입력으로 읽습니다. 기본적으로 Cloud Storage 버킷에 있는 gs://dataflow-samples/shakespeare/kinglear.txt라는 리소스가 포함된 텍스트 파일을 읽습니다.
  2. 각 줄을 단어로 파싱합니다.
  3. 토큰화된 단어에서 빈도 카운트를 수행합니다.

로컬 머신에서 최신 버전의 Beam wordcount 예시를 실행하려면 다음 명령어를 사용합니다. input 플래그는 읽을 파일을 지정하고 output 플래그는 빈도 수 출력의 파일 이름을 지정합니다.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

파이프라인이 완료되면 출력 결과를 봅니다.

more outputs*

종료하려면 q를 누릅니다.

파이프라인 코드 수정

Beam wordcount 파이프라인은 대문자와 소문자 단어를 구분합니다. 다음 단계에서는 자체 Go 모듈을 만들고 파이프라인이 대소문자를 구분하지 않도록 wordcount 파이프라인을 수정한 후 Dataflow에서 실행하는 방법을 설명합니다.

Go 모듈 만들기

파이프라인 코드를 변경하려면 다음 단계를 수행합니다.

  1. 원하는 위치에 Go 모듈 디렉터리를 만듭니다.

    mkdir wordcount
    cd wordcount
  2. Go 모듈을 만듭니다. 이 예시에서는 example/dataflow를 모듈 경로로 사용합니다.

    go mod init example/dataflow
  3. Apache Beam GitHub 저장소에서 wordcount 코드의 최신 사본을 다운로드합니다. 생성한 wordcount 디렉터리에 이 파일을 저장합니다.

  4. Linux 이외의 운영체제를 사용하는 경우 Go unix 패키지를 가져와야 합니다. 이 패키지는 Dataflow 서비스에서 파이프라인을 실행하는 데 필요합니다.

    go get -u golang.org/x/sys/unix
  5. go.mod 파일이 모듈의 소스 코드와 일치하는지 확인합니다.

    go mod tidy

수정되지 않은 파이프라인 실행

수정되지 않은 wordcount 파이프라인이 로컬에서 실행되는지 확인합니다.

  1. 터미널에서 로컬로 파이프라인을 빌드하고 실행합니다.

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. 출력 결과를 봅니다.

     more outputs*
  3. 종료하려면 q를 누릅니다.

파이프라인 코드 변경

대소문자를 구분하지 않도록 파이프라인을 변경하려면 모든 단어에 strings.ToLower 함수를 적용하도록 코드를 수정합니다.

  1. 원하는 편집기에서 wordcount.go 파일을 엽니다.

  2. init 블록을 조사합니다(명확한 설명을 위해 주석이 삭제됨).

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. 새 줄을 추가하여 strings.ToLower 함수를 등록합니다.

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. CountWords 함수를 검사합니다.

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. 단어를 소문자로 만들려면 모든 단어에 strings.ToLower를 적용하는 ParDo를 추가합니다.

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. 파일을 저장합니다.

로컬에서 업데이트된 파이프라인 실행

업데이트된 wordcount 파이프라인을 로컬에서 실행하고 출력이 변경되었는지 확인합니다.

  1. 수정된 wordcount 파이프라인을 빌드하고 실행합니다.

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
  2. 수정된 파이프라인의 출력 결과를 봅니다. 모든 단어는 소문자여야 합니다.

     more outputs*
  3. 종료하려면 q를 누릅니다.

Dataflow 서비스에서 파이프라인 실행

Dataflow 서비스에서 업데이트된 wordcount 예시를 실행하려면 다음 명령어를 사용합니다.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷 이름

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.

  • DATAFLOW_REGION: Dataflow 작업을 배포할 리전입니다. 예를 들면 europe-west1입니다. 사용 가능한 위치 목록은 Dataflow 위치를 참조하세요. --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

결과 보기

Google Cloud 콘솔에서 Dataflow 작업 목록을 확인할 수 있습니다. Google Cloud 콘솔에서 Dataflow 작업 페이지로 이동합니다.

작업으로 이동

작업 페이지에는 wordcount 작업의 세부정보가 표시되는데, 처음에는 실행 중에서 이후 성공으로 상태가 바뀝니다.

Dataflow를 사용하여 파이프라인을 실행하면 결과는 Cloud Storage 버킷에 저장됩니다. Google Cloud 콘솔 또는 로컬 터미널을 사용하여 출력 결과를 봅니다.

콘솔

Google Cloud 콘솔에서 결과를 보려면 Cloud Storage 버킷 페이지로 이동합니다.

버킷으로 이동

프로젝트의 버킷 목록에서 앞서 만든 스토리지 버킷을 클릭합니다. 작업을 통해 만든 출력 파일은 results 디렉터리에 표시됩니다.

터미널

터미널에서 또는 Cloud Shell을 사용하여 결과를 확인합니다.

  1. 출력 파일을 나열하려면 gcloud storage ls 명령어를 사용합니다.

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

    BUCKET_NAME을 지정된 출력 Cloud Storage 버킷의 이름으로 바꿉니다.

  2. 출력 파일의 결과를 보려면 gcloud storage cat 명령어를 사용합니다.

    gcloud storage cat gs://BUCKET_NAME/results/outputs*

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. 프로젝트를 유지할 경우 Compute Engine 기본 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

다음 단계