Google Cloud Platform

マルチクラウド : Google Cloud Endpoints と AWS Lambda のインテグレーション

マルチクラウド戦略は、異なるクラウド プロバイダーの長所を使い分けたり、きわめて重要なワークロードを分散させたりすることを可能にします。たとえば、既存のアプリケーションには Amazon Web Services(AWS)を使っているものの、その一方で Google の Cloud Vision や Cloud Video IntelligenceData Loss Prevention といった強力な API を使いたいとか、Google のビッグデータと機械学習の機能によってデータを分析し知見を引き出したいときに役立ちます。

今回は、Google Cloud Platform(GCP)と AWS のワークロードを統合するパターンの 1 つ(他にもたくさんあります)として、Google Cloud Endpoints と AWS Lambdaを使用する方法を紹介します。アーキテクチャを図示すると次のようになります。

32787.png

Cloud Endpoints を使用すると、OpenAPI Specification に基づいて API の開発、デプロイ、保護、モニタリングが行えます。さらに、アプリケーションの API は、Google App EngineGoogle Container EngineGoogle Compute Engine などのバックエンドで実行できます。

AWS Lambda は、他の AWS サービスで発生したイベントへの応答という形でコードを実行できます。たとえば、Amazon S3 バケットにオブジェクトが追加されたときや、Amazon SNS トピックに通知が届いたとき、DynamoDB Stream のレコードを処理したいときに、AWS Lambda 関数が実行されるように設定することが可能です。

注意 : Amazon S3 から Google Cloud にデータを転送するときは、Amazon S3 のリクエストとデータ転送が適用されます。

この投稿では、Cloud Endpoints API を作成し、AWS Lambda 関数からそれを呼び出します。このサンプルの完全なソース コードは GitHub からダウンロードできます。以下では、このソリューションの実装方法を説明します。

Cloud Endpoints のセットアップ

ソリューションを実装するための最初のステップは、App Engine flexible environment への Cloud Endpoints API のデプロイです。GCP のドキュメントには、App Engine flexible environment、Container Engine などに API をデプロイする方法をまとめたクイックスタート ページが用意されています。

GCP プロジェクトをまだ持っていない場合は作成します。

  • GCP Console を使用して、新しい GCP プロジェクトと App Engine アプリケーションを作成します。
  • プロンプトが表示されたら、App Engine アプリケーションを実行するリージョンを選択し、課金を有効にします。
  • あとで必要になるので、プロジェクトの ID をメモしておきます。
  • Google Cloud Shell を起動します。

ソース コードの入手

Google Cloud Shell で次のことを行います。

このサンプルは、API バックエンドを実装するために Python を使っています。

アプリケーション コードの実装

次に、API バックエンドのためのコードを実装します。サンプル プログラムの aeflex-endpoints は Flask アプリケーションです。main.py ファイルには、processmessage エンドポイントが呼び出されたときに実行されるアプリケーション コードを定義します。

  @app.route('/processmessage', methods=['POST'])
     def process():
         """Process messages with information about S3 objects"""
         message = request.get_json().get('inputMessage', '')
         # add other processing as needed
         # for example, add event to PubSub or 
         # download object using presigned URL, save in Cloud Storage, invoke ML APIs
         return jsonify({'In app code for endpoint, received message': message})

Cloud Endpoints API の構成とデプロイ

API の定義は OpenAPI Specification を使って行います。aeflex-endpoints サンプルの場合、API の OpenAPI Specification は openapi.yaml ファイルに格納されています。個々のフィールドの詳しい説明については、Swagger オブジェクトのドキュメントをご覧ください。

openapi.yaml ファイルは、API を処理するホストを宣言します。

     host: "echo-api.endpoints.aeflex-endpoints.cloud.goog"

/processmessage エンドポイントは paths セクションで定義されています。処理する Amazon S3 オブジェクトの詳細情報は inputMessage パラメータに格納されています。

  # This section configures the processmessage endpoint.
       "/processmessage":
         post:
           description: "Process the given message."
           operationId: "processmessage"
           produces:
           - "application/json"
           responses:
             200:
               description: "Return a success response"
               schema:
                 $ref: "#/definitions/successMessage"
           parameters:
           - description: "Message to process"
             in: body
             name: inputMessage
             required: true
             schema:
               $ref: "#/definitions/inputMessage"
           security:
           - api_key: []
     definitions:
       successMessage:
         properties:
           message:
             type: string
       inputMessage:
         # This section contains information about the S3 bucket and object to be processed.
         properties:
           Bucket: 
             type: string
           ObjectKey:
             type: string
           ContentType:
             type: string
           ContentLength:
             type: integer
           ETag:
             type: string
           PresignedUrl:
             type: string

次に、OpenAPI Specification をデプロイします。

  gcloud service-management deploy openapi.yaml

このコマンドは、デプロイされた OpenAPI Specification に関する次のような情報を返します。

  Service Configuration [2017-03-05r2] uploaded for service
     "echo-api.endpoints.aeflex-endpoints.cloud.goog"

app.yaml ファイル内のサービス構成をアップデートします。

  endpoints_api_service:
       # The following values are to be replaced by information from the output of
       # 'gcloud service-management deploy openapi.yaml' command.
       name: echo-api.endpoints.aeflex-endpoints.cloud.goog
       config_id: 2017-03-05r2

API をデプロイします。

  gcloud app deploy

クライアントを認証するための API キーを作ります。

  • GCP Console の Products & services メニューで API Manager > Credentials をクリックします。
  • Create Credentials > API key をクリックし、API キーをメモします。

AWS Lambda 関数のセットアップ

