Apache Beam 노트북으로 개발하기

JupyterLab 노트북에서 Apache Beam 대화형 실행자를 사용하면 REPL(Read-eval-print-loop) 워크플로에서 반복적으로 파이프라인을 개발하고 파이프라인 그래프를 검사하고 개별 PCollection을 파싱할 수 있습니다. 이 Apache Beam 노트북은 최신 데이터 과학 및 머신러닝 프레임워크가 미리 설치된 노트북 가상 머신을 호스팅하는 관리형 서비스인 Notebooks를 통해 제공됩니다.

이 가이드에서는 Apache Beam 노트북에서 도입한 기능을 중점적으로 설명하지만 빌드하는 방법은 보여주지 않습니다. Apache Beam에 대한 자세한 내용은 Apache Beam 프로그래밍 가이드를 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Compute Engine, Notebooks API를 사용 설정합니다.

    API 사용 설정

  5. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  6. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  7. Compute Engine, Notebooks API를 사용 설정합니다.

    API 사용 설정

이 가이드를 완료하면 생성된 리소스를 삭제하여 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

Apache Beam 메모장 인스턴스 실행

  1. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
  2. 측면 패널에서 Dataflow로 이동하고 Workbench를 클릭합니다.
  3. 툴바에서 새 인스턴스를 클릭합니다.
  4. Apache Beam > GPU 제외를 선택합니다.
  5. (선택사항) GPU에서 노트북을 실행하려면 Apache Beam > 1개의 NVIDIA Tesla T4를 선택합니다.
  6. 새 메모장 인스턴스 페이지에서 메모장 VM의 네트워크를 선택하고 만들기를 클릭합니다.
  7. (선택사항) GPU가 있는 노트북 인스턴스를 만들기로 선택한 경우 새 노트북 인스턴스 페이지를 보려면 NVIDIA GPU 드라이버 자동 설치 옵션을 클릭하기 전에만들기를 선택해야 합니다.
  8. (선택사항) 커스텀 노트북 인스턴스를 설정하려면 맞춤설정을 클릭합니다. 인스턴스 속성 맞춤설정에 대한 자세한 내용은 특정 속성으로 Notebooks 인스턴스 만들기를 참조하세요.
  9. 링크가 활성화되면 JupyterLab 열기를 클릭합니다. Notebooks에서 새 Apache Beam 노트북 인스턴스를 만듭니다.

종속 항목 설치(선택사항)

Apache Beam 노트북에는 Apache Beam과 Google Cloud 커넥터 종속 항목이 이미 설치되어 있습니다. 파이프라인에 타사 라이브러리에 종속된 커스텀 커넥터 또는 커스텀 PTransform이 포함된 경우 노트북 인스턴스를 만든 후 설치할 수 있습니다. 자세한 내용은 Notebooks 문서의 종속 항목 설치를 참조하세요.

Apache Beam 노트북 시작하기

Notebooks 인스턴스를 열면 예시 폴더의 예시 노트북을 사용할 수 있습니다. 다음을 현재 사용할 수 있습니다.

  • 단어 수
  • 단어 수 스트리밍
  • NYC 택시 탑승 데이터 스트리밍
  • Dataflow 단어 수 확인
  • Apache Beam에서 GPU 사용
  • 데이터 시각화

Apache Beam의 기초를 설명하는 추가 가이드는 가이드 폴더에 있습니다. 다음을 현재 사용할 수 있습니다.

  • 기본 작업
  • 요소별 작업
  • 집계
  • Windows
  • I/O 작업
  • 스트리밍

이 노트에는 Apache Beam 개념 및 API 사용을 이해하는 데 도움이 되는 설명 텍스트 및 댓글 코드 블록이 포함되어 있습니다. 또한 가이드에서 학습된 개념을 연습할 수 있습니다.

메모장 인스턴스 만들기

파일 > 새 항목 > 메모장으로 이동하여 Apache Beam 2.22 이상인 커널을 선택합니다.

