작업 빌더로 커스텀 작업 만들기

작업 빌더를 사용하면 커스텀 일괄 및 스트리밍 Dataflow 작업을 만들 수 있습니다. 작업 빌더 작업을 Apache Beam YAML 파일로 저장하여 공유하고 재사용할 수도 있습니다.

새 파이프라인 만들기

작업 빌더에서 새 파이프라인을 만들려면 다음 단계를 수행합니다.

  1. Google Cloud 콘솔에서 작업 페이지로 이동합니다.

    작업으로 이동

  2. 빌더에서 작업 만들기를 클릭합니다.

  3. 작업 이름에 작업 이름을 입력합니다.

  4. 일괄 또는 스트리밍을 선택합니다.

  5. 스트리밍을 선택하는 경우 윈도잉 모드를 선택합니다. 그런 다음 다음과 같이 윈도우 사양을 입력합니다.

    • 고정 윈도우: 윈도우 크기를 초 단위로 입력합니다.
    • 슬라이딩 윈도우: 윈도우 크기와 윈도우 기간을 초 단위로 입력합니다.
    • 세션 윈도우: 세션 간격을 초 단위로 입력합니다.

    윈도잉에 대한 자세한 내용은 윈도우 및 윈도우 함수를 참조하세요.

다음으로 다음 섹션의 설명대로 소스, 변환, 싱크를 파이프라인에 추가합니다.

파이프라인에 소스 추가

파이프라인에는 소스가 최소 하나 이상 있어야 합니다. 처음에는 작업 빌더가 빈 소스로 채워집니다. 소스를 구성하려면 다음 단계를 수행합니다.

  1. 소스 이름 상자에 소스 이름을 입력하거나 기본 이름을 사용합니다. 이 이름은 작업을 실행할 때 작업 그래프에 표시됩니다.

  2. 소스 유형 목록에서 데이터 소스 유형을 선택합니다.

  3. 소스 유형에 따라 추가 구성 정보를 제공합니다. 예를 들어 BigQuery를 선택하면 읽을 테이블을 지정합니다.

    Pub/Sub를 선택하면 메시지 스키마를 지정합니다. Pub/Sub 메시지에서 읽으려는 각 필드의 이름과 데이터 유형을 입력합니다. 파이프라인은 스키마에 지정되지 않은 모든 필드를 삭제합니다.

  4. (선택사항) 일부 소스 유형의 경우 소스 데이터 미리보기를 클릭하여 소스 데이터를 미리 볼 수 있습니다.

파이프라인에 다른 소스를 추가하려면 소스 추가를 클릭합니다. 여러 소스의 데이터를 결합하려면 파이프라인에 SQL 또는 Join 변환을 추가합니다.

파이프라인에 변환 추가

원하는 경우 파이프라인에 변환을 하나 이상 추가합니다. 다음 변환을 사용하여 소스 및 기타 변환의 데이터를 조작, 집계 또는 조인할 수 있습니다.

변환 유형 설명 Beam YAML 변환 정보
필터링(Python) Python 표현식으로 레코드를 필터링합니다.
SQL 변환 SQL 문으로 레코드를 조작하거나 여러 입력을 조인합니다.
필드 매핑(Python) Python 표현식과 함수로 새 필드를 추가하거나 전체 레코드를 다시 매핑합니다.
필드 매핑(SQL) SQL 표현식으로 레코드 필드를 추가하거나 매핑합니다.
YAML 변환:
  1. AssertEqual
  2. AssignTimestamps
  3. 결합
  4. 분할
  5. 필터
  6. Flatten
  7. 참여
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Beam YAML SDK에서 원하는 변환을 사용합니다.

YAML 변환 구성: YAML 변환의 구성 파라미터를 YAML 맵으로 제공합니다. 키-값 쌍은 결과 Beam YAML 변환의 구성 섹션을 채우는 데 사용됩니다. 각 변환 유형에 지원되는 구성 파라미터는 Beam YAML 변환 문서를 참고하세요. 샘플 구성 매개변수:

