Python を使用したクイックスタート

このページでは、Python 開発環境の設定方法や、Apache Beam SDK for Python を入手してパイプラインの例を実行、変更する方法を説明します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP Console のプロジェクト セレクタのページで、GCP プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Google Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore、Cloud Resource ManagerAPIs を有効にします。

    API を有効にする

  5. 認証情報の設定:
    1. GCP Console で [サービス アカウントキーの作成] ページに移動します。

      [サービス アカウントキーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [役割] リストで、[プロジェクト] > [オーナー] を選択します。

      : [役割] フィールドの設定により、リソースにアクセスするサービス アカウントが承認されます。このフィールドは、後から GCP Console で表示または変更できます。本番環境アプリケーションを開発している場合は、[プロジェクト] > [オーナー] よりも詳細な権限を指定します。詳しくはサービス アカウントへの役割の付与をご覧ください。
    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定してください。

  7. Cloud Storage バケットを作成します。
    1. GCP Console で、Cloud Storage ブラウザページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
      • 名前: 一意のバケット名。バケットの名前空間は、全世界で、誰もが見られるようになっていますので、機密情報をバケット名に含めないようにしてください。
      • デフォルト ストレージ クラス: Standard
      • バケットデータが保存されるロケーション。
    4. [作成] をクリックします。

環境の設定

    このクイックスタートでは、コマンド プロンプトを使用します。すぐに利用できるコマンド プロンプトがない場合は、Cloud Shell を使用できます。 Python のパッケージ マネージャーがインストール済みのため、手順 1 をスキップできます。

  1. Apache Beam SDK for Python では、pip と Python バージョン 2.7、3.5、3.6 または 3.7 が必要です。次のコマンドを実行して、Python と pip が動作していることを確認します。
    python --version
    python -m pip --version
    Python をまだインストールしていない場合は、Python をインストールするページでお使いのオペレーティング システム用のインストール手順をご覧ください。
  2. Cython は不要ですが、インストールされている場合は、バージョン 0.28.1 以上にする必要があります。Cython のバージョンを確認するには、pip show cython を実行します。

  3. 最初の試験運用版として Python 仮想環境をインストールします。virtualenv のバージョンが 13.1.0 以上でない場合は、virtualenv ツールをインストールして使用するでお使いのオペレーティング システム用のインストール手順をご覧ください。

    以下のコマンドを実行して、新しい仮想環境を設定して有効にします。

    python -m virtualenv env
    source env/bin/activate
    このクイックスタートの仮想環境を使用します。このクイックスタートが完了したら、deactivate を実行して仮想環境を無効にできます。
Python 開発環境の設定ページで Google Cloud Platform での Python の使用についてお読みください。

注: 最良の結果を得るため、Apache Beam 2.16.0 以降で Python 3 パイプラインを起動してください。Apache Beam での最新の Python 3 の改善点については、Apache Beam の公開バグトラッカーをご覧ください。

Apache Beam SDK を入手する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Cloud Dataflow などのランナーを選択できます。

仮想環境から次のコマンドを実行して、最新バージョンの Apache Beam SDK for Python をインストールします。

pip install apache-beam[gcp]

WordCount をローカルで実行する

WordCount の例では、次の操作を実行するパイプラインを示します。
  1. テキスト ファイルを入力として使用します。
  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

次のコマンドを使用して、ローカルマシンで apache_beam パッケージから wordcount モジュールを実行します。

python -m apache_beam.examples.wordcount \
  --output outputs
このテキスト ファイルは、リソース名 gs://dataflow-samples/shakespeare/kinglear.txt で Cloud Storage バケットに配置されます。出力を表示するには、次のコマンドを実行します。
more outputs*

終了するには、q キーを押します。

パイプラインをローカルで実行すると、Apache Beam プログラムのテストとデバッグを行えます。 Apache Beam GitHubwordcount.py ソースコードを表示できます。

Cloud Dataflow サービスで WordCount を実行する

runner フィールドで DataflowRunner を指定すると、Cloud Dataflow サービスで apache_beam パッケージから wordcount モジュールを実行できます。

まず、PROJECT 変数と GCS_BUCKET 変数を定義します。

PROJECT=PROJECT_ID
BUCKET=GCS_BUCKET
次のコマンドを実行して、このパイプラインを実行します。
python -m apache_beam.examples.wordcount \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://$BUCKET/wordcount/outputs \
  --runner DataflowRunner \
  --project $PROJECT \
  --temp_location gs://$BUCKET/tmp/

GCP を使用して結果を表示する

Cloud Dataflow を使用してパイプラインを実行する場合、結果は Cloud Storage バケットに配置されます。

gsutil ツールを使用すると、ターミナルから結果を表示できます。

出力ファイルを一覧表示するには、次のコマンドを実行します。

gsutil ls -lh "gs://$BUCKET/wordcount/outputs*"  
こうしたファイルの結果を表示するには、次のコマンドを実行します。
gsutil cat "gs://$BUCKET/wordcount/outputs*"

Monitoring UI から結果を表示するには:
  1. Cloud Dataflow Monitoring UI を開きます。
    Cloud Dataflow ウェブ UI に移動

    まず、ステータスが [実行中] の wordcount ジョブを確認し、次に [完了] のジョブを確認します。

    ステータスが [完了] の Cloud Dataflow WordCount ジョブ。
  2. Google Cloud Platform Console で Cloud Storage ブラウザを開きます。
    Cloud Storage ブラウザに移動

    wordcount ディレクトリに、実行したジョブで作成された出力ファイルが格納されているのを確認できます。

    WordCount の出力ファイルがある results ディレクトリ。

パイプライン コードの変更

前の例の WordCount パイプラインでは、大文字の単語と小文字の単語が区別されます。 次のチュートリアルでは、WordCount パイプラインで大文字と小文字が区別されないように、パイプラインを変更する方法を示します。
  1. Apache Beam GitHub リポジトリから最新の WordCount コードをダウンロードします。
  2. ローカルマシンでパイプラインを実行します。
    python wordcount.py --output outputs
  3. 次のコマンドを実行して、結果を表示します。
    more outputs*
    終了するには、q キーを押します。
  4. 任意のエディタで wordcount.py ファイルを開きます。
  5. run 関数内でパイプラインのステップを検証します。split の後、行が文字列として単語に分割されます。
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))
  6. split の後、行を小文字の文字列に変更します。
    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'lowercase' >> beam.Map(unicode.lower)     # Add this line to modify the pipeline
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones)) 
    この変更により、str.lower 関数がすべての単語にマッピングされます。この行は beam.Map(lambda word: str.lower(word)) と同じです。
  7. ファイルを保存し、変更された WordCount ジョブをローカルマシンで実行します。
    python wordcount.py --output outputs
  8. 次のコマンドを実行して、変更されたパイプラインの結果を表示します。
    more outputs*
    終了するには、q キーを押します。

クリーンアップ

このチュートリアルで使用したリソースについて GCP アカウントに課金されないようにするために、以下を行ってください。

次のステップ

Apache Beam™ は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。