Apache Beam은 메모장 인스턴스에 설치되므로 메모장에 interactive_runnerinteractive_beam 모듈을 포함합니다.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

메모장에서 다른 Google API를 사용하는 경우 다음 가져오기 구문을 추가합니다.

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

상호작용 옵션 설정

다음은 대화형 실행자가 제한되지 않은 소스에서 데이터를 기록하는 시간을 설정하는 시간입니다. 이 예시에서 시간은 10분으로 설정됩니다.

ib.options.recording_duration = '10m'

recording_size_limit 속성을 통해 제한되지 않은 소스의 기록 크기 제한(바이트)을 변경할 수도 있습니다.

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

추가 대화형 옵션은 interactive_beam.options 클래스를 참조하세요.

파이프라인 만들기

InteractiveRunner 객체를 사용하여 파이프라인을 초기화합니다.

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

데이터 읽기 및 시각화

다음 예시는 지정된 Pub/Sub 주제에 대한 구독을 만들고 구독에서 읽는 Apache Beam 파이프라인을 보여줍니다.

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

파이프라인은 소스의 윈도우를 기준으로 단어 수를 계산합니다. 각 윈도우가 10초 길이인 고정 윈도우가 생성됩니다.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

데이터에 윈도우가 만들어지면 단어는 윈도우를 기준으로 계산됩니다.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

show() 메서드는 메모장에서 결과 PCollection을 시각화합니다.

ib.show(windowed_word_counts, include_window_info=True)

PCollection을 테이블 형식으로 시각화하는 show 메서드입니다.

선택적 매개변수 nduration을 설정하여 show()에서 결과 집합의 범위를 다시 지정할 수 있습니다. n을 설정하면 결과 집합이 최대 n개의 요소(예: 20개)로 표시됩니다. n이 설정되지 않은 경우 기본 동작은 소스 기록이 끝날 때까지 캡처된 가장 최근 요소를 나열하는 것입니다. duration을 설정하면 결과 집합이 소스 기록을 시작할 때 지정된 시간(초) 분량의 데이터로 제한됩니다. duration을 설정하지 않으면 기본 동작은 기록이 완료될 때까지 모든 요소를 나열하는 것입니다.

두 선택적 매개변수가 모두 설정되면 임계값이 둘 중 하나라도 충족될 때마다 show()가 중지됩니다. 다음 예시에서 show()는 기록된 소스에서 처음 30초 분량의 데이터를 기반으로 계산된 최대 20개의 요소를 반환합니다.

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

데이터 시각화를 표시하려면 show() 메서드로 visualize_data=True를 전달합니다. 여러 필터를 시각화에 적용할 수 있습니다. 다음 시각화를 사용하면 라벨과 축을 기준으로 필터링할 수 있습니다.

PCollection을 풍부한 필터링 가능한 UI 요소 집합으로 시각화하는 show 메서드입니다.

Apache Beam 노트북의 또 다른 유용한 시각화는 Pandas DataFrame입니다. 다음 예시는 먼저 단어를 소문자로 변환한 다음 각 단어의 빈도를 계산합니다.

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

collect() 메서드는 Pandas DataFrame에 출력을 제공합니다.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame의 PCollection을 나타내는 수집 메서드입니다.

대화형 Beam 검사기를 통해 데이터 시각화

특히 출력이 화면 공간을 상당히 차지하고 노트북 간 탐색이 어려울 때는 show()collect()를 지속적으로 호출하여 PCollection의 데이터를 검사하는 것이 쉽지 않을 수 있습니다. PCollection 하나가 변환을 통과해서 다른 PCollection을 생성할 때와 같이 변환이 의도한 대로 작동하는지 확인하기 위해 여러 PCollection을 나란히 비교해봐야 할 수도 있습니다. 이러한 사용 사례에서는 대화형 Beam 검사기가 보다 편리한 솔루션입니다.

