Kubernetes、Cloud Pub/Sub、BigQuery を使用したリアルタイム データ分析

このチュートリアルでは、Google Compute Engine、Kubernetes、Google Cloud Pub/Sub、BigQuery で構築されたパイプラインを使用して、Twitter データのリアルタイム データ分析を実行します。このような分析には、次のような役に立つ応用例が多数あります。

  • 感情分析の実行
  • データ内の一般的なパターンやトレンドの明確化
  • キャンペーンの成果やリーチのモニタリング
  • リアルタイムでのレスポンスの検討

次の図は、サンプル アプリケーションのアーキテクチャを示しています。

サンプル アプリケーションのアーキテクチャ

このサンプル アプリケーションでは、Cloud Pub/Sub を使用して新着ツイートをバッファします。Deployment を使用して定義された複製ポッドで、Twitter Streaming API を使用して Twitter のパブリック サンプル ストリームを読み取り、新着ツイートを Cloud Pub/Sub トピックとしてパブリッシュします。その他の 2 つの複製ポッドでそのトピックをサブスクライブします。新しいツイートデータがトピックに対してパブリッシュされると、これらのポッドで BigQuery API を使用して BigQuery にデータが追加されます。

目標

  • ソースコードをダウンロードします。
  • Kubernetes クラスタを構成および起動します。
  • Twitter アプリケーションを作成します。
  • Cloud Pub/Sub のトピックを作成します。
  • Kubernetes アプリケーションを起動します。
  • BigQuery を使用して Twitter データに対してクエリを実行します。

費用

このチュートリアルでは、以下のような Google Cloud Platform の課金対象となるコンポーネントを使用します。

  • Compute Engine n1-standard-1 仮想マシン インスタンス x 5
  • Compute Engine 10 GB 永続ディスク x 5
  • BigQuery

このチュートリアルの実行にかかる費用は、実行時間によって変動します。料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。

Cloud Platform を初めて使用する方は、無料トライアルをご利用いただけます。

始める前に

新しいプロジェクトを作成、構成する

この例の手順を実行するには、必要な API が有効に設定されたプロジェクトを用意する必要があります。Google Cloud Platform Console で新しいプロジェクトを作成し、以下の API を有効にします

  • Google Compute Engine API
  • Google Pub/Sub API

課金をまだ有効にしていない場合は、有効にするようメッセージが表示されます。

このチュートリアルでは、次の API も使用します。これらは新しいプロジェクトを作成したときにデフォルトで有効になります。

  • BigQuery
  • Google Cloud Storage JSON API

Google Cloud SDK の設定

  1. Google アカウントを使用して認証します。

    gcloud auth login
    
  2. Cloud SDK のデフォルトのプロジェクトに、このチュートリアルの前のセクションで選択したプロジェクトを設定します。PROJECT_ID は実際のプロジェクト ID に置き換えます。

    gcloud config set project [PROJECT_ID]
    
  3. コンポーネントを更新します。

    gcloud components update
    
  4. kubectl のバイナリをインストールします。

    gcloud components update kubectl
    
  5. kubectl のディレクトリをパスに追加します。

    export PATH=$PATH:/usr/local/share/google/google-cloud-sdk/bin/
    

サンプルコードをダウンロードする

サンプルコードは次の 2 つの方法のいずれかで入手できます。

  • zip アーカイブをダウンロードしますkube-pubsub-bq という名前のディレクトリにコードを解凍します。

  • GitHub レポジトリのクローンを作成します。ターミナル ウィンドウで次のコマンドを実行します。

    git clone https://github.com/GoogleCloudPlatform/kubernetes-bigquery-python.git kube-pubsub-bq
    

Kubernetes をダウンロードする

Kubernetes は Docker コンテナ用のオープンソースのオーケストレーション システムです。Compute Engine のクラスタのインスタンスにコンテナをスケジュールし、ワークロードを管理してその状態が宣言されている意図に合致するようにし、管理や検索が容易になるようにラベルタスクの種類ごとにコンテナを整理します。

  1. Kubernetes のバイナリの最新リリースをダウンロードします

  2. サンプルコードをインストールした場所と同じ親ディレクトリにファイルを展開します。tar ファイルは kubernetes という名前のディレクトリに展開されるため、新しいディレクトリを作成する必要はありません。たとえば次のように入力します。

    tar -zxvf kubernetes.tar.gz
    

