Análise de dados em tempo real com Kubernetes, Cloud Pub/Sub e BigQuery

Neste tutorial, você fará análises de dados do Twitter em tempo real usando um canal baseado em Google Compute Engine, Kubernetes, Google Cloud Pub/Sub e BigQuery. Esse tipo de análise tem várias aplicações úteis, como:

  • fazer análises de sentimentos;
  • identificar tendências e padrões gerais em dados;
  • monitorar o desempenho e o alcance das campanhas;
  • elaborar respostas em tempo real.

Veja no diagrama a seguir a arquitetura do aplicativo de exemplo.

A arquitetura do aplicativo de exemplo

O aplicativo de exemplo usa o Cloud Pub/Sub para fazer buffer dos tweets recebidos. Um pod replicado, definido por meio de uma implantação usa a Twitter Streaming API para realizar a leitura do fluxo de amostra público do Twitter, publicando os tweets recebidos em um tópico do Cloud Pub/Sub. Dois pods replicados adicionais se inscrevem no tópico. Quando novos dados de tweet são publicados no tópico, esses pods os adicionam ao BigQuery usando a BigQuery API.

Objetivos

  • Fazer o download do código-fonte.
  • Configurar e iniciar um cluster do Kubernetes.
  • Criar um aplicativo do Twitter.
  • Criar um tópico do Cloud Pub/Sub.
  • Iniciar o aplicativo Kubernetes.
  • Usar o BigQuery para consultar dados do Twitter.

Custos

Este tutorial usa vários componentes faturáveis do Google Cloud Platform, como:

  • cinco instâncias de máquina virtual n1-standard-1 do Google Compute Engine;
  • cinco discos permanentes de 10 GB do Google Compute Engine;
  • BigQuery.

O custo de executar este tutorial varia de acordo com o tempo de execução. Use a calculadora de preços para gerar uma estimativa de custo com base no uso previsto.

Novos usuários do Cloud Platform podem se qualificar para uma avaliação gratuita.

Antes de começar

Criar e configurar um novo projeto

Para trabalhar neste exemplo, é preciso ter um projeto com as APIs necessárias ativadas. No console do Google Cloud Platform, crie um projeto novo e depois ative as seguintes APIs:

  • Google Compute Engine API
  • API do Google Pub/Sub

Você receberá uma solicitação para ativar a cobrança caso não tenha feito isso anteriormente.

Este tutorial também usa as seguintes APIs que foram ativadas por padrão quando você criou o novo projeto:

  • BigQuery
  • Google Cloud Storage JSON API

Configurar o Google Cloud SDK

  1. Autentique usando sua conta do Google:

    gcloud auth login
    
  2. Defina o projeto padrão do Cloud SDK como o projeto que você selecionou na seção anterior deste tutorial. Substitua PROJECT_ID pelo seu código:

    gcloud config set project [PROJECT_ID]
    
  3. Atualize os componentes:

    gcloud components update
    
  4. Instale o binário kubectl:

    gcloud components update kubectl
    
  5. Adicione o diretório ao kubectl no caminho:

    export PATH=$PATH:/usr/local/share/google/google-cloud-sdk/bin/
    

Fazer o download do código de exemplo

Há duas maneiras de conseguir o código de amostra:

  • Fazer o download do arquivo zip. Descompacte o código em um diretório chamado kube-pubsub-bq.

  • Clonar o repositório GitHub. Em uma janela de terminal, execute o comando a seguir:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-bigquery-python.git kube-pubsub-bq
    

Fazer o download do Kubernetes

O Kubernetes é um sistema de articulação de código aberto para contêineres do Docker. Ele programa contêineres nas instâncias do cluster do Compute Engine, gerencia cargas de trabalho para garantir que o estado delas corresponda às intenções declaradas e organiza os contêineres por marcador e tipo de tarefa para facilitar o gerenciamento e a descoberta.

  1. Faça o download da versão mais recente do binário do Kubernetes.

  2. Descompacte o arquivo no mesmo diretório pai em que você instalou o código de exemplo. Como o arquivo .tar será descompactado em um diretório chamado kubernetes, não é preciso criar um novo diretório. Por exemplo, digite:

    tar -zxvf kubernetes.tar.gz
    

