Cloud Dataflow SQL 사용

이 가이드에서는 SQL과 Cloud Dataflow SQL UI를 사용하여 Cloud Dataflow 작업을 실행하는 방법을 설명합니다. 이를 위해 Cloud Pub/Sub의 데이터 스트림을 BigQuery 테이블의 데이터와 조인하는 예를 단계별로 설명합니다.

목표

이 가이드의 목표는 다음과 같습니다.

  • SQL을 사용하여 Cloud Pub/Sub 스트리밍 데이터를 BigQuery 테이블 데이터와 조인
  • Cloud Dataflow SQL UI에서 Cloud Dataflow 작업 배포

비용

이 가이드에서는 다음과 같이 비용이 청구될 수 있는 Google Cloud Platform 구성요소를 사용합니다.

  • Cloud Dataflow
  • Cloud Storage
  • Cloud Pub/Sub

가격 계산기를 사용하면 예상 사용량을 기준으로 예상 비용을 산출할 수 있습니다. GCP 신규 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google 계정에 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Google Cloud Platform 프로젝트를 선택하거나 만듭니다.

    리소스 관리 페이지로 이동

  3. Google Cloud Platform 프로젝트에 결제가 사용 설정되어 있는지 확인하세요.

    결제 사용 설정 방법 알아보기

  4. Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager APIs를 사용 설정합니다.

    APIs 사용 설정

  5. 인증 설정:
    1. GCP Console에서 서비스 계정 키 만들기 페이지로 이동합니다.

      서비스 계정 키 만들기 페이지로 이동
    2. 서비스 계정 목록에서 새 서비스 계정을 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다.
    4. 역할 목록에서 프로젝트 > 소유자를 선택합니다.

      참고: 역할 필드가 리소스에 액세스할 수 있도록 서비스 계정을 승인합니다. 나중에 GCP Console을 사용하여 이 필드를 보고 변경할 수 있습니다. 프로덕션 애플리케이션을 개발하는 경우 프로젝트 > 소유자보다 세부적인 권한을 지정합니다. 자세한 내용은 서비스 계정에 역할 부여를 참조하세요.
    5. 만들기를 클릭합니다. 키가 포함된 JSON 파일이 컴퓨터에 다운로드됩니다.
  6. 환경 변수 GOOGLE_APPLICATION_CREDENTIALS를 서비스 계정 키가 포함된 JSON 파일의 파일 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  7. Cloud SDK를 설치하고 초기화합니다. 설치 옵션 중 하나를 선택합니다. 이 둘러보기에서 사용할 프로젝트에 project 속성을 설정해야 할 수도 있습니다.
  8. GCP Console에서 BigQuery 웹 UI로 이동합니다. 그러면 가장 최근에 액세스한 프로젝트가 열립니다. 다른 프로젝트로 전환하려면 BigQuery 웹 UI 맨 위에 있는 프로젝트 이름을 클릭하고 사용할 프로젝트를 검색합니다.
    BigQuery 웹 UI로 이동

Cloud Dataflow SQL UI로 전환

BigQuery 웹 UI에서 다음 단계를 따라 Cloud Dataflow UI로 전환할 수 있습니다.

  1. 더보기 드롭다운 메뉴를 클릭하고 쿼리 설정을 선택합니다.

  2. 오른쪽에 열리는 쿼리 설정 메뉴에서 Cloud Dataflow 엔진을 선택합니다.

  3. 프로젝트에 Cloud Dataflow API와 Data Catalog API가 사용 설정되지 않은 경우에는 사용 설정하라는 메시지가 표시됩니다. API 사용 설정을 클릭합니다. Cloud Dataflow API와 Data Catalog API를 사용 설정하는 데 몇 분 정도 걸릴 수 있습니다.

  4. API 사용 설정이 완료되면 저장을 클릭합니다.

소스 예 만들기