결합
group_by:
combine:
참여
type:
equalities:
fields:
로그 작업의 작업자 로그에 레코드를 로깅합니다.
그룹화 기준 count()sum()과 같은 함수로 레코드를 결합합니다.
참여 동일한 필드의 여러 입력을 조인합니다.
분할 배열 필드를 평면화하여 레코드를 분할합니다.

변환을 추가하려면 다음 안내를 따르세요.

  1. 변환 추가를 클릭합니다.

  2. 변환 이름 상자에 변환 이름을 입력하거나 기본 이름을 사용합니다. 이 이름은 작업을 실행할 때 작업 그래프에 표시됩니다.

  3. 변환 유형 목록에서 변환 유형을 선택합니다.

  4. 변환 유형에 따라 추가 구성 정보를 제공합니다. 예를 들어 필터(Python)를 선택하면 필터로 사용할 Python 표현식을 입력합니다.

  5. 변환 입력 단계를 선택합니다. 입력 단계는 출력에서 이 변환 입력을 제공하는 소스나 변환입니다.

파이프라인에 싱크 추가

파이프라인에는 싱크가 최소 하나 이상 있어야 합니다. 처음에는 작업 빌더가 빈 싱크로 채워집니다. 싱크를 구성하려면 다음 단계를 수행합니다.

  1. 싱크 이름 상자에 싱크 이름을 입력하거나 기본 이름을 사용합니다. 이 이름은 작업을 실행할 때 작업 그래프에 표시됩니다.

  2. 싱크 유형 목록에서 싱크 유형을 선택합니다.

  3. 싱크 유형에 따라 추가 구성 정보를 제공합니다. 예를 들어 BigQuery 싱크를 선택하면 쓸 BigQuery 테이블을 선택합니다.

  4. 싱크 입력 단계를 선택합니다. 입력 단계는 출력에서 이 변환 입력을 제공하는 소스나 변환입니다.

  5. 파이프라인에 다른 싱크를 추가하려면 싱크 추가를 클릭합니다.

파이프라인 실행

작업 빌더에서 파이프라인을 실행하려면 다음 단계를 수행합니다.

  1. (선택사항) Dataflow 작업 옵션을 설정합니다. Dataflow 옵션 섹션을 펼치려면 펼치기 화살표를 클릭합니다.

  2. 작업 실행을 클릭합니다. 작업 빌더가 제출된 작업의 작업 그래프로 이동합니다. 작업 그래프를 사용하여 작업 상태를 모니터링할 수 있습니다.

실행하기 전에 파이프라인 유효성 검사

Python 필터 및 SQL 표현식과 같이 구성이 복잡한 파이프라인의 경우 실행하기 전에 파이프라인 구성에 문법 오류가 있는지 확인하는 것이 좋습니다. 파이프라인 문법 유효성을 검사하려면 다음 단계를 수행합니다.

  1. 검사를 클릭하여 Cloud Shell을 열고 유효성 검사 서비스를 시작합니다.
  2. 검사 시작을 클릭합니다.
  3. 유효성 검사 중에 오류가 발견되면 빨간색 느낌표가 표시됩니다.
  4. 감지된 오류를 수정하고 검사를 클릭하여 수정사항을 확인합니다. 오류가 발견되지 않으면 녹색 체크표시가 나타납니다.

gcloud CLI로 실행

gcloud CLI를 사용하여 Beam YAML 파이프라인을 실행할 수도 있습니다. gcloud CLI로 작업 빌더 파이프라인을 실행하려면 다음을 실행하세요.

  1. YAML 저장을 클릭하여 YAML 저장 창을 엽니다.

  2. 다음 작업 중 하나를 수행합니다.

    • Cloud Storage에 저장하려면 Cloud Storage 경로를 입력하고 저장을 클릭합니다.
    • 로컬 파일을 다운로드하려면 다운로드를 클릭합니다.
  3. 셸 또는 터미널에서 다음 명령어를 실행합니다.

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    YAML_FILE_PATH을 로컬 또는 Cloud Storage에 있는 YAML 파일의 경로로 바꿉니다.

다음 단계