BigQuery テーブルを作成する

ツイートを保存する BigQuery テーブルを作成します。BigQuery ではテーブルをデータセットと呼ばれる抽象化レイヤにグループ化します。Cloud SDK に含まれている bq コマンドライン ツールを使用して、rtda という名前の新しい BigQuery データセットを作成します。

  1. ターミナル ウィンドウで次のコマンドを入力します。

    bq mk rtda
    
  2. データセットが適切な位置に作成されました。新着ツイートのための tweets という名前の新しいテーブルを作成します。各 BigQuery テーブルはスキーマで定義する必要があります。時間を節約するために、この例では定義済みのスキーマとして schema.json を用意しています。このスキーマを使用してテーブルを定義できます。定義済みのスキーマを使用してテーブルを作成するには、次のコマンドを入力します。

    bq mk -t rtda.tweets kube-pubsub-bq/bigquery-setup/schema.json
    
  3. 新しいデータセットとテーブルが作成されたことを確認するには、BigQuery ウェブ UI を使用します。左側のサイドバーにデータセットの名前が表示されます。表示されていない場合は、正しいプロジェクトを見ているかどうかを確認してください。データセット名の隣にある矢印をクリックすると、新しいテーブルが表示されます。または、次のコマンドを実行すると、あるプロジェクト内にあるすべてのデータセット、または指定したデータセット内にあるすべてのテーブルを表示することができます。

    bq ls [PROJECT_ID]:
    
    bq ls [DATASET_ID]
    

    この場合、データセット ID は rtda です。

  4. 最後に kube-pubsub-bq/pubsub/bigquery-controller.yaml を編集します。次のフィールドの value を更新して、BigQuery の構成を反映させます。

    • PROJECT_ID: 実際のプロジェクト ID です。
    • BQ_DATASET: 使用するテーブルが含まれる BigQuery のデータセットです(rtda)。
    • BQ_TABLE: 作成した BigQuery のテーブルです(tweets)。
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: bigquery-controller
  labels:
    name: bigquery-controller
spec:
  replicas: 2
  template:
    metadata:
      labels:
        name: bigquery-controller
    spec:
      containers:
      - name: bigquery
        image: gcr.io/google-samples/pubsub-bq-pipe:v5
        env:
        - name: PROCESSINGSCRIPT
          value: pubsub-to-bigquery
        # Change this to your pubsub topic
        - name: PUBSUB_TOPIC
          value: projects/your-project/topics/your-topic
        # Change this to your project ID.
        - name: PROJECT_ID
          value: xxxx
        # Change the following two settings to your dataset and table.
        - name: BQ_DATASET
          value: xxxx
        - name: BQ_TABLE
          value: xxxx

Kubernetes クラスタの構成と起動

Kubernetes は、起動、シャットダウン、構成の管理に一連のシェル スクリプトを使用するポータブルであるため、インストールは不要です。

  1. kubernetes/cluster/gce/config-common.sh を編集します。

  2. bigqueryhttps://www.googleapis.com/auth/pubsub のスコープをファイルの NODE_SCOPES 定義に追加します。

この設定によって、ノードから BigQuery テーブルに書き込みが行えるようになります。構成を保存して、ファイルを閉じます。

クラスタを起動する

クラスタを起動する準備が整いました。

  1. 次のコマンドを入力します。

    kubernetes/cluster/kube-up.sh
    

    クラスタが起動するまでに、しばらく時間がかかることがあります。起動プロセスの際に、Compute Engine 用の新しい SSH 認証鍵を作成することを求めるメッセージが表示される場合があります。すでに作成済みの場合は、SSH 認証鍵のパスフレーズを入力してください。

  2. クラスタが起動したら、次のコマンドを入力して、実行されているインスタンスを確認します。

    kubectl get nodes
    kubectl cluster-info