이 가이드에서 제공된 예를 수행하려면 다음 소스를 만들어 가이드 단계에서 사용합니다.

  • transactions라는 Cloud Pub/Sub 주제 - Cloud Pub/Sub 주제 구독을 통해 도착하는 트랜잭션 데이터 스트림입니다. 각 트랜잭션의 데이터에는 구매한 제품, 할인가, 구매한 시/도와 같은 정보가 포함됩니다. Cloud Pub/Sub 주제를 만든 후에는 주제에 메시지를 게시하는 스크립트를 만듭니다. 이 스크립트는 이 가이드의 뒷부분에서 실행됩니다.
  • us_state_salesregions라는 BigQuery 테이블 - 시/도와 판매 지역이 매핑된 테이블입니다. 이 테이블을 만들기 전에 BigQuery 데이터세트를 만들어야 합니다.

Cloud Pub/Sub 주제에 스키마 할당

스키마를 할당하면 Cloud Pub/Sub 주제 데이터에서 SQL 쿼리를 실행할 수 있습니다. 현재 Cloud Dataflow SQL에 사용할 Cloud Pub/Sub 주제의 메시지는 JSON 형식으로 직렬화되어야 합니다. Avro와 같은 다른 형식도 추가로 지원될 예정입니다.

Cloud Pub/Sub 주제 예 transactions에 스키마를 할당하려면 다음 안내를 따르세요.

  1. 텍스트 파일을 만들고 이름을 transactions_schema.yaml로 지정합니다. 다음 스키마 텍스트를 복사하여 transactions_schema.yaml에 붙여넣습니다.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: attributes
    description: Pub/Sub message attributes
    mode: NULLABLE
    type: MAP<STRING,STRING>
  - column: payload
    description: Pub/Sub message payload
    mode: NULLABLE
    type: STRUCT
    subcolumns:
    - column: tr_time_str
      description: Transaction time string
      mode: NULLABLE
      type: STRING
    - column: first_name
      description: First name
      mode: NULLABLE
      type: STRING
    - column: last_name
      description: Last name
      mode: NULLABLE
      type: STRING
    - column: city
      description: City
      mode: NULLABLE
      type: STRING
    - column: state
      description: State
      mode: NULLABLE
      type: STRING
    - column: product
      description: Product
      mode: NULLABLE
      type: STRING
    - column: amount
      description: Amount of transaction
      mode: NULLABLE
      type: FLOAT
  1. gcloud 명령줄 도구를 사용하여 스키마를 할당합니다.

    a. 다음 명령어gcloud 도구를 업데이트합니다. gcloud 도구 버전이 242.0.0 이상인지 확인합니다.

      gcloud components update
    

    b. 명령줄 창에서 다음 명령어를 실행합니다. project-id를 프로젝트 ID로 바꾸고 paths-to-filetransactions_schema.yaml 파일의 경로로 바꿉니다.

      gcloud beta data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    명령어 매개변수와 허용되는 스키마 파일 형식에 대한 자세한 내용은 gcloud 베타 데이터 카탈로그 항목 업데이트 문서를 참조하세요.

    c. 스키마가 transactions Cloud Pub/Sub 주제에 성공적으로 할당되었는지 확인합니다. project-id를 프로젝트 ID로 바꿉니다.

      gcloud beta data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Cloud Pub/Sub 소스 찾기

Cloud Dataflow SQL UI를 사용하면 액세스할 수 있는 모든 프로젝트의 Cloud Pub/Sub 데이터 소스 객체를 찾을 수 있으므로 전체 이름을 기억할 필요가 없습니다.

이 가이드의 예로, 앞에서 만든 transactions Cloud Pub/Sub 주제를 추가합니다.

  1. 왼쪽 탐색 패널에서 데이터 추가 드롭다운 목록을 클릭하고 Cloud Dataflow 소스를 선택합니다.

  2. 오른쪽에 열리는 Cloud Dataflow 소스 추가 패널에서 Cloud Pub/Sub 주제를 선택합니다. 검색창에서 transactions를 검색합니다. 주제를 선택하고 추가를 클릭합니다.

스키마 보기

  1. Cloud Dataflow SQL UI의 왼쪽 탐색 패널에서 Cloud Dataflow 소스를 클릭합니다.
  2. Cloud Pub/Sub 주제를 클릭합니다.
  3. transactions를 클릭합니다.
  4. 스키마에서 transactions Cloud Pub/Sub 주제에 할당된 스키마가 표시됩니다.

