Meluncurkan pipeline Dataflow dengan Cloud Composer

Cloud Composer 1 | Cloud Composer 2

Halaman ini menjelaskan cara menggunakan DataflowTemplateOperator untuk meluncurkan pipeline Dataflow dari Cloud Composer. Pipeline Teks Cloud Storage ke BigQuery adalah pipeline batch yang dapat Anda gunakan untuk mengupload file teks yang disimpan di Cloud Storage, mengubahnya menggunakan JavaScript User Defined Function (UDF) yang Anda sediakan, dan menampilkan hasilnya ke BigQuery.

fungsi yang ditentukan pengguna, file input, dan skema JSON akan diupload ke bucket Cloud Storage. DAG yang merujuk ke file ini akan meluncurkan pipeline batch Dataflow, yang akan menerapkan fungsi yang ditentukan pengguna dan file skema JSON ke file input. Setelah itu, konten ini akan diunggah ke tabel BigQuery

Ringkasan

  • Sebelum memulai alur kerja, Anda akan membuat entity berikut:

    • Tabel BigQuery kosong dari set data kosong yang akan menyimpan kolom informasi berikut: location, average_temperature,month dan, secara opsional, inches_of_rain, is_current, dan latest_measurement.

    • File JSON yang akan menormalkan data dari file .txt ke format yang benar untuk skema tabel BigQuery. Objek JSON akan memiliki array BigQuery Schema, dengan setiap objek akan berisi nama kolom, jenis input, dan apakah kolom tersebut adalah kolom yang wajib diisi atau tidak.

    • File .txt input yang akan menyimpan data yang akan diupload banyak ke tabel BigQuery.

    • Fungsi Buatan Pengguna yang ditulis dalam JavaScript yang akan mengubah setiap baris file .txt menjadi variabel yang relevan untuk tabel kita.

    • File DAG Airflow yang akan mengarah ke lokasi file ini.

  • Selanjutnya, Anda akan mengupload file .txt, file UDF .js, dan file skema .json ke bucket Cloud Storage. Anda juga akan mengupload DAG ke lingkungan Cloud Composer.

  • Setelah DAG diupload, Airflow akan menjalankan tugas dari DAG tersebut. Tugas ini akan meluncurkan pipeline Dataflow yang akan menerapkan Fungsi yang Ditetapkan Pengguna ke file .txt dan memformatnya sesuai dengan skema JSON.

  • Terakhir, data akan diupload ke tabel BigQuery yang Anda buat sebelumnya.

Sebelum memulai

  • Panduan ini mengharuskan Anda memahami JavaScript untuk menulis Fungsi yang Ditetapkan Pengguna.
  • Panduan ini mengasumsikan bahwa Anda sudah memiliki lingkungan Cloud Composer. Lihat Membuat lingkungan untuk membuatnya. Anda dapat menggunakan Cloud Composer versi apa pun dengan panduan ini.
  • Aktifkan API Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Mengaktifkan API

Membuat tabel BigQuery kosong dengan definisi skema

Buat tabel BigQuery dengan definisi skema. Anda akan menggunakan definisi skema ini nanti dalam panduan ini. Tabel BigQuery ini akan menyimpan hasil upload banyak.

Untuk membuat tabel kosong dengan definisi skema:

Konsol

  1. Di konsol Google Cloud, buka halaman BigQuery:

    Buka BigQuery

  2. Di panel navigasi, di bagian Resources, luaskan project Anda.

  3. Di panel detail, klik Create set data.

    create a dataset<i}" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/createDatasetComposerTutorial.png" />

  4. Di halaman Create set data, di bagian Dataset ID, beri nama average_weather Set Data Anda. Biarkan semua kolom lain dalam status default-nya.

    dataset<i} dengan nama average_weather" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/rename-dataset.png" />

  5. Klik Create dataset.

  6. Kembali ke panel navigasi, di bagian Resource, luaskan project Anda. Kemudian, klik set data average_weather.

  7. Di panel detail, klik Buat tabel.

    klik buat tabel

  8. Di halaman Create table, di bagian Source, pilih Empty table.

  9. Di halaman Create table, di bagian Destination:

    • Untuk Dataset name, pilih set data average_weather.

      Dataset<i} untuk {i>dataset<i} average_weather" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/name-table.png" />

    • Di kolom Table name, masukkan nama average_weather.

    • Pastikan Table type disetel ke Native table.

  10. Di bagian Schema, masukkan definisi skema. Anda dapat menggunakan salah satu pendekatan berikut:

    • Masukkan informasi skema secara manual dengan mengaktifkan Edit as text dan memasukkan skema tabel sebagai array JSON. Ketik di kolom berikut:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Gunakan Add field untuk memasukkan skema secara manual:

      add field<i} untuk memasukkan {i>field<i}" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/schema-fields.png" />

  11. Untuk Setelan partisi dan cluster, biarkan nilai default, No partitioning.

  12. Di bagian Advanced options, untuk Encryption biarkan nilai default, Google-managed key.

  13. Klik Create table.

bq

Gunakan perintah bq mk untuk membuat set data kosong dan tabel dalam set data ini.

Jalankan perintah berikut untuk membuat set data cuaca global rata-rata:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Ganti kode berikut:

  • LOCATION: region tempat lingkungan berada.
  • PROJECT_ID: Project ID.

Jalankan perintah berikut untuk membuat tabel kosong dalam set data ini dengan definisi skema:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Setelah tabel dibuat, Anda dapat memperbarui masa berlaku, deskripsi, dan label tabel. Anda juga dapat mengubah definisi skema.

Python

Simpan kode ini sebagai dataflowtemplateoperator_create_dataset_and_table_helper.py dan perbarui variabel di dalamnya untuk mencerminkan project dan lokasi Anda, lalu jalankan dengan perintah berikut:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Membuat bucket Cloud Storage