Como criar a tabela do BigQuery

Crie uma tabela do BigQuery para armazenar seus tweets. O BigQuery agrupa tabelas em camadas de abstração chamadas conjuntos de dados. Use a ferramenta de linha de comando bq, que vem incluída no Cloud SDK, para criar um novo conjunto de dados do BigQuery chamado rtda.

  1. Em uma janela de terminal, digite o seguinte comando:

    bq mk rtda
    
  2. Agora que você já definiu um conjunto de dados, crie uma nova tabela chamada tweets para os tweets recebidos. É necessário que cada tabela do BigQuery seja definida por um esquema. Para poupar tempo, este exemplo fornece um esquema predefinido, schema.json, que você pode usar para definir sua tabela. Para criar a tabela usando o esquema predefinido, digite o seguinte comando:

    bq mk -t rtda.tweets kube-pubsub-bq/bigquery-setup/schema.json
    
  3. Para verificar se o conjunto de dados e a tabela foram criados, use a IU da Web do BigQuery. Você verá o conjunto de dados na barra lateral esquerda. Se ele não estiver lá, verifique se está olhando no projeto certo. Quando você clicar na seta ao lado do nome do conjunto de dados, a nova tabela será exibida. Como alternativa, você pode executar os comandos a seguir para ver todos os conjuntos de dados em um projeto ou todas as tabelas em um determinado conjunto de dados:

    bq ls [PROJECT_ID]:
    
    bq ls [DATASET_ID]
    

    Neste caso, o código do conjunto de dados é rtda.

  4. Por fim, edite kube-pubsub-bq/pubsub/bigquery-controller.yaml. Atualize o value dos seguintes campos para refletir a configuração do BigQuery:

    • PROJECT_ID: o código do projeto.
    • BQ_DATASET: o conjunto de dados do BigQuery que contém sua tabela (rtda).
    • BQ_TABLE: a tabela do BigQuery que você acabou de criar (tweets).
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: bigquery-controller
  labels:
    name: bigquery-controller
spec:
  replicas: 2
  template:
    metadata:
      labels:
        name: bigquery-controller
    spec:
      containers:
      - name: bigquery
        image: gcr.io/google-samples/pubsub-bq-pipe:v5
        env:
        - name: PROCESSINGSCRIPT
          value: pubsub-to-bigquery
        # Change this to your pubsub topic
        - name: PUBSUB_TOPIC
          value: projects/your-project/topics/your-topic
        # Change this to your project ID.
        - name: PROJECT_ID
          value: xxxx
        # Change the following two settings to your dataset and table.
        - name: BQ_DATASET
          value: xxxx
        - name: BQ_TABLE
          value: xxxx

Como configurar e iniciar um cluster do Kubernetes

Não é necessária nenhuma instalação porque o Kubernetes é portátil, ou seja, ele usa um conjunto de scripts de shell para iniciar, encerrar e gerenciar a configuração.

  1. Edite kubernetes/cluster/gce/config-common.sh.

  2. Adicione os escopos do bigquery e do https://www.googleapis.com/auth/pubsub à definição NODE_SCOPES do arquivo.

Essa configuração permite que os nodes gravem em sua tabela do BigQuery. Salve a configuração e feche o arquivo.

Iniciar o cluster

Agora você está pronto para iniciar o cluster.

  1. Insira o seguinte comando:

    kubernetes/cluster/kube-up.sh
    

    Iniciar o cluster pode levar algum tempo. Durante o processo de inicialização, talvez você precise criar uma nova chave SSH para o Google Compute Engine ou, caso já tenha feito isso anteriormente, inserir a senha longa da chave SSH.

  2. Após a inicialização do cluster, insira os seguintes comandos para ver as instâncias em execução:

    kubectl get nodes
    kubectl cluster-info

Será exibida uma instância do Kubernetes mestre e quatro nodes do Kubernetes.

