Python을 사용한 빠른 시작

이 빠른 시작에서는 Python용 Apache Beam SDK를 사용하여 파이프라인을 정의하는 프로그램을 빌드하는 방법을 알아봅니다. 그런 다음 직접 로컬 실행기 또는 Dataflow와 같은 클라우드 기반 실행기를 사용하여 파이프라인을 실행합니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, Cloud Resource Manager API를 사용 설정합니다.

    API 사용 설정

  5. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  6. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  7. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  8. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  9. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  10. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, Cloud Resource Manager API를 사용 설정합니다.

    API 사용 설정

  11. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  12. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  13. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  14. Cloud Storage 버킷을 생성합니다.
    1. Cloud Console에서 Cloud Storage 브라우저 페이지로 이동합니다.

      브라우저로 이동

    2. 버킷 만들기를 클릭합니다.
    3. 버킷 만들기 페이지에서 버킷 정보를 입력합니다. 다음 단계로 이동하려면 계속을 클릭합니다.
      • 버킷 이름 지정에 고유한 버킷 이름을 입력합니다. 버킷 네임스페이스는 전역적이며 공개로 표시되므로 버킷 이름에 민감한 정보를 포함해서는 안 됩니다.
      • 데이터 저장 위치 선택에서 다음을 수행합니다.
        • 위치 유형 옵션을 선택합니다.
        • 위치 옵션을 선택합니다.
      • 데이터의 기본 스토리지 클래스를 선택하려면 Standard.
      • 객체 액세스를 제어하는 방법 선택에서 액세스 제어 옵션을 선택합니다.
      • 고급 설정(선택사항)에서 암호화 방법, 보관 정책 또는 버킷 라벨을 지정합니다.
    4. 만들기를 클릭합니다.
  15. Google Cloud 프로젝트 ID와 Cloud Storage 버킷 이름을 복사합니다. 이 값은 이 문서의 뒷부분에서 필요합니다.

환경 설정

이 섹션에서는 명령 프롬프트를 사용하여 venv를 사용해 파이프라인 프로젝트를 실행할 격리된 Python 가상 환경을 설정합니다. 이 프로세스를 통해 한 프로젝트의 종속 항목을 다른 프로젝트의 종속 항목에서 분리할 수 있습니다.

명령 프롬프트를 사용할 수 없으면 Cloud Shell을 사용해도 됩니다. Cloud Shell에는 이미 Python 3용 패키지 관리자가 설치되어 있으므로 가상 환경 만들기로 건너뛸 수 있습니다.

Python을 설치한 후 가상 환경을 만들려면 다음 단계별 안내를 따르세요.

  1. Python 3 및 pip가 시스템에서 실행 중인지 확인합니다.
    python --version
    python -m pip --version
    
  2. 필요한 경우 Python 3을 설치한 후 Python 가상 환경을 설정합니다. Python 설치venv 설정 섹션의 Python 개발 환경 설정 페이지를 참조하세요.

빠른 시작을 완료한 후에는 deactivate를 실행하여 가상 환경을 중지할 수 있습니다.

Apache Beam SDK 가져오기

Apache Beam SDK는 데이터 파이프라인용 오픈소스 프로그래밍 모델입니다. Apache Beam 프로그램으로 파이프라인을 정의한 다음 Dataflow와 같은 실행기를 선택하여 파이프라인을 실행하게 됩니다.

Apache Beam SDK를 다운로드하고 설치하려면 다음 단계별 안내를 따르세요.

  1. 이전 섹션에서 만든 Python 가상 환경에 위치하고 있는지 확인하세요. 메시지가 <env_name>으로 시작되는지 확인합니다. 여기서 env_name은 가상 환경의 이름입니다.
  2. Python wheel 패키징 표준을 설치합니다.
    pip install wheel
    
  3. 최신 버전의 Python용 Apache Beam SDK를 설치합니다.
  4. pip install 'apache-beam[gcp]'

    연결 상태에 따라 설치에 다소 시간이 걸릴 수 있습니다.

로컬에서 파이프라인 실행

파이프라인이 로컬에서 실행되는 방법을 보려면 apache_beam 패키지에 포함된 wordcount 예시의 바로 활용할 수 있는 Python 모듈을 사용합니다.

wordcount 파이프라인 예시는 다음을 수행합니다.

  1. 텍스트 파일을 입력으로 가져옵니다.

    이 텍스트 파일은 리소스 이름 gs://dataflow-samples/shakespeare/kinglear.txt으로 Cloud Storage 버킷에 저장됩니다.

  2. 각 줄을 단어로 파싱합니다.
  3. 토큰화된 단어에서 빈도 카운트를 수행합니다.

