Menulis data dari Kafka ke BigQuery dengan Dataflow

Dokumen ini memberikan panduan tingkat tinggi tentang cara membuat dan men-deploy pipeline Dataflow yang melakukan streaming dari Apache Kafka ke BigQuery.

Apache Kafka adalah platform open source untuk acara streaming. Kafka biasanya digunakan dalam arsitektur terdistribusi untuk memungkinkan komunikasi antara komponen yang dikaitkan secara longgar. Anda dapat menggunakan Dataflow untuk membaca peristiwa dari Kafka, memprosesnya, dan menulis hasilnya ke tabel BigQuery untuk dianalisis lebih lanjut.

Membaca peristiwa Kafka ke dalam BigQuery

Google menyediakan template Dataflow yang mengonfigurasi pipeline Kafka-to-BigQuery. Template ini menggunakan konektor BigQueryIO yang disediakan di Apache Beam SDK.

Untuk menggunakan template ini, lakukan langkah-langkah berikut:

  1. Men-deploy Kafka, baik di Google Cloud maupun di tempat lain.
  2. Konfigurasi jaringan.
  3. Menetapkan izin Identity and Access Management (IAM).
  4. Tulis fungsi untuk mengubah data peristiwa.
  5. Membuat tabel output BigQuery.
  6. Deploy template Dataflow.

Men-deploy Kafka

Dalam Google Cloud, Anda dapat men-deploy cluster Kafka di instance virtual machine (VM) Compute Engine atau menggunakan layanan Kafka yang dikelola pihak ketiga. Untuk mengetahui informasi selengkapnya tentang opsi deployment di Google Cloud, lihat Apa itu Apache Kafka?. Anda dapat menemukan solusi Kafka pihak ketiga di Google Cloud Marketplace.

Atau, Anda mungkin sudah memiliki cluster Kafka yang berada di luar Google Cloud. Misalnya, Anda mungkin memiliki workload yang di-deploy di infrastruktur lokal atau di cloud publik lainnya.

Mengonfigurasi jaringan

Secara default, Dataflow meluncurkan instance dalam jaringan Virtual Private Cloud (VPC) default Anda. Bergantung pada konfigurasi Kafka, Anda mungkin perlu mengonfigurasi jaringan dan subnet yang berbeda untuk Dataflow. Untuk informasi selengkapnya, lihat Menentukan jaringan dan subnetwork dalam dokumentasi Dataflow. Saat mengonfigurasi jaringan, buat aturan firewall yang mengizinkan mesin pekerja Dataflow untuk menjangkau broker Kafka.

Jika Anda menggunakan Kontrol Layanan VPC, tempatkan cluster Kafka dalam perimeter Kontrol Layanan VPC, atau perluas perimeter ke VPN atau Cloud Interconnect resmi.

Menghubungkan ke cluster eksternal

Jika cluster Kafka di-deploy di luar Google Cloud, Anda harus membuat koneksi jaringan antara Dataflow dan cluster Kafka. Ada beberapa opsi jaringan dengan konsekuensi yang berbeda:

Dedicated Interconnect adalah opsi terbaik untuk performa dan keandalan yang dapat diprediksi, tetapi penyiapannya dapat memerlukan waktu lebih lama karena pihak ketiga harus menyediakan sirkuit baru. Dengan topologi berbasis IP publik, Anda dapat memulai dengan cepat karena sedikit pekerjaan jaringan yang perlu dilakukan.

Dua bagian berikutnya akan menjelaskan opsi ini secara lebih detail.

Ruang alamat RFC 1918 bersama

Dedicated Interconnect dan IPsec VPN memberi Anda akses langsung ke alamat IP RFC 1918 di Virtual Private Cloud (VPC), yang dapat menyederhanakan konfigurasi Kafka Anda. Jika Anda menggunakan topologi berbasis VPN, pertimbangkan untuk menyiapkan VPN dengan throughput tinggi.