Como criar um aplicativo do Twitter

Para receber tweets do Twitter, você precisa criar um aplicativo do Twitter e adicionar os respectivos valores de chave/segredo e token/segredo à especificação do pod do Kubernetes do Twitter para o PubSub. Ao todo, você copiará quatro valores. Siga estas etapas:

  1. Crie um novo aplicativo do Twitter.
  2. Na página do Twitter Gerenciamento de aplicativos, navegue até a guia Chaves e tokens de acesso.
  3. Clique no botão Criar meu token de acesso para gerar um novo token.
  4. Edite kube-pubsub-bq/pubsub/twitter-stream.yaml.
  5. Substitua os valores a seguir pela chave do cliente e pelo segredo do cliente:
    • CONSUMERKEY
    • CONSUMERSECRET
  6. Substitua os valores a seguir pelo seu token de acesso e pelo segredo do token de acesso:
    • ACCESSTOKEN
    • ACCESSTOKENSEC

Por padrão, o aplicativo de exemplo realiza a leitura com base em uma amostra aleatória da stream pública do Twitter. Em vez disso, para filtrar um conjunto de palavras-chave:

  1. Edite kube-pubsub-bq/twitter-stream.yaml e mude o valor de TWSTREAMMODE para filter.
  2. Edite twitter-to-pubsub.py e substitua as palavras-chave definidas na variável track pelas palavras-chave que você quer usar para filtrar.
  3. Recrie a imagem do contêiner. Para ver instruções sobre como criar a imagem do contêiner, consulte o apêndice.

Como criar um tópico do Cloud Pub/Sub

Em seguida, crie um tópico do Cloud Pub/Sub no qual seus pods do Kubernetes possam publicar e se inscrever.

  1. Visite a seção Testar da página da Cloud Pub/Sub API.

  2. Ative a chave de Solicitações de autorização usando o OAuth 2.0 e clique em Autorizar.

  3. Digite o seguinte caminho no campo nome. Substitua [PROJECT_ID] pelo código do projeto.

    projects/[PROJECT_ID]/topics/new_tweets
    
  4. Clique em Executar para criar o novo tópico. Você pode verificar se o tópico foi criado verificando o código de resposta na seção Resposta. Se o código de resposta for 200 OK, o tópico foi criado com sucesso.

  5. Edite os dois arquivos a seguir:

    kube-pubsub-bq/pubsub/twitter-stream.yaml

    kube-pubsub-bq/pubsub/bigquery-controller.yaml

  6. Atualize os valores PUBSUB_TOPIC nos arquivos com seu novo tópico. Procure esta linha:

    value: projects/your-project/topics/your-topic
    

    Substitua your-project pelo código do projeto. Substitua your-topic por new_tweets.

Noções básicas sobre a imagem Docker para os pods Kubernetes

Este exemplo usa dois modelos diferentes de pods do Kubernetes para criar o canal de análise. Os arquivos de especificação apontam para uma imagem do Docker predefinida. Você não precisa fazer nada especial para usá-la. Caso queira criar a imagem, siga as etapas do apêndice.

A imagem do Docker contém dois scripts principais que realizam o trabalho para esta solução.

