Menulis data dari Kafka ke BigQuery dengan Dataflow

Dokumen ini memberikan panduan tingkat tinggi tentang cara membuat dan men-deploy pipeline Dataflow yang melakukan streaming dari Apache Kafka ke BigQuery.

Apache Kafka adalah platform open source untuk streaming peristiwa. Kafka biasanya digunakan dalam arsitektur terdistribusi untuk memungkinkan komunikasi antarkomponen yang terikat longgar. Anda dapat menggunakan Dataflow untuk membaca peristiwa dari Kafka, memprosesnya, dan menulis hasilnya ke tabel BigQuery untuk analisis lebih lanjut.

Membaca peristiwa Kafka ke BigQuery

Google menyediakan template Dataflow yang mengonfigurasi pipeline Kafka ke BigQuery. Template ini menggunakan konektor BigQueryIO yang disediakan di Apache Beam SDK.

Untuk menggunakan template ini, Anda harus melakukan langkah-langkah berikut:

  1. Deploy Kafka, baik di Google Cloud maupun di tempat lain.
  2. Mengonfigurasi jaringan.
  3. Tetapkan izin Identity and Access Management (IAM).
  4. Tulis fungsi untuk mengubah data peristiwa.
  5. Buat tabel output BigQuery.
  6. Deploy template Dataflow.

Men-deploy Kafka

Dalam Google Cloud, Anda dapat men-deploy cluster Kafka di instance virtual machine (VM) Compute Engine atau menggunakan layanan Kafka terkelola pihak ketiga. Untuk mengetahui informasi selengkapnya tentang opsi deployment di Google Cloud, lihat Apa itu Apache Kafka?. Anda dapat menemukan solusi Kafka pihak ketiga di Google Cloud Marketplace.

Atau, Anda mungkin memiliki cluster Kafka yang ada di luar Google Cloud. Misalnya, Anda mungkin memiliki workload yang sudah ada dan di-deploy di infrastruktur lokal atau di cloud publik lainnya.

Mengonfigurasi jaringan

Secara default, Dataflow meluncurkan instance dalam jaringan Virtual Private Cloud (VPC) default Anda. Bergantung pada konfigurasi Kafka, Anda mungkin perlu mengonfigurasi jaringan dan subnet yang berbeda untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Menentukan jaringan dan subnetwork. Saat mengonfigurasi jaringan, buat aturan firewall yang memungkinkan mesin pekerja Dataflow menjangkau broker Kafka.

Jika Anda menggunakan Kontrol Layanan VPC, tempatkan cluster Kafka dalam perimeter Kontrol Layanan VPC, atau perluas perimeter ke VPN atau Cloud Interconnect resmi.

Jika cluster Kafka di-deploy di luar Google Cloud, Anda harus membuat koneksi jaringan antara Dataflow dan cluster Kafka. Ada beberapa opsi jaringan dengan kompromi yang berbeda:

Dedicated Interconnect adalah opsi terbaik untuk performa dan keandalan yang dapat diprediksi, tetapi penyiapannya dapat memerlukan waktu lebih lama karena pihak ketiga harus menyediakan sirkuit baru. Dengan topologi berbasis IP publik, Anda dapat memulai dengan cepat karena hanya perlu melakukan sedikit pekerjaan jaringan.

Dua bagian berikutnya menjelaskan opsi ini secara lebih mendetail.

Ruang alamat RFC 1918 bersama

Dedicated Interconnect dan IPsec VPN memberi Anda akses langsung ke alamat IP RFC 1918 di Virtual Private Cloud (VPC), yang dapat menyederhanakan konfigurasi Kafka Anda. Jika Anda menggunakan topologi berbasis VPN, sebaiknya siapkan VPN throughput tinggi.

Secara default, Dataflow meluncurkan instance di jaringan VPC default Anda. Dalam topologi jaringan pribadi dengan rute yang ditentukan secara eksplisit di Cloud Router yang menghubungkan subjaringan di Google Cloud ke cluster Kafka tersebut, Anda memerlukan kontrol lebih besar atas lokasi instance Dataflow. Anda dapat menggunakan Dataflow untuk mengonfigurasi parameter eksekusi network dan subnetwork.

