BigQuery I/O

Dataflow SDK には、Google BigQuery テーブルとの間でデータを読み書きできる組み込み Read 変換と Write 変換があります。名前を指定したテーブルの全体を読み取ることもできますし、クエリ文字列を使用して部分的なデータを読み取ることもできます。

BigQuery テーブル名の指定

BigQuery テーブルを対象に読み取りや書き込みを行うには、完全修飾の BigQuery テーブル名を指定する必要があります。完全修飾の BigQuery テーブル名は、3 つの部分から構成されます。

  • プロジェクト ID: Google Cloud プロジェクトの ID。デフォルト値は、パイプライン オプション オブジェクトから取得されます。
  • データセット ID: BigQuery データセット ID(対象の Cloud プロジェクト内で一意のもの)。
  • テーブル ID: 特定のデータセット内で一意のテーブル ID。

Java

ただし、BigQueryIO はプロジェクト名を指定しなくても使用できます。省略した場合、BigQueryIOPipelineOptions オブジェクトからデフォルトのプロジェクトを使用します。

BigQuery Java Client APITableReference 型のオブジェクトを受け入れて、ターゲット BigQuery テーブルを識別します。Dataflow SDK for Java の BigQueryIO パッケージには、ヘルパー メソッド(BigQueryIO.parseTableSpec)が含まれています。このメソッドは、BigQuery テーブル名の 3 つの部分を含んだ String から TableReference を作成するために使用できます。

ほとんどの場合、TableReference オブジェクトを明示的に使用する必要はありません。BigQueryIO 変換の静的ファクトリ メソッドが、テーブル名を String として受け取った後、parseTableSpec を内部的に使用して、指定された String から TableReference を作成します。

テーブル名の文字列形式

ターゲットの BigQuery テーブルは、次のいずれかの形式を含んだ文字列を使用して指定できます。

  [project_id]:[dataset_id].[table_id]
  Example: "clouddataflow-readonly:samples.weather_stations"

Java

ここでも project_id は省略可能です。project_id を省略した場合、Cloud Dataflow はパイプライン オプション オブジェクトのデフォルトのプロジェクト ID を使用します。Java では、この ID は PipelineOptions.getProject を使用してアクセスできます。

  [dataset_id].[table_id]
  Example: "samples.weather_stations"

BigQuery テーブルの行とスキーマ

Java

BigQueryIO の読み取りと書き込みの変換では、データを BigQuery の TableRow オブジェクトの PCollection として生成、消費します。TableRowcom.google.api.services.bigquery.model.TableRow パッケージ内の BigQuery Java Client API の一部です。

また、BigQuery に書き込む場合は、ターゲット テーブルに書き込むフィールドの TableSchema オブジェクトを指定する必要があります。BigQuery の TableSchema クラスと TableFieldSchema クラスの両方を使用する必要があります。これらのクラスは、それぞれ com.google.api.services.bigquery.model.TableSchema パッケージと com.google.api.services.bigquery.model.TableFieldSchema パッケージ内で定義されます。

BigQuery からの読み取り

Java

BigQuery テーブルから読み取るには、BigQueryIO.Read 変換を apply します。BigQueryIO.Read は、BigQuery TableRow オブジェクトの PCollection を返します。PCollection 内の各要素は、テーブル内の単一の行を表します。

BigQuery テーブル全体を読み取るには、.from オペレーションを使用して BigQuery テーブル名を BigQueryIO.Read に提供します。次のサンプルコードは、BigQueryIO.Read 変換を適用して、BigQuery テーブル全体を読み取る方法を示したものです。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadWeatherStations")
         .from("clouddataflow-readonly:samples.weather_stations"));

テーブル全体を読み取りたくない場合は、.fromQuery オペレーションを使用して、BigQueryIO.Read にクエリ文字列を提供できます。次のサンプルコードは、クエリ文字列を使用して、BigQuery テーブルから特定のフィールドを読み取る方法を示したものです。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<TableRow> weatherData = p.apply(
    BigQueryIO.Read
         .named("ReadYearAndTemp")
         .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]");

または、次の例のように BigQuery の標準 SQL 言語を使用できます。

PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read
    .named("ReadYearAndTemp")
    .fromQuery("SELECT year, mean_temp FROM `samples.weather_stations`")
    .usingStandardSql();

なお、BigQuery からの読み取りを行う際には、TableRow オブジェクト内の整数値が、BigQuery のエクスポート後の JSON 形式に一致するように、String としてエンコードされます。

バッチモードで BigQueryIO.Read 変換を適用すると、Dataflow は BigQuery エクスポート リクエストを呼び出します。この API を Dataflow で使用する場合、BigQuery の割り当てポリシーと料金ポリシーが適用されます。

BigQuery への書き込み

Java

BigQuery テーブルに書き込むには、BigQueryIO.Write 変換を apply します。変換は PCollection<TableRow>apply する必要があります。

