Kubernetes、Redis、BigQuery を使用したリアルタイム データ分析

このチュートリアルでは、Google Compute Engine、Kubernetes、Redis、および BigQuery で構築されたパイプラインを使用して Twitter データをリアルタイムで分析します。このような分析には、次のような役に立つ応用例が多数あります。

  • 感情分析の実施
  • データ内の一般的なパターンやトレンドの明確化
  • キャンペーンの成果やリーチのモニタリング
  • リアルタイムでのレスポンスの検討

次の図は、サンプル アプリケーションのアーキテクチャを示しています。

サンプル アプリケーションのアーキテクチャ

このサンプル アプリケーションでは、複製ポッドを 1 つ持つ複製コントローラを使用して、受信ツイートをキャッシュする Redis マスターをサポートします。次に、Redis マスターはサービス フロントになります。

デプロイを使用して定義されている複製ポッドは、Twitter streaming API を使用して Twitter のパブリック サンプル ストリームを読み取り、事前定義されたフィルタセットと一致するツイートを Redis キャッシュにダンプします。他の 2 つの複製ポッドでは、Redis キャッシュに対する読み込みをブロックします。ツイートのデータが利用可能になると、これらのポッドによって、BigQuery API を使用してデータが BigQuery に小さな単位で追加されます。

目標

  • ソースコードをダウンロードします。
  • Kubernetes クラスタを構成および起動します。
  • Twitter アプリケーションを作成します。
  • Kubernetes アプリケーションを起動します。
  • BigQuery を使用して Twitter データに対してクエリを実行します。

費用

このチュートリアルでは、以下のような Google Cloud Platform の課金対象となるコンポーネントを使用します。

  • Compute Engine n1-standard-1 仮想マシン インスタンス x 5
  • Compute Engine 10 GB 永続ディスク x 5
  • BigQuery

このチュートリアルの実行にかかる費用は、実行時間によって変動します。料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。

Cloud Platform を初めて使用する方は、無料トライアルをご利用いただけます。

始める前に

新しいプロジェクトを作成、構成する

この例の手順では、必要な API が有効に設定されたプロジェクトを用意する必要があります。Google Cloud Platform Console で新しいプロジェクトを作成した後、Google Compute Engine API を有効にします。課金をまだ有効にしていない場合は、有効にするようにメッセージが表示されます。

このチュートリアルでは、次の API も使用します。これらは新しいプロジェクトを作成したときにデフォルトで有効になります。

  • BigQuery
  • Google Cloud Storage JSON API

Google Cloud SDK の設定

  1. Google アカウントを使用して認証します。

    gcloud auth login
    
  2. Cloud SDK のデフォルトのプロジェクトに、このチュートリアルの前のセクションで選択したプロジェクトを設定します。PROJECT_ID は実際のプロジェクト ID に置き換えます。

    gcloud config set project [PROJECT_ID]
    
  3. コンポーネントを更新します。

    gcloud components update
    
  4. kubectl のバイナリをインストールします。

    gcloud components update kubectl
    
  5. kubectl のディレクトリをパスに追加します。

    export PATH=$PATH:/usr/local/share/google/google-cloud-sdk/bin/
    

サンプルコードのダウンロード

サンプルコードは次の 2 つの方法のいずれかで入手できます。

  • zip アーカイブをダウンロードしますkube-redis-bq という名前のディレクトリにコードを解凍します。

  • GitHub レポジトリのクローンを作成します。ターミナル ウィンドウで次のコマンドを実行します。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-bigquery-python.git kube-redis-bq
    

Kubernetes をダウンロードする

Kubernetes は Docker コンテナ用のオープンソースのオーケストレーション システムです。Compute Engine のクラスタのインスタンスにコンテナをスケジュールし、ワークロードを管理してその状態が宣言されている意図に合致するようにし、管理や検索が容易になるようにラベルタスクの種類ごとにコンテナを整理します。

  1. Kubernetes のバイナリの最新リリースをダウンロードします

  2. サンプルコードをインストールした場所と同じ親ディレクトリにファイルを展開します。tar ファイルは kubernetes という名前のディレクトリに展開されるため、新しいディレクトリを作成する必要はありません。たとえば次のように入力します。

    tar -zxvf kubernetes.tar.gz
    

