Cloud Composer 1 | Cloud Composer 2
Halaman ini menjelaskan cara menggunakan DataflowTemplateOperator
untuk meluncurkan pipeline Dataflow dari Cloud Composer.
Pipeline Teks Cloud Storage ke BigQuery
adalah pipeline batch yang dapat Anda gunakan untuk mengupload file teks yang disimpan di
Cloud Storage, mengubahnya menggunakan JavaScript User Defined Function (UDF) yang Anda sediakan, dan menampilkan hasilnya ke
BigQuery.
Ringkasan
Sebelum memulai alur kerja, Anda akan membuat entity berikut:
Tabel BigQuery kosong dari set data kosong yang akan menyimpan kolom informasi berikut:
location
,average_temperature
,month
dan, secara opsional,inches_of_rain
,is_current
, danlatest_measurement
.File JSON yang akan menormalkan data dari file
.txt
ke format yang benar untuk skema tabel BigQuery. Objek JSON akan memiliki arrayBigQuery Schema
, dengan setiap objek akan berisi nama kolom, jenis input, dan apakah kolom tersebut adalah kolom yang wajib diisi atau tidak.File
.txt
input yang akan menyimpan data yang akan diupload banyak ke tabel BigQuery.Fungsi Buatan Pengguna yang ditulis dalam JavaScript yang akan mengubah setiap baris file
.txt
menjadi variabel yang relevan untuk tabel kita.File DAG Airflow yang akan mengarah ke lokasi file ini.
Selanjutnya, Anda akan mengupload file
.txt
, file UDF.js
, dan file skema.json
ke bucket Cloud Storage. Anda juga akan mengupload DAG ke lingkungan Cloud Composer.Setelah DAG diupload, Airflow akan menjalankan tugas dari DAG tersebut. Tugas ini akan meluncurkan pipeline Dataflow yang akan menerapkan Fungsi yang Ditetapkan Pengguna ke file
.txt
dan memformatnya sesuai dengan skema JSON.Terakhir, data akan diupload ke tabel BigQuery yang Anda buat sebelumnya.
Sebelum memulai
- Panduan ini mengharuskan Anda memahami JavaScript untuk menulis Fungsi yang Ditetapkan Pengguna.
- Panduan ini mengasumsikan bahwa Anda sudah memiliki lingkungan Cloud Composer. Lihat Membuat lingkungan untuk membuatnya. Anda dapat menggunakan Cloud Composer versi apa pun dengan panduan ini.
-
Aktifkan API Cloud Composer, Dataflow, Cloud Storage, BigQuery.
Membuat tabel BigQuery kosong dengan definisi skema
Buat tabel BigQuery dengan definisi skema. Anda akan menggunakan definisi skema ini nanti dalam panduan ini. Tabel BigQuery ini akan menyimpan hasil upload banyak.
Untuk membuat tabel kosong dengan definisi skema:
Konsol
Di konsol Google Cloud, buka halaman BigQuery:
Di panel navigasi, di bagian Resources, luaskan project Anda.
Di panel detail, klik Create set data.
create a dataset<i}" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/createDatasetComposerTutorial.png" />
Di halaman Create set data, di bagian Dataset ID, beri nama
average_weather
Set Data Anda. Biarkan semua kolom lain dalam status default-nya.dataset<i} dengan nama average_weather" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/rename-dataset.png" />
Klik Create dataset.
Kembali ke panel navigasi, di bagian Resource, luaskan project Anda. Kemudian, klik set data
average_weather
.Di panel detail, klik Buat tabel.
Di halaman Create table, di bagian Source, pilih Empty table.
Di halaman Create table, di bagian Destination:
Untuk Dataset name, pilih set data
average_weather
.Dataset<i} untuk {i>dataset<i} average_weather" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/name-table.png" />
Di kolom Table name, masukkan nama
average_weather
.Pastikan Table type disetel ke Native table.
Di bagian Schema, masukkan definisi skema. Anda dapat menggunakan salah satu pendekatan berikut:
Masukkan informasi skema secara manual dengan mengaktifkan Edit as text dan memasukkan skema tabel sebagai array JSON. Ketik di kolom berikut:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Gunakan Add field untuk memasukkan skema secara manual:
add field<i} untuk memasukkan {i>field<i}" class="l10n-absolute-url-src screenshot" l10n-attrs-original-order="src,class,alt" src="https://cloud.google.com/static/composer/docs/images/schema-fields.png" />
Untuk Setelan partisi dan cluster, biarkan nilai default,
No partitioning
.Di bagian Advanced options, untuk Encryption biarkan nilai default,
Google-managed key
.Klik Create table.
bq
Gunakan perintah bq mk
untuk membuat set data kosong dan tabel dalam
set data ini.
Jalankan perintah berikut untuk membuat set data cuaca global rata-rata:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Ganti kode berikut:
LOCATION
: region tempat lingkungan berada.PROJECT_ID
: Project ID.
Jalankan perintah berikut untuk membuat tabel kosong dalam set data ini dengan definisi skema:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Setelah tabel dibuat, Anda dapat memperbarui masa berlaku, deskripsi, dan label tabel. Anda juga dapat mengubah definisi skema.
Python
Simpan kode ini sebagai dataflowtemplateoperator_create_dataset_and_table_helper.py
dan perbarui variabel di dalamnya untuk mencerminkan project dan lokasi Anda, lalu jalankan dengan perintah berikut:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Membuat bucket Cloud Storage
Buat bucket untuk menyimpan semua file yang diperlukan untuk alur kerja. DAG yang Anda buat nanti dalam panduan ini akan merujuk file yang Anda upload ke bucket penyimpanan ini. Untuk membuat bucket penyimpanan baru:
Konsol
Buka Cloud Storage di Konsol Google Cloud.
Klik Create Bucket untuk membuka formulir pembuatan bucket.
Masukkan informasi bucket Anda, lalu klik Continue untuk menyelesaikan setiap langkah:
Tentukan Name yang unik secara global untuk bucket Anda. Panduan ini menggunakan
bucketName
sebagai contoh.Pilih Wilayah untuk jenis lokasi. Selanjutnya, pilih Lokasi tempat data bucket akan disimpan.
Pilih Standard sebagai kelas penyimpanan default untuk data Anda.
Pilih kontrol akses Uniform untuk mengakses objek Anda.
Klik Done.
gsutil
Gunakan perintah gsutil mb
:
gsutil mb gs://bucketName/
Ganti kode berikut:
bucketName
: nama bucket yang Anda buat sebelumnya dalam panduan ini.
Contoh kode
C#
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Go
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Java
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Python
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Ruby
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Membuat skema BigQuery berformat JSON untuk tabel output
Buat file skema BigQuery berformat JSON yang cocok dengan tabel output yang Anda buat sebelumnya. Perhatikan bahwa nama, jenis, dan mode kolom harus sama dengan yang ditentukan sebelumnya dalam skema tabel BigQuery Anda. File ini akan menormalkan data dari file .txt
ke format yang kompatibel dengan skema BigQuery. Beri nama file ini
jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Membuat file JavaScript untuk memformat data Anda
Dalam file ini, Anda akan menentukan UDF (Fungsi Buatan Pengguna) yang menyediakan logika untuk mengubah baris teks dalam file input Anda. Perhatikan bahwa fungsi ini menggunakan setiap baris teks dalam file input Anda sebagai argumennya sendiri, sehingga fungsi ini akan dijalankan satu kali untuk setiap baris file input Anda. Beri nama file ini
transformCSVtoJSON.js
.
Membuat file input
File ini akan menyimpan informasi yang ingin Anda upload ke
tabel BigQuery. Salin file ini secara lokal dan beri nama
inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Mengupload file ke bucket Anda
Upload file berikut ke bucket Cloud Storage yang Anda buat sebelumnya:
- Skema BigQuery berformat JSON (
.json
) - Fungsi yang Ditetapkan Pengguna JavaScript (
transformCSVtoJSON.js
) File input teks yang ingin Anda proses (
.txt
)
Konsol
- Di Konsol Google Cloud, buka halaman Bucket Cloud Storage.
Pada daftar bucket, klik bucket Anda.
Di tab Objects untuk bucket, lakukan salah satu langkah berikut:
Tarik lalu lepas file yang diinginkan dari desktop atau pengelola file ke panel utama di Konsol Google Cloud.
Klik tombol Upload Files, pilih file yang ingin diupload dalam dialog yang muncul, lalu klik Open.
gsutil
Jalankan perintah gsutil cp
:
gsutil cp OBJECT_LOCATION gs://bucketName
Ganti kode berikut:
bucketName
: nama bucket yang Anda buat sebelumnya dalam panduan ini.OBJECT_LOCATION
: jalur lokal ke objek Anda. Misalnya,Desktop/transformCSVtoJSON.js
.
Contoh kode
Python
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Ruby
Untuk mengautentikasi ke Cloud Composer, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Mengonfigurasi DataflowTemplateOperator
Sebelum menjalankan DAG, tetapkan variabel Airflow berikut.
Variabel Airflow | Nilai |
---|---|
project_id
|
Project ID |
gce_zone
|
Zona Compute Engine tempat cluster Dataflow harus dibuat |
bucket_path
|
Lokasi bucket Cloud Storage yang Anda buat sebelumnya |
Sekarang Anda akan mereferensikan file yang telah dibuat sebelumnya untuk membuat DAG yang memulai alur kerja Dataflow. Salin DAG ini dan simpan secara lokal
sebagai composer-dataflow-dag.py
.
Mengupload DAG ke Cloud Storage
Upload DAG ke folder /dags
di bucket
lingkungan Anda. Setelah upload berhasil diselesaikan, Anda dapat melihatnya dengan mengklik link DAGs Folder di halaman Lingkungan Cloud Composer.
Melihat status tugas
- Buka Airflow web interface.
- Di halaman DAG, klik nama DAG (seperti
composerDataflowDAG
). - Di halaman Detail DAG, klik Graph View.
Periksa status:
Failed
: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan pointer pada tugas dan mencari State: Failed.Success
: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat menahan pointer ke tugas dan memeriksa State: Success.
Setelah beberapa menit, Anda dapat memeriksa hasilnya di Dataflow dan BigQuery.
Melihat tugas Anda di Dataflow
Di konsol Google Cloud, buka halaman Dataflow.
Tugas Anda diberi nama
dataflow_operator_transform_csv_to_bq
dengan ID unik yang dilampirkan di akhir nama dengan tanda hubung, seperti:Klik nama untuk melihat detail pekerjaan.
Lihat hasil Anda di BigQuery
Di konsol Google Cloud, buka halaman BigQuery.
Anda dapat mengirimkan kueri menggunakan SQL standar. Gunakan kueri berikut untuk melihat baris yang telah ditambahkan ke tabel Anda:
SELECT * FROM projectId.average_weather.average_weather