Dataflow SQL로 스트리밍 데이터 조인

이 가이드에서는 Dataflow SQL을 사용하여 Pub/Sub의 데이터 스트림을 BigQuery 테이블의 데이터와 조인하는 방법을 다룹니다.

목표

이 가이드의 목표는 다음과 같습니다.

  • Pub/Sub 스트리밍 데이터를 BigQuery 테이블 데이터와 조인하는 Dataflow SQL 쿼리를 작성합니다.
  • Dataflow SQL UI에서 Dataflow 작업을 배포합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataflow
  • Cloud Storage
  • Pub/Sub

가격 계산기를 사용하면 예상 사용량을 기준으로 예상 비용을 산출할 수 있습니다. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager API를 사용 설정합니다.

    API 사용 설정

  5. 인증 설정:
    1. Cloud Console에서 서비스 계정 키 만들기 페이지로 이동합니다.

      서비스 계정 키 만들기 페이지로 이동
    2. 서비스 계정 목록에서 새 서비스 계정을 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다.
    4. 역할 목록에서 프로젝트 > 소유자.

    5. 만들기를 클릭합니다. 키가 포함된 JSON 파일이 컴퓨터에 다운로드됩니다.
  6. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  7. Cloud SDK를 설치하고 초기화합니다. 설치 옵션 중 하나를 선택하세요. 이 둘러보기에서 사용할 프로젝트에 project 속성을 설정해야 할 수도 있습니다.
  8. Cloud Console에서 BigQuery 웹 UI로 이동합니다. 그러면 가장 최근에 액세스한 프로젝트가 열립니다. 다른 프로젝트로 전환하려면 BigQuery 웹 UI 맨 위에 있는 프로젝트 이름을 클릭하고 사용할 프로젝트를 검색합니다.
    BigQuery 웹 UI로 이동

Dataflow SQL UI로 전환

BigQuery 웹 UI에서 Dataflow UI로 전환하려면 다음 단계를 따르세요.

  1. 더보기 드롭다운 메뉴를 클릭하고 쿼리 설정을 선택합니다.

  2. 오른쪽에 열리는 쿼리 설정 메뉴에서 Dataflow 엔진을 선택합니다.

  3. 프로젝트에 Dataflow와 Data Catalog API가 사용 설정되어 있지 않으면 API를 사용 설정하라는 메시지가 표시됩니다. API 사용 설정을 클릭합니다. Dataflow 및 Data Catalog API 사용 설정은 몇 분 정도 걸릴 수 있습니다.

  4. API 사용 설정이 완료되면 저장을 클릭합니다.

소스 예 만들기

이 가이드에서 제공된 예시를 수행하려면 다음 소스를 만들어 가이드 단계에서 사용합니다.

  • transactions라는 Pub/Sub 주제 - Pub/Sub 주제 구독을 통해 도착하는 트랜잭션 데이터 스트림입니다. 각 트랜잭션의 데이터에는 구매한 제품, 할인가, 구매한 시/도와 같은 정보가 포함됩니다. Pub/Sub 주제를 만든 후 주제에 메시지를 게시하는 스크립트를 만듭니다. 이 스크립트는 이 가이드의 뒷부분에서 실행됩니다.
  • us_state_salesregions라는 BigQuery 테이블 - 시/도와 판매 지역이 매핑된 테이블입니다. 이 테이블을 만들기 전에 BigQuery 데이터세트를 만들어야 합니다.

Pub/Sub 소스 찾기

Dataflow SQL UI는 액세스 권한이 있는 프로젝트의 Pub/Sub 데이터 소스 객체를 찾을 수 있으므로 전체 이름을 기억할 필요가 없습니다.

이 가이드의 예시에서는 사용자가 만든 transactions Pub/Sub 주제를 추가합니다.

  1. 왼쪽 탐색 패널에서 데이터 추가 드롭다운 목록을 클릭하고 Cloud Dataflow 소스를 선택합니다.

  2. 오른쪽에 열리는 Cloud Dataflow 소스 추가 패널에서 Pub/Sub 주제를 선택합니다. 검색창에서 transactions를 검색합니다. 주제를 선택하고 추가를 클릭합니다.

Pub/Sub 주제에 스키마 할당

스키마를 할당하면 Pub/Sub 주제 데이터에서 SQL 쿼리를 실행할 수 있습니다. 현재 Dataflow SQL은 Pub/Sub 주제의 메시지가 JSON 형식으로 직렬화되도록 합니다. Avro와 같은 다른 형식도 추가로 지원될 예정입니다.

