Cloud Composer로 Dataflow 파이프라인 실행

이 페이지에서는 DataflowTemplateOperator를 사용하여 Cloud Composer에서 Dataflow 파이프라인을 실행하는 방법을 설명합니다. The Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 업로드하고 제공된 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고 BigQuery로 결과를 출력하는 일괄 파이프라인입니다.

사용자 정의 함수, 입력 파일, json 스키마가 Cloud Storage 버킷에 업로드됩니다. 이러한 파일을 참조하는 DAG는 사용자 정의 함수 및 json 스키마 파일을 입력 파일에 적용하는 Dataflow 일괄 파이프라인을 시작합니다. 그러면 이 콘텐츠가 BigQuery 테이블에 업로드됩니다.

  • 워크플로를 시작하기 전에 다음 항목을 만들어야 합니다.

    • location, average_temperature, month 및 선택 사항으로 inches_of_rain, is_current, latest_measurement 열이 있는 빈 데이터 세트의 빈 BigQuery 테이블.

    • .txt 파일의 데이터를 BigQuery 테이블 스키마의 올바른 형식으로 정규화하는 JSON 파일입니다. JSON 객체에는 BigQuery Schema 배열이 포함되며, 각 객체에는 열 이름, 입력 유형, 필수 필드 여부가 포함됩니다.

    • BigQuery 테이블에 일괄 업로드할 데이터를 저장할 입력 .txt 파일입니다.

    • .txt 파일의 각 행을 테이블의 관련 변수로 변환하는 자바스크립트로 작성된 사용자 정의 함수입니다.

    • 위에서 언급한 파일의 위치를 가리키는 방향성 비순환 그래프(DAG) 파일입니다.

  • 다음으로 .txt 파일, .js UDF 파일, .json 스키마 파일을 스토리지 버킷에 업로드합니다. DAG도 Cloud Composer 환경에 업로드합니다.

  • DAG가 업로드되면 Airflow 작업이 시작됩니다. 이 작업은 사용자 정의 함수를 .txt 파일에 적용하고 JSON 스키마에 따라 형식을 지정하는 Cloud Dataflow 파이프라인을 시작합니다.

  • 마지막으로 데이터는 앞에서 만든 BigQuery 테이블에 업로드됩니다.

비용

이 가이드에서는 다음과 같은 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.

  • Cloud Composer
  • Dataflow
  • Cloud Storage
  • BigQuery

기본 요건

  • Cloud Composer 환경을 만들었는지 확인합니다.
  • Cloud Composer 버전은 최소 1.9.0 이상이어야 합니다. 이미지 버전을 확인하려면 환경 세부정보를 참조하세요.
  • 이 가이드를 따라 사용자 정의 함수를 작성하려면 자바스크립트에 익숙해야 합니다.
  • Cloud Composer, Dataflow, Cloud Storage, BigQuery API를 사용 설정합니다.

    API 사용 설정

환경 설정

스키마 정의가 있는 빈 BigQuery 테이블 만들기

먼저 스키마 정의가 있는 BigQuery 테이블을 만듭니다. 이 스키마 정의는 가이드의 뒷부분에서 사용합니다. 이 BigQuery 테이블에는 일괄 업로드 결과가 저장됩니다.

스키마 정의가 있는 빈 테이블을 만들려면 다음과 같이 하세요.

Console

  1. Cloud Console에서 BigQuery 웹 UI를 엽니다.
    BigQuery 웹 UI로 이동

  2. 탐색 패널의 리소스 섹션에서 프로젝트를 펼칩니다.

  3. 창 오른쪽의 세부정보 패널에서 데이터세트 만들기를 클릭합니다.

창의 오른쪽에 있는 데이터 세트 만들기 버튼을 클릭합니다.

  1. 데이터 세트 만들기 페이지의 데이터 세트 ID 섹션에서 데이터 세트 이름을 average_weather로 지정합니다. 다른 필드는 모두 기본 상태로 둡니다.

