BigQuery から Dataflow に読み込む

このドキュメントでは、Apache Beam BigQuery I/O コネクタを使用して BigQuery から Dataflow にデータを読み取る方法について説明します。

概要

BigQuery I/O コネクタでは、BigQuery からの読み取りを行うために 2 つのオプションがサポートされています。

  • テーブルの直接読み取り。BigQuery Storage Read API を使用するため、これは最速のオプションとなります。
  • エクスポート ジョブ。このオプションでは、BigQuery はテーブルデータを Cloud Storage に書き込むエクスポート ジョブを実行します。その後、コネクタがエクスポートされたデータを Cloud Storage から読み取ります。このオプションはエクスポート ステップを必要とするため、効率的ではありません。

デフォルトのオプションはエクスポート ジョブです。直接読み取りを指定するには、withMethod(Method.DIRECT_READ) を呼び出します。

コネクタは、テーブルデータを PCollection にシリアル化します。PCollection 内の各要素は 1 つのテーブル行を表します。コネクタは、次のシリアル化方法をサポートしています。

並列処理

このコネクタの並列処理は読み取り方法によって異なります。

  • 直接読み取り: I/O コネクタは、エクスポート リクエストのサイズに基づいて動的な数のストリームを生成します。これらのストリームを BigQuery から並行して読み取ります。

  • エクスポート ジョブ: BigQuery は、Cloud Storage に書き込むファイルの数を決定します。ファイルの数は、クエリとデータ量によって異なります。I/O コネクタは、エクスポートされたファイルを並行して読み取ります。

パフォーマンス

次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
ストレージ読み取り 120 MBps 88,000 要素/秒
Avro エクスポート 105 MBps 78,000 要素/秒
JSON のエクスポート 110 MBps 81,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

ベスト プラクティス

  • 通常は、テーブルの直接読み取り(Method.DIRECT_READ)を使用することをおすすめします。Storage Read API は、データをエクスポートする中間手順を必要としないため、エクスポート ジョブよりもデータ パイプラインに適しています。

  • 直接読み取りを使用すると、Storage Read API の使用量に対して料金が発生します。BigQuery の料金ページのデータ抽出の料金をご覧ください。

  • エクスポート ジョブに追加料金はかかりません。ただし、エクスポート ジョブには上限があります。タイミングを優先し、コストを調整できる大規模なデータ移動の場合は、直接読み取りをおすすめします。

  • Storage Read API には割り当て上限があります。Google Cloud の指標を使用して、割り当ての使用状況をモニタリングします。

  • Storage Read API を使用すると、次のようなリースの有効期限エラーとセッション タイムアウト エラーがログに表示されることがあります。

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    これらのエラーは、オペレーションがタイムアウトよりも時間がかかる場合に発生することがあります。通常は、実行時間が 6 時間を超えるパイプラインで発生します。この問題を軽減するには、ファイル エクスポートに切り替えます。

  • Java SDK を使用する場合は、BigQuery テーブルのスキーマを表すクラスを作成することを検討してください。次に、パイプラインで useBeamSchema を呼び出して、Apache Beam Row 型と BigQuery の TableRow 型を自動的に変換します。スキーマクラスの例については、ExampleModel.java をご覧ください。

このセクションのコードサンプルでは、テーブルを直接読み取ります。

エクスポート ジョブを使用する場合は、withMethod の呼び出しを省略するか、Method.EXPORT を指定します。次に、--tempLocation パイプライン オプションを設定して、エクスポートされたファイルの Cloud Storage バケットを指定します。

これらのコード例では、ソーステーブルに次の列があることを前提としています。

  • name(文字列)
  • age(整数)

JSON スキーマ ファイルとして指定。

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Avro 形式のレコードを読み取る

BigQuery データを Avro 形式のレコードに読み取るには、read(SerializableFunction) メソッドを使用します。このメソッドは、SchemaAndRecord オブジェクトを解析し、カスタムデータ型を返すアプリケーション定義関数を使用します。コネクタからの出力はカスタムデータ型の PCollection です。

次のコードは、BigQuery テーブルから PCollection<MyData> を読み取ります。ここで、MyData はアプリケーション定義クラスです。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read メソッドは SerializableFunction<SchemaAndRecord, T> インターフェースを使用します。このインターフェースには、Avro レコードからカスタム データクラスに変換する関数が定義されています。上のコードサンプルでは、MyData.apply メソッドがこの変換関数を実装しています。このサンプル関数は、Avro レコードの name フィールドと age フィールドを解析し、MyData インスタンスを返します。

読み取る BigQuery テーブルを指定するには、前の例のように from メソッドを呼び出します。詳細については、BigQuery I/O コネクタ ドキュメントのテーブル名をご覧ください。

TableRow オブジェクトを読み取る

readTableRows メソッドは、BigQuery データを TableRow オブジェクトの PCollection に読み込みます。各 TableRow は、テーブルデータの単一行を保持する Key-Value ペアのマップです。from メソッドを呼び出して、読み取る BigQuery テーブルを指定します。

次のコードは、BigQuery テーブルから PCollection<TableRows> を読み取ります。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

また、この例には、TableRow ディクショナリの値にアクセスする方法も示されています。整数値は、BigQuery でエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。

列の射影とフィルタリング

直接読み取り(Method.DIRECT_READ)を使用する場合、BigQuery から読み取られてネットワーク経由で送信されるデータの量を減らすことで、読み取りオペレーションを効率化できます。

  • 列射影: withSelectedFields を呼び出して、テーブルから列のサブセットを読み取ります。この方法では、テーブルに多数の列がある場合に効率的に読み取ることができます。
  • 行フィルタリング: withRowRestriction を呼び出して、サーバー側でデータをフィルタリングする述語を指定します。

フィルタの述語は確定的である必要があります。また、集計はサポートされていません。

次の例では、"user_name" 列と "age" 列を射影し、述語 "age > 18" と一致しない行を除外しています。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

クエリ結果から読み取る

上記の例は、テーブルから行を読み取る方法を示しています。fromQuery を呼び出すと、SQL クエリの結果から読み取ることもできます。この方法では、計算作業の一部を BigQuery に移行します。 また、このメソッドを使用してビューにクエリを実行し、BigQuery ビューまたはマテリアライズド ビューから読み取ることもできます。

次の例では、BigQuery 一般公開データセットに対してクエリを実行し、結果を読み取ります。パイプラインの実行後、BigQuery ジョブ履歴でクエリジョブを確認できます。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

次のステップ