この節では、ファイルが Amazon S3 バケットにアップロードされたときに Lambda 関数が呼び出されるようにセットアップする方法を説明します。このサンプルでは、Lambda 関数は Python で書かれています。Lambda 関数の完全なソース コードは、次のファイルに格納されています。

  blogs/endpointslambda/lambdafunctioninline.py.

ファイルを格納する S3 バケットの作成

AWS の S3 Management Console で images-bucket-rawdata という S3 バケットを作ります。このバケットにファイルが追加されると、Lambda 関数が呼び出されます。

IAM ロールの作成

IAM Management Console で、次のようにして、Lambda 関数が S3 バケット、SQS キュー、CloudWatch Logs にアクセスするためのパーミッションを定義した IAM ロールを作ります。

  • IAM Management Console のナビゲーション ペインにある Roles をクリックします。
  • LambdaExecRole という新しいロールを作ります。
  • AWS Lambda Role Type を選び、ポリシーを選択します。
  • AWSLambdaExecute : このポリシーは、S3 への Put および Get アクセス、CloudWatch Logs へのフルアクセスを Lambda 関数に与えます。
  • AmazonSQSFullAccess : このポリシーは、DLQ(dead-letter-queue)へのメッセージ送信パーミッションを Lambda 関数に与えます。
  • 設定をよく確かめて、ロールを作成します。

SQS キューの作成

  • SQS Management Console で、Lambda 関数の dead-letter queue として機能する IntegrationDLQ という名前の SQS キューを作ります。AWS Lambda は、失敗した非同期呼び出しを自動的に再試行します。また、Lambda 関数は、処理されなかったペイロードを dead-letter queue に転送するように設定できます。

Lambda 関数の作成

Lambda Management Console で、次のようにして Lambda 関数を作ります。

  • Select blueprint ページで、設計図として s3-get-object-python を選択します。
  • Configure triggers ページで次の指定を行います。
  • Bucket : images-bucket-rawdata
  • Event-type : Object-Created (All)
  • トリガを有効にします。
  • Configure function ページで次の指定を行います。
  • Name : CallEndpoint
  • Runtime : Python 2.7
  • Code entry type : Edit code inline
  • Environment variable key : ENDPOINT_API_KEY
  • Environment variable : .(注意 : 本番環境では、AWS の暗号化ヘルパーで API キーを保護することを検討してください)
  • Handler : lambda_function.lambda_handler
  • Role : Choose an existing role
  • Existing role : LambdaExecRole(先ほど作ったもの)
  • Advanced settings の DLQ resource : IntegrationDLQ
  • Lambda 関数のインライン コード エディタで、元のコードを消して次のコードを書き込みます。この Lambda 関数は、イベントからバケットとオブジェクト情報を取得し、オブジェクト メタデータを取得して、オブジェクトの署名付き URL を生成し、最後に Cloud Endpoints API を呼び出します。

  from __future__ import print_function
import boto3
import json
import os
import urllib
import urllib2
print('Loading function')
s3 = boto3.client('s3')
endpoint_api_key = os.environ['ENDPOINT_API_KEY']
endpoint_url = "https://aeflex-endpoints.appspot.com/processmessage"
def lambda_handler(event, context):
    # Get the object information from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    try:
        # Retrieve object metadata
        response = s3.head_object(Bucket=bucket, Key=object_key)
        # Generate pre-signed URL for object
        presigned_url = s3.generate_presigned_url('get_object', Params = {'Bucket': bucket, 'Key': object_key}, ExpiresIn = 3600)
        data = {"inputMessage": {
                    "Bucket": bucket,
                    "ObjectKey": object_key,
                    "ContentType": response['ContentType'],
                    "ContentLength": response['ContentLength'],
                    "ETag": response['ETag'],
                    "PresignedUrl": presigned_url
            }
        }
        headers = {"Content-Type": "application/json",
                    "x-api-key": endpoint_api_key
        }
        # Invoke Cloud Endpoints API
        request = urllib2.Request(endpoint_url, data = json.dumps(data), headers = headers)
        response = urllib2.urlopen(request)
        
        print('Response text: {} \nResponse status: {}'.format(response.read(), response.getcode()))
        return response.getcode()
    except Exception as e:
        print(e)
        print('Error integrating lambda function with endpoint for the object {} in bucket {}'.format(object_key, bucket))
        raise e

  • Lambda 関数の設定をよく確かめて、関数を作成します。

インテグレーションのテスト

いよいよ運命のときがやってきました。AWS Lambda 関数と Cloud Endpoints API のインテグレーションのテスト手順は次のとおりです。

  • AWS の S3 Management Console で、images-bucket-rawdata にファイルをアップロードします。
  • Lambda Management Console で、CallEndpoint 関数に移り、Monitoring タブの内容を確認します。CloudWatch グラフが関数呼び出しの成功を示しているはずです。
  • CloudWatch で View Logs をクリックし、関数のログ ストリームの詳細を表示させます。ログには、Lambda 関数が API から成功の応答コード(200)を受け取ったことと、API が Lambda 関数から受け取った S3 イベントの詳細が書かれているはずです。

これで、AWS Lambda と Cloud Endpoints のインテグレーションにより、S3 バケットにアップロードされたファイルをリアルタイムで処理することができました。今回紹介したのは、GCP を使ったマルチクラウド環境の構築に使用できるパターンの 1 つにすぎません。今後は他の例も紹介していきますので、ご期待ください。

* この投稿は米国時間 4 月 24 日、Google Cloud Platform の Technical Curriculum Developer である Sowmya Kannan によって投稿されたもの(投稿はこちら)の抄訳です。

- By Sowmya Kannan, Technical Curriculum Developer, Google Cloud Platform