通常、出力データを BigQuery TableRow オブジェクトのコレクションにフォーマットするには、別の変換(ParDo など)が必要になります。

BigQueryIO.Write 変換を作成する際には、ターゲット テーブルに基づいて追加情報を提供する必要があります。テーブル名に加えて、次の情報が必要です。

  • ターゲット テーブルの CreateDispositionCreateDisposition は、ターゲット テーブルが存在している必要があるかどうかや、書き込みオペレーションによって抽出先テーブルを作成できるかどうかを指定します。
  • ターゲット テーブルの WriteDispositionWriteDisposition は、書き込むデータによって既存のテーブルを置き換えるか、既存のテーブルに行を追加するか、または空のテーブルのみに書き込みを行うかを指定します。

また、書き込みオペレーションによって新しい BigQuery テーブルが作成される場合は、ターゲット テーブルに関するスキーマ情報を指定する必要があります。この場合は、書き込みオペレーションに TableSchema オブジェクトを含める必要があります。

CreateDisposition

CreateDisposition は、ターゲット テーブルが存在しない場合に BigQuery 書き込みオペレーションでテーブルを作成するかどうかを制御します。CreateDisposition は、BigQueryIO.Write 変換の作成時に .withCreateDisposition メソッドを呼び出すことで指定します。

CreateDispositionenum で、有効値は次のとおりです。

  • BigQueryIO.Write.CreateDisposition.CREATE_NEVER: テーブルを作成しないよう指定します。ターゲット テーブルが存在しない場合、書き込みオペレーションは失敗します。
  • BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED: テーブルが存在しない場合に、書き込みオペレーションによって新しいテーブルを作成することを指定します。この値を使用する場合は、.withSchema オペレーションを使用してテーブル スキーマも指定する必要があります。CREATE_IF_NEEDED がデフォルトの動作です。

ただし、CreateDisposition として CREATE_IF_NEEDED を指定し、TableSchema を指定しなかった場合、ターゲット テーブルが存在していないと、実行時に変換が失敗して java.lang.IllegalArgumentException が返される可能性があります。

WriteDisposition

WriteDisposition は、BigQuery の書き込みオペレーションが既存のテーブルに適用される方法を制御します。WriteDisposition は、BigQueryIO.Write 変換の作成時に .withWriteDisposition メソッドを呼び出すことで指定します。

WriteDispositionenum で、有効値は次のとおりです。

  • BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE: 書き込みオペレーションによって既存のテーブルを置き換えるよう指定します。ターゲット テーブルに既存の行がある場合、それらは削除され、テーブルに新しい行が追加されます。
  • BigQueryIO.Write.WriteDisposition.WRITE_APPEND: 書き込みオペレーションによって、既存のテーブルの末尾に行を追加するよう指定します。
  • BigQueryIO.Write.WriteDisposition.WRITE_EMPTY: 抽出先テーブルが空でない場合、書き込みオペレーションが実行時に失敗するよう指定します。WRITE_EMPTY がデフォルトの動作です。

WRITE_EMPTYWriteDisposition に使用する場合、実際の書き込みオペレーションのかなり前に、ターゲット テーブルが空であるかどうかの確認が行われる可能性があることに注意してください。また、この種のチェックは、パイプラインがテーブルに排他的にアクセスできることを保証するものではありません。同時に実行されている 2 つのプログラムが、WRITE_EMPTYWriteDisposition を使用して、同じ出力テーブルに書き込みを試行した場合、両方が成功する可能性があります。

新しいテーブルに書き込みを行うための TableSchema の作成

BigQuery の書き込みオペレーションによって新しいテーブルが作成される場合は、スキーマ情報を指定する必要があります。スキーマ情報は、TableSchema オブジェクトを作成して指定します。TableSchema を渡すには、BigQueryIO.Write 変換の作成時に .withSchema オペレーションを使用します。

TableSchema オブジェクトには、TableFieldSchema 型のオブジェクトを使用してテーブル内の各フィールドに関する情報が格納されます。TableSchema を作成するには、最初にテーブル内のフィールドの List を作成します。次に、TableSchema の作成時に .setFields オペレーションを使用してリストを渡します。

次のサンプルコードは、String 型の 2 つの型のフィールドを使用して、テーブルの TableSchema を作成する方法を示したものです。

  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("source").setType("STRING"));
  fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
  TableSchema schema = new TableSchema().setFields(fields);

BigQueryIO.Write 変換の適用

次のサンプルコードは、BigQueryIO.Write 変換を apply して BigQuery テーブルに PCollection<TableRow> を書き込む方法を示したものです。この書き込みオペレーションでは、必要な場合にテーブルが作成されます。テーブルがすでに存在する場合、そのテーブルは置き換えられます。

  PCollection<TableRow> quotes = ...;

  quotes.apply(BigQueryIO.Write
      .named("Write")
      .to("my-project:output.output_table")
      .withSchema(schema)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Apache Beam™ は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。