O código em twitter-to-pubsub.py faz streaming dos tweets recebidos do Twitter para o Cloud Pub/Sub.

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script uses the Twitter Streaming API, via the tweepy library,
to pull in tweets and publish them to a PubSub topic.
"""

import base64
import datetime
import os
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

import utils

# Get your twitter credentials from the environment variables.
# These are set in the 'twitter-stream.json' manifest file.
consumer_key = os.environ['CONSUMERKEY']
consumer_secret = os.environ['CONSUMERSECRET']
access_token = os.environ['ACCESSTOKEN']
access_token_secret = os.environ['ACCESSTOKENSEC']

PUBSUB_TOPIC = os.environ['PUBSUB_TOPIC']
NUM_RETRIES = 3

def publish(client, pubsub_topic, data_lines):
    """Publish to the given pubsub topic."""
    messages = []
    for line in data_lines:
        pub = base64.urlsafe_b64encode(line)
        messages.append({'data': pub})
    body = {'messages': messages}
    resp = client.projects().topics().publish(
            topic=pubsub_topic, body=body).execute(num_retries=NUM_RETRIES)
    return resp

class StdOutListener(StreamListener):
    """A listener handles tweets that are received from the stream.
    This listener dumps the tweets into a PubSub topic
    """

    count = 0
    twstring = ''
    tweets = []
    batch_size = 50
    total_tweets = 10000000
    client = utils.create_pubsub_client(utils.get_credentials())

    def write_to_pubsub(self, tw):
        publish(self.client, PUBSUB_TOPIC, tw)

    def on_data(self, data):
        """What to do when tweet data is received."""
        self.tweets.append(data)
        if len(self.tweets) >= self.batch_size:
            self.write_to_pubsub(self.tweets)
            self.tweets = []
        self.count += 1
        # if we've grabbed more than total_tweets tweets, exit the script.
        # If this script is being run in the context of a kubernetes
        # replicationController, the pod will be restarted fresh when
        # that happens.
        if self.count > self.total_tweets:
            return False
        if (self.count % 1000) == 0:
            print 'count is: %s at %s' % (self.count, datetime.datetime.now())
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    print '....'
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    print 'stream mode is: %s' % os.environ['TWSTREAMMODE']

    stream = Stream(auth, listener)
    # set up the streaming depending upon whether our mode is 'sample', which
    # will sample the twitter public stream. If not 'sample', instead track
    # the given set of keywords.
    # This environment var is set in the 'twitter-stream.yaml' file.
    if os.environ['TWSTREAMMODE'] == 'sample':
        stream.sample()
    else:
        stream.filter(
                track=['bigdata', 'kubernetes', 'bigquery', 'docker', 'google',
                       'googlecloud', 'golang', 'dataflow',
                       'containers', 'appengine', 'gcp', 'compute',
                       'scalability', 'gigaom', 'news', 'tech', 'apple',
                       'amazon', 'cluster', 'distributed', 'computing',
                       'cloud', 'android', 'mobile', 'ios', 'iphone',
                       'python', 'recode', 'techcrunch', 'timoreilly']
                )

O código em pubsub-to-bigquery.py faz streaming de dados armazenados em cache do Cloud Pub/Sub para o BigQuery.

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This script grabs tweets from a PubSub topic, and stores them in BiqQuery
using the BigQuery Streaming API.
"""

import base64
import datetime
import json
import os
import time

import utils

# Get the project ID and pubsub topic from the environment variables set in
# the 'bigquery-controller.yaml' manifest.
PROJECT_ID = os.environ['PROJECT_ID']
PUBSUB_TOPIC = os.environ['PUBSUB_TOPIC']
NUM_RETRIES = 3

def fqrn(resource_type, project, resource):
    """Returns a fully qualified resource name for Cloud Pub/Sub."""
    return "projects/{}/{}/{}".format(project, resource_type, resource)

def create_subscription(client, project_name, sub_name):
    """Creates a new subscription to a given topic."""
    print "using pubsub topic: %s" % PUBSUB_TOPIC
    name = get_full_subscription_name(project_name, sub_name)
    body = {'topic': PUBSUB_TOPIC}
    subscription = client.projects().subscriptions().create(
            name=name, body=body).execute(num_retries=NUM_RETRIES)
    print 'Subscription {} was created.'.format(subscription['name'])

def get_full_subscription_name(project, subscription):
    """Returns a fully qualified subscription name."""
    return fqrn('subscriptions', project, subscription)

