Dataflow SQL로 스트리밍 데이터 조인

이 가이드에서는 Dataflow SQL을 사용하여 Pub/Sub의 데이터 스트림을 BigQuery 테이블의 데이터와 조인하는 방법을 다룹니다.

목표

이 가이드의 목표는 다음과 같습니다.

  • Pub/Sub 스트리밍 데이터를 BigQuery 테이블 데이터와 조인하는 Dataflow SQL 쿼리를 작성합니다.
  • Dataflow SQL UI에서 Dataflow 작업을 배포합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, Data Catalog. API를 사용 설정합니다.

    API 사용 설정

  5. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  6. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  7. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  8. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  9. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  10. Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, Data Catalog. API를 사용 설정합니다.

    API 사용 설정

  11. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  12. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  13. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  14. Cloud SDK를 설치하고 초기화합니다. 설치 옵션 중 하나를 선택하세요. 이 둘러보기에서 사용할 프로젝트에 project 속성을 설정해야 할 수도 있습니다.
  15. Cloud Console에서 Dataflow SQL 웹 UI로 이동합니다. 그러면 가장 최근에 액세스한 프로젝트가 열립니다. 다른 프로젝트로 전환하려면 Dataflow SQL 웹 UI 맨 위에 있는 프로젝트 이름을 클릭하고 사용할 프로젝트를 검색합니다.
    Dataflow SQL 웹 UI로 이동

소스 예 만들기

이 가이드에서 제공된 예시를 수행하려면 다음 소스를 만들어 가이드 단계에서 사용합니다.

  • transactions라는 Pub/Sub 주제 - Pub/Sub 주제 구독을 통해 도착하는 트랜잭션 데이터 스트림입니다. 각 트랜잭션의 데이터에는 구매한 제품, 할인가, 구매한 시/도와 같은 정보가 포함됩니다. Pub/Sub 주제를 만든 후 주제에 메시지를 게시하는 스크립트를 만듭니다. 이 스크립트는 이 가이드의 뒷부분에서 실행됩니다.
  • us_state_salesregions라는 BigQuery 테이블 - 시/도와 판매 지역이 매핑된 테이블입니다. 이 테이블을 만들기 전에 BigQuery 데이터세트를 만들어야 합니다.

Pub/Sub 주제에 스키마 할당

스키마를 할당하면 Pub/Sub 주제 데이터에서 SQL 쿼리를 실행할 수 있습니다. 현재 Dataflow SQL은 Pub/Sub 주제의 메시지가 JSON 형식으로 직렬화되도록 합니다.

Cloud Pub/Sub 주제 예시 transactions에 스키마를 할당하려면 다음 안내를 따르세요.

  1. 텍스트 파일을 만들고 이름을 transactions_schema.yaml로 지정합니다. 다음 스키마 텍스트를 복사하여 transactions_schema.yaml에 붙여넣습니다.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: tr_time_str
    description: Transaction time string
    mode: NULLABLE
    type: STRING
  - column: first_name
    description: First name
    mode: NULLABLE
    type: STRING
  - column: last_name
    description: Last name
    mode: NULLABLE
    type: STRING
  - column: city
    description: City
    mode: NULLABLE
    type: STRING
  - column: state
    description: State
    mode: NULLABLE
    type: STRING
  - column: product
    description: Product
    mode: NULLABLE
    type: STRING
  - column: amount
    description: Amount of transaction
    mode: NULLABLE
    type: FLOAT
  1. gcloud 명령줄 도구를 이용해 스키마를 할당합니다.

    a. 다음 명령어gcloud 도구를 업데이트합니다. gcloud 도구 버전이 242.0.0 이상인지 확인합니다.

      gcloud components update
    

    b. 명령줄 창에서 다음 명령어를 실행합니다. project-id를 프로젝트 ID로 바꾸고 path-to-filetransactions_schema.yaml 파일의 경로로 바꿉니다.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    명령어 매개변수와 허용되는 스키마 파일 형식에 대한 자세한 내용은 gcloud data-catalog entry update 문서 페이지를 참조하세요.

    c. transactions Pub/Sub 주제에 스키마가 할당되었는지 확인합니다. project-id를 프로젝트 ID로 바꿉니다.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Pub/Sub 소스 찾기

Dataflow SQL UI는 액세스 권한이 있는 프로젝트의 Pub/Sub 데이터 소스 객체를 찾을 수 있으므로 전체 이름을 기억할 필요가 없습니다.

