Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Halaman ini menjelaskan cara menggunakan DataflowTemplateOperator
untuk meluncurkan
pipeline Dataflow dari
Cloud Composer.
Pipeline Teks Cloud Storage ke BigQuery adalah pipeline batch yang memungkinkan Anda mengupload file teks yang disimpan di Cloud Storage, mengubahnya menggunakan User Defined Function (UDF) JavaScript yang Anda berikan, dan menampilkan hasilnya ke BigQuery.
Ringkasan
Sebelum memulai alur kerja, Anda akan membuat entitas berikut:
Tabel BigQuery kosong dari set data kosong yang akan menyimpan kolom informasi berikut:
location
,average_temperature
,month
, dan, secara opsional,inches_of_rain
,is_current
, danlatest_measurement
.File JSON yang akan menormalisasi data dari file
.txt
ke dalam format yang benar untuk skema tabel BigQuery. Objek JSON akan memiliki arrayBigQuery Schema
, dengan setiap objek akan berisi nama kolom, jenis input, dan apakah kolom tersebut wajib diisi atau tidak.File
.txt
input yang akan menyimpan data yang akan diupload secara batch ke tabel BigQuery.Fungsi yang Ditentukan Pengguna yang ditulis dalam JavaScript yang akan mengubah setiap baris file
.txt
menjadi variabel yang relevan untuk tabel kita.File DAG Airflow yang akan mengarah ke lokasi file ini.
Selanjutnya, Anda akan mengupload file
.txt
, file UDF.js
, dan file skema.json
ke bucket Cloud Storage. Anda juga akan mengupload DAG ke lingkungan Cloud Composer.Setelah DAG diupload, Airflow akan menjalankan tugas dari DAG tersebut. Tugas ini akan meluncurkan pipeline Dataflow yang akan menerapkan Fungsi yang Ditetapkan Pengguna ke file
.txt
dan memformatnya sesuai dengan skema JSON.Terakhir, data akan diupload ke tabel BigQuery yang Anda buat sebelumnya.
Sebelum memulai
- Panduan ini memerlukan pemahaman tentang JavaScript untuk menulis User Defined Function.
- Panduan ini mengasumsikan bahwa Anda sudah memiliki lingkungan Cloud Composer. Lihat Membuat lingkungan untuk membuatnya. Anda dapat menggunakan versi Cloud Composer apa pun dengan panduan ini.
-
Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.
Membuat tabel BigQuery kosong dengan definisi skema
Buat tabel BigQuery dengan definisi skema. Anda akan menggunakan definisi skema ini nanti dalam panduan ini. Tabel BigQuery ini akan menyimpan hasil upload batch.
Untuk membuat tabel kosong dengan definisi skema:
Konsol
Di konsol Google Cloud, buka halaman BigQuery:
Di panel navigasi, di bagian Resource, luaskan project Anda.
Di panel detail, klik Create dataset.
Di halaman Create dataset, di bagian Dataset ID, beri nama Set Data Anda
average_weather
. Biarkan semua kolom lain dalam status defaultnya.Klik Create dataset.
Kembali ke panel navigasi, di bagian Resource, luaskan project Anda. Kemudian, klik set data
average_weather
.Di panel detail, klik Buat tabel.
Di halaman Create table, di bagian Source, pilih Empty table.
Di halaman Create table, di bagian Destination:
Untuk Dataset name, pilih set data
average_weather
.Di kolom Table name, masukkan nama
average_weather
.Pastikan Table type disetel ke Native table.
Di bagian Schema, masukkan definisi skema. Anda dapat menggunakan salah satu pendekatan berikut:
Masukkan informasi skema secara manual dengan mengaktifkan Edit sebagai teks dan memasukkan skema tabel sebagai array JSON. Ketik di kolom berikut:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Gunakan Tambahkan kolom untuk memasukkan skema secara manual:
Untuk Partition and cluster settings, gunakan nilai default,
No partitioning
.Di bagian Advanced options, untuk Encryption, biarkan nilai defaultnya,
Google-owned and managed key
.Klik Create table.
bq
Gunakan perintah bq mk
untuk membuat set data kosong dan tabel dalam set data ini.
Jalankan perintah berikut untuk membuat set data cuaca global rata-rata:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Ganti kode berikut:
LOCATION
: region tempat lingkungan berada.PROJECT_ID
: Project ID.
Jalankan perintah berikut untuk membuat tabel kosong dalam set data ini dengan definisi skema:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Setelah tabel dibuat, Anda dapat memperbarui masa berlaku, deskripsi, dan label tabel. Anda juga dapat mengubah definisi skema.
Python
Simpan kode ini sebagai
dataflowtemplateoperator_create_dataset_and_table_helper.py
dan perbarui variabel di dalamnya untuk mencerminkan project dan lokasi Anda, lalu jalankan dengan perintah berikut:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Membuat bucket Cloud Storage
Buat bucket untuk menyimpan semua file yang diperlukan untuk alur kerja. DAG yang Anda buat nanti dalam panduan ini akan mereferensikan file yang Anda upload ke bucket penyimpanan ini. Untuk membuat bucket penyimpanan baru:
Konsol
Buka Cloud Storage di konsol Google Cloud.
Klik Create Bucket untuk membuka formulir pembuatan bucket.
Masukkan informasi bucket Anda, lalu klik Continue untuk menyelesaikan setiap langkah:
Tentukan Nama yang unik secara global untuk bucket Anda. Panduan ini menggunakan
bucketName
sebagai contoh.Pilih Region untuk jenis lokasi. Selanjutnya, pilih Lokasi tempat data bucket akan disimpan.
Pilih Standard sebagai kelas penyimpanan default untuk data Anda.
Pilih kontrol akses Seragam untuk mengakses objek Anda.
Klik Done.
gcloud
Gunakan perintah gcloud storage buckets create
:
gcloud storage buckets create gs://bucketName/
Ganti kode berikut:
bucketName
: nama bucket yang Anda buat sebelumnya dalam panduan ini.
Contoh kode
C#
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Go
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Java
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Python
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Ruby
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Membuat skema BigQuery berformat JSON untuk tabel output Anda
Buat file skema BigQuery berformat JSON yang cocok dengan
tabel output yang Anda buat sebelumnya. Perhatikan bahwa nama, jenis, dan mode kolom
harus cocok dengan yang ditentukan sebelumnya dalam skema tabel
BigQuery Anda. File ini akan menormalisasi data dari file .txt
menjadi format yang kompatibel dengan skema BigQuery Anda. Beri nama file ini
jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Membuat file JavaScript untuk memformat data
Dalam file ini, Anda akan menentukan UDF (Fungsi yang Ditentukan Pengguna) yang menyediakan logika untuk mengubah baris teks dalam file input. Perhatikan bahwa fungsi ini mengambil setiap baris teks dalam file input Anda sebagai argumennya sendiri, sehingga fungsi akan berjalan satu kali untuk setiap baris file input Anda. Beri nama file ini
transformCSVtoJSON.js
.
Membuat file input
File ini akan menyimpan informasi yang ingin Anda upload ke tabel BigQuery. Salin file ini secara lokal dan beri nama
inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Mengupload file ke bucket
Upload file berikut ke bucket Cloud Storage yang Anda buat sebelumnya:
- Skema BigQuery berformat JSON (
.json
) - Fungsi yang Ditentukan Pengguna JavaScript (
transformCSVtoJSON.js
) File input teks yang ingin Anda proses (
.txt
)
Konsol
- Di Konsol Google Cloud, buka halaman Bucket Cloud Storage.
Dalam daftar bucket, klik bucket Anda.
Di tab Objek untuk bucket, lakukan salah satu hal berikut:
Tarik lalu lepas file yang diinginkan dari desktop atau pengelola file ke panel utama di Konsol Google Cloud.
Klik tombol Upload File, pilih file yang ingin diupload dalam dialog yang muncul, lalu klik Buka.
gcloud
Jalankan perintah gcloud storage cp
:
gcloud storage cp OBJECT_LOCATION gs://bucketName
Ganti kode berikut:
bucketName
: nama bucket yang Anda buat sebelumnya dalam panduan ini.OBJECT_LOCATION
: jalur lokal ke objek Anda. Contoh,Desktop/transformCSVtoJSON.js
.
Contoh kode
Python
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Ruby
Untuk melakukan autentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Mengonfigurasi DataflowTemplateOperator
Sebelum menjalankan DAG, tetapkan variabel Airflow berikut.
Variabel Airflow | Nilai |
---|---|
project_id
|
Project ID |
gce_zone
|
Zona Compute Engine tempat cluster Dataflow harus dibuat |
bucket_path
|
Lokasi bucket Cloud Storage yang Anda buat sebelumnya |
Sekarang Anda akan mereferensikan file yang dibuat sebelumnya untuk membuat DAG yang memulai alur kerja Dataflow. Salin DAG ini dan simpan secara lokal sebagai composer-dataflow-dag.py
.
Mengupload DAG ke Cloud Storage
Upload DAG ke folder /dags
di bucket lingkungan Anda. Setelah upload berhasil diselesaikan, Anda dapat melihatnya dengan
mengklik link Folder DAG di halaman Lingkungan
Cloud Composer.
Melihat status tugas
- Buka antarmuka web Airflow.
- Di halaman DAG, klik nama DAG (seperti
composerDataflowDAG
). - Di halaman Detail DAG, klik Tampilan Grafik.
Periksa status:
Failed
: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan kursor di atas tugas dan mencari Status: Gagal.Success
: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat mengarahkan kursor ke tugas dan memeriksa Status: Berhasil.
Setelah beberapa menit, Anda dapat memeriksa hasilnya di Dataflow dan BigQuery.
Melihat tugas Anda di Dataflow
Di konsol Google Cloud, buka halaman Dataflow.
Tugas Anda diberi nama
dataflow_operator_transform_csv_to_bq
dengan ID unik yang ditambahkan di akhir nama dengan tanda hubung, seperti ini:Klik nama untuk melihat detail tugas.
Melihat hasil di BigQuery
Di Konsol Google Cloud, buka halaman BigQuery.
Anda dapat mengirimkan kueri menggunakan SQL standar. Gunakan kueri berikut untuk melihat baris yang ditambahkan ke tabel Anda:
SELECT * FROM projectId.average_weather.average_weather