def pull_messages(client, project_name, sub_name):
    """Pulls messages from a given subscription."""
    BATCH_SIZE = 50
    tweets = []
    subscription = get_full_subscription_name(project_name, sub_name)
    body = {
            'returnImmediately': False,
            'maxMessages': BATCH_SIZE
    }
    try:
        resp = client.projects().subscriptions().pull(
                subscription=subscription, body=body).execute(
                        num_retries=NUM_RETRIES)
    except Exception as e:
        print "Exception: %s" % e
        time.sleep(0.5)
        return
    receivedMessages = resp.get('receivedMessages')
    if receivedMessages is not None:
        ack_ids = []
        for receivedMessage in receivedMessages:
                message = receivedMessage.get('message')
                if message:
                        tweets.append(
                            base64.urlsafe_b64decode(str(message.get('data'))))
                        ack_ids.append(receivedMessage.get('ackId'))
        ack_body = {'ackIds': ack_ids}
        client.projects().subscriptions().acknowledge(
                subscription=subscription, body=ack_body).execute(
                        num_retries=NUM_RETRIES)
    return tweets

def write_to_bq(pubsub, sub_name, bigquery):
    """Write the data to BigQuery in small chunks."""
    tweets = []
    CHUNK = 50  # The size of the BigQuery insertion batch.
    # If no data on the subscription, the time to sleep in seconds
    # before checking again.
    WAIT = 2
    tweet = None
    mtweet = None
    count = 0
    count_max = 50000
    while count < count_max:
        while len(tweets) < CHUNK:
            twmessages = pull_messages(pubsub, PROJECT_ID, sub_name)
            if twmessages:
                for res in twmessages:
                    try:
                        tweet = json.loads(res)
                    except Exception, bqe:
                        print bqe
                    # First do some massaging of the raw data
                    mtweet = utils.cleanup(tweet)
                    # We only want to write tweets to BigQuery; we'll skip
                    # 'delete' and 'limit' information.
                    if 'delete' in mtweet:
                        continue
                    if 'limit' in mtweet:
                        continue
                    tweets.append(mtweet)
            else:
                # pause before checking again
                print 'sleeping...'
                time.sleep(WAIT)
        response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
                             os.environ['BQ_TABLE'], tweets)
        tweets = []
        count += 1
        if count % 25 == 0:
            print ("processing count: %s of %s at %s: %s" %
                   (count, count_max, datetime.datetime.now(), response))

if __name__ == '__main__':
    topic_info = PUBSUB_TOPIC.split('/')
    topic_name = topic_info[-1]
    sub_name = "tweets-%s" % topic_name
    print "starting write to BigQuery...."
    credentials = utils.get_credentials()
    bigquery = utils.create_bigquery_client(credentials)
    pubsub = utils.create_pubsub_client(credentials)
    try:
        # TODO: check if subscription exists first
        subscription = create_subscription(pubsub, PROJECT_ID, sub_name)
    except Exception, e:
        print e
    write_to_bq(pubsub, sub_name, bigquery)
    print 'exited write loop'

Como iniciar o aplicativo Kubernetes

Depois de criar e enviar por push a imagem do Docker (opcionalmente), você pode iniciar o aplicativo Kubernetes.

No Kubernetes, os pods {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} (e não os contêineres de aplicativos individuais) são as menores unidades implementáveis que podem ser criadas, programadas e gerenciadas.

Um conjunto de réplicas {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} garante que um número específico de réplicas de pods esteja em execução em determinado momento. Se houver muitas delas, ele encerrará algumas. Se houver muito poucas, ele iniciará mais. Em vez de apenas criar pods de singleton ou até criar pods em massa, um conjunto de réplicas substitui os pods que são excluídos ou encerrados por qualquer motivo, como no caso de falha do node.

Uma implantação {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} fornece atualizações declarativas para pods e conjuntos de réplicas. Basta descrever o estado que quiser em um objeto de implantação. O controlador de implantação alterará o estado real para o estado desejado, a uma taxa controlada.

Você usará as implementações para iniciar seu aplicativo do Kubernetes.

Iniciar a implantação do Cloud-Pub/Sub para o BigQuery

Comece iniciando a implantação dos pods que se inscreverão no tópico do Cloud Pub/Sub e transmitirão tweets para a tabela do BigQuery conforme eles se tornem disponíveis.