이 튜토리얼의 예시에서는 사용자가 만든 transactions Pub/Sub 주제를 검색합니다.

  1. 왼쪽 패널에서 projectid=project-id transactions를 검색합니다. project-id를 프로젝트 ID로 바꿉니다.

    Dataflow SQL 작업공간의 Data Catalog 검색 패널

스키마 보기

  1. Dataflow SQL UI의 왼쪽 탐색기 패널에서 트랜잭션을 클릭하거나 projectid=project-id system=cloud_pubsub를 입력하여 Pub/Sub 주제를 검색하고 주제를 선택합니다.
  2. 스키마 아래에서 Pub/Sub 주제에 할당한 스키마를 볼 수 있습니다.

    필드 이름 목록 및 설명을 포함하여 주제에 할당된 스키마

SQL 쿼리 만들기

Dataflow SQL UI에서 Dataflow 작업을 실행하는 SQL 쿼리를 만들 수 있습니다.

다음 SQL 쿼리는 데이터 보강 쿼리입니다. 시/도와 판매 지역을 매핑하는 BigQuery 테이블(us_state_salesregions)을 사용하여 이벤트(transactions)의 Pub/Sub 스트림에 다른 필드 sales_region을 추가합니다.

다음 SQL 쿼리를 복사하여 쿼리 편집기에 붙여넣습니다. project-id는 프로젝트 ID로 바꿉니다.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI에 쿼리를 입력하면 쿼리 검사기쿼리 구문을 확인합니다. 쿼리가 유효하면 녹색 체크표시 아이콘이 표시되고 쿼리가 잘못되면 빨간색 느낌표 아이콘이 표시됩니다. 쿼리 구문이 잘못된 경우 검사기 아이콘을 클릭하면 수정해야 하는 항목의 정보가 표시됩니다.

다음 스크린샷은 쿼리 편집기에서 유효한 쿼리를 보여줍니다. 검사기에 녹색 체크표시가 표시되어 있습니다.

편집기에 표시되는 튜토리얼의 쿼리가 포함된 Dataflow SQL 작업공간

SQL 쿼리를 실행하는 Dataflow 작업 만들기

SQL 쿼리를 실행하려면 Dataflow SQL UI에서 Dataflow 작업을 만듭니다.

  1. 쿼리 편집기 위에 있는 작업 만들기를 클릭합니다.

  2. 오른쪽에 열리는 Dataflow 작업 만들기 패널의 대상 옵션에서 BigQuery를 선택합니다. 그런 다음 데이터 세트 IDdataflow_sql_tutorial을 선택하고 테이블 이름sales로 설정합니다.

    Dataflow SQL 작업 양식 만들기
  3. (선택사항)Dataflow는 Dataflow SQL 작업에 가장 적합한 설정을 자동으로 선택하지만, 선택적 매개변수 메뉴를 확장하여 수동으로 다음 파이프 라인 옵션을 지정할 수 있습니다.

    • 최대 작업자 수
    • 영역
    • 서비스 계정 이메일
    • 머신 유형
    • 추가 실험
    • 작업자 IP 주소 구성
    • 네트워크
    • 서브네트워크
  4. 만들기를 클릭합니다. Dataflow 작업을 실행하는 데 몇 분 정도 걸립니다.

Dataflow 작업 보기

Dataflow는 SQL 쿼리를 Apache Beam 파이프라인으로 변환합니다. 새 브라우저 탭에서 열린 Dataflow 웹 UI에서는 파이프라인이 그래픽으로 표시됩니다.

Dataflow 웹 UI에 표시되는 SQL 쿼리의 파이프라인

상자를 클릭하면 파이프라인에서 발생하는 변환을 세부적으로 확인할 수 있습니다. 예를 들어 그래픽 표시에서 맨 위에 있는 SQL 쿼리 실행이라는 상자를 클릭하면 백그라운드로 수행되는 작업을 보여주는 그래픽이 나타납니다.

맨 위에 있는 상자 두 개는 Pub/Sub 주제 transactions와 BigQuery 테이블 us_state_salesregions를 조인한 입력 두 개를 나타냅니다.

두 입력 조인의 쓰기 출력이 25초 이내에 완료됩니다.