대화형 Beam 검사기는 Apache Beam 노트북에 사전 설치된 JupyterLab 확장 프로그램 apache-beam-jupyterlab-sidepanel로 제공됩니다. 이 확장 프로그램을 사용하면 show() 또는 collect()를 명시적으로 호출하지 않아도 각 PCollection과 연결된 파이프라인 및 데이터 상태를 대화형으로 검사할 수 있습니다.

검사기를 여는 데에는 세 가지 방법이 있습니다.

  • JupyterLab의 상단 메뉴 바에서 Interactive Beam을 클릭합니다. 드롭다운에서 Open Inspector를 찾아 클릭하여 검사기를 엽니다.

    메뉴를 통해 검사기 열기

  • 런처 페이지를 사용합니다. 런처 페이지가 열려 있지 않으면 File -> New Launcher를 클릭하여 엽니다. 런처 페이지에서 Interactive Beam을 찾아 Open Inspector를 클릭하여 검사기를 엽니다.

    런처를 통해 검사기 열기

  • 명령어 팔레트를 사용합니다. JupyterLab 상단 메뉴 바에서 View -> Activate Command Palette를 클릭합니다. 팝업에서 Interactive Beam을 검색하여 확장 프로그램의 모든 옵션을 나열합니다. Open Inspector를 클릭하여 검사기를 엽니다.

    명령어 팔레트를 통해 검사기 열기

검사기가 열릴 때:

  1. 정확히 1개의 노트북이 열려 있으면 검사기가 자동으로 여기에 연결됩니다.

  2. 그렇지 않으면 연결할 커널(열린 노트북이 없을 때) 또는 노트북 세션(여러 노트북이 열려 있을 때)을 선택할 수 있도록 대화상자가 표시됩니다.

    연결할 노트북 선택

열린 노트북에 대해 여러 검사기를 열고 작업공간에 해당 탭을 자유롭게 드래그 앤 드롭하여 검사기를 배열할 수 있습니다.

2개 검사기를 열고 나란히 배열

노트북에서 셀을 실행하면 검사기 페이지가 자동으로 새로고침됩니다. 왼쪽에서 연결된 노트북에 정의된 파이프라인 및 PCollection이 나열됩니다. PCollection은 속하는 파이프라인에 따라 정리되며 헤더 파이프라인을 클릭하여 축소할 수 있습니다.

파이프라인 및 PCollection 목록에서 항목을 클릭하면 검사기가 오른쪽에서 해당 시각화를 렌더링합니다.

  • PCollection인 경우에는 검사기가 APPLY 버튼을 클릭한 후 시각화를 조정하는 추가 위젯과 함께 데이터를 렌더링합니다(데이터가 계속 unbounded PCollection에 대해 수신되는 경우에는 동적으로).

    검사기 페이지

  • 파이프라인의 경우 검사기가 파이프라인 그래프를 표시합니다.

    검사기 페이지

익명 파이프라인이 있는 것을 확인할 수 있습니다. 이것들은 액세스할 수 있지만 기본 세션에서 더 이상 참조되지 않는 PCollection 파이프라인입니다. 예를 들면 다음과 같습니다.

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

위 예시는 비어 있는 파이프라인 p 및 1개 PCollection pcoll을 포함하는 익명 파이프라인을 만듭니다. 여전히 pcoll.pipeline을 통해 익명 파이프라인에 액세스할 수 있습니다.

큰 시각화의 경우 공간을 절약하기 위해 왼쪽에서 파이프라인 및 PCollection 목록을 토글할 수 있습니다. 왼쪽 목록 토글

파이프라인의 기록 상태 이해

시각화를 제공할 뿐만 아니라 describe를 호출하여 노트북 인스턴스에 있는 1개 또는 모든 파이프라인의 기록 상태를 검사할 수도 있습니다.

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

describe() 메서드는 다음 세부정보를 제공합니다.

  • 디스크에 저장된 파이프라인의 모든 기록 파일의 총 크기(바이트)
  • 백그라운드 기록 작업이 시작된 시작 시간(초 단위, Unix 에포크 기준)
  • 백그라운드 기록 작업의 현재 파이프라인 상태
  • 파이프라인의 Python 변수