BigQuery テーブルを作成する

ツイートを保存する BigQuery テーブルを作成します。BigQuery ではテーブルをデータセットと呼ばれる抽象化レイヤにグループ化します。Cloud SDK に含まれている bq コマンドライン ツールを使用して、rtda という名前の新しい BigQuery データセットを作成します。

  1. ターミナル ウィンドウで次のコマンドを入力します。

    bq mk rtda
    
  2. データセットが適切な位置に作成されました。新着ツイートのための tweets という名前の新しいテーブルを作成します。各 BigQuery テーブルはスキーマで定義する必要があります。時間を節約するために、この例では定義済みのスキーマとして schema.json を用意しています。このスキーマを使用してテーブルを定義できます。定義済みのスキーマを使用してテーブルを作成するには、次のコマンドを入力します。

    bq mk -t rtda.tweets kube-redis-bq/bigquery-setup/schema.json
    
  3. 新しいデータセットとテーブルが作成されたことを確認するには、BigQuery ウェブ UI を使用します。左側のサイドバーにデータセットの名前が表示されます。表示されていない場合は、正しいプロジェクトを見ているかどうかを確認してください。データセット名の隣にある矢印をクリックすると、新しいテーブルが表示されます。または、次のコマンドを実行すると、あるプロジェクト内にあるすべてのデータセット、または指定したデータセット内にあるすべてのテーブルを表示することができます。

    bq ls [PROJECT_ID]:
    
    bq ls [DATASET_ID]
    

    この場合、データセット ID は rtda です。

  4. 最後に kube-redis-bq/redis/bigquery-controller.yaml を編集します。次のフィールドの value を更新して、BigQuery の設定を反映させます。

    • PROJECT_ID: 実際のプロジェクト ID です。
    • BQ_DATASET: 使用するテーブルが含まれる BigQuery のデータセットです(rtda)。
    • BQ_TABLE: 作成した BigQuery のテーブルです(tweets)。
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: bigquery-controller
  labels:
    name: bigquery-controller
spec:
  replicas: 2
  template:
    metadata:
      labels:
        name: bigquery-controller
    spec:
      containers:
      - name: bigquery
        image: gcr.io/google-samples/redis-bq-pipe:v5
        env:
        - name: PROCESSINGSCRIPT
          value: redis-to-bigquery
        - name: REDISLIST
          value: twitter-stream
        # Change this to your project ID.
        - name: PROJECT_ID
          value: xxxx
        # Change the following two settings to your dataset and table.
        - name: BQ_DATASET
          value: xxxx
        - name: BQ_TABLE
          value: xxxx

Kubernetes クラスタの構成と起動

Kubernetes は、起動、シャットダウン、構成の管理に一連のシェル スクリプトを使用するポータブルであるため、インストールは不要です。

  1. kubernetes/cluster/gce/config-common.sh を編集します。

  2. bigquery のスコープをファイルの NODE_SCOPES 定義に追加します。

この設定によって、ノードから BigQuery テーブルに書き込みが行えるようになります。設定を保存して、ファイルを閉じます。

(省略可能)Redis の信頼性を長期間維持するために起動スクリプトを更新する

ノードの起動スクリプトにいくつかの推奨する改善項目を追加すると、Kubernetes での Redis の信頼性が長期間にわたって向上します。この手順は Redis マスターを長期間運用する場合に役に立ちます。クラスタを長期間にわたって起動状態にする意図がない場合は、この手順は省略できます。

  1. kubernetes/cluster/gce/configure-vm.sh を編集して、このファイルの末尾までスクロールします。次の if 文で、# 記号の行の後に、

    if [[ -z "${is_push}" ]]; then
    
  2. 次のコマンドを挿入します。

    /sbin/sysctl vm.overcommit_memory=1
    echo never > /sys/kernel/mm/transparent_hugepage/enabled
    