Secara default, Dataflow meluncurkan instance pada jaringan VPC default. Dalam topologi jaringan pribadi dengan rute yang ditentukan secara eksplisit di Cloud Router yang menghubungkan subnetwork di Google Cloud ke cluster Kafka, Anda memerlukan kontrol lebih besar untuk menentukan lokasi instance Dataflow Anda. Anda dapat menggunakan Dataflow untuk mengonfigurasi parameter eksekusi network dan subnetwork.

Pastikan subnetwork yang sesuai memiliki alamat IP yang cukup yang tersedia agar Dataflow dapat meluncurkan instance saat mencoba menyebarkan skala. Selain itu, saat membuat jaringan terpisah untuk meluncurkan instance Dataflow, pastikan Anda memiliki aturan firewall yang mengaktifkan traffic TCP di antara semua virtual machine dalam project tersebut. Jaringan default telah mengonfigurasi aturan firewall ini.

Ruang alamat IP publik

Arsitektur ini menggunakan Transport Layer Security (TLS) untuk mengamankan traffic antara klien eksternal dan Kafka, serta menggunakan teks biasa untuk komunikasi antar-broker. Saat pemroses Kafka terikat ke antarmuka jaringan yang digunakan untuk komunikasi internal dan eksternal, mengonfigurasi pemroses akan menjadi mudah. Namun, dalam banyak skenario, alamat broker Kafka yang diiklankan secara eksternal di cluster berbeda dengan antarmuka jaringan internal yang digunakan Kafka. Dalam skenario seperti itu, Anda dapat menggunakan properti advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Klien eksternal terhubung menggunakan port 9093 melalui saluran "SSL", dan klien internal terhubung menggunakan port 9092 melalui saluran teks biasa. Saat Anda menentukan alamat di advertised.listeners, gunakan nama DNS (dalam contoh ini, kafkabroker-n.mydomain.com) yang di-resolve ke instance yang sama untuk traffic eksternal dan internal. Penggunaan alamat IP publik mungkin tidak berhasil karena alamat tersebut mungkin gagal diselesaikan untuk traffic internal.

Menetapkan izin IAM

Tugas Dataflow menggunakan dua akun layanan IAM:

  • Layanan Dataflow menggunakan akun layanan Dataflow untuk memanipulasi resource Google Cloud, seperti membuat VM.
  • VM pekerja Dataflow menggunakan akun layanan pekerja untuk mengakses file pipeline dan resource lainnya. Akun layanan ini memerlukan akses tulis ke tabel output BigQuery. Pipeline ini juga memerlukan akses ke resource lain yang dirujuk oleh tugas pipeline.

Pastikan kedua akun layanan ini memiliki peran yang sesuai. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.

Mentransformasi data untuk BigQuery

Template Kafka-to-BigQuery membuat pipeline yang membaca peristiwa dari satu atau beberapa topik Kafka dan menuliskannya ke dalam tabel BigQuery. Atau, Anda dapat menyediakan fungsi yang ditentukan pengguna (UDF) JavaScript, yang mengubah data peristiwa sebelum ditulis ke BigQuery.

Output dari pipeline harus berupa data berformat JSON yang cocok dengan skema tabel output. Jika data peristiwa Kafka sudah dalam format JSON, Anda dapat membuat tabel BigQuery dengan skema yang cocok dan meneruskan peristiwa tersebut langsung ke BigQuery. Jika tidak, tulis UDF yang mengambil data peristiwa sebagai input dan menampilkan data JSON yang cocok dengan tabel BigQuery Anda.

Misalnya, data peristiwa berisi dua kolom:

  • name (string)
  • customer_id (bilangan bulat)

Output dari pipeline Dataflow mungkin terlihat seperti berikut:

{ "name": "Alice", "customer_id": 1234 }

Dengan asumsi data peristiwa belum dalam format JSON, Anda akan menulis UDF yang mengubah data tersebut, sebagai berikut:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

UDF dapat melakukan pemrosesan tambahan pada data peristiwa, seperti memfilter peristiwa, menghapus informasi identitas pribadi (PII), atau memperkaya data dengan kolom tambahan.

Untuk mengetahui informasi selengkapnya tentang cara menulis UDF untuk template, lihat Memperluas template Dataflow dengan UDF. Upload file JavaScript ke Cloud Storage.

Membuat tabel output BigQuery