Pastikan subnet yang sesuai memiliki alamat IP yang cukup untuk digunakan Dataflow dalam meluncurkan instance saat mencoba melakukan penskalaan keluar. Selain itu, saat membuat jaringan terpisah untuk meluncurkan instance Dataflow, pastikan Anda memiliki aturan firewall yang mengaktifkan traffic TCP di antara semua virtual machine dalam project. Jaringan default sudah mengonfigurasi aturan firewall ini.

Ruang alamat IP publik

Arsitektur ini menggunakan Transport Layer Security (TLS) untuk mengamankan traffic antara klien eksternal dan Kafka, serta menggunakan traffic yang tidak dienkripsi untuk komunikasi antar-broker. Saat pemroses Kafka terikat ke antarmuka jaringan yang digunakan untuk komunikasi internal dan eksternal, mengonfigurasi pemroses akan mudah. Namun, dalam banyak skenario, alamat broker Kafka yang diiklankan secara eksternal di cluster berbeda dengan antarmuka jaringan internal yang digunakan Kafka. Dalam skenario tersebut, Anda dapat menggunakan properti advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Klien eksternal terhubung menggunakan port 9093 melalui saluran "SSL", dan klien internal terhubung menggunakan port 9092 melalui saluran teks biasa. Saat Anda menentukan alamat di bagian advertised.listeners, gunakan nama DNS (kafkabroker-n.mydomain.com, dalam contoh ini) yang me-resolve ke instance yang sama untuk traffic eksternal dan internal. Menggunakan alamat IP publik mungkin tidak berfungsi karena alamat tersebut mungkin gagal di-resolve untuk traffic internal.

Menetapkan izin IAM

Tugas Dataflow menggunakan dua akun layanan IAM:

  • Layanan Dataflow menggunakan akun layanan Dataflow untuk memanipulasi resource Google Cloud, seperti membuat VM.
  • VM pekerja Dataflow menggunakan akun layanan pekerja untuk mengakses file pipeline dan resource lainnya. Akun layanan ini memerlukan akses tulis ke tabel output BigQuery. Tugas ini juga memerlukan akses ke resource lain yang dirujuk oleh tugas pipeline.

Pastikan kedua akun layanan ini memiliki peran yang sesuai. Untuk informasi selengkapnya, lihat Keamanan dan izin Dataflow.

Mentransformasi data untuk BigQuery

Template Kafka-to-BigQuery membuat pipeline yang membaca peristiwa dari satu atau beberapa topik Kafka dan menulisnya ke tabel BigQuery. Secara opsional, Anda dapat memberikan fungsi yang ditetapkan pengguna (UDF) JavaScript yang mengubah data peristiwa sebelum ditulis ke BigQuery.

Output dari pipeline harus berupa data berformat JSON yang cocok dengan skema tabel output. Jika data peristiwa Kafka sudah dalam format JSON, Anda dapat membuat tabel BigQuery dengan skema yang cocok dan meneruskan peristiwa langsung ke BigQuery. Atau, tulis UDF yang menggunakan data peristiwa sebagai input dan menampilkan data JSON yang cocok dengan tabel BigQuery Anda.

Misalnya, data peristiwa berisi dua kolom:

  • name (string)
  • customer_id (bilangan bulat)

Output dari pipeline Dataflow mungkin terlihat seperti berikut:

{ "name": "Alice", "customer_id": 1234 }

Dengan asumsi data peristiwa belum dalam format JSON, Anda akan menulis UDF yang mengubah data, sebagai berikut:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

UDF dapat melakukan pemrosesan tambahan pada data peristiwa, seperti memfilter peristiwa, menghapus informasi identitas pribadi (PII), atau memperkaya data dengan kolom tambahan.

Untuk mengetahui informasi selengkapnya tentang cara menulis UDF untuk template, lihat Memperluas template Dataflow dengan UDF. Upload file JavaScript ke Cloud Storage.