これらのコマンドによって、Linux カーネルが常にメモリを過剰消費するように設定され、カーネルで transparent huge pages が無効になります。ファイルを保存して閉じます。

クラスタを起動する

クラスタを起動する準備が整いました。

  1. 次のコマンドを入力します。

    kubernetes/cluster/kube-up.sh
    

    クラスタが起動するまでに、しばらく時間がかかることがあります。起動プロセスの際に、Compute Engine 用の新しい SSH 認証鍵を作成することを求めるメッセージが表示される場合があります。すでに作成済みの場合は、SSH 認証鍵のパスフレーズを入力してください。

  2. クラスタが起動したら、次のコマンドを入力して、実行されているインスタンスを確認します。

    kubectl get nodes
    kubectl cluster-info

Kubernetes のマスター インスタンスが 1 つと、Kubernetes のノードが 4 つあることを確認します。

Twitter アプリケーションの作成

Twitter からツイートを受信するには、Twitter アプリケーションを作成して、そのキー / シークレットの値およびトークン / シークレットの値を Twitter-to-Redis Kubernetes ポッドの仕様に追加します。全部で 4 つの値をコピーします。手順は次のとおりです。

  1. 新しい Twitter アプリケーションを作成します。
  2. Twitter の [アプリケーション管理] ページで、[キーとアクセス トークン] タブに移動します。
  3. [アクセス トークンの作成] ボタンをクリックして、新しいアクセス トークンを作成します。
  4. kube-redis-bq/redis/twitter-stream.yaml を編集します。
  5. 次の値を使用するコンシューマ キーとコンシューマ シークレットに置き換えます。
    • CONSUMERKEY
    • CONSUMERSECRET
  6. 次の値を使用するアクセス トークンとアクセス トークン シークレットに置き換えます。
    • ACCESSTOKEN
    • ACCESSTOKENSEC

このサンプル アプリケーションでは、デフォルトで Twitter のパブリック ストリームのランダムなサンプルから読み込みを行います。これを使用せず、キーワードに対するフィルタを設定する手順は次のとおりです。

  1. kube-redis-bq/twitter-stream.yaml を編集し、TWSTREAMMODE の値を filter に変更します。
  2. twitter-to-redis.py を編集し、track 変数で定義されているキーワードを、フィルタの対象とするキーワードで置き換えます。
  3. コンテナ イメージを再構築します。コンテナ イメージを構築する方法については、付録をご覧ください。

Kubernetes ポッドの Docker イメージについて

この例では、異なる 3 つの Kubernetes ポッド テンプレートを使用して、分析パイプラインを構築しています。1 つは Redis ポッドで、他の 2 つのポッドのタイプはサンプル アプリケーションのスクリプトを実行します。

仕様ファイルは構築済みの Docker イメージを指しているため、これを使用するために何か特別なことをする必要はありません。イメージを自分で構築したい場合は、付録の手順に従ってください。

Docker イメージには、このソリューションのための操作を実行するメイン スクリプトが 2 つ含まれています。

twitter-to-redis.py のコードは、新着ツイートを Twitter から Redis に送ります。

