Pub/Sub Proto 到 BigQuery 範本

「將 Pub/Sub 原型設計資料傳送至 BigQuery」範本是一種串流管道,可將 Pub/Sub 訂閱項目中的原型設計資料擷取至 BigQuery 資料表。寫入 BigQuery 資料表時發生的所有錯誤都會串流至未處理的 Pub/Sub 主題。

您可以提供 JavaScript 使用者定義函式 (UDF) 來轉換資料,執行 UDF 時發生的錯誤可以傳送至另一個 Pub/Sub 主題,也可以傳送至與 BigQuery 錯誤相同的未處理主題。

針對這個情境執行 Dataflow 管道之前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。

管道相關規定

  • 輸入 Pub/Sub 訂閱項目必須已存在。
  • Proto 記錄的結構定義檔案必須存在於 Cloud Storage。
  • 輸出 Pub/Sub 主題必須存在。
  • 輸出 BigQuery 資料集必須已存在。
  • 如果 BigQuery 資料表已存在,則無論 createDisposition 值為何,都必須具備與 Proto 資料相符的結構定義。

範本參數

必要參數

  • protoSchemaPath (Proto 結構定義檔案的 Cloud Storage 路徑):Cloud Storage 路徑,指向獨立的描述元集檔案。例如:gs://MyBucket/schema.pb。只要在編譯 proto 的 protoc 指令中加入 --descriptor_set_out=schema.pb,即可產生 schema.pb--include_imports 標記可用來確保檔案是獨立的。
  • fullMessageName (完整 Proto 訊息名稱):完整訊息名稱 (例如:package.name.MessageName)。如果訊息巢狀內嵌於另一則訊息中,請使用「.」分隔符號納入所有訊息 (例如:package.name.OuterMessage.InnerMessage)。「package.name」應來自 package 陳述式,而非 java_package 陳述式。
  • inputSubscription (Pub/Sub 輸入訂閱):要從中讀取輸入內容的 Pub/Sub 訂閱,格式為「projects/your-project-id/subscriptions/your-subscription-name」(範例:projects/your-project-id/subscriptions/your-subscription-name)。
  • outputTableSpec (BigQuery 輸出資料表):用於寫入輸出內容的 BigQuery 資料表位置。名稱的格式應為 <project>:<dataset>.<table_name>。資料表的結構定義必須與輸入物件相符。
  • outputTopic (輸出 Pub/Sub 主題):要發布資料的主題名稱,格式為「projects/your-project-id/topics/your-topic-name」(範例:projects/your-project-id/topics/your-topic-name)。

選用參數

  • preserveProtoFieldNames (保留 Proto 欄位名稱):用來控制是否應保留 Proto 欄位名稱,或轉換為 lowerCamelCase。如果資料表已存在,則應根據與資料表結構定義相符的內容。否則,系統會判斷所建立資料表的資料欄名稱。如要保留 Proto snake_case,請設為 True。False 會將欄位轉換為 lowerCamelCase。(預設值:false)。
  • bigQueryTableSchemaPath (BigQuery 資料表結構定義路徑):BigQuery 結構定義 JSON 檔案的 Cloud Storage 路徑。如果未設定,系統會從 Proto 結構定義推斷結構定義。(例如:gs://MyBucket/bq_schema.json)。
  • udfOutputTopic (UDF 失敗的 Pub/Sub 輸出主題):可選用的輸出主題,用於傳送 UDF 失敗訊息。如未設定這個選項,失敗訊息會寫入與 BigQuery 失敗訊息相同的主題。(例如:projects/your-project-id/topics/your-topic-name)。
  • writeDisposition (用於 BigQuery 的寫入配置):BigQuery WriteDisposition。例如 WRITE_APPEND、WRITE_EMPTY 或 WRITE_TRUNCATE。預設值為 WRITE_APPEND。
  • createDisposition (建立要用於 BigQuery 的處置):BigQuery CreateDisposition。例如 CREATE_IF_NEEDED、CREATE_NEVER。預設值為 CREATE_IF_NEEDED。
  • javascriptTextTransformGcsPath (JavaScript UDF 來源的 Cloud Storage 路徑):包含使用者定義函式的 JavaScript 程式碼的 Cloud Storage 路徑模式。(例如:gs://your-bucket/your-function.js)。
  • javascriptTextTransformFunctionName (UDF JavaScript 函式名稱):要從 JavaScript 檔案呼叫的函式名稱。只能使用英文字母、數字和底線,(例如「transform」或「transform_udf1」)。
  • javascriptTextTransformReloadIntervalMinutes (JavaScript UDF 自動重新載入間隔 (分鐘)):定義工作人員檢查 JavaScript UDF 變更以重新載入檔案的間隔。預設值為 0。
  • useStorageWriteApi (使用 BigQuery Storage Write API):如果為 true,管道會在將資料寫入 BigQuery 時使用 Storage Write API (請參閱 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api)。預設值為 false。在「正好一次」模式下使用 Storage Write API 時,您必須設定下列參數:「BigQuery Storage Write API 的串流數量」和「BigQuery Storage Write API 的觸發頻率 (以秒為單位)」。如果您啟用 Dataflow「至少一次」模式,或將 useStorageWriteApiAtLeastOnce 參數設為 true,則不需要設定串流數量或觸發頻率。
  • useStorageWriteApiAtLeastOnce (在 BigQuery Storage Write API 中使用至少一次語意):只有在啟用「使用 BigQuery Storage Write API」時,這個參數才會生效。如果啟用,系統會對 Storage Write API 使用至少一次語意,否則會使用單次語意。預設值為 false。
  • numStorageWriteApiStreams (BigQuery Storage Write API 的串流數量):串流數量會定義 BigQueryIO 的 Write 轉換並行程度,大致對應於管道使用的 Storage Write API 串流數量。如要瞭解建議值,請參閱 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api。預設值為 0。
  • storageWriteApiTriggeringFrequencySec (BigQuery Storage Write API 的觸發頻率,以秒為單位):觸發頻率會決定資料在 BigQuery 中可供查詢的時間。如需建議值,請參閱 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:Pub/Sub 訊息資料欄位, 序列化為 JSON 字串。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入不重複的工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub Proto to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    更改下列內容:

    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • SCHEMA_PATH:Proto 結構定義檔案的 Cloud Storage 路徑 (例如 gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME:Proto 訊息名稱 (例如 package.name.MessageName)
    • SUBSCRIPTION_NAME:Pub/Sub 輸入訂閱項目名稱
    • BIGQUERY_TABLE:BigQuery 輸出資料表名稱
    • UNPROCESSED_TOPIC:用於未處理佇列的 Pub/Sub 主題

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • SCHEMA_PATH:Proto 結構定義檔案的 Cloud Storage 路徑 (例如 gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME:Proto 訊息名稱 (例如 package.name.MessageName)
    • SUBSCRIPTION_NAME:Pub/Sub 輸入訂閱項目名稱
    • BIGQUERY_TABLE:BigQuery 輸出資料表名稱
    • UNPROCESSED_TOPIC:用於未處理佇列的 Pub/Sub 主題

    後續步驟