Cloud Pub/Sub と Compute Engine を使用したメディア処理

このチュートリアルでは、長時間実行される可能性のあるタスクを調整するキューイング システムとして Google Cloud Pub/Sub を使用する方法をご紹介します。このチュートリアルの処理タスクでは、アップロードした音声ファイルを分析して発声した音声のトランスクリプトを作成してから、後で分析するためにトランスクリプトを Google BigQuery に保存します。

次の図にシステムの概要を示します。

ファイルのアップロードと処理の概要手順は次のテキストで説明しています。

このシステムでは以下を行います。

  1. ユーザーが音声ファイルを Google Cloud Storage バケットにアップロードします。
  2. 新しいファイルが追加されると、Cloud Storage によりオブジェクト変更通知が Google App Engine で実行しているウォッチャー アプリケーションに送信されます。
  3. ウォッチャー アプリケーションで、Cloud Pub/Sub サブスクリプションで管理されるキューに新しいメディアを push するタスクが追加されます。
  4. Google Compute Engine で実行しているワーカー アプリケーションによってタスクがキューから pull され、Google Speech API を使用して音声がテキストに変換されます。
  5. 音声ファイルがテキストに変換されると、メッセージと信頼情報が BigQuery テーブルに追加されます。

このチュートリアルで使われるアーキテクチャの完全な説明については、長時間実行されるタスクにおける Cloud Pub/Sub の使用をご覧ください。

目標

このチュートリアルでは、以下の手順を紹介します。

  • データを記録する BigQuery データセットとテーブルを作成する。
  • Cloud Storage バケットを作成する。
  • Cloud Pub/Sub でタスクキューを管理するトピックとサブスクリプションを設定する。
  • オブジェクト変更通知を使用して新しいメディア ファイルが入力バケットに追加されたことを検出し、タスクをキューに追加する App Engine にウォッチャー アプリをデプロイする。
  • キューからタスクを pull するワーカーアプリを Compute Engine で起動し、Speech API を使用してメディア ファイルを処理する。

費用

このチュートリアルでは、以下を含む、Google Cloud Platform の課金対象となるコンポーネントを使用します。

  • BigQuery
  • Cloud Storage 標準ストレージ
  • Cloud Pub/Sub
  • App Engine
  • Compute Engine

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。表示されたリンクは、このチュートリアルで使用されるプロダクトの費用見積もりを示します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP Console プロジェクトをセットアップします。

    プロジェクトをセットアップする

    クリックして、以下を行います。

    • プロジェクトを作成または選択します。
    • プロジェクトにCloud Pub/Sub API を有効にします。
    • サービス アカウントを作成します。
    • JSON として秘密鍵をダウンロードします。

    これらのリソースは、GCP Console でいつでも表示および管理できます。

  3. Cloud SDK をインストールし、初期化します。
    SDK のインストール

サンプルコードのクローンを作成する

次のコマンドを使用して GitHub からサンプルコードのクローンを作成します。

git clone https://github.com/GoogleCloudPlatform/pubsub-media-processing/

BigQuery テーブルを作成する

  1. bq ツールがインストールされていることを確認します。
  2. ターミナル ウィンドウで、media_processing という名前のデータセットを作成します。

    bq mk media_processing
    
  3. transcript 列と confidence 列が含まれる speech という名前のテーブルを作成します。

    bq mk --schema transcript:string,confidence:float \ -t media_processing.speech

Cloud Storage バケットを作成する

このチュートリアルではアップロードされたファイルを受け取る Cloud Storage バケットが必要です。

バケットに名前を付ける場合は [PREFIX]-mediap-dropzone のような名前を使用し、[PREFIX] を好みの値で置き換えます。

次の手順でバケットを作成します。

  1. GCP Console で、Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

  2. [バケットを作成] をクリックします。
  3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
  4. [作成] をクリックします。

Cloud Pub/Sub トピックとサブスクリプションを作成する