데이터 세트 ID에 평균 날씨를 입력합니다.

  1. 데이터세트 만들기를 클릭합니다.

  2. 탐색 패널로 돌아가 리소스 섹션에서 프로젝트를 펼칩니다. 그런 다음 average_weather 데이터 세트를 클릭합니다.

  3. 창 오른쪽에 있는 세부정보 패널에서 테이블 만들기를 클릭합니다.

창의 오른쪽에 있는 테이블 만들기를 클릭합니다.

  1. 테이블 만들기 페이지의 소스 섹션에서 빈 테이블을 선택합니다.

  2. 테이블 만들기 페이지의 대상 섹션에서 다음을 수행합니다.

    • 데이터 세트 이름에서 average_weather 데이터 세트를 선택합니다.

      평균 날씨 데이터 세트의 데이터 세트 옵션 선택

    • 테이블 이름 필드에 average_weather라는 이름을 입력합니다.

    • 테이블 유형기본 테이블로 설정되어 있는지 확인합니다.

  3. 스키마 섹션에 스키마 정의를 입력합니다.

    • 다음과 같이 스키마 정보를 직접 입력합니다.

      • 텍스트로 편집을 사용 설정하고 테이블 스키마를 JSON 배열로 입력합니다. 이 옵션에서 다음 필드를 입력합니다.

        [
        {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
        },
        {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
        },
        {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
        },
        {
        "name": "inches_of_rain",
        "type": "NUMERIC"
        },
        {
        "name": "is_current",
        "type": "BOOLEAN"
        },
        {
        "name": "latest_measurement",
        "type": "DATE"
        }
        ]
        

      • 필드 추가를 사용하여 스키마를 직접 입력합니다.

      화면 하단의 필드 추가를 클릭하여 필드를 입력합니다.

  4. 파티션 및 클러스터 설정에서 기본값(No partitioning)을 그대로 둡니다.

  5. 고급 옵션 섹션에서 암호화의 기본값(Google-managed key)을 그대로 둡니다. 기본적으로 Compute Engine은 미사용으로 저장된 고객 콘텐츠를 암호화합니다.

  6. 테이블 만들기를 클릭합니다.

CLI

--location 플래그와 함께 bq mk 명령어를 사용하여 빈 데이터 세트를 만듭니다. PROJECT_ID를 프로젝트 ID로 바꾸고 LOCATION을 원하는 위치로 바꿉니다. 지연 시간을 최소화하려면 Composer 환경이 위치한 동일한 리전을 선택하는 것이 좋습니다.

다음 명령어를 복사하여 전 세계 평균 날씨 데이터 세트를 만듭니다.

bq --location=LOCATION mk \
--dataset \
PROJECT_ID:average_weather

스키마 정의로 이 데이터 세트의 빈 테이블을 만들려면 아래 명령어에서 PROJECT_ID를 프로젝트 ID로 바꾸고 터미널에 입력합니다.

bq mk \
--table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

테이블을 만든 후 테이블의 만료 시간, 설명, 라벨을 업데이트할 수 있습니다. 또한 스키마 정의를 수정할 수 있습니다.

Python

샘플을 실행하기 전에 다음 명령어를 실행하여 환경에 라이브러리를 설치해야 합니다.

pip install google.cloud.bigquery

이 코드를 dataflowtemplateoperator_create_dataset_and_table_helper.py로 저장하고 프로젝트 및 위치를 반영하도록 변수를 업데이트한 후 다음 명령어로 실행합니다.

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = 'your-project'  # Your GCP Project
location = 'US'  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = 'average_weather'

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

스토리지 버킷 만들기

다음으로 워크플로에 필요한 모든 파일을 보관할 스토리지 버킷을 만들어야 합니다. 앞으로 만드는 DAG는 이 스토리지 버킷에 업로드한 파일을 참조합니다. 새 저장소 버킷을 만들려면 다음 안내를 따르세요.

