Python を使用したクイックスタート

このページでは、Python 開発環境の設定方法や、Apache Beam SDK for Python を入手してパイプラインの例を実行、変更する方法を説明します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 詳しくは、課金を有効にする方法をご覧ください。

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Google Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore、Cloud Resource Manager API を有効にします。

    APIを有効にする

  5. 認証情報の設定:
    1. GCP Console で [サービス アカウントキーの作成] ページに移動します。

      [サービス アカウントキーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [役割] リストで、[プロジェクト] > [オーナー] を選択します。

      : [役割] フィールドの設定により、リソースにアクセスするサービス アカウントが承認されます。このフィールドは、後から GCP Console で表示または変更できます。本番環境アプリケーションを開発している場合は、[プロジェクト] > [オーナー] よりも詳細な権限を指定します。詳しくはサービス アカウントへの役割の付与をご覧ください。
    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定してください。

  7. Cloud Storage バケットを作成します。
    1. GCP Console で、Cloud Storage ブラウザページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
      • 名前: 一意のバケット名。バケットの名前空間は、全世界で、誰もが見られるようになっていますので、機密情報をバケット名に含めないようにしてください。
      • デフォルト ストレージ クラス: Standard
      • バケットデータが保存されるロケーション。
    4. [作成] をクリックします。

環境を設定する

    このクイックスタートでは、コマンド プロンプトを使用します。すぐに利用できるコマンド プロンプトがない場合は、Cloud Shell を使用できます。Python のパッケージ マネージャーがインストール済みのため、手順 1 をスキップできます。

  1. Apache Beam SDK for Python では、Python バージョン 2.7.x および pip が必要です。次のコマンドを実行して、Python 2.7.x および pip が動作していることを確認します。
    python2.7 --version
    python2.7 -m pip --version
    Python をまだインストールしていない場合は、Python をインストールするページでお使いのオペレーティング システム用のインストール手順をご覧ください。
  2. Cython は不要ですが、インストールされている場合は、バージョン 0.28.1 以上にする必要があります。Cython のバージョンを確認するには、pip show cython を実行します。

  3. 最初の試験運用版として Python 仮想環境をインストールします。virtualenv のバージョンが 13.1.0 以上でない場合は、virtualenv ツールをインストールして使用するでお使いのオペレーティング システム用のインストール手順をご覧ください。

    以下のコマンドを実行して、新しい仮想環境を設定して有効にします。

    python2.7 -m virtualenv env
    source env/bin/activate
    このクイックスタートの仮想環境を使用します。このクイックスタートが完了したら、deactivate を実行して仮想環境を無効にできます。
Python 開発環境の設定ページで Google Cloud Platform での Python の使用についてお読みください。

注: Python 3 を使用して、このクイックスタートを実行できます。Apache Beam SDK for Python 3 の使用のサポートはプレリリース版(アルファ版)の状態であり、変更されたりサポートが制限されたりする可能性があります。

Apache Beam SDK を入手する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Cloud Dataflow などのランナーを選択できます。

仮想環境から次のコマンドを実行して、最新バージョンの Apache Beam SDK for Python をインストールします。

pip install apache-beam[gcp]

WordCount をローカルで実行する

WordCount の例では、次の操作を実行するパイプラインを示します。
  1. テキスト ファイルを入力として使用します。
  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

次のコマンドで、ローカルマシンで apache_beam パッケージから wordcount モジュールを実行します。

python -m apache_beam.examples.wordcount \
  --output outputs
このテキスト ファイルは、リソース名 gs://dataflow-samples/shakespeare/kinglear.txt で Cloud Storage バケットに配置されます。出力を表示するには、次のコマンドを実行します。
more outputs*

終了するには、q キーを押します。

パイプラインをローカルで実行すると、Apache Beam プログラムのテストとデバッグを行えます。Apache Beam GitHubwordcount.py ソースコードを表示できます。

Cloud Dataflow サービスで WordCount を実行する

runner フィールドで DataflowRunner を指定すると、Cloud Dataflow サービスで apache_beam パッケージから wordcount モジュールを実行できます。

まず、PROJECT 変数と GCS_BUCKET 変数を定義します。

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
次のコマンドを実行して、このパイプラインを実行します。
python -m apache_beam.examples.wordcount \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/wordcount/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

GCP を使用して結果を表示する

Cloud Dataflow を使用してパイプラインを実行する場合、結果は Cloud Storage バケットに配置されます。

gsutil ツールを使用すると、ターミナルから結果を表示できます。

出力ファイルを一覧表示するには、次のコマンドを実行します。

gsutil ls -lh "gs://$BUCKET/wordcount/outputs*"  
こうしたファイルの結果を表示するには、次のコマンドを実行します。
gsutil cat "gs://$BUCKET/wordcount/outputs*"

Monitoring UI から結果を表示するには、次の手順を行います。
  1. Cloud Dataflow Monitoring UI を開きます。
    Cloud Dataflow ウェブ UI に移動

    まず、ステータスが [実行中] の wordcount ジョブを確認し、次に [完了] のジョブを確認します。

    ステータスが [完了] の Cloud Dataflow WordCount ジョブ。
  2. Google Cloud Platform Console で Cloud Storage ブラウザを開きます。
    Cloud Storage ブラウザに移動

    wordcount ディレクトリに、実行したジョブで作成された出力ファイルが格納されているのを確認できます。

    WordCount の出力ファイルがある results ディレクトリ。

パイプライン コードの変更

前の例の WordCount パイプラインでは、大文字の単語と小文字の単語が区別されます。次のチュートリアルでは、WordCount パイプラインで大文字と小文字が区別されないように、パイプラインを変更する方法を示します。
  1. Apache Beam GitHub リポジトリから最新の WordCount コードをダウンロードします。
  2. ローカルマシンでパイプラインを実行します。
    python wordcount.py --output outputs
  3. 次のコマンドを実行して、結果を表示します。
    more outputs*
    終了するには、q キーを押します。
  4. 任意のテキスト エディタでファイル wordcount.py を開きます。
  5. run 関数内でパイプラインのステップを検証します。split の後、行が文字列として単語に分割されます。
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. split の後、行を小文字の文字列に変更します。
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    この変更により、str.lower 関数がすべての単語にマッピングされます。この行は beam.Map(lambda word: str.lower(word)) と同様です。
  7. ファイルを保存し、変更された WordCount ジョブをローカルマシンで実行します。
    python wordcount.py --output outputs
  8. 次のコマンドを実行して、変更されたパイプラインの結果を表示します。
    more outputs*
    終了するには、q キーを押します。

クリーンアップ

このチュートリアルで使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。

  1. GCP Console で、Cloud Storage ブラウザページに移動します。

    Cloud Storage ブラウザページに移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケット削除するには、[削除]()をクリックします。

次のステップ

Apache Beam™ は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。