#!/usr/bin/env python
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script uses the Twitter Streaming API, via the tweepy library,
to pull in tweets and store them in a Redis server.
"""

import datetime
import os

import redis
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

# Get your twitter credentials from the environment variables.
# These are set in the 'twitter-stream.json' manifest file.
consumer_key = os.environ['CONSUMERKEY']
consumer_secret = os.environ['CONSUMERSECRET']
access_token = os.environ['ACCESSTOKEN']
access_token_secret = os.environ['ACCESSTOKENSEC']

# Get info on the Redis host and port from the environment variables.
# The name of this variable comes from the redis service id, 'redismaster'.
REDIS_HOST = os.environ['REDISMASTER_SERVICE_HOST']
REDIS_PORT = os.environ['REDISMASTER_SERVICE_PORT']
REDIS_LIST = os.environ['REDISLIST']

class StdOutListener(StreamListener):
    """A listener handles tweets that are received from the stream.
    This listener dumps the tweets into Redis.
    """

    count = 0
    redis_errors = 0
    allowed_redis_errors = 3
    twstring = ''
    tweets = []
    r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
    total_tweets = 10000000

    def write_to_redis(self, tw):
        try:
            self.r.lpush(REDIS_LIST, tw)
        except:
            print 'Problem adding data to Redis.'
            self.redis_errors += 1

    def on_data(self, data):
        """What to do when tweet data is received."""
        self.write_to_redis(data)
        self.count += 1
        # if we've grabbed more than total_tweets tweets, exit the script.
        # If this script is being run in the context of a kubernetes
        # replicationController, the pod will be restarted fresh when
        # that happens.
        if self.count > self.total_tweets:
            return False
        if self.redis_errors > self.allowed_redis_errors:
            print 'too many redis errors.'
            return False
        if (self.count % 1000) == 0:
            print 'count is: %s at %s' % (self.count, datetime.datetime.now())
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    print '....'
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    print 'stream mode is: %s' % os.environ['TWSTREAMMODE']

    stream = Stream(auth, listener)
    # set up the streaming depending upon whether our mode is 'sample', which
    # will sample the twitter public stream. If not 'sample', instead track
    # the given set of keywords.
    # This environment var is set in the 'twitter-stream.yaml' file.
    if os.environ['TWSTREAMMODE'] == 'sample':
        stream.sample()
    else:
        stream.filter(
                track=['bigdata', 'kubernetes', 'bigquery', 'docker', 'google',
                       'googlecloud', 'golang', 'dataflow',
                       'containers', 'appengine', 'gcp', 'compute',
                       'scalability', 'gigaom', 'news', 'tech', 'apple',
                       'amazon', 'cluster', 'distributed', 'computing',
                       'cloud', 'android', 'mobile', 'ios', 'iphone',
                       'python', 'recode', 'techcrunch', 'timoreilly']
                )

redis-to-bigquery.py のコードは、キャッシュされたデータを Redis から BigQuery に送ります。

#!/usr/bin/env python
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script grabs tweets from a redis server, and stores them in BiqQuery
using the BigQuery Streaming API.
"""
import datetime
import json
import os

import redis

import utils

# Get info on the Redis host and port from the environment variables.
# The name of this variable comes from the redis service id, 'redismaster'.
REDIS_HOST = os.environ['REDISMASTER_SERVICE_HOST']
REDIS_PORT = os.environ['REDISMASTER_SERVICE_PORT']
REDIS_LIST = os.environ['REDISLIST']

r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)

# Get the project ID from the environment variable set in
# the 'bigquery-controller.yaml' manifest.
PROJECT_ID = os.environ['PROJECT_ID']

def write_to_bq(bigquery):
    """Write the data to BigQuery in small chunks."""
    tweets = []
    CHUNK = 50  # The size of the BigQuery insertion batch.
    tweet = None
    mtweet = None
    count = 0
    count_max = 50000
    redis_errors = 0
    allowed_redis_errors = 3
    while count < count_max:
        while len(tweets) < CHUNK:
            # We'll use a blocking list pop -- it returns when there is
            # new data.
            res = None
            try:
                res = r.brpop(REDIS_LIST)
            except:
                print 'Problem getting data from Redis.'
                redis_errors += 1
                if redis_errors > allowed_redis_errors:
                    print "Too many redis errors: exiting."
                    return
                continue
            try:
                tweet = json.loads(res[1])
            except Exception, e:
                print e
                redis_errors += 1
                if redis_errors > allowed_redis_errors:
                    print "Too many redis-related errors: exiting."
                    return
                continue
            # First do some massaging of the raw data
            mtweet = utils.cleanup(tweet)
            # We only want to write tweets to BigQuery; we'll skip 'delete' and
            # 'limit' information.
            if 'delete' in mtweet:
                continue
            if 'limit' in mtweet:
                continue
            tweets.append(mtweet)
        # try to insert the tweets into bigquery
        response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
                             os.environ['BQ_TABLE'], tweets)
        tweets = []
        count += 1
        if count % 25 == 0:
            print ("processing count: %s of %s at %s: %s" %
                   (count, count_max, datetime.datetime.now(), response))