콘솔

  1. Cloud Console에서 Cloud Storage를 엽니다.

    Cloud Storage 열기

  2. 버킷 만들기를 클릭하여 버킷 생성 양식을 엽니다.

  3. 각 단계를 완료하려면 버킷 정보를 입력하고 계속을 클릭합니다.

    • 버킷의 전역 고유 이름을 지정합니다(이 가이드의 나머지 부분에서는 bucketName으로 참조됨).

    • 위치 유형으로 리전을 선택합니다. 그런 다음 버킷 데이터가 영구적으로 저장될 위치를 선택합니다.

    • 데이터의 기본 스토리지 클래스로 표준을 선택합니다.

    • 균일 액세스 제어를 선택하여 객체에 액세스합니다.

  4. 완료를 클릭합니다.

gsutil

  1. gsutil mb 명령어를 사용합니다.
    gsutil mb gs://bucketName/
    

Python

Python

이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print("Bucket {} created".format(bucket.name))

출력 테이블용 JSON 형식 BigQuery 스키마 만들기

앞에서 만든 출력 테이블과 일치하는 JSON 형식의 BigQuery 스키마 파일을 만듭니다. 필드 이름, 유형, 모드는 앞서 BigQuery 테이블 스키마에서 정의한 것과 일치해야 합니다. 이 파일은 .txt 파일의 데이터를 BigQuery 스키마와 호환되는 형식으로 정규화합니다. 파일 이름을 jsonSchema.json으로 지정합니다.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

데이터 형식 지정을 위해 자바 스크립트(.js) 파일 만들기

이 파일에서 입력 파일의 텍스트 줄을 변환하는 논리를 제공하는 UDF(사용자 정의 함수)를 정의합니다. 이 함수는 입력 파일의 각 텍스트 줄을 자체 인수로 취하므로 이 함수는 입력 파일의 각 행에 대해 한 번씩 실행됩니다. 파일 이름을 transformCSVtoJSON.js으로 지정합니다.

Node.js

이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Compute Engine Node.js API 참조 문서를 참조하세요.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  const weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  var jsonString = JSON.stringify(weatherInCity);
  return jsonString;
}

입력 파일 만들기

이 파일에는 BigQuery 테이블에 업로드할 정보가 저장됩니다. 이 파일을 로컬에서 복사하고 이름을 inputFile.txt로 지정합니다.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

스토리지 버킷에 파일 업로드 및 스테이징 폴더 만들기

앞에서 만든 스토리지 버킷에 다음 파일을 업로드합니다.

  • JSON 형식의 BigQuery 스키마(.json)
  • 자바 스크립트 사용자 정의 함수(transformCSVtoJSON.js)
  • 처리하려는 텍스트의 입력 파일(.txt)

콘솔

  1. Google Cloud Console에서 Cloud Storage 브라우저를 엽니다.
    Cloud Storage 브라우저 열기
  2. 버킷 목록에서 bucketName 버킷을 클릭합니다.

  3. 버킷의 객체 탭에서 다음 중 하나를 수행합니다.

    • 원하는 파일을 바탕화면이나 파일 관리자에서 Cloud Console의 기본 창으로 드래그 앤 드롭합니다.

    • 파일 업로드 버튼을 클릭하고 나타나는 대화상자에서 업로드할 파일을 선택하고 열기를 클릭합니다.

gsutil

[gsutil cp] 명령어를 사용합니다.

gsutil cp [OBJECT_LOCATION] gs://bucketName

여기에서

  • [OBJECT_LOCATION]은 객체의 로컬 경로입니다. 예: Desktop/dog.png

  • [bucketName]은 앞에서 만든 전역적으로 고유한 버킷 이름입니다.

성공하면 응답은 다음 예와 같습니다.

Operation completed over 1 objects/58.8 KiB.

Python

Python

이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

DataflowTemplateOperator 구성

샘플을 실행하기 전에 적절한 환경 변수를 설정해야 합니다. gcloud 또는 Airflow UI를 사용하면 됩니다.

gcloud

다음 명령어를 입력합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set project_id PROJECT_ID

각 매개변수는 다음과 같습니다.

  • ENVIRONMENT는 Cloud Composer 환경의 이름입니다.
  • LOCATION은 Cloud Composer 환경이 위치한 리전입니다.
  • PROJECT_ID는 Google Cloud 프로젝트 ID입니다.
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_region GCE_REGION