작업 결과가 포함된 출력 테이블을 보려면 BigQuery UI로 이동하여 테이블을 확인합니다. 왼쪽 탐색기 패널에 있는 프로젝트에서 생성한 dataflow_sql_tutorial 데이터 세트를 클릭합니다. 그런 다음 출력 테이블인 sales를 클릭합니다. 미리보기 탭에 출력 테이블의 콘텐츠가 표시됩니다.

sales preview 미리보기 테이블에는 tr_time_str, first_name, last_name, city, state, product, amount, sales_region의 열이 포함됩니다.

이전 작업 보기 및 쿼리 편집

Dataflow UI는 이전 작업 및 쿼리를 작업 페이지에 저장합니다.

작업 기록 목록을 사용하여 이전 SQL 쿼리를 볼 수 있습니다. 예를 들어 15초마다 판매 지역별로 판매를 집계하도록 쿼리를 수정하려고 합니다. 작업 페이지를 사용하여 이 튜토리얼의 앞부분에서 시작한 실행 중인 작업에 액세스하고 SQL 쿼리를 복사한 후 수정된 쿼리로 다른 작업을 실행합니다.

  1. Dataflow 작업 페이지에서 수정할 작업을 클릭합니다.

  2. 작업 세부정보 페이지에서 파이프라인 옵션 아래의 오른쪽 패널에서 SQL 쿼리를 찾습니다. queryString 행을 찾습니다.

    queryString이라는 작업 파이프라인 옵션
  3. SQL 쿼리를 복사하여 쿼리 편집기에 붙여넣어 텀블링 기간을 추가합니다. 다음 쿼리를 복사하는 경우 project-id를 프로젝트 ID로 바꿉니다.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. 작업 만들기를 클릭하여 수정된 쿼리로 새 작업을 만듭니다.

정리

이 가이드에서 사용한 리소스 비용이 Cloud Billing 계정에 청구되지 않게 하려면 다음을 따르세요.

  1. transactions_injector.py 게시 스크립트가 실행 중이면 스크립트를 중지합니다.

  2. 실행 중인 Dataflow 작업을 중지합니다. Cloud Console에서 Dataflow 웹 UI로 이동합니다.

    Dataflow 웹 UI로 이동

    이 둘러보기를 따라 만든 각 작업에 다음 단계를 수행합니다.

    1. 작업 이름을 클릭합니다.

    2. 작업의 작업 요약 패널에서 작업 중지를 클릭합니다. 작업 중지 방법 옵션이 있는 작업 중지 대화상자가 나타납니다.

    3. 취소를 클릭합니다.

    4. 작업 중지를 클릭합니다. 서비스가 모든 데이터 수집 및 처리를 최대한 빨리 중지합니다. 취소는 처리를 즉시 중단하므로 '처리 중인' 데이터가 손실될 수 있습니다. 작업을 중지하는 데 몇 분 정도 걸릴 수 있습니다.

  3. BigQuery 데이터세트를 삭제합니다. Cloud Console에서 BigQuery 웹 UI로 이동합니다.

    BigQuery 웹 UI로 이동

    1. 탐색기 패널의 리소스 섹션에서 만든 dataflow_sql_tutorial 데이터 세트를 클릭합니다.

    2. 세부정보 패널의 우측에서 데이터세트 삭제를 클릭합니다. 이 작업으로 데이터세트, 테이블, 모든 데이터가 삭제됩니다.

    3. 데이터세트 삭제 대화상자에서 데이터세트 이름(dataflow_sql_tutorial)을 입력하여 삭제 명령어를 확인한 후 삭제를 클릭합니다.

  4. Pub/Sub 주제를 삭제합니다. Cloud Console에서 Pub/Sub 주제 페이지로 이동합니다.

    Pub/Sub 주제 페이지로 이동

    1. transactions 주제 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 주제를 영구 삭제합니다.

    3. Pub/Sub 구독 페이지로 이동합니다.

    4. 남아 있는 transactions 구독 옆에 있는 체크박스를 선택합니다. 작업이 더 이상 실행되지 않는다면 구독이 없을 수도 있습니다.

    5. 삭제를 클릭하여 구독을 영구 삭제합니다.

  5. Cloud Storage에서 Dataflow 스테이징 버킷을 삭제합니다. Cloud Console에서 Cloud Storage 브라우저로 이동합니다.

    Cloud Storage 브라우저로 이동

    1. Dataflow 스테이징 버킷 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 버킷을 영구 삭제합니다.

다음 단계