Cloud Pub/Sub トピックとサブスクリプションにより、受信リクエストの処理を調整するキューにプログラムからアクセスできるようになります。次の手順でトピックとサブスクリプションを作成します。

  1. Google Cloud Platform Console で Pub/Sub ページに移動します。

    PUB/SUB ページに移動

  2. [トピックを作成] を選択し、[名前] のテキスト末尾に「mediap-topic」と入力します。

  3. [作成] をクリックします。

  4. [トピック] リストで新しいトピックの右にある矢印をクリックします。

  5. [新しいサブスクリプション] を選択します。

  6. [サブスクリプション名] で、テキスト フィールドのテキスト末尾に「mediap-sub」を追加します。
  7. [配信タイプ] を [pull] に設定します。
  8. [その他のオプション] をクリックします。
  9. [確認応答期限] を 60 秒に設定します。

  10. [作成] をクリックします。

ウォッチャー アプリケーションを設定する

このウォッチャー アプリケーションは、新しいメディア ファイルが [PREFIX]-mediap-dropzone バケットにアップロードされると、オブジェクト変更通知を受け取ります。アプリケーションは Cloud Pub/Sub のキューにタスクを追加することで通知に応答します。

ウォッチャー アプリケーションを設定するにはいくつかの手順があります。

  1. ウォッチャー アプリケーションが実行されるプロジェクトにサービス アカウントを作成します。
  2. サービス アカウントを使用するように gsutil コマンドライン ツールを設定します。
  3. ウォッチャー アプリケーションのドメインをホワイトリストに登録します。
  4. ウォッチャー アプリケーションをデプロイします。

このセクションの後半では各手順の詳細を説明します。