Membuat tabel output BigQuery

Buat tabel output BigQuery sebelum Anda menjalankan template. Skema tabel harus kompatibel dengan output JSON dari pipeline. Untuk setiap properti dalam payload JSON, pipeline akan menulis nilai ke kolom tabel BigQuery dengan nama yang sama. Setiap properti yang hilang dalam JSON ditafsirkan sebagai nilai NULL.

Dengan menggunakan contoh sebelumnya, tabel BigQuery akan memiliki kolom berikut:

Nama kolom Jenis data
name STRING
customer_id INTEGER

Anda dapat menggunakan pernyataan SQL CREATE TABLE untuk membuat tabel:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Atau, Anda dapat menentukan skema tabel menggunakan file definisi JSON. Untuk mengetahui informasi selengkapnya, lihat Menentukan skema dalam dokumentasi BigQuery.

Menjalankan tugas Dataflow

Setelah membuat tabel BigQuery, jalankan template Dataflow.

Konsol

Untuk membuat tugas Dataflow menggunakan konsol Google Cloud, lakukan langkah-langkah berikut:

  1. Buka halaman Dataflow di konsol Google Cloud.
  2. Klik Create job from template.
  3. Di kolom Nama Tugas, masukkan nama tugas.
  4. Untuk Regional endpoint, pilih region.
  5. Pilih template "Kafka ke BigQuery".
  6. Di bagian Required parameters, masukkan nama tabel output BigQuery. Tabel harus sudah ada dan memiliki skema yang valid.
  7. Klik Tampilkan parameter opsional dan masukkan nilai untuk setidaknya parameter berikut:

    • Topik Kafka untuk membaca input.
    • Daftar server bootstrap Kafka, dipisahkan dengan koma.
    • Email akun layanan.

    Masukkan parameter tambahan sesuai kebutuhan. Secara khusus, Anda mungkin perlu menentukan hal berikut:

    • Jaringan: Untuk menggunakan jaringan VPC selain jaringan default, tentukan jaringan dan subnet.
    • UDF: Untuk menggunakan UDF JavaScript, tentukan lokasi Cloud Storage skrip dan nama fungsi JavaScript yang akan dipanggil.

gcloud

Untuk membuat tugas Dataflow menggunakan Google Cloud CLI, jalankan perintah berikut:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Ganti variabel berikut:

  • JOB_NAME. Nama tugas yang Anda pilih.
  • LOCATION. Region tempat menjalankan tugas. Untuk informasi selengkapnya tentang region dan lokasi, lihat Lokasi dataflow.
  • KAFKA_TOPICS. Daftar topik Kafka yang dipisahkan koma untuk dibaca.
  • BOOTSTRAP_SERVERS. Daftar server bootstrap Kafka yang dipisahkan koma. Contoh: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. Tabel output BigQuery, yang ditentukan sebagai PROJECT_ID:DATASET_NAME.TABLE_NAME. Contoh: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Opsional. Alamat email akun layanan yang akan menjalankan tugas ini.
  • UDF_SCRIPT_PATH. Opsional. Jalur Cloud Storage ke file JavaScript yang berisi UDF. Contoh: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Opsional. Nama fungsi JavaScript yang akan dipanggil sebagai UDF.
  • VPC_NETWORK_NAME. Opsional. Jaringan tempat pekerja akan ditetapkan.
  • SUBNET_NAME. Opsional. Subnetwork tempat pekerja akan ditetapkan.

Jenis data

Bagian ini menjelaskan cara menangani berbagai jenis data dalam skema tabel BigQuery.

Secara internal, pesan JSON dikonversi menjadi objek TableRow, dan nilai kolom TableRow diterjemahkan ke jenis BigQuery.

Jenis skalar