if __name__ == '__main__':
    print "starting write to BigQuery...."
    bigquery = utils.create_bigquery_client()
    write_to_bq(bigquery)

Kubernetes アプリの起動

オプションで Docker イメージを作成して push すると、Kubernetes アプリの起動を開始できるようになります。

Kubernetes では、作成、スケジューリング、管理を行うことができる最小のデプロイ可能なユニットは個々のアプリケーション コンテナではなく、ポッドです。

レプリカセットによって、指定した数のポッドレプリカがどの時点でも確実に実行されます。レプリカの数が多すぎる場合は、一部がシャットダウンされます。レプリカが少なすぎる場合は、さらにレプリカが起動されます。単にシングルトン ポッドを作成したり、ポッドを一括で作成したりする場合とは対照的に、レプリカセットは、ノード障害など、なんらかの理由で削除または終了されたポッドを置き換えます。

デプロイでは、ポッドとレプリカセットに宣言型の更新が行われます。目的の状態をデプロイ オブジェクトに記述するだけで、デプロイ コントローラによって自動制御された速度で実際の状態が目的の状態に変更されます。

Redis-to-BigQuery と Twitter-to-Redis のポッドにデプロイを使用します。

Redis マスターポッドを起動する

最初に Redis マスターポッドを起動します。関心があれば、kube-redis-bq/redis/redis-master.yaml の Redis マスターポッドの仕様ファイルをご覧ください。

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: redis-master
  labels:
    name: redis-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: redis-master
    spec:
      containers:
      - name: master
        image: redis
        ports:
        - containerPort: 6379
  1. ポッドを起動するには、以下のコマンドを実行します。

    kubectl create -f kube-redis-bq/redis/redis-master.yaml
    
  2. Redis マスターポッドが動作していることを確認するには、次のコマンドを実行します。

    kubectl get pods
    

マスターが動作していることを確認するには、STATUS フィールドを確認してください。さらに、他のポッドがすでに動作していることが確認できる場合もあります。これらの他のポッドは Kubernetes のフレームワークで追加および使用されているものです。

ポッドがインスタンスに配置されるまで、30 秒ほどかかる場合があります。この間、Redis マスターポッドにはそのホストが <unassigned> としてリストされます。しかし、Host の最終的な値はポッドが実行されているインスタンスの名前になります。処理が完了すると、ポッドの状態は ContainerCreating から Running に変化します。

Redis マスター サービスの起動

Kubernetes では、サービスによってポッドの論理的なセットと、アクセスするためのポリシーを定義します。サービスは kube-redis-bq/redis/redis-master-service.yaml で次のように指定されています。

apiVersion: v1
kind: Service
metadata:
  name: redismaster
  labels:
    name: redis-master
spec:
  ports:
    # The port that this service should serve on.
    # You don't need to specify the targetPort if it is the same as the port,
    # though here we include it anyway, to show the syntax.
  - port: 6379
    targetPort: 6379
  selector:
    name: redis-master

上記の手順で作成した Redis ポッドには、name: redis-master というラベルが付けられています。サービスのセレクタ フィールドによって、サービスに送信されるトラフィックを受信するポッドが決まります。

  1. 次のコマンドを入力して、サービスを起動します。

    kubectl create -f kube-redis-bq/redis/redis-master-service.yaml
    
  2. 動作しているサービスを表示するには、以下のコマンドを実行します。

    kubectl get services
    

Redis-to-BigQuery デプロイの起動

次に、Redis のキャッシュからツイートを pull して、それらを BigQuery テーブルに送るポッドのデプロイを起動します。