각 매개변수는 다음과 같습니다.

  • GCE_REGION은 Compute Engine 리전입니다.
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_zone GCE_ZONE

각 매개변수는 다음과 같습니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set bucket_path BUCKET_PATH

각 매개변수는 다음과 같습니다.

  • BUCKET_PATH는 앞서 만든 Cloud Storage 버킷의 위치입니다.

Airflow UI

  1. 툴바에서 관리 > 변수를 클릭합니다.

  2. 만들기를 클릭합니다.

  3. 다음 정보를 입력합니다.

    • 키: project_id
    • 값: PROJECT_ID(Google Cloud 프로젝트 ID)
  4. 저장 후 다른 항목 추가를 클릭합니다. 추가 저장을 선택하고 왼쪽 하단에서 다른 옵션 추가

  5. 다음 정보를 입력합니다.

    • 키: bucket_path
    • 값: BUCKET_PATH(Cloud Storage 버킷의 위치. 예: 'gs://my-bucket')
  6. 저장 후 다른 항목 추가를 클릭합니다.

  7. 다음 정보를 입력합니다.

    • 키: gce_region
    • 값: GCE_REGION(Compute Engine 리전의 리전)
  8. 저장 후 다른 항목 추가를 클릭합니다.

  9. 다음 정보를 입력합니다.

  10. 저장을 클릭합니다.

이제 앞서 만든 파일을 참조하여 Dataflow 워크플로를 시작하는 DAG를 만듭니다. 이 DAG를 복사하여 composer-dataflow-dag.py로 로컬에 저장합니다.

Python

이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
gce_region = models.Variable.get("gce_region")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your region
        "region": gce_region,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Cloud Storage에 DAG 업로드

DAG를 환경 폴더에 업로드합니다. 업로드가 성공적으로 완료되면 Cloud Composer 환경 페이지에서 DAG 폴더 링크를 클릭하여 이를 볼 수 있습니다.

환경의 DAG 폴더에 DAG가 있습니다.

작업 상태 보기

  1. Airflow 웹 인터페이스로 이동합니다.
  2. DAG 페이지에서 DAG 이름(예: composerDataflowDAG)을 클릭합니다.
  3. DAG 세부정보 페이지에서 그래프 보기를 클릭합니다.
  4. 상태를 확인합니다.

    • 실패: 작업 주변에 빨간색 상자가 있습니다. 작업 위로 마우스 포인터를 올려놓고 상태: 실패를 찾을 수도 있습니다. 작업 주변에 빨간색 상자가 표시되며 실패했음을 나타냅니다.

몇 분 후 Dataflow와 BigQuery에서 결과를 확인할 수 있습니다.

Dataflow에서 작업 보기

  1. Dataflow 웹 UI로 이동합니다. Dataflow 웹 UI로 이동

  2. 작업 이름은 하이픈으로 이름 끝에 연결된 고유한 ID와 함께 dataflow_operator_transform_csv_to_bq로 지정됩니다. Dataflow 작업에는 고유 ID가 있습니다.

  3. 작업 세부정보를 보려면 이름을 클릭합니다. Dataflow 작업 세부정보 자세히 알아보기 아래의 모든 작업 세부정보 보기

BigQuery에서 결과 보기

  1. BigQuery 웹 UI로 이동합니다. BigQuery 웹 UI로 이동

  2. 표준 SQL을 사용하여 쿼리를 제출할 수 있습니다. 다음 쿼리를 사용하여 테이블에 추가된 행을 확인합니다.

    SELECT * FROM projectId.average_weather
    

삭제

Google Cloud Platform 계정에 요금이 부과되지 않도록 이 가이드에서 사용한 리소스를 삭제할 수 있습니다.

  1. Cloud Composer 환경을 삭제합니다.
  2. Cloud Composer 환경의 Cloud Storage 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.
  3. Dataflow 작업을 중지합니다.
  4. BigQuery 테이블BigQuery 데이터 세트를 삭제합니다.