Contoh berikut membuat tabel BigQuery dengan berbagai jenis data skalar, termasuk jenis string, numerik, Boolean, tanggal/waktu, interval, dan geografi:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Berikut adalah payload JSON dengan kolom yang kompatibel:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Catatan:

  • Untuk kolom TIMESTAMP, Anda dapat menggunakan metode Date.toJSON JavaScript untuk memformat nilai.
  • Untuk kolom GEOGRAPHY, Anda dapat menentukan geografi menggunakan teks terkenal (WKT) atau GeoJSON, yang diformat sebagai string. Untuk informasi selengkapnya, lihat Memuat data geospasial.

Untuk mengetahui informasi selengkapnya tentang jenis data di BigQuery, lihat Jenis data.

Array

Anda dapat menyimpan array di BigQuery menggunakan jenis data ARRAY. Dalam contoh berikut, payload JSON berisi properti bernama scores yang nilainya adalah array JSON:

{"name":"Emily","scores":[10,7,10,9]}

Pernyataan SQL CREATE TABLE berikut membuat tabel BigQuery dengan skema yang kompatibel:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

Tabel yang dihasilkan akan terlihat seperti berikut:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Struktur

Jenis data STRUCT di BigQuery berisi daftar berurutan kolom bernama. Anda dapat menggunakan STRUCT untuk menyimpan objek JSON yang mengikuti skema yang konsisten.

Dalam contoh berikut, payload JSON berisi properti bernama val yang nilainya adalah objek JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

Pernyataan SQL CREATE TABLE berikut membuat tabel BigQuery dengan skema yang kompatibel:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

Tabel yang dihasilkan akan terlihat seperti berikut:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Data peristiwa semi-terstruktur

Jika data peristiwa Kafka tidak mengikuti skema yang ketat, pertimbangkan untuk menyimpannya di BigQuery sebagai jenis data JSON (Pratinjau). Dengan menyimpan data JSON sebagai jenis data JSON, Anda tidak perlu menentukan skema peristiwa di awal. Setelah penyerapan data, Anda dapat membuat kueri tabel output menggunakan operator akses kolom (notasi titik) dan akses array.

Pertama, buat tabel dengan kolom JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Kemudian, tentukan UDF JavaScript yang menggabungkan payload peristiwa di dalam objek JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Setelah data ditulis ke BigQuery, Anda dapat membuat kueri untuk setiap kolom menggunakan operator akses kolom. Misalnya, kueri berikut menampilkan nilai kolom name untuk setiap kumpulan data:

SELECT event_data.name FROM my_dataset1.kafka_events;

Untuk mengetahui informasi selengkapnya tentang penggunaan JSON di BigQuery, lihat Menangani data JSON di Google Standard SQL.

Error dan logging

Anda mungkin mengalami error saat menjalankan pipeline, atau error saat menangani setiap peristiwa Kafka.

Untuk informasi selengkapnya tentang cara menangani error pipeline, lihat Pemecahan masalah dan proses debug pipeline.

Jika tugas berhasil berjalan, tetapi error terjadi saat memproses setiap peristiwa Kafka, tugas pipeline akan menulis kumpulan data error ke tabel di BigQuery. Tugas itu sendiri tidak gagal, dan error tingkat peristiwa tidak muncul sebagai error dalam log tugas Dataflow.

Tugas pipeline otomatis membuat tabel untuk menyimpan data error. Secara default, nama tabel adalah "output_table_error_records", dengan output_table adalah nama tabel output. Misalnya, jika tabel output bernama kafka_events, tabel error akan diberi nama kafka_events_error_records. Anda dapat menentukan nama yang berbeda dengan menetapkan parameter template outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

Kemungkinan error meliputi:

  • Error serialisasi, termasuk JSON yang diformat dengan buruk.
  • Error konversi jenis, yang disebabkan oleh ketidakcocokan dalam skema tabel dan data JSON.
  • Kolom tambahan dalam data JSON yang tidak ada dalam skema tabel.

Contoh pesan error:

Jenis error Data peristiwa errorMessage
Error serialisasi "Hello world" Gagal melakukan serialisasi json ke baris tabel: "Hello world"
Error konversi jenis {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Kolom tidak dikenal {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Langkah berikutnya