Cloud Composer로 Dataflow 파이프라인 실행

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 DataflowTemplateOperator를 사용하여 Cloud Composer에서 Dataflow 파이프라인을 실행하는 방법을 설명합니다. Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 업로드하고 제공된 JavaScript 사용자 정의 함수(UDF)를 사용하여 변환하고 결과를 BigQuery에 출력할 수 있게 해주는 일괄 파이프라인입니다.

사용자 정의 함수, 입력 파일, json 스키마가 Cloud Storage 버킷에 업로드됩니다. 이러한 파일을 참조하는 DAG는 사용자 정의 함수와 json 스키마 파일을 입력 파일에 적용하는 Dataflow 일괄 파이프라인을 실행합니다. 그러면 이 콘텐츠가 BigQuery 테이블에 업로드됩니다.

개요

  • 워크플로를 시작하기 전에 다음 항목을 만듭니다.

    • location, average_temperature, month 및 선택 사항으로 inches_of_rain, is_current, latest_measurement 열이 있는 빈 데이터 세트의 빈 BigQuery 테이블

    • .txt 파일의 데이터를 BigQuery 테이블 스키마의 올바른 형식으로 정규화하는 JSON 파일. JSON 객체에는 BigQuery Schema 배열이 있습니다. 여기서 각 객체에는 열 이름, 입력 유형, 필수 필드인지 여부가 포함됩니다.

    • BigQuery 테이블에 일괄 업로드할 데이터를 저장할 입력 .txt 파일

    • .txt 파일의 각 행을 테이블의 관련 변수로 변환하는 자바스크립트로 작성된 사용자 정의 함수

    • 이러한 파일의 위치를 가리키는 Airflow DAG 파일

  • 다음으로 .txt 파일, .js UDF 파일, .json 스키마 파일을 Cloud Storage 버킷에 업로드합니다. DAG를 Cloud Composer 환경에도 업로드합니다.

  • DAG가 업로드되면 Airflow가 여기에서 태스크를 실행합니다. 이 태스크에서는 사용자 정의 함수를 .txt 파일에 적용하고 형식을 JSON 스키마에 따라 지정하는 Dataflow 파이프라인을 실행합니다.

  • 마지막으로 데이터는 앞에서 만든 BigQuery 테이블에 업로드됩니다.

시작하기 전에

  • 이 가이드를 따라 사용자 정의 함수를 작성하려면 JavaScript에 익숙해야 합니다.
  • 이 가이드는 Cloud Composer 환경이 이미 있다고 가정하고 작성되었습니다. 환경을 만들려면 환경 만들기를 참조하세요. 이 가이드에서는 모든 버전의 Cloud Composer를 사용할 수 있습니다.
  • API Cloud Composer, Dataflow, Cloud Storage, BigQuery 사용 설정

    API 사용 설정

스키마 정의가 있는 빈 BigQuery 테이블 만들기

스키마 정의가 있는 BigQuery 테이블을 만듭니다. 이 스키마 정의는 이 가이드의 후반부에서 사용됩니다. 이 BigQuery 테이블에는 일괄 업로드 결과가 저장됩니다.

스키마 정의가 있는 빈 테이블을 만들려면 다음과 같이 하세요.