Buat tabel output BigQuery sebelum Anda menjalankan template. Skema tabel harus kompatibel dengan output JSON dari pipeline. Untuk setiap properti dalam payload JSON, pipeline akan menulis nilai ini ke kolom tabel BigQuery dengan nama yang sama. Setiap properti yang hilang di JSON ditafsirkan sebagai nilai NULL.

Menggunakan contoh sebelumnya, tabel BigQuery akan memiliki kolom berikut:

Nama kolom Jenis data
name STRING
customer_id INTEGER

Anda dapat menggunakan pernyataan SQL CREATE TABLE untuk membuat tabel:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Atau, Anda dapat menentukan skema tabel menggunakan file definisi JSON. Untuk informasi selengkapnya, lihat Menentukan skema di dokumentasi BigQuery.

Menjalankan tugas Dataflow

Setelah Anda membuat tabel BigQuery, jalankan template Dataflow.

Konsol

Untuk membuat tugas Dataflow menggunakan Konsol Google Cloud, lakukan langkah-langkah berikut:

  1. Buka halaman Dataflow di konsol Google Cloud.
  2. Klik Create job from template.
  3. Di kolom Job Name, masukkan nama pekerjaan.
  4. Untuk Endpoint regional, pilih region.
  5. Pilih {i>template<i} "Kafka ke BigQuery".
  6. Di bagian Parameter yang diperlukan, masukkan nama tabel output BigQuery. Tabel tersebut harus sudah ada dan memiliki skema yang valid.
  7. Klik Tampilkan parameter opsional, lalu masukkan nilai untuk setidaknya parameter berikut:

    • Topik Kafka untuk membaca input.
    • Daftar server bootstrap Kafka, yang dipisahkan oleh koma.
    • Email akun layanan.

    Masukkan parameter tambahan yang diperlukan. Secara khusus, Anda mungkin perlu menentukan hal berikut:

    • Jaringan: Untuk menggunakan jaringan VPC selain jaringan default, tentukan jaringan dan subnet.
    • UDF: Untuk menggunakan UDF JavaScript, tentukan lokasi Cloud Storage untuk skrip dan nama fungsi JavaScript yang akan dipanggil.

gcloud

Untuk membuat tugas Dataflow menggunakan Google Cloud CLI, jalankan perintah berikut:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Ganti variabel berikut:

  • JOB_NAME. Nama pekerjaan pilihan Anda.
  • LOCATION. Region tempat menjalankan tugas. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi Dataflow.
  • KAFKA_TOPICS. Daftar yang dipisahkan koma berisi topik Kafka yang akan dibaca.
  • BOOTSTRAP_SERVERS. Daftar server bootstrap Kafka yang dipisahkan koma. Contoh: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. Tabel output BigQuery, yang ditentukan sebagai PROJECT_ID:DATASET_NAME.TABLE_NAME. Contoh: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Opsional. Alamat email akun layanan tempat tugas akan dijalankan.
  • UDF_SCRIPT_PATH. Opsional. Jalur Cloud Storage ke file JavaScript yang berisi UDF. Contoh: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Opsional. Nama fungsi JavaScript yang akan dipanggil sebagai UDF.
  • VPC_NETWORK_NAME. Opsional. Jaringan tempat pekerja akan ditetapkan.
  • SUBNET_NAME. Opsional. Subnetwork tempat pekerja akan ditetapkan.

Jenis data

Bagian ini menjelaskan cara menangani berbagai jenis data dalam skema tabel BigQuery.

Secara internal, pesan JSON dikonversi menjadi objek TableRow, dan nilai kolom TableRow diterjemahkan ke jenis BigQuery.

Jenis skalar

Contoh berikut membuat tabel BigQuery dengan berbagai jenis data skalar, termasuk jenis string, numerik, Boolean, tanggal/waktu, interval, dan geografis:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Berikut adalah payload JSON dengan kolom yang kompatibel:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Catatan:

  • Untuk kolom TIMESTAMP, Anda dapat menggunakan metode Date.toJSON JavaScript untuk memformat nilai.
  • Untuk kolom GEOGRAPHY, Anda dapat menentukan geografi menggunakan teks terkenal (WKT) atau GeoJSON, yang diformat sebagai string. Untuk mengetahui informasi selengkapnya, lihat Memuat data geospasial.

