데이터 파이프라인 작업

개요

Dataflow 데이터 파이프라인을 사용하여 반복 작업 일정을 만들고, 여러 작업 실행에서 리소스가 소비되는 위치를 파악하고, 데이터 최신 상태 목표를 정의 및 관리하고, 개별 파이프라인 단계를 드릴다운하여 파이프라인을 수정하고 최적화할 수 있습니다.

데이터 파이프라인 기능:

  • 일정에 따라 일괄 작업을 실행할 반복 일괄 파이프라인을 만듭니다.
  • 최신 버전의 입력 데이터에 대해 일괄 작업을 실행하려면 반복 증분 일괄 파이프라인을 만듭니다.
  • 파이프라인 요약 스코어카드를 사용하여 파이프라인의 집계 용량 사용량 및 리소스 소비를 확인합니다.
  • 스트리밍 파이프라인의 데이터 최신 상태를 확인합니다. 시간 경과에 따라 발전하는 이 측정항목은 최신 상태가 지정된 목표 미만이 되면 알려주는 알림에 연결될 수 있습니다.
  • 파이프라인 측정항목 그래프를 사용하여 일괄 파이프라인 작업을 비교하고 이상치를 찾습니다.

데이터 파이프라인 사용 제한:

  • 리전별 가용성: Dataflow 데이터 파이프라인은 App Engine 애플리케이션인 Cloud Scheduler를 사용하므로 사용 가능한 App Engine 리전에서 데이터 파이프라인을 사용할 수 있습니다.

  • 할당량 한도

    • 프로젝트당 최대 파이프라인 수: 500개
    • 조직당 최대 파이프라인 수: 2,500개

API 참조 문서:

API 문서는 데이터 파이프라인 참조를 확인하세요.

데이터 파이프라인 유형

Dataflow 데이터 파이프라인에는 스트리밍과 배치의 두 가지 유형이 있습니다. 두 가지 파이프라인 유형 모두 Dataflow 템플릿에 정의된 작업을 실행합니다.

스트리밍 데이터 파이프라인
스트리밍 데이터 파이프라인은 Dataflow 스트리밍 작업이 생성되면 바로 실행합니다.
일괄 데이터 파이프라인
일괄 데이터 파이프라인은 사용자 정의 일정에 따라 Dataflow 일괄 작업을 실행합니다. 일괄 파이프라인 입력 파일 이름을 매개변수화하여 증분 일괄 파이프라인 처리를 허용할 수 있습니다.

증분 일괄 파이프라인

날짜/시간 자리표시자를 사용하여 일괄 파이프라인의 증분 입력 파일 형식을 지정할 수 있습니다.

  • 연도, 월, 날짜, 시간, 분, 초 자리표시자를 사용할 수 있으며 strftime() 형식을 따라야 합니다. 자리표시자 앞에는 백분율 기호(%)가 옵니다.
  • 파이프라인 생성 중에는 매개변수 형식이 확인되지 않습니다.
    • 예: 매개변수화된 입력 파일 경로로 'gs://bucket/Y'를 지정한 경우 앞에 '%'가 없는 'Y'는 strftime() 형식에 매핑되지 않기 때문에 'gs://bucket/Y'로 평가됩니다.

각 예약된 일괄 파이프라인 실행 시에 입력 파일 경로의 자리표시자 부분은 현재(또는 타임 시프트) 날짜/시간으로 평가됩니다(날짜 값은 예약 작업 시간대의 현재 날짜를 사용하여 평가됨). 평가된 파일 경로가 입력 파일의 경로와 일치할 경우 파일은 예약된 시간에 일괄 파이프라인이 처리하기 위해 선택됩니다.

  • 예: 일괄 파이프라인은 해당 시간(PST) 정각에 반복되도록 예약됩니다. 입력 파일 경로를 gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv로 매개변수화하면 2021년 4월 15일 오후 6시(PST)에 입력 파일 경로가 gs://bucket-name/2021-04-15/prefix-18_00.csv로 평가됩니다.

타임 시프트 매개변수 사용