SQL 쿼리 만들기

Cloud Dataflow SQL UI에서 Cloud Dataflow 작업을 실행하는 SQL 쿼리를 만들 수 있습니다.

다음 SQL 쿼리는 데이터 보강 쿼리입니다. 시/도와 판매 지역이 매핑된 BigQuery 테이블(us_state_salesregions)을 사용하여 이벤트(transactions)의 Cloud Pub/Sub 스트림에 다른 필드 sales_region을 추가합니다.

다음 SQL 쿼리를 복사하여 쿼리 편집기에 붙여넣습니다. project-id를 프로젝트 ID로 바꿉니다.

SELECT tr.payload.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.payload.state = sr.state_code

Cloud Dataflow SQL UI에 쿼리를 입력하면 쿼리 검사기쿼리 구문을 확인합니다. 쿼리가 유효하면 녹색 체크표시 아이콘이 표시되고 쿼리가 잘못되면 빨간색 느낌표 아이콘이 표시됩니다. 쿼리 구문이 잘못된 경우 검사기 아이콘을 클릭하면 수정해야 하는 항목의 정보가 표시됩니다.

다음 스크린샷은 쿼리 편집기에서 유효한 쿼리를 보여줍니다. 검사기에 녹색 체크표시가 표시되어 있습니다.

편집기에 쿼리 입력

SQL 쿼리를 실행하는 Cloud Dataflow 작업 만들기

SQL 쿼리를 실행하려면 Cloud Dataflow SQL UI에서 Cloud Dataflow 작업을 만듭니다.

  1. 쿼리 편집기에서 Cloud Dataflow 작업 만들기를 클릭합니다.

  2. 오른쪽에 열리는 Cloud Dataflow 작업 만들기 패널에서 기본 테이블 이름dfsqltable_sales로 변경합니다.

  3. 만들기를 클릭합니다. Cloud Dataflow 작업을 실행하는 데 몇 분 정도 걸립니다.

  4. 쿼리 결과 패널이 UI에 나타납니다. 나중에 작업의 쿼리 결과 패널로 돌아오려면 작업 기록 패널에서 작업을 찾고 Cloud Dataflow 작업 및 출력 보기에서와 같이 편집기에서 쿼리 열기 버튼을 사용합니다.

  5. 작업 정보에서 작업 ID 링크를 클릭합니다. 그러면 Cloud Dataflow 웹 UI의 Cloud Dataflow 작업 세부정보 페이지가 새 브라우저 탭에서 열립니다.

Cloud Dataflow 작업 및 출력 보기

Cloud Dataflow는 SQL 쿼리를 Apache Beam 파이프라인으로 변환합니다. 새 브라우저 탭에서 열린 Cloud Dataflow 웹 UI에서는 파이프라인이 그래픽으로 표시됩니다.

상자를 클릭하면 파이프라인에서 발생하는 변환을 세부적으로 확인할 수 있습니다. 예를 들어 그래픽 표시에서 맨 위에 있는 SQL 쿼리 실행이라는 상자를 클릭하면 백그라운드로 수행되는 작업을 보여주는 그래픽이 나타납니다.

맨 위에 있는 상자 두 개는 Cloud Pub/Sub 주제 transactions와 BigQuery 테이블 us_state_salesregions를 조인한 입력 두 개를 나타냅니다.

작업 결과가 포함된 출력 테이블을 보려면 Cloud Dataflow SQL UI가 있는 브라우저 탭으로 돌아갑니다. 왼쪽 탐색 패널의 프로젝트에서 사용자가 만든 dataflow_sql_dataset 데이터세트를 클릭합니다. 그런 다음 출력 테이블 dfsqltable_sales를 클릭합니다. 미리보기 탭에 출력 테이블의 콘텐츠가 표시됩니다.

이전 작업 보기 및 쿼리 편집

Cloud Dataflow SQL UI는 이전 작업 및 쿼리를 작업 기록 패널에 저장합니다. 작업은 작업이 시작된 날짜별로 나열됩니다. 작업 목록에는 실행 중인 작업이 있는 날짜가 가장 먼저 표시됩니다. 그런 다음 실행 중인 작업이 없는 날짜가 목록에 표시됩니다.