Dataflow 소스Pub/Sub 주제 예시를 추가한 후 Dataflow SQL UI의 주제에 스키마를 할당하려면 다음 단계를 따르세요.

  1. 리소스 패널에서 주제를 선택합니다.

  2. 스키마 탭에서 스키마 수정을 클릭합니다. 스키마 측면 패널이 오른쪽에 열립니다.

  3. 텍스트로 편집 버튼을 전환하고 편집기에 다음 인라인 스키마를 붙여넣습니다. 그런 다음 제출을 클릭합니다.

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT64"
      }
    ]
    
  4. (선택사항) 주제 미리보기을 클릭하여 메시지 콘텐츠를 검토하고 정의한 스키마와 일치하는지 확인합니다.

스키마 보기

  1. Dataflow SQL UI의 왼쪽 탐색 패널에서 Cloud Dataflow 소스를 클릭합니다.
  2. Pub/Sub 주제를 클릭합니다.
  3. transactions를 클릭합니다.
  4. 스키마 아래에서 transactions Pub/Sub 주제에 할당한 스키마를 볼 수 있습니다.
필드 이름 목록 및 설명을 포함하여 주제에 할당된 스키마

SQL 쿼리 만들기

Dataflow SQL UI에서 Dataflow 작업을 실행하는 SQL 쿼리를 만들 수 있습니다.

다음 SQL 쿼리는 데이터 보강 쿼리입니다. 시/도와 판매 지역을 매핑하는 BigQuery 테이블(us_state_salesregions)을 사용하여 이벤트(transactions)의 Pub/Sub 스트림에 다른 필드 sales_region을 추가합니다.

다음 SQL 쿼리를 복사하여 쿼리 편집기에 붙여넣습니다. project-id는 프로젝트 ID로 바꿉니다.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI에 쿼리를 입력하면 쿼리 검사기쿼리 구문을 확인합니다. 쿼리가 유효하면 녹색 체크표시 아이콘이 표시되고 쿼리가 잘못되면 빨간색 느낌표 아이콘이 표시됩니다. 쿼리 구문이 잘못된 경우 검사기 아이콘을 클릭하면 수정해야 하는 항목의 정보가 표시됩니다.

다음 스크린샷은 쿼리 편집기에서 유효한 쿼리를 보여줍니다. 검사기에 녹색 체크표시가 표시되어 있습니다.

SQL 쿼리를 실행하는 Dataflow 작업 만들기

SQL 쿼리를 실행하려면 Dataflow SQL UI에서 Dataflow 작업을 만듭니다.

  1. 쿼리 편집기에서 Dataflow 작업 만들기를 클릭합니다.

  2. 오른쪽에 열리는 Dataflow 작업 만들기 패널에서 기본 테이블 이름dfsqltable_sales로 변경합니다.

  3. (선택사항)Dataflow는 Dataflow SQL 작업에 가장 적합한 설정을 자동으로 선택하지만, 선택적 매개변수 메뉴를 확장하여 수동으로 다음 파이프 라인 옵션을 지정할 수 있습니다.

    • 최대 작업자 수
    • 영역
    • 서비스 계정 이메일
    • 머신 유형
    • 추가 실험
    • 작업자 IP 주소 구성
    • 네트워크
    • 서브네트워크
  4. 만들기를 클릭합니다. Dataflow 작업을 실행하는 데 몇 분 정도 걸립니다.

  5. 쿼리 결과 패널이 UI에 나타납니다. 나중에 작업의 쿼리 결과 패널로 돌아오려면 작업 기록 패널에서 작업을 찾고 Cloud Dataflow 작업 및 출력 보기에서와 같이 편집기에서 쿼리 열기 버튼을 사용합니다.

  6. 작업 정보에서 작업 ID 링크를 클릭합니다. 그러면 Dataflow 웹 UI의 Dataflow 작업 세부정보 페이지가 새 브라우저 탭에서 열립니다.

Dataflow 작업 및 출력 보기

Dataflow는 SQL 쿼리를 Apache Beam 파이프라인으로 변환합니다. 새 브라우저 탭에서 열린 Dataflow 웹 UI에서는 파이프라인이 그래픽으로 표시됩니다.

Dataflow 웹 UI에 표시된 SQL 쿼리의 파이프라인

상자를 클릭하면 파이프라인에서 발생하는 변환을 세부적으로 확인할 수 있습니다. 예를 들어 그래픽 표시에서 맨 위에 있는 SQL 쿼리 실행이라는 상자를 클릭하면 백그라운드로 수행되는 작업을 보여주는 그래픽이 나타납니다.

맨 위에 있는 상자 두 개는 Pub/Sub 주제 transactions와 BigQuery 테이블 us_state_salesregions를 조인한 입력 두 개를 나타냅니다.

두 입력 조인의 쓰기 출력이 25초 이내에 완료됩니다.

작업 결과가 포함된 출력 테이블을 보려면 Dataflow SQL UI가있는 브라우저 탭으로 돌아갑니다. 왼쪽 탐색 패널에 있는 프로젝트에서, 생성한 dataflow_sql_dataset 데이터세트를 클릭합니다. 그런 다음 출력 테이블인 dfsqltable_sales를 클릭합니다. 미리보기 탭에 출력 테이블의 콘텐츠가 표시됩니다.

