BigQuery I/O

Dataflow SDK には、Google BigQuery テーブルとの間でデータを読み書きできる組み込み Read 変換と Write 変換があります。名前を指定したテーブルの全体を読み取ることもできますし、クエリ文字列を使用して部分的なデータを読み取ることもできます。

BigQuery テーブル名の指定

BigQuery テーブルを対象に読み取りや書き込みを行うには、完全修飾の BigQuery テーブル名を指定する必要があります。完全修飾の BigQuery テーブル名は、3 つの部分から構成されます。

  • プロジェクト ID: Google Cloud プロジェクトの ID。デフォルト値は、パイプライン オプション オブジェクトから取得されます。
  • データセット ID: BigQuery データセット ID(対象の Cloud プロジェクト内で一意のもの)。
  • テーブル ID: 特定のデータセット内で一意のテーブル ID。

Java

BigQueryIO はプロジェクト名を指定しなくても使用できます。省略した場合、BigQueryIOPipelineOptions オブジェクトからデフォルトのプロジェクトを使用します。

BigQuery Java Client APITableReference 型のオブジェクトを受け入れて、ターゲット BigQuery テーブルを識別します。Dataflow SDK for Java の BigQueryIO パッケージには、ヘルパー メソッド(BigQueryIO.parseTableSpec)が含まれています。このメソッドは、BigQuery テーブル名の 3 つの部分を含んだ String から TableReference を作成するために使用できます。

多くの場合、TableReference オブジェクトを明示的に使用する必要はありません。BigQueryIO 変換の静的ファクトリ メソッドが、テーブル名を String として受け取った後、parseTableSpec を内部的に使用して、提供された String から TableReference オブジェクトを作成します。

テーブル名の文字列形式

ターゲットの BigQuery テーブルは、次のいずれかの形式を含んだ文字列を使用して指定できます。

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

ここでも project_id は省略できます。project_id を省略した場合、Cloud Dataflow はパイプライン オプション オブジェクトからデフォルトのプロジェクト ID を使用します。Java では、PipelineOptions.getProject を使用して ID にアクセスできます。

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

BigQuery テーブルの行とスキーマ

Java

BigQueryIO の読み取り変換と書き込み変換は、データを BigQuery TableRow オブジェクトの PCollection として生成、消費します。TableRow は BigQuery Java Client API の一部です(com.google.api.services.bigquery.model.TableRow パッケージに入っています)。

また、BigQuery への書き込みを行う際には、ターゲット テーブルに書き込みたいフィールドの TableSchema オブジェクトを指定する必要があります。その際、BigQuery の TableSchema クラスと TableFieldSchema クラスの両方を使用する必要があります。これらのクラスは、パッケージ com.google.api.services.bigquery.model.TableSchemacom.google.api.services.bigquery.model.TableFieldSchema でそれぞれ定義されています。

BigQuery からの読み取り

Java

BigQuery テーブルからの読み取りを行うには、BigQueryIO.Read 変換を apply します。BigQueryIO.Read は、BigQuery TableRow オブジェクトの PCollection を返します。PCollection 内の各要素は、テーブル内の 1 行を表します。

BigQuery テーブル全体を読み取るには、.from 操作を使用して、BigQueryIO.Read に BigQuery テーブル名を提供します。次のサンプルコードは、BigQueryIO.Read 変換を適用して、BigQuery テーブル全体を読み取る方法を示したものです。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

テーブル全体を読み取りたくない場合は、.fromQuery オペレーションを使用して、BigQueryIO.Read にクエリ文字列を提供できます。次のサンプルコードは、クエリ文字列を使用して、BigQuery テーブルから特定のフィールドを読み取る方法を示したものです。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

または、次の例のように BigQuery の標準 SQL 言語を使用できます。

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

BigQuery からの読み取りを行う際には、TableRow オブジェクト内の整数値が、BigQuery のエクスポート後の JSON 形式に一致するように、String としてエンコードされます。

バッチモードで BigQueryIO.Read 変換を適用すると、Dataflow は BigQuery エクスポート リクエストを呼び出します。この API を Dataflow で使用する場合、BigQuery の割り当てポリシーと料金ポリシーが適用されます。

BigQuery への書き込み

Java

BigQuery テーブルへの書き込みを行うには、BigQueryIO.Write 変換を apply します。その際、変換は PCollection<TableRow>apply する必要があります。