노트북에서 만든 파이프라인에서 Dataflow 작업 실행

  1. (선택사항) Dataflow 작업을 실행하기 전에 메모장을 사용하여 커널을 다시 시작한 다음 모든 셀을 다시 실행하고 출력을 확인합니다. 이 단계를 건너뛰면 노트북의 숨겨진 상태가 파이프라인 객체의 작업 그래프에 영향을 줄 수 있습니다.
  2. Dataflow API를 사용 설정합니다.
  3. 다음 import 문을 추가합니다.

    from apache_beam.runners import DataflowRunner
    
  4. 파이프라인 옵션을 전달합니다.

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # Set the SDK location. This is used by Dataflow to locate the
    # SDK needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    매개변수 값을 조정할 수 있습니다. 예를 들어 us-central1에서 region 값을 변경할 수 있습니다.

  5. DataflowRunner를 사용하여 파이프라인을 실행합니다. 그러면 Dataflow 서비스에서 작업을 실행합니다.

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p파이프라인 만들기의 파이프라인 객체입니다.

대화형 노트북에서 이 변환을 수행하는 방법에 대한 예시는 노트북 인스턴스의 Dataflow 단어 수 노트북을 참조하세요.

또는 노트북을 실행 가능한 스크립트로 내보내고 이전 단계를 사용하여 생성된 .py 파일을 수정한 다음 Dataflow 서비스에 파이프라인을 배포할 수 있습니다.

노트북 저장

생성된 노트북은 실행 중인 노트북 인스턴스에 로컬로 저장됩니다. 개발 중에 노트북 인스턴스를 재설정하거나 종료하는 경우 새 노트북은 /home/jupyter 디렉터리에 만드는 한 유지됩니다. 하지만 노트북 인스턴스가 삭제되면 해당 노트북도 삭제됩니다.

나중에 사용할 수 있도록 노트북을 로컬에 다운로드하거나 GitHub에 저장하거나 다른 파일 형식으로 내보냅니다.

노트북을 추가 영구 디스크에 저장

노트북 및 스크립트와 같은 작업을 다양한 노트북 인스턴스에서 유지하려면 Persistent Disk에 저장할 수 있습니다.

  1. Persistent Disk를 만들거나 연결합니다. 안내에 따라 ssh를 사용하여 노트북 인스턴스의 VM에 연결하고 열린 Cloud Shell에서 명령어를 실행합니다.

  2. Persistent Disk가 마운트된 디렉터리를 확인합니다(예: /mnt/myDisk).

  3. 노트북 인스턴스의 VM 세부정보를 수정하여 Custom metadata에 다음 항목을 추가합니다. 키 - container-custom-params, 값 - -v /mnt/myDisk:/mnt/myDisk 마운트된 PD를 결합하는 데 필요한 추가 메타데이터

  4. 저장을 클릭합니다.

  5. 이러한 변경사항을 업데이트하려면 노트북 인스턴스를 재설정하세요. 노트북 인스턴스 재설정

  6. 재설정 후 링크가 활성화되면 JupyterLab 열기를 클릭합니다. JupyterLab UI를 사용할 수 있게 될 때까지 시간이 조금 걸릴 수 있습니다. UI가 표시되면 터미널을 열고 다음 명령어를 실행합니다. ls -al /mnt /mnt/myDisk 디렉터리가 나열됩니다. 볼륨 결합 나열

이제 /mnt/myDisk 디렉터리에 작업을 저장할 수 있습니다. 노트북 인스턴스가 삭제되더라도 Persistent Disk는 프로젝트 아래에 계속 존재합니다. 그런 다음 이 Persistent Disk를 다른 노트북 인스턴스에 연결할 수 있습니다.

삭제

Apache Beam 노트북 인스턴스 사용을 마친 후 노트북 인스턴스를 종료하여 Google Cloud에서 만든 리소스를 삭제합니다.