Antes de iniciar a implantação, pode ser útil examinar mais atentamente o kube-pubsub-bq/pubsub/bigquery-controller.yaml. Esse arquivo define a implantação e especifica um conjunto de réplicas com duas réplicas de pod. As características desses pods replicados são definidas no arquivo por meio de um modelo de pod.

  1. Execute o comando a seguir para iniciar os pods replicados:

    kubectl create -f kube-pubsub-bq/pubsub/bigquery-controller.yaml
    
  2. Para verificar se os pods estão sendo executados, digite este comando:

    kubectl get pods
    

    Pode demorar cerca de 30 segundos para que os novos pods sejam movidos de ContainerCreating para Running. Na lista, você verá dois pods do Cloud Pub/Sub para o BigQuery com o marcador bigquery-controller.

  3. Você pode executar:

    kubectl get deployments

para ver as implementações definidas pelo sistema e a quantidade de réplicas especificada para cada uma.

Iniciar a implementação do Twitter para o Cloud Pub/Sub

Depois que os pods de canal do Cloud-Pub/Sub para o BigQuery estiverem funcionando, inicie a implantação do pod que resgata os tweets e os publica no tópico do Cloud Pub/Sub. Assim como o kube-pubsub-bq/pubsub/bigquery-controller.yaml, o arquivo de especificação do Twitter para o Cloud Pub/Sub, kube-pubsub-bq/pubsub/twitter-stream.yaml, também define uma implantação. No entanto, desta vez, a implantação pede apenas um pod replicado, já que o Twitter só permite uma conexão de API de streaming ao mesmo tempo.

  1. Para iniciar a implantação:

    kubectl create -f kube-pubsub-bq/pubsub/twitter-stream.yaml
    
  2. Digite o comando a seguir para verificar se todos os pods estão sendo executados. Mais uma vez, pode demorar cerca de 30 segundos para que o novo pod seja movido de ContainerCreating para Running.

    kubectl get pods
    

Além dos pods mencionados na etapa anterior, será exibido um novo pod chamado twitter-stream. Parabéns! O canal de análise está funcionando.

  1. Você pode executar novamente:

    kubectl get deployments

para ver as implementações definidas pelo sistema e a quantidade de réplicas especificada para cada uma. O resultado será semelhante a:

  NAME                   DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
  bigquery-controller    2         2         2            2           2m
  twitter-stream         1         1         1            1           2m

Como consultar a tabela do BigQuery

Abra a IU da Web do BigQuery e clique em Escrever consulta para começar a escrever uma nova consulta. Para verificar se está funcionando, basta executar uma consulta simples da seguinte maneira:

SELECT
  text
FROM
  [rtda.tweets]
LIMIT
  1000 IGNORE CASE;

Deixe o canal coletar tweets por um tempo. Algumas horas devem bastar, mas quanto mais você deixá-lo funcionando, mais rico será o conjunto de dados. Depois que você tiver mais dados na tabela do BigQuery, tente executar algumas consultas de amostra interessantes.

Este exemplo de consulta demonstra como encontrar os tweets na tabela que foram reenviados mais vezes, filtrando por um termo específico (neste caso, "android"):

SELECT
  text,
  MAX(retweeted_status.retweet_count) AS max_retweets,
  retweeted_status.user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'android'
GROUP BY
  text,
  retweeted_status.user.screen_name
ORDER BY
  max_retweets DESC
LIMIT
  1000 IGNORE CASE;

Talvez seja interessante filtrar os tweets coletados por um conjunto de termos. Esta consulta filtra as palavras "Kubernetes", "BigQuery", "Cloud Pub/Sub" ou "Twitter":

SELECT
  created_at,
  text,
  id,
  retweeted_status.retweet_count,
  user.screen_name
FROM
  [rtda.tweets]
WHERE
  text CONTAINS 'kubernetes'
  OR text CONTAINS 'BigQuery'
  OR text CONTAINS 'cloud pub/sub'
  OR text CONTAINS 'twitter'
ORDER BY
  created_at DESC
LIMIT
  1000 IGNORE CASE;

Esta consulta procura uma correlação entre o número de favoritos e o número de tweets reenviados em seu conjunto de tweets:

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c
FROM [rtda.tweets]
GROUP BY lang
HAVING c > 2000000
ORDER BY 1