dfsqltable_sales 미리보기 테이블에는 tr_time_str, first_name, last_name, city, state, product, amount, sales_region 열이 포함됩니다.

이전 작업 보기 및 쿼리 편집

Dataflow SQL UI는 이전 작업 및 쿼리를 작업 기록 패널에 저장합니다. 작업은 작업이 시작된 날짜별로 나열됩니다. 작업 목록에는 실행 중인 작업이 있는 날짜가 가장 먼저 표시됩니다. 그런 다음 실행 중인 작업이 없는 날짜가 목록에 표시됩니다.

작업 기록 목록을 사용하여 이전 SQL 쿼리를 편집하고 새 Dataflow 작업을 실행할 수 있습니다. 예를 들어 15초마다 판매 지역별로 판매를 집계하도록 쿼리를 수정하려고 합니다. 작업 기록 패널을 사용하여 이 가이드의 앞부분에서 시작한 실행 중인 작업에 액세스하고 SQL 쿼리를 변경한 후 수정된 쿼리로 다른 작업을 실행합니다.

  1. 왼쪽 탐색 패널에서 작업 기록을 클릭합니다.

  2. 작업 기록에서 Cloud Dataflow를 클릭합니다. 프로젝트의 모든 이전 작업이 나타납니다.

    작업이 실행된 날짜 및 시간과 작업의 상태 아이콘이 나열된 작업 기록
  3. 수정할 작업을 클릭합니다. 쿼리 편집기에서 열기를 클릭합니다.

  4. 쿼리 편집기에서 SQL 쿼리를 수정하여 텀블링 기간을 추가합니다. 다음 쿼리를 복사하는 경우 project-id를 프로젝트 ID로 바꾸세요.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. 쿼리 편집기에서 Cloud Dataflow 작업 만들기를 클릭하여 수정된 쿼리로 새 작업을 만듭니다.

삭제

이 가이드에서 사용한 리소스 비용이 Cloud Billing 계정에 청구되지 않게 하려면 다음을 따르세요.

  1. transactions_injector.py 게시 스크립트가 실행 중이면 스크립트를 중지합니다.

  2. 실행 중인 Dataflow 작업을 중지합니다. Cloud Console에서 Dataflow 웹 UI로 이동합니다.

    Dataflow 웹 UI로 이동합니다

    이 둘러보기를 따라 만든 각 작업에 다음 단계를 수행합니다.

    1. 작업 이름을 클릭합니다.

    2. 작업의 작업 요약 패널에서 작업 중지를 클릭합니다. 작업 중지 방법 옵션이 있는 작업 중지 대화상자가 나타납니다.

    3. 취소를 클릭합니다.

    4. 작업 중지를 클릭합니다. 서비스가 모든 데이터 수집 및 처리를 최대한 빨리 중지합니다. 취소는 처리를 즉시 중단하므로 '처리 중인' 데이터가 손실될 수 있습니다. 작업을 중지하는 데 몇 분 정도 걸릴 수 있습니다.

  3. BigQuery 데이터세트를 삭제합니다. Cloud Console에서 BigQuery 웹 UI로 이동합니다.

    BigQuery 웹 UI로 이동

    1. 탐색 패널의 리소스 섹션에서 만든 dataflow_sql_dataset 데이터세트를 클릭합니다.

    2. 오른쪽에 있는 세부정보 패널에서 데이터세트 삭제를 클릭합니다. 이 작업으로 데이터세트, 테이블, 모든 데이터가 삭제됩니다.

    3. 데이터세트 삭제 대화상자에서 데이터세트 이름(dataflow_sql_dataset)을 입력하여 삭제 명령어를 확인한 후 삭제를 클릭합니다.

  4. Pub/Sub 주제를 삭제합니다. Cloud Console에서 Pub/Sub 주제 페이지로 이동합니다.

    Pub/Sub 주제 페이지로 이동

    1. transactions 주제 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 주제를 영구 삭제합니다.

    3. Pub/Sub 구독 페이지로 이동합니다.

    4. 남아 있는 transactions 구독 옆에 있는 체크박스를 선택합니다. 작업이 더 이상 실행되지 않는다면 구독이 없을 수도 있습니다.

    5. 삭제를 클릭하여 구독을 영구 삭제합니다.

  5. Cloud Storage에서 Dataflow 스테이징 버킷을 삭제합니다. Cloud Console에서 Cloud Storage 브라우저로 이동합니다.

    Cloud Storage 브라우저로 이동

    1. Dataflow 스테이징 버킷 옆에 있는 체크박스를 선택합니다.

    2. 삭제를 클릭하여 버킷을 영구 삭제합니다.

다음 단계