'{[+|-][0-9]+[m|h]}' 중괄호 형식으로 둘러싼 + 또는 - 분 또는 시간 타임 시프트 매개변수를 사용하여 입력 파일 경로와 파이프라인 일정의 현재 날짜/시간 전후로 이동된 평가 날짜/시간의 일치를 지원할 수 있습니다. 배치 파이프라인은 예약된 시간에 계속 반복되지만 입력 파일 경로는 지정된 타임스탬프로 평가됩니다.

  • 예: 일괄 파이프라인은 해당 시간(PST) 정각에 반복되도록 예약됩니다. 입력 파일 경로를 gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}로 매개변수화하면 2021년 4월 15일 오후 6시(PST)에 입력 파일 경로가 gs://bucket-name/2021-04-15/prefix-16_00.csv로 평가됩니다.

데이터 파이프라인 역할

데이터 파이프라인 작업이 성공하려면 다음과 같이 사용자에게 필요한 IAM 역할을 부여해야 합니다.

  1. 작업을 수행하려면 사용자에게 적절한 역할이 있어야 합니다.

    • Datapipelines.admin: 모든 데이터 파이프라인 작업을 수행할 수 있습니다.
    • Datapipelines.viewer: 데이터 파이프라인 및 작업을 볼 수 있습니다.
    • Datapipelines.invoker: 데이터 파이프라인 작업 을 호출할 수 있습니다(API를 사용하여 이 역할을 사용 설정할 수 있음).
  2. 사용자는 해당 계정에 대한 roles/iam.serviceAccountUser 역할이 부여되어 Cloud Scheduler 및 Dataflow에서 사용하는 서비스 계정의 역할을 할 수 있어야 합니다. 사용자가 Cloud Scheduler 및 Dataflow의 서비스 계정을 선택하지 않으면 기본 Compute Engine 서비스 계정이 사용됩니다.

데이터 파이프라인 만들기

다음 두 가지 방법으로 데이터 파이프라인을 만들 수 있습니다.

  1. 작업 가져오기 또는
  2. 데이터 파이프라인 만들기

데이터 파이프라인 설정 페이지: Cloud Console에서 Dataflow 파이프라인 기능에 처음 액세스하면 설정 페이지가 열립니다.

  1. 나열된 API 사용 설정
  2. Cloud Scheduler에서 파이프라인을 예약하는 데 사용할 App Engine 애플리케이션의 리전을 선택합니다.

작업 가져오기

기본 또는 Flex 템플릿을 기반으로 하는 Dataflow 일괄 또는 스트리밍 작업을 가져와서 데이터 파이프라인으로 만들 수 있습니다.

  1. Cloud Console의 Dataflow 작업 페이지로 이동하여 완료된 작업을 선택한 후 작업 세부정보 페이지에서 '+파이프라인으로 가져오기'를 선택합니다.

  2. 템플릿에서 파이프라인 만들기 페이지에서 '데이터 파이프라인' 파이프라인 옵션이 선택됩니다. 다른 매개변수는 가져온 작업의 옵션으로 채워집니다.

    1. 일괄 작업의 경우 템플릿 매개변수 아래의 '파이프라인 예약' 섹션에 반복 일정을 입력합니다. 일괄 실행을 예약하는 데 사용되는 Cloud Scheduler의 이메일 계정 주소를 제공하는 것은 선택사항입니다. 지정하지 않으면 기본 Compute Engine 서비스 계정이 사용됩니다. 참고: 사용자 지정 또는 기본 Compute Engine 사용자 계정인지 여부에 관계없이(데이터 파이프라인 역할 참조) 사용자에게 Cloud Scheduler에서 사용하는 서비스 계정의 roles/iam.serviceAccountUser 역할이 부여되어야 합니다.