Untuk mengetahui informasi lebih lanjut mengenai jenis data di BigQuery, baca artikel Jenis data.

Array

Anda dapat menyimpan array di BigQuery menggunakan jenis data ARRAY. Pada contoh berikut, payload JSON berisi properti bernama scores yang nilainya adalah array JSON:

{"name":"Emily","scores":[10,7,10,9]}

Pernyataan SQL CREATE TABLE berikut akan membuat tabel BigQuery dengan skema yang kompatibel:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

Tabel yang dihasilkan akan terlihat seperti berikut:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Structure

Jenis data STRUCT di BigQuery berisi daftar kolom bernama yang diurutkan. Anda dapat menggunakan STRUCT untuk menyimpan objek JSON yang mengikuti skema yang konsisten.

Pada contoh berikut, payload JSON berisi properti bernama val yang nilainya adalah objek JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

Pernyataan SQL CREATE TABLE berikut akan membuat tabel BigQuery dengan skema yang kompatibel:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

Tabel yang dihasilkan akan terlihat seperti berikut:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Data peristiwa semi-terstruktur

Jika data peristiwa Kafka tidak mengikuti skema yang ketat, sebaiknya simpan data tersebut di BigQuery sebagai jenis data JSON (Pratinjau). Dengan menyimpan data JSON sebagai jenis data JSON, Anda tidak perlu menentukan skema peristiwa di awal. Setelah penyerapan data, Anda dapat membuat kueri tabel output menggunakan akses kolom (notasi titik) dan operator akses array.

Pertama, buat tabel dengan kolom JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Kemudian, tentukan UDF JavaScript yang menggabungkan payload peristiwa di dalam objek JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Setelah data ditulis ke BigQuery, Anda dapat membuat kueri untuk masing-masing kolom menggunakan operator akses kolom. Misalnya, kueri berikut menampilkan nilai kolom name untuk setiap kumpulan data:

SELECT event_data.name FROM my_dataset1.kafka_events;

Untuk mengetahui informasi lebih lanjut tentang penggunaan JSON di BigQuery, lihat Bekerja dengan data JSON di SQL Standar Google.

Error dan logging

Anda mungkin mengalami error saat menjalankan pipeline, atau error saat menangani peristiwa Kafka individual.

Untuk informasi selengkapnya tentang menangani error pipeline, lihat Pemecahan masalah dan proses debug pipeline.

Jika tugas berhasil dijalankan, tetapi terjadi error saat memproses peristiwa Kafka satu per satu, tugas pipeline akan menulis data error ke tabel di BigQuery. Tugas itu sendiri tidak gagal, dan error tingkat peristiwa tidak muncul sebagai error dalam log tugas Dataflow.

Tugas pipeline secara otomatis membuat tabel untuk menyimpan kumpulan data error. Secara default, nama tabelnya adalah "output_table_error_records", dengan output_table adalah nama tabel output. Misalnya, jika tabel output diberi nama kafka_events, tabel error akan diberi nama kafka_events_error_records. Anda dapat menentukan nama yang berbeda dengan menetapkan parameter template outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

Kemungkinan kesalahan meliputi:

  • Error serialisasi, termasuk JSON yang diformat dengan tidak benar.
  • Error konversi jenis, yang disebabkan oleh ketidakcocokan dalam skema tabel dan data JSON.
  • Kolom tambahan dalam data JSON yang tidak ada dalam skema tabel.

Contoh pesan error:

Jenis error Data peristiwa errorMessage
Error serialisasi "Halo dunia" Gagal menserialisasi json ke baris tabel: "Hello world"
Error konversi jenis {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Tidak dapat mengonversi nilai menjadi bilangan bulat (nilai buruk): abc", "reason" : "invalid" } ], "index" : 0 }
Kolom tidak dikenal {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "tidak ada kolom tersebut: customer_id.", "reason" : "tidak valid" } ], "indeks" : 0 }

Langkah berikutnya