Kubernetes のマスター インスタンスが 1 つと、Kubernetes のノードが 4 つあることを確認します。

Twitter アプリケーションの作成

Twitter からツイートを受信するには、Twitter アプリケーションを作成して、そのキー / シークレットの値およびトークン / シークレットの値を Twitter-to-PubSub Kubernetes ポッドの仕様に追加します。全部で 4 つの値をコピーします。手順は次のとおりです。

  1. 新しい Twitter アプリケーションを作成します。
  2. Twitter の [アプリケーション管理] ページで、[キーとアクセス トークン] タブに移動します。
  3. [アクセス トークンの作成] ボタンをクリックして、新しいアクセス トークンを作成します。
  4. kube-pubsub-bq/pubsub/twitter-stream.yaml を編集します。
  5. 次の値を、使用するコンシューマ キーとコンシューマ シークレットに置き換えます。
    • CONSUMERKEY
    • CONSUMERSECRET
  6. 次の値を、使用するアクセス トークンとアクセス トークン シークレットに置き換えます。
    • ACCESSTOKEN
    • ACCESSTOKENSEC

このサンプル アプリケーションでは、デフォルトで Twitter のパブリック ストリームのランダムなサンプルから読み込みを行います。これを使用せず、キーワードに対するフィルタを設定する方法は次のとおりです。

  1. kube-pubsub-bq/twitter-stream.yaml を編集し、TWSTREAMMODE の値を filter に変更します。
  2. twitter-to-pubsub.py を編集して、track 変数で定義されているキーワードを、フィルタの対象とするキーワードに置き換えます。
  3. コンテナ イメージを再構築します。コンテナ イメージを構築する方法については、付録をご覧ください。

Cloud Pub/Sub トピックを作成する

次に、Kubernetes のポッドがパブリッシュおよびサブスクライブできる Cloud Pub/Sub トピックを作成します。

  1. 試してみましょうセクションに移動します。このセクションは、関連する Cloud Pub/Sub API ページにあります。

  2. [OAuth 2.0 を使用してリクエストを承認] スイッチを有効にして、[承認] をクリックします。

  3. [名前] フィールドに次のパスを入力します。[PROJECT_ID] は実際のプロジェクト ID に置き換えます。

    projects/[PROJECT_ID]/topics/new_tweets
    
  4. [実行] をクリックして、新しいトピックを作成します。[レスポンス] セクションでレスポンス コードを確認して、トピックが作成されたことを確認できます。レスポンス コードが 200 OK であれば、トピックは正しく作成されています。

  5. 次の 2 つのファイルを編集します。

    kube-pubsub-bq/pubsub/twitter-stream.yaml

    kube-pubsub-bq/pubsub/bigquery-controller.yaml

  6. ファイルの PUBSUB_TOPIC の値を新しいトピックに書き換えます。次の行を探してください。

    value: projects/your-project/topics/your-topic
    

    your-project は実際のプロジェクト ID に置き換えます。your-topicnew_tweets に置き換えます。

Kubernetes ポッドの Docker イメージについて

この例では、異なる 2 つの Kubernetes ポッド テンプレートを使用して、分析パイプラインを構築しています。仕様ファイルは構築済みの Docker イメージを指しているため、これを使用するために何か特別なことをする必要はありません。イメージを自分で構築したい場合の手順は、付録のとおりです。

Docker イメージには、このソリューションのための操作を実行するメイン スクリプトが 2 つ含まれています。