通常、出力データを BigQuery TableRow オブジェクトのコレクションにフォーマットするには、別の変換(ParDo など)が必要になります。

BigQueryIO.Write 変換を作成する際には、ターゲット テーブルに基づいて追加情報を提供する必要があります。テーブル名に加えて、次の情報が必要です。

  • ターゲット テーブルの CreateDispositionCreateDisposition は、ターゲット テーブルが存在している必要があるかどうかや、書き込みオペレーションによってターゲット テーブルを作成できるかどうかを指定します。
  • ターゲット テーブルの WriteDispositionWriteDisposition は、書き込むデータによって既存のテーブルを置き換えるか、既存のテーブルに行を追加するか、または空のテーブルのみに書き込みを行うかを指定します。

また、書き込みオペレーションによって新しい BigQuery テーブルが作成される場合は、ターゲット テーブルに関するスキーマ情報を指定する必要があります。この場合、書き込みオペレーションに TableSchema オブジェクトを含める必要があります。

CreateDisposition

CreateDisposition は、ターゲット テーブルが存在しない場合に、BigQuery 書き込みオペレーションによってテーブルを作成するかどうかを制御します。CreateDisposition は、BigQueryIO.Write 変換の作成時に .withCreateDisposition メソッドを呼び出して指定します。

CreateDispositionenum で、有効値は次のとおりです。

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: テーブルを作成しないよう指定します。ターゲット テーブルが存在しない場合、書き込みオペレーションは失敗します。
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: テーブルが存在しない場合に、書き込みオペレーションによって新しいテーブルを作成するよう指定します。この値を使用する場合は、.withSchema オペレーションを使用してテーブル スキーマも指定する必要があります。 CREATE_IF_NEEDED がデフォルトの動作です。

CREATE_IF_NEEDEDCreateDisposition と指定し、TableSchema を提供しない場合、ターゲット テーブルが存在しなければ実行時に変換が失敗し、java.lang.IllegalArgumentException が返されます。

WriteDisposition

WriteDisposition は、既存のテーブルに対する書き込みオペレーションの適用方法を制御します。WriteDisposition は、BigQueryIO.Write 変換の作成時に .withWriteDisposition メソッドを呼び出して指定します。

WriteDispositionenum で、有効値は次のとおりです。

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: 書き込みオペレーションによって既存のテーブルを置き換えるよう指定します。ターゲット テーブルに既存の行がある場合、それらは削除され、テーブルに新しい行が追加されます。
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: 書き込みオペレーションによって既存のテーブルの末尾に行を追加するよう指定します。
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: ターゲット テーブルが空でない場合、書き込みオペレーションが実行時に失敗するよう指定します。WRITE_EMPTY がデフォルトの動作です。

WriteDispositionWRITE_EMPTY を使用する場合、ターゲット テーブルが空でないかどうかのチェックは、実際の書き込み操作よりもかなり前に発生する可能性があるので注意してください。また、この種のチェックは、パイプラインがテーブルに排他的にアクセスできることを保証するものではありません。同時に実行されている 2 つのプログラムが WRITE_EMPTYWriteDisposition を使用して、同じ出力テーブルに書き込みを試行した場合、その両方がオペレーションに成功する可能性があります。

新しいテーブルに書き込みを行うための TableSchema の作成

BigQuery の書き込みオペレーションによって新しいテーブルが作成される場合は、スキーマ情報を指定する必要があります。スキーマ情報は、TableSchema オブジェクトを作成して指定します。TableSchema を渡すには、BigQueryIO.Write 変換の作成時に .withSchema 操作を使用します。

TableSchema オブジェクトは、TableFieldSchema 型のオブジェクトを使用して、テーブル内の各フィールドに関する情報を保持します。TableSchema を作成するには、まずテーブル内のフィールドの List を作成します。その後、TableSchema の作成時に .setFields オペレーションを使用してリストを渡します。

次のサンプルコードは、String 型の 2 つのフィールドを使用して、テーブルの TableSchema を作成する方法を示したものです。

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

BigQueryIO.Write 変換の適用

次のサンプルコードは、BigQueryIO.Write 変換を apply して BigQuery テーブルに PCollection<TableRow> を書き込む方法を示したものです。この書き込みオペレーションでは、必要な場合にテーブルが作成されます。テーブルがすでに存在する場合、そのテーブルは置き換えられます。

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™ は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。