콘솔

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 탐색 패널의 리소스 섹션에서 프로젝트를 확장합니다.

  3. 세부정보 패널에서 데이터 세트 만들기를 클릭합니다.

    데이터 세트 만들기 버튼 클릭

  4. 데이터 세트 만들기 페이지의 데이터 세트 ID 섹션에서 데이터 세트 이름을 average_weather로 지정합니다. 다른 모든 필드를 기본 상태로 둡니다.

    데이터 세트 ID에 평균 날씨를 입력합니다.

  5. 데이터 세트 만들기를 클릭합니다.

  6. 탐색 패널로 돌아가 리소스 섹션에서 프로젝트를 펼칩니다. 그런 다음 average_weather 데이터 세트를 클릭합니다.

  7. 세부정보 패널에서 테이블 만들기를 클릭합니다.

    테이블 만들기 클릭

  8. 테이블 만들기 페이지의 소스 섹션에서 빈 테이블을 선택합니다.

  9. 테이블 만들기 페이지의 대상 섹션에서 다음을 수행합니다.

    • 데이터 세트 이름에서 average_weather 데이터 세트를 선택합니다.

      평균 날씨 데이터 세트의 데이터 세트 옵션 선택

    • 테이블 이름 필드에 average_weather라는 이름을 입력합니다.

    • 테이블 유형기본 테이블로 설정되어 있는지 확인합니다.

  10. 스키마 섹션에 스키마 정의를 입력합니다. 다음 방법 중 하나를 사용할 수 있습니다.

    • 텍스트로 편집을 사용 설정하고 테이블 스키마를 JSON 배열로 입력하여 스키마 정보를 수동으로 입력합니다. 다음 필드를 입력합니다.

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • 필드 추가를 사용하여 스키마를 수동으로 입력합니다.

      필드 추가를 클릭하여 필드 입력

  11. 파티션 및 클러스터 설정에 기본값(No partitioning)을 그대로 둡니다.

  12. 고급 옵션 섹션의 암호화에 기본값(Google-managed key)을 그대로 둡니다.

  13. 테이블 만들기를 클릭합니다.

bq

bq mk 명령어를 사용하여 빈 데이터 세트를 만들고 이 데이터 세트에 테이블을 만듭니다.

다음 명령어를 실행하여 전 세계 평균 날씨 데이터 세트를 만듭니다.

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

다음을 바꿉니다.

다음 명령어를 실행하여 이 데이터 세트에 스키마 정의가 있는 빈 테이블을 만듭니다.

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

테이블을 만든 후 테이블의 만료 시간, 설명, 라벨을 업데이트할 수 있습니다. 또한 스키마 정의를 수정할 수 있습니다.

Python

이 코드를 dataflowtemplateoperator_create_dataset_and_table_helper.py로 저장하고 프로젝트와 위치가 반영되도록 변수를 업데이트한 후 다음 명령어로 실행합니다.

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Cloud Storage 버킷 만들기

워크플로에 필요한 모든 파일을 저장할 버킷을 만듭니다. 이 가이드의 후반부에서 만들 DAG에서 이 스토리지 버킷에 업로드하는 파일을 참조합니다. 새 저장소 버킷을 만들려면 다음 안내를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 Cloud Storage를 엽니다.

    Cloud Storage로 이동

  2. 버킷 만들기를 클릭하여 버킷 생성 양식을 엽니다.

    1. 각 단계를 완료하려면 버킷 정보를 입력하고 계속을 클릭합니다.

      • 버킷의 전역 고유 이름을 지정합니다. 이 가이드에서는 bucketName을 예시로 사용합니다.

      • 위치 유형으로 리전을 선택합니다. 그런 다음 버킷 데이터를 저장할 위치를 선택합니다.

      • 데이터의 기본 스토리지 클래스로 표준을 선택합니다.

      • 균일 액세스 제어를 선택하여 객체에 액세스합니다.

    2. 완료를 클릭합니다.

gsutil

gsutil mb 명령어를 사용합니다.

gsutil mb gs://bucketName/

다음을 바꿉니다.

  • bucketName: 이 가이드의 앞부분에서 만든 버킷의 이름

코드 샘플

C#

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

출력 테이블용 JSON 형식 BigQuery 스키마 만들기

앞에서 만든 출력 테이블과 일치하는 JSON 형식의 BigQuery 스키마 파일을 만듭니다. 필드 이름, 유형, 모드는 앞에서 BigQuery 테이블 스키마에 정의한 것과 일치해야 합니다. 이 파일은 .txt 파일의 데이터를 BigQuery 스키마와 호환되는 형식으로 정규화합니다. 이 파일 이름을 jsonSchema.json으로 지정합니다.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

데이터 형식을 지정하도록 JavaScript 파일 만들기

이 파일에서 입력 파일의 텍스트 줄을 변환하는 로직을 제공하는 UDF(사용자 정의 함수)를 정의합니다. 이 함수는 입력 파일의 각 텍스트 줄을 자체 인수로 취하므로 함수는 입력 파일의 줄마다 한 번씩 실행됩니다. 이 파일 이름을 transformCSVtoJSON.js로 지정합니다.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