twitter-to-pubsub.py のコードは、新着ツイートを Twitter から Cloud Pub/Sub に送ります。

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script uses the Twitter Streaming API, via the tweepy library,
to pull in tweets and publish them to a PubSub topic.
"""

import base64
import datetime
import os
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

import utils

# Get your twitter credentials from the environment variables.
# These are set in the 'twitter-stream.json' manifest file.
consumer_key = os.environ['CONSUMERKEY']
consumer_secret = os.environ['CONSUMERSECRET']
access_token = os.environ['ACCESSTOKEN']
access_token_secret = os.environ['ACCESSTOKENSEC']

PUBSUB_TOPIC = os.environ['PUBSUB_TOPIC']
NUM_RETRIES = 3

def publish(client, pubsub_topic, data_lines):
    """Publish to the given pubsub topic."""
    messages = []
    for line in data_lines:
        pub = base64.urlsafe_b64encode(line)
        messages.append({'data': pub})
    body = {'messages': messages}
    resp = client.projects().topics().publish(
            topic=pubsub_topic, body=body).execute(num_retries=NUM_RETRIES)
    return resp

class StdOutListener(StreamListener):
    """A listener handles tweets that are received from the stream.
    This listener dumps the tweets into a PubSub topic
    """

    count = 0
    twstring = ''
    tweets = []
    batch_size = 50
    total_tweets = 10000000
    client = utils.create_pubsub_client(utils.get_credentials())

    def write_to_pubsub(self, tw):
        publish(self.client, PUBSUB_TOPIC, tw)

    def on_data(self, data):
        """What to do when tweet data is received."""
        self.tweets.append(data)
        if len(self.tweets) >= self.batch_size:
            self.write_to_pubsub(self.tweets)
            self.tweets = []
        self.count += 1
        # if we've grabbed more than total_tweets tweets, exit the script.
        # If this script is being run in the context of a kubernetes
        # replicationController, the pod will be restarted fresh when
        # that happens.
        if self.count > self.total_tweets:
            return False
        if (self.count % 1000) == 0:
            print 'count is: %s at %s' % (self.count, datetime.datetime.now())
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    print '....'
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    print 'stream mode is: %s' % os.environ['TWSTREAMMODE']

    stream = Stream(auth, listener)
    # set up the streaming depending upon whether our mode is 'sample', which
    # will sample the twitter public stream. If not 'sample', instead track
    # the given set of keywords.
    # This environment var is set in the 'twitter-stream.yaml' file.
    if os.environ['TWSTREAMMODE'] == 'sample':
        stream.sample()
    else:
        stream.filter(
                track=['bigdata', 'kubernetes', 'bigquery', 'docker', 'google',
                       'googlecloud', 'golang', 'dataflow',
                       'containers', 'appengine', 'gcp', 'compute',
                       'scalability', 'gigaom', 'news', 'tech', 'apple',
                       'amazon', 'cluster', 'distributed', 'computing',
                       'cloud', 'android', 'mobile', 'ios', 'iphone',
                       'python', 'recode', 'techcrunch', 'timoreilly']
                )

pubsub-to-bigquery.py のコードは、キャッシュされたデータを Cloud Pub/Sub から BigQuery に送ります。

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This script grabs tweets from a PubSub topic, and stores them in BiqQuery
using the BigQuery Streaming API.
"""

import base64
import datetime
import json
import os
import time

import utils

# Get the project ID and pubsub topic from the environment variables set in
# the 'bigquery-controller.yaml' manifest.
PROJECT_ID = os.environ['PROJECT_ID']
PUBSUB_TOPIC = os.environ['PUBSUB_TOPIC']
NUM_RETRIES = 3

def fqrn(resource_type, project, resource):
    """Returns a fully qualified resource name for Cloud Pub/Sub."""
    return "projects/{}/{}/{}".format(project, resource_type, resource)

def create_subscription(client, project_name, sub_name):
    """Creates a new subscription to a given topic."""
    print "using pubsub topic: %s" % PUBSUB_TOPIC
    name = get_full_subscription_name(project_name, sub_name)
    body = {'topic': PUBSUB_TOPIC}
    subscription = client.projects().subscriptions().create(
            name=name, body=body).execute(num_retries=NUM_RETRIES)
    print 'Subscription {} was created.'.format(subscription['name'])

def get_full_subscription_name(project, subscription):
    """Returns a fully qualified subscription name."""
    return fqrn('subscriptions', project, subscription)