데이터 파이프라인 만들기

  1. Cloud Console에서 Dataflow 파이프라인 페이지로 이동한 다음 '+데이터 파이프라인 만들기'를 선택합니다.

  2. 작업 관리의 템플릿에서 파이프라인 만들기 페이지에서 '데이터 파이프라인'을 선택하고 파이프라인 이름을 입력한 다음 다른 템플릿 선택 및 매개변수 필드를 채웁니다.

    1. 일괄 작업의 경우 템플릿 매개변수 아래의 '파이프라인 예약' 섹션에 반복 일정을 입력합니다. 일괄 실행을 예약하는 데 사용되는 Cloud Scheduler의 이메일 계정 주소를 제공하는 것은 선택사항입니다. 지정하지 않으면 기본 Compute Engine 서비스 계정이 사용됩니다. 참고: 사용자 지정 또는 기본 Compute Engine 사용자 계정인지 여부에 관계없이(데이터 파이프라인 역할 참조) 사용자에게 Cloud Scheduler에서 사용하는 서비스 계정의 roles/iam.serviceAccountUser 역할이 부여되어야 합니다.

일괄 데이터 파이프라인 만들기

이 샘플 일괄 데이터 파이프라인을 만들려면 프로젝트의 다음 리소스에 액세스할 수 있어야 합니다.

이 예시 파이프라인에서는 Cloud Storage의 텍스트 파일을 BigQuery로 일괄 파이프라인 템플릿을 사용하는데, 이 템플릿은 Cloud Storage에서 CSV 형식의 파일을 읽고 변환을 실행한 다음 your-project-id:your-dataset-name.three_column_table에 값을 삽입합니다.

  1. 로컬 드라이브에 다음 파일을 만듭니다.
    1. 대상 BigQuery 테이블에 다음 스키마가 포함된 bq_three_column_table.json 파일입니다.
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. split_csv_3cols.js 자바스크립트 파일입니다. BigQuery에 삽입하기 전에 입력 데이터에 대한 간단한 변환을 구현합니다.
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. BigQuery 테이블에 삽입될 여러 레코드가 있는 file01.csv CSV 파일입니다.
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. 다음과 같이 gsutil을 사용하여 프로젝트의 Cloud Storage 버킷 폴더에 파일을 복사합니다.
    1. bq_three_column_table.jsonsplit_csv_3cols.jsgs://your-bucket/text_to_bigquery/에 복사합니다.
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. file01.csvgs://your-bucket/inputs/에 복사합니다.
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. Cloud Storage 브라우저에서 your-bucket에 'tmp' 폴더를 만듭니다. 폴더 이름을 선택하여 버킷 세부정보 페이지를 연 후 폴더 만들기를 클릭하여 버킷에 'tmp' 폴더를 만듭니다.
  4. Dataflow 파이프라인 페이지로 이동한 다음 '데이터 파이프라인 만들기'를 선택합니다. 템플릿에서 파이프라인 만들기 페이지에서 다음 항목을 입력하거나 선택합니다.

    1. 작업 관리
      1. '데이터 파이프라인'을 선택합니다.
      2. 파이프라인 이름: 'text_to_bq_batch_data_pipeline'을 입력합니다.
      3. '계속'을 클릭합니다.
    2. 템플릿 선택
      1. 리전 엔드포인트: Compute Engine 리전을 선택합니다.
      2. 템플릿 목록: '일괄로 데이터 처리(일괄 처리)'에서 'Cloud Storage의 텍스트 파일을 BigQuery로'를 선택합니다. 설명: 일괄 파이프라인입니다. Cloud Storage에 저장된 텍스트 파일을 읽고 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고 결과를 BigQuery에 출력합니다.' 참고: '데이터 연속 처리(스트림)'에서 같은 이름의 스트리밍 파이프라인을 선택하지 마세요.
      3. '계속'을 클릭합니다.
    3. 템플릿 매개변수
      1. 파이프라인 예약: 속해 있는 시간대에서 매시간 25분과 같은 일정을 선택합니다. 아래 설명된 대로 파이프라인을 제출한 후 일정을 수정할 수 있습니다.
    4. 필수 매개변수:
      1. Cloud Storage의 자바스크립트 UDF 경로
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. JSON 경로
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. 자바스크립트 UDF 이름: 'transform'
      4. BigQuery 출력 테이블(정규화된 테이블 이름):
        your_project_id:your_dataset.three_column_table
        
      5. Cloud Storage 입력 경로
        gs://your_bucket/inputs/file*.csv
        
      6. _임시 BigQuery 디렉터리
        gs://your_bucket/tmp
        
      7. 임시 위치
        gs://your_bucket/tmp
        
    5. 제출을 클릭합니다.
  5. 파이프라인 세부정보 페이지에서 파이프라인 및 템플릿 정보를 확인하고 현재 및 이전 기록을 봅니다.