サービス アカウントを作成する

  1. Cloud Platform Console で [サービス アカウント] のページを開きます。

    [サービス アカウント] ページに移動

  2. [サービス アカウントを作成] を選択します。

  3. [サービス アカウント名] に、「mediap-service-account」と入力します。
  4. [役割] に対して [プロジェクト] > [オーナー] を選択します。

  5. サービス アカウント ID を書き留めておきます。この値は gsutil の設定で使用します。サービス アカウント ID の形式は [SERVICE_ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com にします。[SERVICE_ACCOUNT_NAME] を追加の文字に置き換え、一意の値にすることができます。

  6. [新しい秘密鍵の提供] をオンにします。
  7. [キーのタイプ] に [P12] を選択します。
  8. [作成] をクリックします。

    コンソールにより秘密鍵がパソコンにダウンロードされます。次のセクションで使用するのでファイル名とパスを書き留めておきます。

  9. [閉じる] をクリックします。

サービス アカウントを使用するように gsutil を設定する

Google Cloud SDK を使用してサービス アカウントを認証情報アカウントとして追加できます。認証情報を追加すると、その後のすべての gsutil コマンドでサービス アカウント認証情報が使用されるようになります。

  1. Cloud SDK を更新します。

    gcloud components update
    
  2. サービス アカウントを追加します。

    gcloud auth activate-service-account [SERVICE_ACCOUNT_ID] --key-file [PATH_TO_KEY]
    
    • [SERVICE_ACCOUNT_ID] は作成したサービス アカウントの ID です。

    • [PATH_TO_KEY] はダウンロードした P12 キーへのフルパスとファイル名です。

  3. サービス アカウントがアクティブな認証情報アカウントであることを確認します。

    gcloud auth list
    

サービス アカウント認証情報の横にテキスト (active) が表示されるはずです。

ウォッチャー アプリケーションをホワイトリストに登録してデプロイする

オブジェクト変更通知を受信するには、ウォッチャー アプリケーションの URL が通知チャンネルのプロジェクトによってホワイトリストに登録される必要があります。手順は次のとおりです。

  1. Search Console を開きます。

    Search Console を開く

  2. [プロパティの追加] を選択します。

  3. プルダウン リストを [ウェブサイト] に設定したままにします。

  4. テキスト フィールドに以下の URL を入力します。

    https://[PROJECT_ID].appspot.com/

始める前にで選択または作成した Cloud Platform プロジェクトの識別子で [PROJECT_ID] を置き換えます。

  1. HTML 確認ファイルをダウンロードし、pubsub-media-processing/01_watcher フォルダに保存します。

  2. ファイル pubsub-media-processing/01_watcher/app.yaml を開き、すべての文字列 google[SOME_STRING].html を HTML 確認ファイルのファイル名に変更します。

  3. ウォッチャー アプリをデプロイし、ファイルをパブリッシュします。pubsub-media-processing/01_watcher/ ディレクトリで以下のコマンドを実行します。[PROJECT_ID] は、始める前にで選択または作成した Cloud Platform プロジェクトの識別子です。

    gcloud app deploy 01_watcher/app.yaml --project [PROJECT_ID]

  4. Search Console の [ブラウザで [LINK] にアクセスして、アップロードが正しく行われたことを確認します] 行でリンクをクリックします。

  5. [私はロボットではありません] の横にあるチェックボックスをオンにします。

  6. [確認] を選択します。

  7. Cloud Platform Console で [ドメインの確認] ページを開きます。

    [ドメインの確認] ページを開く

  8. [ドメインの追加] を選択します。

  9. [ドメイン] で「[PROJECT_ID].appspot.com」と入力します。[PROJECT_ID] は、始める前にで選択または作成した Cloud Platform プロジェクトの識別子です。

  10. [ドメインを追加] をクリックします。

ウォッチャー アプリケーションを入力バケットにリンクする

問題がなければ、URL https://[PROJECT_ID].appspot.com/media-processing-hook から webhook にアクセスできるようになるはずです。webhook は POST リクエストで呼び出されます。

オブジェクト変更通知を設定する最後の手順では、gsutil コマンドライン ユーティリティを使用して通知を開始します。

次のコマンドを実行します。このとき、始める前にで選択または作成した Cloud Platform プロジェクトの識別子で [PROJECT_ID] を置き換え、Cloud Storage バケットを作成するで一意のバケット名を作成するために指定した接頭辞で [PREFIX] を置き換えます。

  <pre class="devsite-click-to-copy">gsutil notification watchbucket https://[PROJECT-ID].appspot.com/media-processing-hook gs://[PREFIX]-mediap-dropzone</pre>

メディア ファイルを処理するワーカーを作成する

次の手順では、Cloud Pub/Sub キューからタスクを pull してメディア ファイルを処理するワーカーを作成し、設定します。

インスタンス テンプレートを作成する

起動スクリプトを使って必要なコンポーネントをインストールするインスタンス テンプレートを作成します。

起動スクリプトによって次のアクションが実行されます。

  1. ローカルに作業フォルダを作成します。
  2. 必要なライブラリをダウンロードします。
  3. ワーカー スクリプトを実行する仮想環境をインストールします。
  4. 次の 2 つの Python スクリプトをダウンロードして実行します。

    • worker.py はキューからメッセージを pull し、関連するメディア ファイルを処理します。
    • healthor.py はインスタンス グループのヘルスチェックで使用します。ヘルスチェックはこの後の手順で設定します。
  5. Cloud Platform Console で [インスタンス テンプレート] ページを開きます。

    [インスタンス テンプレート] ページを開く

  6. [インスタンス テンプレートを作成] を選択します。

  7. [名前] でデフォルト値を mediap-tpl で置き換えます。

  8. [マシンタイプ] と [ブートディスク] の値はデフォルト値のままにします。本番レベルのメディア処理アプリケーションの場合、CPU の量を増やしておくことをおすすめします。

  9. [ID と API へのアクセス] で [すべての Cloud API に完全アクセス権を許可] を選択します。これは Speech API を使用するために必要です。

  10. [管理、ディスク、ネットワーキング、SSH 認証鍵] を選択します。

  11. [自動化] で以下のスクリプトを [起動スクリプト] テキスト ボックスにコピーします。このとき、Cloud Storage バケットを作成するで一意のバケット名を作成するために指定した接頭辞で [PREFIX] を置き換えます。

      #! /bin/bash
      mkdir /tmp/mediap
      cd /tmp/mediap
      wget -q
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/02_workers/requirements.txt
      wget -q
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/02_workers/mediator.py
      wget -q
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/02_workers/worker.py
      wget -q
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/02_workers/recurror.py
      curl https://bootstrap.pypa.io/get-pip.py | sudo python
      sudo pip install virtualenv
      virtualenv venv
      source venv/bin/activate
      venv/bin/pip install -r requirements.txt
      python worker.py --subscription mediap-sub --dataset_id=media_processing --table_id=speech
      

インスタンス グループを作成する

インスタンス テンプレートの作成が完了したのでインスタンス グループを作成できます。

  1. Cloud Platform Console で [インスタンス グループ] ページを開きます。

    [インスタンス グループ] ページを開く

  2. [インスタンス グループを作成] を選択します。

  3. [名前] に mediap-igr を設定します。
  4. [インスタンス定義] を選択し、mediap-tpl テンプレートを選択します。
  5. [インスタンス数] を 3 に設定します。

  6. [作成] を選択します。

自動スケーリングをワーカーに追加する(省略可)

自動スケーリングを追加すると、キューに追加されたタスクを処理するだけの十分なワーカーが確保されます。

次のコマンドを実行してオートスケーラーを作成します。

gcloud alpha \
compute instance-groups managed set-autoscaling \
mediap-igr --zone us-central1-f \
--min-num-replicas 0 --max-num-replicas 10 \
--queue-scaling-cloud-pub-sub \
 topic=mediap-topic,subscription=mediap-sub \
--queue-scaling-acceptable-backlog-per-instance 5 \
--target-cpu-utilization 1

メディア ファイルを活発に処理するインスタンスが自動スケーリングによって終了しないようにするには、--queue-scaling-acceptable-backlog-per-instance--target-cpu-utilization の 2 つのパラメータが重要になります。これらのパラメータは Cloud Pub/Sub キューのサイズと各 VM の平均 CPU をそれぞれ指定します。

このチュートリアルのメディア処理は高速なので、キューの最大バックログ メッセージ数である 5 は十分足りるはずです。このソリューションを拡張して、動画編集などのより複雑な処理を使用する場合は、queue-scaling-acceptable-backlog-per-instance の値を下げることを検討してください。

システムをテストする

メディア処理システムのテスト用に、Speech API の要件を満たす指定の音声ファイルを使用できます。次のように、Google Cloud の公開バケットからお使いの Dropzone バケットにファイルをコピーし、デモを開始できます。[YOUR_BUCKET_ID] を正しい値で置き換えます。

gsutil -m cp -r gs://solutions-public-assets/media-processor/audio_samples/* gs://[YOUR_BUCKET_ID]

バックグラウンドで以下の処理が行われます。

  1. gsutil により、公開バケットから Dropzone バケットにファイルがコピーされます。
  2. バケットは App Engine webhook を呼び出すオブジェクト通知を使用して監視されます。
  3. コピーされた各ファイル用の Cloud Pub/Sub メッセージが webhook で作成されます。
  4. 1 つのワーカーで 1 つのメッセージが選択され、ファイルがテキストに変換されて、そのテキストが BigQuery に保存されます。
  5. この処理は Cloud Pub/Sub キューのすべてのメッセージに対して繰り返されます。

音声ファイルの処理が完了すると、以下の例のような追加の行が BigQuery テーブルに表示されます。

BigQuery テーブルには、音声文字変換および信頼値が表示されます。

コードを確認する

以下のセクションでは、このチュートリアルに付属のサンプルコードの詳細について説明します。

メッセージの形式

オブジェクト変更通知と Google Cloud Functions のどちらを使用してもパブリッシュされたメッセージのコンテンツは同じであり、バケット名、メディアリンク、メディア名、メディア MIME タイプなどのフィールドが含まれます。

このソリューションで使用するメッセージ形式を以下のスニペットに示します。

# Publish message to topic
message = json.dumps({
    'selfLink': data['selfLink'],
    'bucket'  : data['bucket'],
    'name'    : data['name'],
    'type'    : data['contentType']
})
logging.debug(message)

新しいメディア ファイルを監視する

このキューのワークフローは以下のとおりです。

  1. パブリッシュ。ウォッチャーによってメッセージが Cloud Pub/Sub トピックにパブリッシュされます。

    ウォッチャー アプリケーションの次のコード スニペットによって、新しいメディア ファイルの詳細を含む Cloud Pub/Sub にメッセージがパブリッシュされます。

    body = {
        'messages': [
            {'data': base64.b64encode(message)}
        ]
    }
    resp = pubsub_client.projects().topics().publish(
        topic='projects/%s/topics/%s' % (PROJECT_ID, PUBSUB_TOPIC),
        body=body
    ).execute()

  2. pull。すべてのワーカーにより次のメッセージが pull されます。先に取得したものから処理されます。

    Compute Engine で実行中のワーカー アプリケーションの次のコード スニペットにより、次のメッセージがキューから pull ダウンされます。

    # Pull the data when available.
    resp = pubsub_client.projects().subscriptions().pull(
        subscription=sub,
        body={
            "maxMessages": toprocess
        }
    ).execute()

  3. 確認応答期限。メディア ファイルの処理にかかる期間は正確に把握できないため、確認応答期限を設定するのは困難です。

    • 時間制限を長く、たとえば 3 時間に設定すると、ワーカーがメッセージに失敗するたびにメッセージが反映されるまで 3 時間待機することになります。

    • 時間制限を短く、たとえば 10 秒に設定すると、メディアが処理されている最中にメッセージが削除される可能性があります。この状態でワーカーが失敗すると永久にメッセージは失われます。

    メッセージが迅速に処理され、失われないようにするために、ワーカーを使ってメディアの処理中に確認応答期限を定期的に更新できます。

    ワーカー アプリケーションの次のコード スニペットは、確認応答期限を延長します。

    #Increment the ackDeadLine to make sure that file has time to be processed
    pubsub_client.projects().subscriptions().modifyAckDeadline(
        subscription=sub,
        body={
            'ackIds': ack_ids,
            'ackDeadlineSeconds': refresh
        }
    ).execute()

  4. メッセージの確認応答。メディアが処理されると、Cloud Pub/Sub によってメッセージがキューから削除されるようにワーカーはメッセージの確認応答を行います。

    # Delete the message in the queue by acknowledging it.
    pubsub_client.projects().subscriptions().acknowledge(
        subscription=sub,
        body={
            'ackIds': [ack_id]
        }
    ).execute()

以下の図は、処理が正常に完了した場合を示しています。

処理が正常に完了しました。

次の図は、ワーカーが失敗した場合を示しています。

処理に失敗しました。

メディア ファイルを処理する

次のコードはワーカーによるメディア ファイルの処理に使用されます。

Speech API を呼び出す

Google Speech API で音声入力が分析されると、結果の信頼度値とともに 1 つ以上のトランスクリプトが返されます。

speech_body={
    'config': {
        'encoding': 'FLAC',
        'sampleRate': 16000,
        'languageCode': self.filename.split('_')[0]
    },
    'audio': {
        'uri': "gs://{0}/{1}".format(self.dropzone_bucket, self.filename)
    }
}

BigQuery テーブルにレスポンスを追加する

次の snippet は BigQuery のテーブルにレスポンスを追加する方法を示しています。

def write_to_bq(self, transcript, confidence):
    """Write to BigQuery"""
    Logger.log_writer("Writing - {} - to BigQuery".format(transcript))
    body = {
        "rows":[{
            "json": {
                "transcript": transcript,
                "confidence": confidence
            }
        }]
    }

    response = self.bq_client.tabledata().insertAll(
        projectId=self.project_id,
        datasetId=self.dataset_id,
        tableId=self.table_id,
        body=body
    ).execute()

クリーンアップ

メディア処理のチュートリアルが終了したら、Google Cloud Platform で作成したリソースについて今後料金が発生しないようにクリーンアップすることができます。以下のセクションで、このようなリソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除する手順は次のとおりです。

  1. GCP Console で [プロジェクト] ページに移動します。

    プロジェクト ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

インスタンスを削除する

Compute Engine インスタンスを削除する手順は次のとおりです。

  1. GCP Console の [VM インスタンス] ページに移動します。

    [VM インスタンス] ページに移動

  2. 削除したいインスタンスの隣のチェックボックスを選択します。
  3. ページの上部にある、[削除] ボタンをクリックし、インスタンスを削除します。

デフォルトの App Engine アプリを停止する

デフォルト バージョンの App Engine アプリを削除するには、お使いのプロジェクトを削除するしかありません。ただし、GCP Console でデフォルト バージョンを停止することができます。このアクションによって、バージョンに関連付けられているすべてのインスタンスがシャットダウンします。これらのインスタンスは、必要に応じてあとで再起動できます。

App Engine スタンダード環境では、お使いのアプリに手動スケーリングまたは基本スケーリングがある場合、デフォルト バージョンを停止させることができます。

Cloud Storage バケットを削除する

Cloud Storage バケットを削除する手順は次のとおりです。

  1. GCP Console で、Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

  2. 削除したいバケットの隣にあるチェックボックスをクリックします。
  3. ページの上部にある [削除] ボタンをクリックし、バケットを削除します。

BigQuery でデータセットを削除する

BigQuery でデータセットを削除するには:

  1. BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI

  2. ナビゲーションで、作成したデータセットの名前の上にカーソルを合わせます。

  3. ナビゲーションのデータセット名の横にある下向き矢印アイコン 下矢印の画像 をクリックし、[Delete dataset] をクリックします。

  4. [Delete dataset] ダイアログ ボックスで、データセットの名前を入力し、[OK] をクリックして削除コマンドを確認します。

次のステップ

このサンプルで以下を拡張およびビルドできます。

  • モニタリングの追加。本番環境では Cloud Pub/Sub キューのサイズ、ワーカーの自動スケーリング、その他の指標をモニタリングすることをおすすめします。詳細については、PubSub 用 Cloud Monitoring APIカスタム指標用 Cloud Monitoringクラウド オートスケーラーをご覧ください。

  • 自動修復の追加ヘルスチェックはロードバランサとインスタンス グループの両方に適用できます。ロードバランサで失敗したヘルスチェックが検出されると、トラフィックは正常なインスタンスにリダイレクトされます。インスタンス グループ マネージャで失敗したヘルスチェックが検出されると、インスタンスはマネージャによって終了され、再起動されます。

  • 元のメディア ファイルのバックアップ。アーカイブ目的や後で追加の処理を行うために、元のメディア ファイルを保存しておくことをおすすめします。Cloud Storage のライフサイクル管理機能を使用して、メディア ファイルを標準から Nearline Storage に移動し、ストレージ費用を削減できます。

  • 音声処理の拡張Google Natural Language API などの NLP ライブラリを使用して、追加処理を音声ファイルに追加できます。

  • その他のメディアタイプの処理。このサンプルでは音声を処理しますが、このアーキテクチャを使用して画像や動画などのその他のメディアタイプも処理できます。

  • マルチスレッドの追加。Google Cloud Speech API の呼び出しは高速なのでシングルスレッドのワーカー アプリケーションで問題なく行うことができます。処理の作業負荷が高い場合、マルチスレッド化したアプリケーションを使用してパフォーマンスを向上できます。

  • Google Cloud Functions の使用。App Engine で実行中のウォッチャー アプリケーションを置き換えて Cloud Functions を代わりに使用できます。Cloud Functions はさまざまなイベントでトリガー可能な(この場合は、特定のバケットにおける変更)ロジックのデプロイを簡略化します。Cloud Functions では、Node.js を使用してメッセージを Cloud Pub/Sub トピックにパブリッシュできます。この方法を表すコードは、このソリューションに関連する GitHub レポジトリにあります。Cloud Functions は現在クローズド アルファ版であることにご注意ください。

  • Google Cloud Platform のその他の機能を試すには、チュートリアルをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...