このページでは、Datastream を使用してソース データベースからデータをストリーミングするために BigQuery の宛先を構成する方法について説明します。
宛先データセットを構成する
BigQuery の宛先のデータセットを構成する際には、次のいずれかのオプションを選択できます。
スキーマごとのデータセット: データセットは、ソースのスキーマ名に基づいて、指定された BigQuery のロケーションで選択または作成されます。そのため、Datastream は、ソースのスキーマごとに自動的に BigQuery にデータセットを作成します。
このオプションを選択すると、Datastream は、ストリームを含むプロジェクトにデータセットを作成します。
たとえば、MySQL ソースがあり、このソースのデータベース内に
mydbデータベースとemployeesテーブルがある場合、Datastream は BigQuery にmydbデータセットとemployeesテーブルを作成します。すべてのスキーマに単一のデータセット: ストリームに BigQuery データセットを選択できます。Datastream は、すべてのデータをこのデータセットにストリーミングします。選択したデータセットに対して、Datastream はすべてのテーブルを
<schema>_<table>として作成します。たとえば、MySQL ソースがあり、このソースのデータベース内に
mydbデータベースとemployeesテーブルがある場合、Datastream は選択したデータセットにmydb_employeesテーブルを作成します。
書き込みの動作
BigQuery にデータをストリーミングする際のイベントの最大サイズは 20 MB です。
ストリームを構成するときに、Datastream が変更データを BigQuery に書き込む方法を選択できます。詳細については、書き込みモードを構成するをご覧ください。
書き込みモードを設定する
BigQuery へのデータの書き込み方法を定義するには、次の 2 つのモードを使用できます。
- マージ: デフォルトの書き込みモードです。選択すると、BigQuery はソース データベースでのデータの保存方法を反映します。つまり、Datastream はデータのすべての変更を BigQuery に書き込み、BigQuery は変更を既存のデータと統合して、ソーステーブルのレプリカである最終的なテーブルを作成します。マージ モードでは、変更イベントの過去の記録は保持されません。たとえば、行を挿入してから更新すると、BigQuery には更新されたデータのみが保持されます。その後、ソーステーブルから行を削除すると、BigQuery はその行のレコードを保持しなくなります。
- 追加専用: 追加専用の書き込みモードでは、変更のストリーム(
INSERT、UPDATE-INSERT、UPDATE-DELETE、DELETEイベント)として BigQuery にデータを追加できます。データの過去の状態を保持する必要がある場合は、このモードを使用します。追記専用書き込みモードの詳細を理解するために、次のシナリオについて考えてみましょう。- 最初のバックフィル: 最初のバックフィルの後、すべてのイベントは同じタイムスタンプ、Universally Unique Identifier(UUID)、変更シーケンス番号を持つ
INSERTタイプのイベントとして BigQuery に書き込まれます。 - 主キーの更新: 主キーが変更されると、2 つの行が BigQuery に書き込まれます。
- 元の主キーを含む
UPDATE-DELETE行 - 新しい主キーを含む
UPDATE-INSERT行
- 元の主キーを含む
- 行の更新: 行を更新すると、単一の
UPDATE-INSERT行が BigQuery に書き込まれます。 - 行の削除: 行を削除すると、単一の
DELETE行が BigQuery に書き込まれます。
- 最初のバックフィル: 最初のバックフィルの後、すべてのイベントは同じタイムスタンプ、Universally Unique Identifier(UUID)、変更シーケンス番号を持つ
テーブルのメタデータ
Datastream は、BigQuery の宛先に書き込まれる各テーブルに datastream_metadata という名前の STRUCT 列を追加します。
マージ書き込みモード
ソースのテーブルに主キーがある場合、列には次のフィールドが含まれます。
UUIDこのフィールドにはSTRINGデータ型が含まれます。SOURCE_TIMESTAMPこのフィールドにはINTEGERデータ型が含まれます。
テーブルに主キーがない場合、列には追加のフィールド IS_DELETED が含まれます。このフィールドには BOOLEAN データ型があり、Datastream が宛先にストリーミングするデータがソースの DELETE オペレーションに関連付けられているかどうかを示します。主キーのないテーブルは追加専用です。
追記専用の書き込みモード
datastream_metadata 列には、主キーがあるテーブルと主キーがないテーブルで同じフィールドが含まれます。
UUIDこのフィールドにはSTRINGデータ型が含まれます。SOURCE_TIMESTAMPこのフィールドにはINTEGERデータ型が含まれます。CHANGE_SEQUENCE_NUMBERこのフィールドにはSTRINGデータ型が含まれます。 これは、Datastream が各変更イベントに使用する内部シーケンス番号です。CHANGE_TYPEこのフィールドにはSTRINGデータ型が含まれます。変更イベントのタイプ(INSERT、UPDATE-INSERT、UPDATE-DELETE、DELETE)を示します。SORT_KEYS: このフィールドには、STRING値の配列が含まれます。この値を使用して、変更イベントを並べ替えることができます。
max_staleness オプションで BigQuery テーブルを使用する
ほぼリアルタイムの取り込みの一環として、Datastream は、データの更新、挿入、削除などの upsert オペレーションに対する BigQuery の組み込みサポートを使用します。upsert オペレーションを使用すると、行の追加、変更、削除に応じて BigQuery の宛先を動的に更新できます。Datastream は、BigQuery の Storage Write API を使用して、これらの upsert オペレーションを宛先テーブルにストリーミングします。
データの未更新に関する制限事項を指定する
BigQuery は、構成されたデータ未更新の制限に従って、ソースの変更をバックグラウンドで継続的に、またはクエリの実行時に適用します。Datastream が BigQuery で新しいテーブルを作成する場合、テーブルの max_staleness オプションは、ストリームの現在のデータ未更新の上限値に従って設定されます。
max_staleness オプション付きで BigQuery テーブルを使用する方法については、テーブルの未更新をご覧ください。
BigQuery の費用を管理する
BigQuery の費用は Datastream とは別に請求されます。BigQuery の費用を管理する方法については、BigQuery CDC の料金をご覧ください。
データ型のマッピング
次の表に、サポートされているソース データベースから BigQuery の宛先へのデータ型の変換を示します。
| ソース データベース | ソースのデータ型 | BigQuery のデータ型 |
|---|---|---|
| MySQL | BIGINT(size) |
LONG |
| MySQL | BIGINT (unsigned) |
DECIMAL |
| MySQL | BINARY(size) |
STRING (hex encoded) |
| MySQL | BIT(size) |
INT64 |
| MySQL | BLOB(size) |
STRING (hex encoded) |
| MySQL | BOOL |
INT64 |
| MySQL | CHAR(size) |
STRING |
| MySQL | DATE |
DATE |
| MySQL | DATETIME(fsp) |
DATETIME |
| MySQL | DECIMAL(precision, scale) |
精度値が 38 以下で、小数点以下の桁数が 9 以下の場合は NUMERIC です。それ以外の場合 BIGNUMERIC |
| MySQL | DOUBLE(size, d) |
FLOAT64 |
| MySQL | ENUM(val1, val2, val3, ...) |
STRING |
| MySQL | FLOAT(precision) |
FLOAT64 |
| MySQL | FLOAT(size, d) |
FLOAT64 |
| MySQL | INTEGER(size) |
INT64 |
| MySQL | INTEGER (unsigned) |
LONG |
| MySQL |
|
JSON |
| MySQL | LONGBLOB |
STRING (hex encoded) |
| MySQL | LONGTEXT |
STRING |
| MySQL | MEDIUMBLOB |
STRING (hex encoded) |
| MySQL | MEDIUMINT(size) |
INT64 |
| MySQL | MEDIUMTEXT |
STRING |
| MySQL | SET(val1, val2, val3, ...) |
STRING |
| MySQL | SMALLINT(size) |
INT64 |
| MySQL | TEXT(size) |
STRING |
| MySQL | TIME(fsp) |
INTERVAL |
| MySQL | TIMESTAMP(fsp) |
TIMESTAMP |
| MySQL | TINYBLOB |
STRING (hex encoded) |
| MySQL | TINYINT(size) |
INT64 |
| MySQL | TINYTEXT |
STRING |
| MySQL | VARBINARY(size) |
STRING (hex encoded) |
| MySQL | VARCHAR |
STRING |
| MySQL | YEAR |
INT64 |
| Oracle | ANYDATA |
UNSUPPORTED |
| Oracle | BFILE |
STRING |
| Oracle | BINARY DOUBLE |
FLOAT64 |
| Oracle | BINARY FLOAT |
FLOAT64 |
| Oracle | BLOB |
BYTES |
| Oracle | CHAR |
STRING |
| Oracle | CLOB |
STRING |
| Oracle | DATE |
DATETIME
|
| Oracle | DOUBLE PRECISION |
FLOAT64 |
| Oracle | FLOAT(p) |
FLOAT64 |
| Oracle | INTERVAL DAY TO SECOND |
UNSUPPORTED |
| Oracle | INTERVAL YEAR TO MONTH |
UNSUPPORTED |
| Oracle | LONG/LONG RAW |
STRING |
| Oracle | NCHAR |
STRING |
| Oracle | NCLOB |
STRING |
| Oracle | NUMBER(precision, scale>0) |
0<p=77 の場合、パラメータ化された小数型にマッピング。p>=77 の場合、STRING にマッピング |
| Oracle | NVARCHAR2 |
STRING |
| Oracle | RAW |
STRING |
| Oracle | ROWID |
STRING |
| Oracle | SDO_GEOMETRY |
UNSUPPORTED |
| Oracle | SMALLINT |
INT64 |
| Oracle | TIMESTAMP |
TIMESTAMP
|
| Oracle | TIMESTAMP WITH TIME ZONE |
TIMESTAMP
|
| Oracle | UDT (user-defined type) |
UNSUPPORTED |
| Oracle | UROWID |
STRING |
| Oracle | VARCHAR |
STRING |
| Oracle | VARCHAR2 |
STRING |
| Oracle | XMLTYPE |
UNSUPPORTED |
| PostgreSQL | ARRAY |
JSON
|
| PostgreSQL | BIGINT |
INT64 |
| PostgreSQL | BIT |
BYTES |
| PostgreSQL | BIT_VARYING |
BYTES |
| PostgreSQL | BOOLEAN |
BOOLEAN |
| PostgreSQL | BOX |
UNSUPPORTED |
| PostgreSQL | BYTEA |
BYTES |
| PostgreSQL | CHARACTER |
STRING |
| PostgreSQL | CHARACTER_VARYING |
STRING |
| PostgreSQL | CIDR |
STRING |
| PostgreSQL | CIRCLE |
UNSUPPORTED |
| PostgreSQL | DATE |
DATE |
| PostgreSQL | DOUBLE_PRECISION |
FLOAT64 |
| PostgreSQL | ENUM |
STRING |
| PostgreSQL | INET |
STRING |
| PostgreSQL | INTEGER |
INT64 |
| PostgreSQL | INTERVAL |
INTERVAL |
| PostgreSQL | JSON |
JSON |
| PostgreSQL | JSONB |
JSON |
| PostgreSQL | LINE |
UNSUPPORTED |
| PostgreSQL | LSEG |
UNSUPPORTED |
| PostgreSQL | MACADDR |
STRING |
| PostgreSQL | MONEY |
FLOAT64 |
| PostgreSQL | NUMERIC |
精度 = -1 の場合は、STRING(BigQuery NUMERIC 型には固定精度が必要です)。それ以外の場合は BIGNUMERIC/NUMERIC。詳細については、PostgreSQL ドキュメントの任意精度の数値をご覧ください。 |
| PostgreSQL | OID |
INT64 |
| PostgreSQL | PATH |
UNSUPPORTED |
| PostgreSQL | POINT |
UNSUPPORTED |
| PostgreSQL | POLYGON |
UNSUPPORTED |
| PostgreSQL | REAL |
FLOAT64 |
| PostgreSQL | SMALLINT |
INT64 |
| PostgreSQL | SMALLSERIAL |
INT64 |
| PostgreSQL | SERIAL |
INT64 |
| PostgreSQL | TEXT |
STRING |
| PostgreSQL | TIME |
TIME |
| PostgreSQL | TIMESTAMP |
TIMESTAMP |
| PostgreSQL | TIMESTAMP_WITH_TIMEZONE |
TIMESTAMP |
| PostgreSQL | TIME_WITH_TIMEZONE |
TIME |
| PostgreSQL | TSQUERY |
STRING |
| PostgreSQL | TSVECTOR |
STRING |
| PostgreSQL | TXID_SNAPSHOT |
STRING |
| PostgreSQL | UUID |
STRING |
| PostgreSQL | XML |
STRING |
| SQL Server | BIGINT |
INT64 |
| SQL Server | BINARY |
BYTES |
| SQL Server | BIT |
BOOL |
| SQL Server | CHAR |
STRING |
| SQL Server | DATE |
DATE |
| SQL Server | DATETIME2 |
DATETIME |
| SQL Server | DATETIME |
DATETIME |
| SQL Server | DATETIMEOFFSET |
TIMESTAMP |
| SQL Server | DECIMAL |
BIGNUMERIC |
| SQL Server | FLOAT |
FLOAT64 |
| SQL Server | IMAGE |
BYTES |
| SQL Server | INT |
INT64 |
| SQL Server | MONEY |
BIGNUMERIC |
| SQL Server | NCHAR |
STRING |
| SQL Server | NTEXT |
STRING |
| SQL Server | NUMERIC |
BIGNUMERIC |
| SQL Server | NVARCHAR |
STRING |
| SQL Server | NVARCHAR(MAX) |
STRING |
| SQL Server | REAL |
FLOAT64 |
| SQL Server | SMALLDATETIME |
DATETIME |
| SQL Server | SMALLINT |
INT64 |
| SQL Server | SMALLMONEY |
NUMERIC |
| SQL Server | TEXT |
STRING |
| SQL Server | TIME |
TIME |
| SQL Server | TIMESTAMP/ROWVERSION |
BYTES |
| SQL Server | TINYINT |
INT64 |
| SQL Server | UNIQUEIDENTIFIER |
STRING |
| SQL Server | VARBINARY |
BYTES |
| SQL Server | VARBINARY(MAX) |
BYTES |
| SQL Server | VARCHAR |
STRING |
| SQL Server | VARCHAR(MAX) |
STRING |
| SQL Server | XML |
STRING |
PostgreSQL の配列を BigQuery 配列データ型として クエリする
PostgreSQL 配列を BigQuery ARRAY データ型としてクエリする場合は、BigQuery の JSON_VALUE_ARRAY 関数を使用することで JSON の値を BigQuery の列に変換できます。:
SELECT ARRAY(SELECT CAST(element AS TYPE) FROM UNNEST(JSON_VALUE_ARRAY(BQ_COLUMN_NAME,'$')) AS element)AS array_col
以下を置き換えます。
TYPE: PostgreSQL ソース配列の要素型に一致する BigQuery 型。たとえば、ソースタイプが
BIGINT値の配列の場合は、TYPE をINT64に置き換えます。データ型をマッピングする方法の詳細については、データ型をマッピングするをご覧ください。
BQ_COLUMN_NAME: BigQuery テーブル内の関連する列の名前。
値の変換方法には 2 つの例外があります。
ソース列に
BIT、BIT_VARYING、またはBYTEA値の配列がある場合は、次のクエリを実行します。SELECT ARRAY(SELECT FROM_BASE64(element) FROM UNNEST(JSON_VALUE_ARRAY(BQ_COLUMN_NAME,'$')) AS element)
AS array_of_bytes ソース列に
JSON値またはJSONB値の配列がある場合は、JSON_QUERY_ARRAY関数を使用します。SELECT ARRAY(SELECT element FROM UNNEST(JSON_QUERY_ARRAY(BQ_COLUMN_NAME,'$')) AS element)
AS array_of_jsons
既知の制限事項
宛先として BigQuery を使用する場合の既知の制限事項は次のとおりです。
- データを複製できるのは、Datastream ストリームと同じ Google Cloud プロジェクトにある BigQuery データセットに限られます。
- デフォルトでは、Datastream は、主キーなしで BigQuery にすでに複製されているテーブルに主キーを追加したり、主キーを使用して BigQuery に複製されているテーブルから主キーを削除したりすることはできません。このような変更を行う必要がある場合は、Google サポートにお問い合わせください。主キーがすでにあるソーステーブルの主キー定義を変更する方法については、問題を診断するをご覧ください。
BigQuery の主キーは、次のいずれかのデータ型にする必要があります。
DATEBOOLGEOGRAPHYINT64NUMERICBIGNUMERICSTRINGTIMESTAMPDATETIME
サポートされていないデータ型の主キーを含むテーブルは、Datastream によって複製されません。
BigQuery では、
.、$、/、@、+の文字を含むテーブル名はサポートされていません。Datastream は、宛先テーブルの作成時にこのような文字をアンダースコアに置き換えます。たとえば、ソース データベースの
table.nameは BigQuery ではtable_nameになります。BigQuery のテーブル名の詳細については、テーブルの命名をご覧ください。
- BigQuery は、4 つを超えるクラスタリング列はサポートしていません。4 つを超える主キー列を持つテーブルを複製する場合、Datastream は 4 つの主キー列をクラスタリング列として使用します。
- Datastream は、PostgreSQL 無限大型などの範囲外の日時リテラルを次の値にマッピングします。
- 正の
DATEを9999-12-31の値に - 負の
DATEを0001-01-01の値に - 正の
TIMESTAMPを9999-12-31 23:59:59.999000 UTCの値に - 負の
TIMESTAMPを0001-01-01 00:00:00 UTCの値に
- 正の
- BigQuery では、
FLOATまたはREALデータ型の主キーを持つストリーミング テーブルはサポートされていません。このようなテーブルは複製されません。
BigQuery の日付型と範囲の詳細については、データ型をご覧ください。