또한 Dataflow 파이프라인 콘솔에서 실행 버튼을 사용하여 필요에 따라 일괄 파이프라인을 실행할 수도 있습니다.

샘플 스트리밍 데이터 파이프라인 만들기

샘플 일괄 파이프라인 안내에 따라 샘플 스트리밍 데이터 파이프라인을 만들 수 있지만 다음과 같은 차이점이 있습니다.

  • 파이프라인 일정. 스트리밍 데이터 파이프라인의 일정을 지정하지 않습니다. Dataflow 스트리밍 작업이 즉시 시작됩니다.

  • 템플릿 선택: '지속적으로 데이터 처리(스트림)'에서 'Cloud Storage의 텍스트 파일을 BigQuery로'를 선택합니다. 설명: Cloud Storage에 저장된 텍스트 파일을 읽고, 사용자 정의 자바스크립트 함수를 통해 변환을 수행하고, 결과를 BigQuery에 스트리밍할 수 있는 스트리밍 파이프라인입니다. 이 파이프라인에는 자바스크립트 함수와 BigQuery TableSchema의 JSON 표현이 필요합니다.

  • 작업자 머신 유형: 파이프라인은 gs://<your_bucket>/inputs/file*.csv 패턴과 일치하는 초기 파일 집합과 inputs/ 폴더에 업로드하는 이 패턴과 일치하는 추가 파일을 처리합니다. CSV 파일 크기가 몇 GB를 초과하는 경우 발생 가능한 메모리 부족 오류를 방지하기 위해 n1-highmem-8과 같이 기본 n1-standard-4 머신 유형보다 메모리가 더 많은 머신 유형을 선택합니다.

파이프라인 목표 위반 조사

반복 일괄 파이프라인

Cloud Console의 파이프라인 세부정보 페이지에서 파이프라인 상태의 초기 분석을 위해 파이프라인 상태 패널의 '개별 작업 상태' 및 '단계당 스레드 시간' 그래프를 사용합니다.

샘플 조사:

  1. 매시간 3분에 실행되는 반복 일괄 파이프라인이 있으며, 각 작업은 일반적으로 약 9분 동안 실행되며, 모든 작업을 10분 이내에 완료하려는 목표가 있습니다.

  2. '작업 상태' 그래프는 작업이 10분 넘게 실행되었음을 보여줍니다.

  3. 업데이트/실행 기록 테이블에서 원하는 시간 동안 실행된 작업을 찾은 다음 클릭하여 Dataflow 작업 세부정보 페이지로 이동합니다. 이 페이지에서 더 오래 실행되는 단계를 찾은 후 로그에서 가능한 오류를 찾아 지연의 원인을 파악합니다.

스트리밍 파이프라인

Cloud Console의 파이프라인 세부정보 페이지에 있는 파이프라인 정보 탭 아래에서 파이프라인 상태 초기 분석을 위해 파이프라인 상태 패널의 데이터 최신 상태 그래프를 사용합니다.

샘플 조사:

  1. 일반적으로 20초의 데이터 최신 상태로 출력을 생성하는 스트리밍 파이프라인이 있습니다.

  2. 30초의 데이터 최신 상태를 보장한다는 목표를 설정합니다. 데이터 최신 상태 그래프를 검토한 결과, 오전 9시에서 10시 사이에 데이터 최신 상태가 거의 40초로 급증한 것을 발견했습니다.

  3. 파이프라인 측정항목 탭으로 전환한 다음 처리량, CPU 사용률, 메모리 사용률 그래프를 확인하여 추가 분석을 수행합니다.