작업 기록 목록을 사용하여 이전 SQL 쿼리를 편집하고 새 Cloud Dataflow 작업을 실행할 수 있습니다. 예를 들어 15초마다 판매 지역별로 판매를 집계하도록 쿼리를 수정하려고 합니다. 작업 기록 패널을 사용하여 이 가이드의 앞부분에서 시작한 실행 중인 작업에 액세스하고 SQL 쿼리를 변경한 후 수정된 쿼리로 다른 작업을 실행합니다.

  1. 왼쪽 탐색 패널에서 작업 기록을 클릭합니다.

  2. 작업 기록에서 Cloud Dataflow를 클릭합니다. 프로젝트의 모든 이전 작업이 나타납니다.

  3. 수정할 작업을 클릭합니다. 쿼리 편집기에서 열기를 클릭합니다.

  4. 쿼리 편집기에서 SQL 쿼리를 수정하여 텀블링 기간을 추가합니다. 다음 쿼리를 복사하는 경우 project-id를 프로젝트 ID로 바꿉니다.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.payload.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.payload.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. 쿼리 편집기에서 Cloud Dataflow 작업 만들기를 클릭하여 수정된 쿼리로 새 작업을 만듭니다.

삭제

이 가이드에서 사용한 리소스 비용이 Google Cloud Platform 계정에 청구되지 않도록 하는 방법은 다음과 같습니다.

  1. transactions_injector.py 게시 스크립트가 아직 실행 중이면 중지합니다.

  2. 실행 중인 Cloud Dataflow 작업을 중지합니다. GCP Console에서 Cloud Dataflow 웹 UI로 이동합니다.

    Cloud Dataflow 웹 UI로 이동

    이 둘러보기를 따라 만든 각 작업에 다음 단계를 수행합니다.

    1. 작업 이름을 클릭합니다.

    2. 작업의 작업 요약 패널에서 작업 중지를 클릭합니다. 작업 중지 방법 옵션이 있는 작업 중지 대화상자가 나타납니다.

    3. 취소를 클릭합니다.

    4. 작업 중지를 클릭합니다. 서비스가 모든 데이터 수집 및 처리를 최대한 빨리 중지합니다. 취소는 처리를 즉시 중단하므로 '처리 중인' 데이터가 손실될 수 있습니다. 작업을 중지하는 데 몇 분 정도 걸릴 수 있습니다.

  3. BigQuery 데이터세트를 삭제합니다. GCP Console에서 BigQuery 웹 UI로 이동합니다.

    BigQuery 웹 UI로 이동

    1. 탐색 패널의 리소스 섹션에서 만든 dataflow_sql_dataset 데이터세트를 클릭합니다.

    2. 오른쪽에 있는 세부정보 패널에서 데이터세트 삭제를 클릭합니다. 이 작업으로 데이터세트, 테이블, 모든 데이터가 삭제됩니다.

    3. 데이터세트 삭제 대화상자에 데이터세트 이름(dataflow_sql_dataset)을 입력하여 삭제 명령어를 확인한 후 삭제를 클릭합니다.

  4. Cloud Pub/Sub 주제를 삭제합니다. GCP Console에서 Cloud Pub/Sub 주제 페이지로 이동합니다.

    Cloud Pub/Sub 주제 페이지로 이동

    1. transactions 주제 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 주제를 영구 삭제합니다.

    3. Cloud Pub/Sub 구독 페이지로 이동합니다.

    4. 남아 있는 transactions 구독 옆에 있는 체크박스를 선택합니다. 작업이 더 이상 실행되지 않으면 구독이 없을 수 있습니다.

    5. 삭제를 클릭하여 구독을 영구 삭제합니다.

  5. Cloud Storage에서 Cloud Dataflow 스테이징 버킷을 삭제합니다. GCP Console에서 Cloud Storage 브라우저로 이동합니다.

    Cloud Storage 브라우저로 이동

    1. Cloud Dataflow 스테이징 버킷 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 버킷을 영구 삭제합니다.

다음 단계

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.