def pull_messages(client, project_name, sub_name):
    """Pulls messages from a given subscription."""
    BATCH_SIZE = 50
    tweets = []
    subscription = get_full_subscription_name(project_name, sub_name)
    body = {
            'returnImmediately': False,
            'maxMessages': BATCH_SIZE
    }
    try:
        resp = client.projects().subscriptions().pull(
                subscription=subscription, body=body).execute(
                        num_retries=NUM_RETRIES)
    except Exception as e:
        print "Exception: %s" % e
        time.sleep(0.5)
        return
    receivedMessages = resp.get('receivedMessages')
    if receivedMessages is not None:
        ack_ids = []
        for receivedMessage in receivedMessages:
                message = receivedMessage.get('message')
                if message:
                        tweets.append(
                            base64.urlsafe_b64decode(str(message.get('data'))))
                        ack_ids.append(receivedMessage.get('ackId'))
        ack_body = {'ackIds': ack_ids}
        client.projects().subscriptions().acknowledge(
                subscription=subscription, body=ack_body).execute(
                        num_retries=NUM_RETRIES)
    return tweets

def write_to_bq(pubsub, sub_name, bigquery):
    """Write the data to BigQuery in small chunks."""
    tweets = []
    CHUNK = 50  # The size of the BigQuery insertion batch.
    # If no data on the subscription, the time to sleep in seconds
    # before checking again.
    WAIT = 2
    tweet = None
    mtweet = None
    count = 0
    count_max = 50000
    while count < count_max:
        while len(tweets) < CHUNK:
            twmessages = pull_messages(pubsub, PROJECT_ID, sub_name)
            if twmessages:
                for res in twmessages:
                    try:
                        tweet = json.loads(res)
                    except Exception, bqe:
                        print bqe
                    # First do some massaging of the raw data
                    mtweet = utils.cleanup(tweet)
                    # We only want to write tweets to BigQuery; we'll skip
                    # 'delete' and 'limit' information.
                    if 'delete' in mtweet:
                        continue
                    if 'limit' in mtweet:
                        continue
                    tweets.append(mtweet)
            else:
                # pause before checking again
                print 'sleeping...'
                time.sleep(WAIT)
        response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
                             os.environ['BQ_TABLE'], tweets)
        tweets = []
        count += 1
        if count % 25 == 0:
            print ("processing count: %s of %s at %s: %s" %
                   (count, count_max, datetime.datetime.now(), response))

if __name__ == '__main__':
    topic_info = PUBSUB_TOPIC.split('/')
    topic_name = topic_info[-1]
    sub_name = "tweets-%s" % topic_name
    print "starting write to BigQuery...."
    credentials = utils.get_credentials()
    bigquery = utils.create_bigquery_client(credentials)
    pubsub = utils.create_pubsub_client(credentials)
    try:
        # TODO: check if subscription exists first
        subscription = create_subscription(pubsub, PROJECT_ID, sub_name)
    except Exception, e:
        print e
    write_to_bq(pubsub, sub_name, bigquery)
    print 'exited write loop'

Kubernetes アプリの起動

オプションで Docker イメージを作成して push すると、Kubernetes アプリの起動を開始できるようになります。

Kubernetes では、作成、スケジューリング、管理を行える最小のデプロイ可能なユニットは個々のアプリケーション コンテナではなく、ポッド {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} です。

レプリカセット {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} によって、指定した数のポッドレプリカがどの時点でも確実に実行されるようになります。数が多すぎる場合、一部がシャットダウンされます。レプリカが少なすぎる場合は、さらにレプリカが起動されます。単にシングルトン ポッドを作成したり、ポッドを一括で作成したりする場合とは対照的に、レプリカセットは、ノード障害など、なんらかの理由で削除または終了されたポッドを置き換えます。

Deployment {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} では、ポッドとレプリカセットに宣言型の更新が行われます。目的の状態を Deployment オブジェクトに記述するだけで、Deployment コントローラによって自動制御された速度で実際の状態が目的の状態に変更されます。

Deployment を使用して Kubernetes アプリを起動します。

Cloud-Pub/Sub-to-BigQuery Deployment の起動

Cloud Pub/Sub トピックをサブスクライブするポッドに対する Deployment の起動を開始し、使用可能になれば、BigQuery テーブルにツイートを送ります。