입력 파일 만들기

이 파일에는 BigQuery 테이블에 업로드할 정보가 저장됩니다. 이 파일을 로컬에서 복사하고 이름을 inputFile.txt로 지정합니다.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

버킷에 파일 업로드

앞에서 만든 Cloud Storage 버킷에 다음 파일을 업로드합니다.

  • JSON 형식의 BigQuery 스키마(.json)
  • 자바 스크립트 사용자 정의 함수(transformCSVtoJSON.js)
  • 처리하려는 텍스트의 입력 파일(.txt)

콘솔

  1. Google Cloud 콘솔에서 Cloud Storage 버킷 페이지로 이동합니다.

    버킷으로 이동

  2. 버킷 목록에서 버킷을 클릭합니다.

  3. 버킷의 객체 탭에서 다음 중 하나를 수행합니다.

    • 원하는 파일을 바탕화면이나 파일 관리자에서 Google Cloud 콘솔의 기본 창으로 드래그 앤 드롭합니다.

    • 파일 업로드 버튼을 클릭하고 나타나는 대화상자에서 업로드할 파일을 선택한 후 열기를 클릭합니다.

gsutil

gsutil cp 명령어를 실행합니다.

gsutil cp OBJECT_LOCATION gs://bucketName

다음을 바꿉니다.

  • bucketName: 이 가이드의 앞부분에서 만든 버킷의 이름
  • OBJECT_LOCATION: 객체의 로컬 경로. 예를 들면 Desktop/transformCSVtoJSON.js입니다.

코드 샘플

Python

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Cloud Composer에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

DataflowTemplateOperator 구성

DAG를 실행하기 전에 다음 Airflow 변수를 설정합니다.

Airflow 변수
project_id 프로젝트 ID
gce_zone Dataflow 클러스터를 만들어야 하는 Compute Engine 영역
bucket_path 앞에서 만든 Cloud Storage 버킷의 위치

이제 앞서 만든 파일을 참조하여 Dataflow 워크플로를 시작하는 DAG를 만듭니다. 이 DAG를 복사하여 composer-dataflow-dag.py로 로컬에 저장합니다.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Cloud Storage에 DAG 업로드

DAG를 환경 버킷의 /dags 폴더에 업로드합니다. 업로드가 성공적으로 완료되면 Cloud Composer 환경 페이지에서 DAG 폴더 링크를 클릭하여 확인할 수 있습니다.

DAG가 저장된 환경의 DAG 폴더

태스크 상태 보기

  1. Airflow 웹 인터페이스로 이동합니다.
  2. DAG 페이지에서 DAG 이름(예: composerDataflowDAG)을 클릭합니다.
  3. DAG 세부정보 페이지에서 그래프 보기를 클릭합니다.
  4. 상태를 확인합니다.

    • Failed: 태스크 주변에 빨간색 상자가 있습니다. 태스크 위로 마우스 포인터를 올려놓고 상태: 실패를 찾을 수도 있습니다.

    • Success: 태스크 주변에 녹색 상자가 있습니다. 태스크 위로 마우스 포인터를 올려놓고 상태: 성공을 확인할 수도 있습니다.

몇 분 후 Dataflow와 BigQuery에서 결과를 확인할 수 있습니다.

Dataflow에서 작업 보기

  1. Google Cloud 콘솔에서 Dataflow 페이지로 이동합니다.

    Dataflow로 이동

  2. 작업 이름은 다음과 같이 하이픈으로 이름 끝에 연결된 고유한 ID가 있는 dataflow_operator_transform_csv_to_bq로 지정됩니다.

    Dataflow 작업에는 고유 ID가 있습니다.

  3. 작업 세부정보를 확인하려면 이름을 클릭합니다.

    모든 작업 세부정보 보기

BigQuery에서 결과 보기

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 표준 SQL을 사용하여 쿼리를 제출할 수 있습니다. 다음 쿼리를 사용하여 테이블에 추가된 행을 확인합니다.

    SELECT * FROM projectId.average_weather.average_weather