Indo ainda mais fundo, você também pode investigar se os falantes de um determinado idioma preferem adicionar aos favoritos em vez de reenviar o tweet ou vice-versa:

SELECT
  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
  lang,
COUNT(*) c,
AVG(retweeted_status.retweet_count) avg_rt,
AVG(retweeted_status.favorite_count) avg_fv,
AVG(retweeted_status.retweet_count)/AVG(retweeted_status.favorite_count) ratio_rt_fv
FROM [rtda.tweets]
WHERE retweeted_status.retweet_count > 1 AND retweeted_status.favorite_count > 1
GROUP BY lang
HAVING c > 1000000
ORDER BY 1;

Como fazer a limpeza

Os marcadores facilitam a seleção dos recursos que você quer parar ou excluir. Por exemplo:

kubectl delete deployment -l "name in (twitter-stream, bigquery-controller)"

Caso queira encerrar as instâncias do cluster, execute este comando:

kubernetes/cluster/kube-down.sh

Isso excluirá todas as instâncias do cluster.

Apêndice: como criar e enviar a imagem do Docker

Os pods do Kubernetes exigem uma imagem do Docker que inclua os scripts do aplicativo e as respectivas bibliotecas de suporte. Essa imagem é usada para iniciar os pods do Twitter para o Cloud Pub/Sub e do Cloud Pub/Sub para o BigQuery que fazem parte do canal de análise. O Dockerfile localizado no diretório kube-pubsub-bq/pubsub/pubsub-pipe-image especifica essa imagem da seguinte maneira:

FROM python:2

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install python-dateutil

ADD twitter-to-pubsub.py /twitter-to-pubsub.py
ADD pubsub-to-bigquery.py /pubsub-to-bigquery.py
ADD controller.py /controller.py
ADD utils.py /utils.py

CMD python controller.py

Este Dockerfile primeiro instrui o daemon do Docker a instalar algumas bibliotecas Python necessárias para a nova imagem:

  • tweepy, um wrapper Python para a Twitter API.
  • As bibliotecas Python da Google API.
  • python-dateutil, uma extensão para o módulo datetime padrão de Python.

Em seguida, são adicionados quatro scripts em pubsub-pipe-image:

  • controller.py fornece um ponto de execução comum para os outros dois scripts Python na imagem do canal.
  • utils.py: contém algumas funções auxiliares.
  • pubsub-to-bigquery.py faz streaming de dados armazenados em cache do Cloud Pub/Sub para o BigQuery.
  • twitter-to-pubsub.py faz streaming de tweets recebidos do Twitter para o Cloud Pub/Sub.

  • Para criar a imagem do canal usando o Dockerfile associado, execute o comando a seguir. Substitua [PROJECT_ID] pelo código do seu projeto.

    sudo docker build -t gcr.io/[PROJECT_ID]/pubsub_pipeline kube-pubsub-bq/pubsub/pubsub-pipe-image
    
  • Depois de criar a imagem, envie-a por push ao Google Container Registry para que o Kubernetes possa acessá-la. Substitua [PROJECT_ID] pelo código do seu projeto.

    sudo gcloud docker -- push gcr.io/[PROJECT_ID]/pubsub_pipeline
    

Atualizar os arquivos de especificação do Kubernetes para usar uma imagem personalizada

Em seguida, atualize os valores de image nos arquivos de especificação para usar sua nova imagem. É preciso atualizar estes dois arquivos:

kube-pubsub-bq/pubsub/twitter-stream.yaml
kube-pubsub-bq/pubsub/bigquery-controller.yaml
  1. Edite cada arquivo e procure esta linha:

    gcr.io/google-samples/pubsub-bq-pipe:v3
    
  2. Substitua google-samples pelo código do projeto. Substitua pubsub-bq-pipe:v3 por pubsub_pipeline.

  3. Salve e feche os arquivos.

Próximas etapas

  • Conheça outros recursos do Google Cloud Platform. Veja nossos tutoriais.
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…