デプロイを起動する前に、kube-redis-bq/redis/bigquery-controller.yaml を確認しておくと役立つ場合があります。このファイルにはデプロイが定義されており、2 つのポッドレプリカを含むレプリカセットが指定されています。複製されたこれらのポッドの特性は、ポッド テンプレートを使用してファイルに定義されています。

  1. 複製されたポッドを起動するには、次のコマンドを実行します。

    kubectl create -f kube-redis-bq/redis/bigquery-controller.yaml
    
  2. ポッドが動作していることを確認するには、次のコマンドを実行します。

    kubectl get pods
    

    新しいポッドが ContainerCreating から Running に移行するには、30 秒程度かかることがあります。このリストには redis-master とラベルが付けられた Redis ポッドが 1 つと、bigquery-controller というラベルが付けられた BigQuery-to-Redis ポッドが 2 つあることがわかります。

  3. システムに定義されているデプロイと、それぞれに指定されているレプリカの数を確認するには、次のコマンドを実行します。

    kubectl get deployments
    

Twitter-to-Redis デプロイの起動

Redis マスターポッドおよび Redis-to-BigQuery パイプライン ポッドが動作したら、ツイートを pull して Redis キャッシュに追加するデプロイを起動します。kube-redis-bq/redis/bigquery-controller.yaml と同様に、Twitter-to-Redis 仕様ファイルである kube-redis-bq/redis/twitter-stream.yaml はデプロイを定義します。ただし今回は、Twitter で同時に 1 つのストリーミング API 接続しか許可されないため、デプロイで必要になる複製ポッドは 1 つのみです。

  1. デプロイを起動するには、以下のコマンドを実行します。

    kubectl create -f kube-redis-bq/redis/twitter-stream.yaml
    
  2. 次のコマンドを実行して、すべてのポッドが動作していることを確認します。新しいポッドが ContainerCreating から Running に移行するには、再び 30 秒ほどかかることがあります。

    kubectl get pods
    

    前の手順で確認したポッドの他に、twitter-stream というラベルが付けられた新しいポッドが確認できます。これで分析パイプラインが起動して動作するようになりました。

  3. システムに定義されているデプロイと、それぞれに指定されているレプリカの数を確認するには、次のコマンドを実行します。

    kubectl get deployments
    

    次のような出力が表示されます。

    NAME                   DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
    bigquery-controller    2         2         2            2           2m
    twitter-stream         1         1         1            1           2m
    

BigQuery テーブルに対するクエリの実行

BigQuery ウェブ UI を開き、[クエリの作成] をクリックして、新しいクエリの作成書き込みを開始します。これが動作していることを確認するには、次のような単純なクエリを実行します。

SELECT
  text
FROM
  [rtda.tweets]
LIMIT
  1000 IGNORE CASE;

パイプラインでしばらくの間、ツイートを収集します。数時間で十分ですが、収集する時間が長いほど、データセットは充実します。BigQuery テーブルにデータがいくらか集積されたら、興味のあるサンプルクエリを試すことができます。

このサンプルクエリでは、特定の用語にフィルタを実行して(この場合は、「android」)、テーブル内で最もリツイートされたツイートを探し出す方法を示しています。

SELECT
  text,
  MAX(retweeted_status.retweet_count) AS max_retweets,
  retweeted_status.user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'android'
GROUP BY
  text,
  retweeted_status.user.screen_name
ORDER BY
  max_retweets DESC
LIMIT
  1000 IGNORE CASE;

いくつかの用語を使用して収集したツイートにフィルタを設定しても、興味深い結果が得られるかもしれません。次のクエリでは、「Kubernetes」、「BigQuery」、「Redis」、あるいは「Twitter」という単語にフィルタを設定します。

SELECT
  created_at,
  text,
  id,
  retweeted_status.retweet_count,
  user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'kubernetes'
  OR text CONTAINS 'BigQuery'
  OR text CONTAINS 'redis'
  OR text CONTAINS 'twitter'
ORDER BY
  created_at DESC
LIMIT
  1000 IGNORE CASE;

次のクエリでは、一連のツイートにおけるお気に入りの数とリツイートの数の間の関連性を調べます。

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c
FROM [rtda.tweets]
GROUP BY lang
HAVING c > 2000000
ORDER BY 1