Deployment を起動する前に、kube-pubsub-bq/pubsub/bigquery-controller.yaml を確認しておくと役立つ場合があります。このファイルには Deployment が定義されており、2 つのポッドレプリカを含むレプリカセットが指定されています。複製されたこれらのポッドの特性は、ポッド テンプレートを使用してファイルに定義されています。

  1. 複製されたポッドを起動するには、次のコマンドを実行します。

    kubectl create -f kube-pubsub-bq/pubsub/bigquery-controller.yaml
    
  2. ポッドが動作していることを確認するには、次のコマンドを実行します。

    kubectl get pods
    

    新しいポッドが ContainerCreating から Running に移行するには、30 秒程度かかることがあります。このリストには bigquery-controller とラベルが付けられた BigQuery-to-Cloud-Pub/Sub ポッドが 2 つあることがわかります。

  3. 次のコマンドを実行します。

    kubectl get deployments

これにより、システムに定義されている Deployment と、それぞれに指定されているレプリカの数を確認できます。

Twitter-to-Cloud-Pub/Sub Deployment の起動

Cloud-Pub/Sub-to-BigQuery パイプライン ポッドが起動して動作するようになったら、ツイートを受信して Cloud Pub/Sub トピックにパブリッシュする Deployment を起動します。kube-pubsub-bq/pubsub/bigquery-controller.yaml と同様に、Twitter-to-Cloud-Pub/Sub 仕様ファイルである kube-pubsub-bq/pubsub/twitter-stream.yaml は Deployment を定義します。ただし今回は、Twitter で同時に 1 つのストリーミング API 接続しか許可されないため、Deployment で必要になる複製ポッドは 1 つのみです。

  1. Deployment を起動するには、次のコマンドを実行します。

    kubectl create -f kube-pubsub-bq/pubsub/twitter-stream.yaml
    
  2. 次のコマンドを実行して、すべてのポッドが動作していることを確認します。新しいポッドが ContainerCreating から Running に移行するには、再び 30 秒ほどかかることがあります。

    kubectl get pods
    

前の手順で確認したポッドの他に、twitter-stream というラベルが付けられた新しいポッドが確認できます。これで分析パイプラインが起動して動作するようになりました。

  1. もう一度、次のコマンドを実行します。

    kubectl get deployments

これにより、システムに定義されている Deployment と、それぞれに指定されているレプリカの数を確認できます。次のような出力が表示されます。

  NAME                   DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
  bigquery-controller    2         2         2            2           2m
  twitter-stream         1         1         1            1           2m

BigQuery テーブルに対するクエリの実行

BigQuery ウェブ UI を開き、[クエリの作成] をクリックして、新しいクエリの作成書き込みを開始します。これが動作していることを確認するには、次のような単純なクエリを実行します。

SELECT
  text
FROM
  [rtda.tweets]
LIMIT
  1000 IGNORE CASE;

パイプラインでしばらくの間、ツイートを収集します。数時間で十分ですが、収集する時間が長いほど、データセットは充実します。BigQuery テーブルにデータがいくらか集積されたら、興味のあるサンプルクエリを試すことができます。

このクエリの例では、特定の用語でフィルタをかけて(この場合は、「android」)、テーブル内で最もリツイートされたツイートを探し出す方法を示しています。

SELECT
  text,
  MAX(retweeted_status.retweet_count) AS max_retweets,
  retweeted_status.user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'android'
GROUP BY
  text,
  retweeted_status.user.screen_name
ORDER BY
  max_retweets DESC
LIMIT
  1000 IGNORE CASE;

いくつかの用語を使用して収集したツイートにフィルタを設定しても、興味深い結果が得られるかもしれません。次のクエリでは、「Kubernetes」、「BigQuery」、「Cloud Pub/Sub」、「Twitter」という単語にフィルタを設定します。

SELECT
  created_at,
  text,
  id,
  retweeted_status.retweet_count,
  user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'kubernetes'
  OR text CONTAINS 'BigQuery'
  OR text CONTAINS 'cloud pub/sub'
  OR text CONTAINS 'twitter'
ORDER BY
  created_at DESC
LIMIT
  1000 IGNORE CASE;

