Apache Kafka は、リアルタイム データ パイプラインとデータ統合用のオープンソースの配信ストリーミング プラットフォームです。次のようなさまざまなアプリケーションで使用する、効率的でスケーラブルなストリーミング システムを提供します。
- リアルタイム分析
- ストリーム処理
- ログ集計
- 配信メッセージ
- イベント ストリーミング
目標
ZooKeeper で Dataproc HA クラスタに Kafka をインストールします(このチュートリアルでは「Dataproc Kafka クラスタ」と呼びます)。
架空の顧客データを作成し、データを Kafka トピックに公開します。
Cloud Storage に Hive パーケットと ORC テーブルを作成して、ストリーミングされた Kafka トピックデータを受信します。
PySpark ジョブを送信して、Kafka トピックを Cloud Storage にパーケットと ORC 形式で登録してストリーミングします。
ストリーミングされた Hive テーブルデータに対してクエリを実行し、ストリーミングされた Kafka メッセージをカウントします。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
Google Cloud プロジェクトをまだ作成していない場合は、それを行います。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
チュートリアルのステップ
次の手順を実行して、Dataproc Kafka クラスタを作成し、Kafka トピックをパーケットまたは ORC 形式で Cloud Storage に読み込みます。
Kafka インストール スクリプトを Cloud Storage にコピーする
kafka.sh
初期化アクション スクリプトは、Kafka を Dataproc クラスタにインストールします。
コードを参照します。
kafka.sh
初期化アクション スクリプトを Cloud Storage バケットにコピーします。このスクリプトは、Dataproc クラスタに Kafka をインストールします。Cloud Shell を開き、次のコマンドを実行します。
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
次の項目を置き換えます。
- REGION:
kafka.sh
は、Cloud Storage のリージョンでタグ付けされた公開バケットに保存されます。地理的に近い Compute Engine のリージョン(例:us-central1
)を指定します。 - BUCKET_NAME: Cloud Storage バケットの名前。
- REGION:
Dataproc Kafka クラスタを作成する
Cloud Shell を開き、次の
gcloud dataproc clusters create
コマンドを実行して、Kafka と ZooKeeper コンポーネントをインストールする Dataproc HA クラスタを作成します。gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注:
- KAFKA_CLUSTER: クラスタ名。プロジェクト内で一意にする必要があります。 名前は先頭を小文字にして、51 文字以下の小文字、数字、ハイフンを使用できます。末尾をハイフンにすることはできません。削除されたクラスタの名前は再使用できます。
- PROJECT_ID: このクラスタに関連付けるプロジェクト。
- REGION:
クラスタが配置される Compute Engine のリージョン(
us-central1
など)。- オプションの
--zone=ZONE
フラグを追加して、指定したリージョン内のゾーン(us-central1-a
など)を指定できます。ゾーンを指定しない場合、Dataproc の自動ゾーン プレースメント機能は、指定されたリージョンのあるゾーンを選択します。
- オプションの
--image-version
: このチュートリアルでは、Dataproc イメージ バージョン2.1-debian11
をおすすめします。 注: 各イメージ バージョンには、このチュートリアルで使用する Hive コンポーネントなど、プリインストールされたコンポーネントのセットが含まれています(サポートされている Dataproc イメージ バージョンをご覧ください)。--num-master
:3
のマスターノードが HA クラスタを作成します。Kafka に必要な Zookeeper コンポーネントは HA クラスタにプリインストールされています。--enable-component-gateway
: Dataproc コンポーネント ゲートウェイを有効にします。- BUCKET_NAME:
/scripts/kafka.sh
初期化スクリプトを含む Cloud Storage バケットの名前(Kafka インストール スクリプトを Cloud Storage にコピーするをご覧ください)。
Kafka custdata
トピックを作成する
Dataproc Kafka クラスタに Kafka トピックを作成するには:
SSH ユーティリティを使用して、クラスタ マスター VM でターミナル ウィンドウを開きます。
Kafka
custdata
トピックを作成します。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注:
KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
-w-0:9092
は、worker-0
ノードのポート9092
で実行されている Kafka ブローカーを示します。custdata
トピックの作成後は、次のコマンドを実行できます。# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Kafka custdata
トピックにコンテンツを公開する
次のスクリプトは、kafka-console-producer.sh
Kafka ツールを使用して、架空の顧客データを CSV 形式で生成します。
スクリプトをコピーして、Kafka クラスタのマスターノード上の SSH ターミナルに貼り付けます。<return> を押してスクリプトを実行します。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
次の Kafka コマンドを実行して、
custdata
トピックに 10,000 件のメッセージが含まれていることを確認します。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注:
- KAFKA_CLUSTER: Kafka クラスタの名前。
予想される出力:
custdata:0:10000
Cloud Storage に Hive テーブルを作成する
ストリーミングされた Kafka トピックデータを受信する Hive テーブルを作成します。
次の手順を実行して、Cloud Storage バケットに cust_parquet
(パーケット)と cust_orc
(ORC)の Hive テーブルを作成します。
BUCKET_NAME を次のスクリプトに挿入し、スクリプトをコピーして Kafka クラスタのマスターノード上の SSH ターミナルに貼り付けてから、<return> を押して
~/hivetables.hql
(Hive クエリ言語)スクリプトを作成します。次の手順で
~/hivetables.hql
スクリプトを実行して、Cloud Storage バケットにパーケットと ORC の Hive テーブルを作成します。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Kafka クラスタのマスターノードの SSH ターミナルで、
~/hivetables.hql
Hive ジョブを送信して、Cloud Storage バケットにcust_parquet
(パーケット)とcust_orc
(ORC)Hive テーブルを作成します。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注:
- Hive コンポーネントは、Dataproc Kafka クラスタにプリインストールされています。最近リリースされた 2.1 イメージに含まれる Hive コンポーネント バージョンのリストについては、2.1.x リリース バージョンをご覧ください。
- KAFKA_CLUSTER: Kafka クラスタの名前。
- REGION: Kafka クラスタが配置されているリージョン。
Kafka custdata
を Hive テーブルにストリーミングする
- Kafka クラスタのマスターノードの SSH ターミナルで次のコマンドを実行して、
kafka-python
ライブラリをインストールします。 Kafka クライアントは、Kafka トピックのデータを Cloud Storage にストリーミングするのに必要です。
pip install kafka-python
BUCKET_NAME を挿入して、次の PySpark コードを Kafka クラスタ マスターノードの SSH ターミナルに貼り付けてから、<return> を押して
streamdata.py
ファイルを作成します。このスクリプトは、Kafka
custdata
トピックを登録し、データを Cloud Storage の Hive テーブルにストリーミングします。出力形式(パーケットまたは ORC を使用可能)は、パラメータとしてスクリプトに渡します。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Kafka クラスタのマスターノードの SSH ターミナルで、
spark-submit
を実行して Cloud Storage の Hive テーブルにデータをストリーミングします。KAFKA_CLUSTER の名前と出力 FORMAT を挿入し、次のコードをコピーして Kafka クラスタのマスターノード上の SSH ターミナルに貼り付け、<return> を押してコードを実行し、Kafka の
custdata
データをパーケット形式で Cloud Storage の Hive テーブルにストリーミングします。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
注:
- KAFKA_CLUSTER: Kafka クラスタの名前を挿入します。
- FORMAT: 出力形式として
parquet
またはorc
を指定します。コマンドを連続して実行して、両方の形式を Hive テーブルにストリーミングできます。たとえば、最初の呼び出しでは、parquet
を指定して Kafkacustdata
トピックを Hive パーケット テーブルにストリーミングし、次に 2 番目の呼び出しでは、orc
形式を指定して、custdata
を Hive ORC テーブルにストリーミングします。
標準出力が SSH ターミナルで停止したら(これは、すべての
custdata
がストリーミングされたことを示します)、SSH ターミナルで <control-c> を押してプロセスを停止します。Cloud Storage 内の Hive テーブルを一覧表示します。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注:
- BUCKET_NAME: Hive テーブルを含む Cloud Storage バケットの名前を挿入します(Hive テーブルを作成するをご覧ください)。
ストリーミング データをクエリする
Kafka クラスタのマスターノードの SSH ターミナルで、次の
hive
コマンドを実行して、Cloud Storage の Hive テーブルでストリーミングされた Kafkacustdata
メッセージをカウントします。hive -e "select count(1) from TABLE_NAME"
注:
- TABLE_NAME: Hive テーブル名として
cust_parquet
またはcust_orc
を指定します。
予想される出力スニペット:
- TABLE_NAME: Hive テーブル名として
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
クリーンアップ
プロジェクトの削除
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースの削除
-
バケットを削除します。
gcloud storage buckets delete BUCKET_NAME
- Kafka クラスタを削除します。
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}