さらに深く調べていくと、特定の言語の話者がリツイートよりもお気に入りを好むかどうか、あるいはその逆を調べることもできます。

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c,
AVG(retweeted_status.retweet_count) avg_rt,
AVG(retweeted_status.favorite_count) avg_fv,
AVG(retweeted_status.retweet_count)/AVG(retweeted_status.favorite_count) ratio_rt_fv
FROM [rtda.tweets]
WHERE retweeted_status.retweet_count > 1 AND retweeted_status.favorite_count > 1
GROUP BY lang
HAVING c > 1000000
ORDER BY 1;

クリーンアップ

ラベルを使用すると、停止または削除するリソースを簡単に選択できます。次に例を示します。

kubectl delete deployment -l "name in (twitter-stream, bigquery-controller)"

仕様ファイル名を使用してリソースを削除することもできます。たとえば、次のようにして Redis サービスと複製コントローラを削除します。

kubectl delete -f redis-master.yaml
kubectl delete -f redis-master-service.yaml

クラスタのインスタンスもすべてシャットダウンするには、次のコマンドを実行します。

kubernetes/cluster/kube-down.sh

これによってクラスタ内のすべてのインスタンスが削除されます。

付録: Docker イメージの構築と push

Kubernetes のポッドには、アプリのスクリプトやそのサポート ライブラリが含まれる Docker イメージが必要です。このイメージは、分析パイプラインの一部である Twitter-to-Redis ポッドおよび Redis-to-BigQuery ポッドを起動するために使用します。kube-redis-bq/redis/redis-pipe-image ディレクトリにある Dockerfile では、このイメージを次のように指定しています。

FROM python:2

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install redis
RUN pip install python-dateutil

ADD twitter-to-redis.py /twitter-to-redis.py
ADD redis-to-bigquery.py /redis-to-bigquery.py
ADD controller.py /controller.py
ADD utils.py /utils.py

CMD python controller.py

この Dockerfile では、最初に Docker デーモンに対して、次に示す必須の Python ライブラリを新しいイメージにインストールするよう指示します。

  • Twitter API に対する Python のラッパーである tweepy
  • Redis に対する Python のラッパーである redis-py
  • Google API Python ライブラリ。
  • Python の標準 python-dateutil モジュールの拡張機能である datetime

次に、redis-pipe-image の 4 つのスクリプトを追加します。

  • controller.py:: パイプライン イメージ内の他の 2 つの Python スクリプトに対する共通の実行ポイントを提供します。
  • utils.py: いくつかのヘルパー機能が含まれます。
  • redis-to-bigquery.py:: キャッシュされたデータを Redis から BigQuery に送ります。
  • twitter-to-redis.py:: 新着ツイートを Twitter から Redis に送ります。

  • 関連付けられた Dockerfile を使用してパイプライン イメージを構築するには、次のコマンドを実行します。[PROJECT_ID] は実際のプロジェクト ID に置き換えます。

    sudo docker build -t gcr.io/[PROJECT_ID]/pipeline_image kube-redis-bq/redis/redis-pipe-image
    
  • イメージを構築した後、これを Google Container Registry に push して、Kubernetes からアクセスできるようにします。次のコマンドを実行するときには、[PROJECT_ID] を実際のプロジェクト ID に置き換えます。

    sudo gcloud docker -- push gcr.io/[PROJECT_ID]/pipeline_image
    

カスタム イメージを使用するよう Kubernetes の仕様ファイルを更新する

次に、仕様ファイル内の image の値を更新して、新しいイメージが使用されるようにします。次の 2 つのファイルを更新する必要があります。

kube-redis-bq/redis/twitter-stream.yaml
kube-redis-bq/redis/bigquery-controller.yaml
  1. 各ファイルを編集して、次の行を探します。

    gcr.io/google-samples/redis-bq-pipe:v3
    
  2. your-project をプロジェクト ID に置き換えます。イメージの名前は pipeline_image に置き換えます。

  3. ファイルを保存して閉じます。

次のステップ

  • Google Cloud Platform のその他の機能を試すには、チュートリアルをご覧ください。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...