次のクエリでは、一連のツイートにおけるお気に入りの数とリツイートの数の間の関連性を調べます。

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c
FROM [rtda.tweets]
GROUP BY lang
HAVING c > 2000000
ORDER BY 1

さらに深く調べていくと、特定の言語の話者がリツイートよりもお気に入りを好むかどうか、あるいはその逆を調べることもできます。

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c,
AVG(retweeted_status.retweet_count) avg_rt,
AVG(retweeted_status.favorite_count) avg_fv,
AVG(retweeted_status.retweet_count)/AVG(retweeted_status.favorite_count) ratio_rt_fv
FROM [rtda.tweets]
WHERE retweeted_status.retweet_count > 1 AND retweeted_status.favorite_count > 1
GROUP BY lang
HAVING c > 1000000
ORDER BY 1;

クリーンアップ

ラベルを使用すると、停止または削除するリソースを簡単に選択できます。次に例を示します。

kubectl delete deployment -l "name in (twitter-stream, bigquery-controller)"

クラスタのインスタンスもすべてシャットダウンするには、次のコマンドを実行します。

kubernetes/cluster/kube-down.sh

これによってクラスタ内のすべてのインスタンスが削除されます。

付録: Docker イメージの構築と push

Kubernetes のポッドには、アプリのスクリプトやそのサポート ライブラリが含まれる Docker イメージが必要です。このイメージは、分析パイプラインの一部である Twitter-to-Cloud-Pub/Sub ポッドおよび Cloud-Pub/Sub-to-BigQuery ポッドを起動するために使用します。kube-pubsub-bq/pubsub/pubsub-pipe-image ディレクトリにある Dockerfile では、このイメージを次のように指定しています。

FROM python:2

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install python-dateutil

ADD twitter-to-pubsub.py /twitter-to-pubsub.py
ADD pubsub-to-bigquery.py /pubsub-to-bigquery.py
ADD controller.py /controller.py
ADD utils.py /utils.py

CMD python controller.py

この Dockerfile では、最初に Docker デーモンに対して、次に示す必須の Python ライブラリを新しいイメージにインストールするよう指示します。

  • Twitter API の Python ラッパーである tweepy
  • Google API Python ライブラリ。
  • Python の標準 datetime モジュールの拡張機能である python-dateutil

次に、pubsub-pipe-image の 4 つのスクリプトを追加します。

  • controller.py: パイプライン イメージ内の他の 2 つの Python スクリプトに対する共通の実行ポイントを提供します。
  • utils.py: いくつかのヘルパー機能が含まれます。
  • pubsub-to-bigquery.py: キャッシュされたデータを Cloud Pub/Sub から BigQuery に送ります。
  • twitter-to-pubsub.py: 新着ツイートを Twitter から Cloud Pub/Sub に送ります。

  • 関連付けられた Dockerfile を使用してパイプライン イメージを構築するには、次のコマンドを実行します。[PROJECT_ID] は実際のプロジェクト ID に置き換えます。

    sudo docker build -t gcr.io/[PROJECT_ID]/pubsub_pipeline kube-pubsub-bq/pubsub/pubsub-pipe-image
    
  • イメージを構築した後、これを Google Container Registry に push して、Kubernetes からアクセスできるようにします。[PROJECT_ID] は実際のプロジェクト ID に置き換えます。

    sudo gcloud docker -- push gcr.io/[PROJECT_ID]/pubsub_pipeline
    

カスタム イメージを使用するよう Kubernetes の仕様ファイルを更新する

次に、仕様ファイル内の image の値を更新して、新しいイメージが使用されるようにします。次の 2 つのファイルを更新する必要があります。

kube-pubsub-bq/pubsub/twitter-stream.yaml
kube-pubsub-bq/pubsub/bigquery-controller.yaml
  1. 各ファイルを編集して、次の行を探します。

    gcr.io/google-samples/pubsub-bq-pipe:v3
    
  2. google-samples をプロジェクト ID に置き換えます。pubsub-bq-pipe:v3pubsub_pipeline に置き換えます。

  3. ファイルを保存して閉じます。

次のステップ

  • Google Cloud Platform のその他の機能を試すには、チュートリアルをご覧ください。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...