wordcount 파이프라인을 로컬에서 스테이징하려면 다음 단계별 안내를 따르세요.

  1. 로컬 터미널에서 wordcount 예시를 실행합니다.
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. 파이프라인의 출력을 확인합니다.
    more outputs*
  3. 종료하려면 q를 누릅니다.
파이프라인을 로컬에서 실행하면 Apache Beam 프로그램을 테스트하고 디버깅할 수 있습니다. Apache Beam GitHub에서 wordcount.py 소스 코드를 확인할 수 있습니다.

Dataflow 서비스에서 파이프라인 실행

이 섹션에서는 Dataflow 서비스의 apache_beam 패키지에서 wordcount 예시 파이프라인을 실행합니다. 이 예시에서는 DataflowRunner--runner의 매개변수로 지정합니다.
  • 파이프라인을 실행합니다.
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://STORAGE_BUCKET/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://STORAGE_BUCKET/tmp/

    다음을 바꿉니다.

    • DATAFLOW_REGION: Dataflow 작업을 배포할 리전 엔드포인트입니다(예: europe-west1).

      --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

    • STORAGE_BUCKET: 이전에 복사한 Cloud Storage 이름입니다.
    • PROJECT_ID: 이전에 복사한 Google Cloud 프로젝트 ID입니다.

결과 보기

Dataflow를 사용하여 파이프라인을 실행한 경우 결과는 Cloud Storage 버킷에 저장됩니다. 이 섹션에서는 Cloud Console 또는 로컬 터미널을 사용하여 파이프라인이 실행 중인지 확인합니다.

Cloud Console

Cloud Console에서 결과를 보려면 다음 단계별 안내를 따르세요.

  1. Cloud Console에서 Dataflow 작업 페이지로 이동합니다.

    작업으로 이동

    작업 페이지에는 wordcount 작업의 상세 정보가 표시되는데, 처음에는 실행 중이 나타나고 다음에는 성공으로 상태가 바뀝니다.

  2. Cloud Storage 브라우저 페이지로 이동하세요.

    브라우저로 이동

  3. 프로젝트의 버킷 목록에서 앞서 만든 스토리지 버킷을 클릭합니다.

    wordcount 디렉터리에 작업을 통해 만든 출력 파일이 표시됩니다.

로컬 터미널

터미널에서 결과를 보려면 gsutil 도구를 사용합니다. Cloud Shell에서 명령어를 실행할 수도 있습니다.

  1. 출력 파일을 나열합니다.
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*"  
  2. STORAGE_BUCKET을 파이프라인 프로그램에 사용되는 Cloud Storage 버킷 이름으로 바꿉니다.

  3. 출력 파일에서 결과를 확인합니다.
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

파이프라인 코드 수정

이전 예시의 wordcount 파이프라인은 대소문자를 구분합니다. 다음 단계에서는 wordcount 파이프라인이 대소문자를 구분하지 않도록 파이프라인을 수정하는 방법을 보여줍니다.
  1. 로컬 머신의 Apache Beam GitHub 저장소에서 wordcount 코드의 최신 사본을 다운로드합니다.
  2. 로컬 터미널에서 파이프라인을 실행합니다.
    python wordcount.py --output outputs
  3. 결과 보기
    more outputs*
  4. 종료하려면 q를 누릅니다.
  5. 원하는 편집기에서 wordcount.py 파일을 엽니다.
  6. run 함수에서 파이프라인 단계를 검사합니다.
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split하면 줄이 문자열 단어로 분리됩니다.

  7. 문자열을 소문자로 입력하려면 split 다음의 줄을 수정하세요.
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    이렇게 수정하면 str.lower 함수가 모든 단어에 매핑됩니다. 이 줄은 beam.Map(lambda word: str.lower(word))와 동일합니다.
  8. 파일을 저장하고 수정된 wordcount 작업을 실행합니다.
    python wordcount.py --output outputs
  9. 수정된 파이프라인의 결과를 확인합니다.
    more outputs*
  10. 종료하려면 q를 누릅니다.

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.

  1. Cloud Console에서 Cloud Storage 브라우저 페이지로 이동합니다.

    브라우저로 이동

  2. 삭제할 버킷의 체크박스를 클릭합니다.
  3. 버킷을 삭제하려면 삭제를 클릭한 후 안내를 따르세요.

다음 단계

Apache Beam™은 미국 및/또는 다른 국가에서 사용되는 Apache Software Foundation 또는 해당 계열사의 상표입니다.