Buat bucket untuk menyimpan semua file yang diperlukan untuk alur kerja. DAG yang Anda buat nanti dalam panduan ini akan merujuk file yang Anda upload ke bucket penyimpanan ini. Untuk membuat bucket penyimpanan baru:

Konsol

  1. Buka Cloud Storage di Konsol Google Cloud.

    Buka Cloud Storage

  2. Klik Create Bucket untuk membuka formulir pembuatan bucket.

    1. Masukkan informasi bucket Anda, lalu klik Continue untuk menyelesaikan setiap langkah:

      • Tentukan Name yang unik secara global untuk bucket Anda. Panduan ini menggunakan bucketName sebagai contoh.

      • Pilih Wilayah untuk jenis lokasi. Selanjutnya, pilih Lokasi tempat data bucket akan disimpan.

      • Pilih Standard sebagai kelas penyimpanan default untuk data Anda.

      • Pilih kontrol akses Uniform untuk mengakses objek Anda.

    2. Klik Done.

gsutil

Gunakan perintah gsutil mb:

gsutil mb gs://bucketName/

Ganti kode berikut:

  • bucketName: nama bucket yang Anda buat sebelumnya dalam panduan ini.

Contoh kode

C#

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Membuat skema BigQuery berformat JSON untuk tabel output

Buat file skema BigQuery berformat JSON yang cocok dengan tabel output yang Anda buat sebelumnya. Perhatikan bahwa nama, jenis, dan mode kolom harus sama dengan yang ditentukan sebelumnya dalam skema tabel BigQuery Anda. File ini akan menormalkan data dari file .txt ke format yang kompatibel dengan skema BigQuery. Beri nama file ini jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Membuat file JavaScript untuk memformat data Anda

Dalam file ini, Anda akan menentukan UDF (Fungsi Buatan Pengguna) yang menyediakan logika untuk mengubah baris teks dalam file input Anda. Perhatikan bahwa fungsi ini menggunakan setiap baris teks dalam file input Anda sebagai argumennya sendiri, sehingga fungsi ini akan dijalankan satu kali untuk setiap baris file input Anda. Beri nama file ini transformCSVtoJSON.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Membuat file input

File ini akan menyimpan informasi yang ingin Anda upload ke tabel BigQuery. Salin file ini secara lokal dan beri nama inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Mengupload file ke bucket Anda

Upload file berikut ke bucket Cloud Storage yang Anda buat sebelumnya:

  • Skema BigQuery berformat JSON (.json)
  • Fungsi yang Ditetapkan Pengguna JavaScript (transformCSVtoJSON.js)
  • File input teks yang ingin Anda proses (.txt)

Konsol

  1. Di Konsol Google Cloud, buka halaman Bucket Cloud Storage.

    Buka Buckets

  2. Pada daftar bucket, klik bucket Anda.

  3. Di tab Objects untuk bucket, lakukan salah satu langkah berikut:

    • Tarik lalu lepas file yang diinginkan dari desktop atau pengelola file ke panel utama di Konsol Google Cloud.

    • Klik tombol Upload Files, pilih file yang ingin diupload dalam dialog yang muncul, lalu klik Open.

gsutil

Jalankan perintah gsutil cp:

gsutil cp OBJECT_LOCATION gs://bucketName

Ganti kode berikut:

  • bucketName: nama bucket yang Anda buat sebelumnya dalam panduan ini.
  • OBJECT_LOCATION: jalur lokal ke objek Anda. Misalnya, Desktop/transformCSVtoJSON.js.

Contoh kode

Python

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Mengonfigurasi DataflowTemplateOperator

Sebelum menjalankan DAG, tetapkan variabel Airflow berikut.

Variabel Airflow Nilai
project_id Project ID
gce_zone Zona Compute Engine tempat cluster Dataflow harus dibuat
bucket_path Lokasi bucket Cloud Storage yang Anda buat sebelumnya

Sekarang Anda akan mereferensikan file yang telah dibuat sebelumnya untuk membuat DAG yang memulai alur kerja Dataflow. Salin DAG ini dan simpan secara lokal sebagai composer-dataflow-dag.py.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Mengupload DAG ke Cloud Storage

Upload DAG ke folder /dags di bucket lingkungan Anda. Setelah upload berhasil diselesaikan, Anda dapat melihatnya dengan mengklik link DAGs Folder di halaman Lingkungan Cloud Composer.

Folder DAG di lingkungan Anda menyimpan DAG Anda

Melihat status tugas

  1. Buka Airflow web interface.
  2. Di halaman DAG, klik nama DAG (seperti composerDataflowDAG).
  3. Di halaman Detail DAG, klik Graph View.
  4. Periksa status:

    • Failed: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan pointer pada tugas dan mencari State: Failed.

    • Success: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat menahan pointer ke tugas dan memeriksa State: Success.

Setelah beberapa menit, Anda dapat memeriksa hasilnya di Dataflow dan BigQuery.

Melihat tugas Anda di Dataflow

  1. Di konsol Google Cloud, buka halaman Dataflow.

    Buka Dataflow

  2. Tugas Anda diberi nama dataflow_operator_transform_csv_to_bq dengan ID unik yang dilampirkan di akhir nama dengan tanda hubung, seperti:

    tugas dataflow memiliki ID unik

  3. Klik nama untuk melihat detail pekerjaan.

    melihat semua detail pekerjaan

Lihat hasil Anda di BigQuery

  1. Di konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Anda dapat mengirimkan kueri menggunakan SQL standar. Gunakan kueri berikut untuk melihat baris yang telah ditambahkan ke tabel Anda:

    SELECT * FROM projectId.average_weather.average_weather