O pipeline de texto do Cloud Storage para o BigQuery com UDF Python é um pipeline em lote que lê ficheiros de texto armazenados no Cloud Storage, transforma-os através de uma função definida pelo utilizador (UDF) Python e anexa o resultado a uma tabela do BigQuery.
Requisitos do pipeline
- Crie um ficheiro JSON que descreva o seu esquema do BigQuery.
Certifique-se de que existe uma matriz JSON de nível superior com o título
BigQuery Schema
e que o respetivo conteúdo segue o padrão{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
.O modelo de lote de texto do Cloud Storage para o BigQuery não suporta a importação de dados para campos de
STRUCT
(registo) na tabela do BigQuery de destino.O JSON seguinte descreve um exemplo de esquema do BigQuery:
{ "BigQuery Schema": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" }, ] }
- Crie um ficheiro Python (
.py
) com a função UDF que fornece a lógica para transformar as linhas de texto. A sua função tem de devolver uma string JSON.Por exemplo, esta função divide cada linha de um ficheiro CSV e devolve uma string JSON após transformar os valores.
import json def process(value): data = value.split(',') obj = { 'name': data[0], 'age': int(data[1]) } return json.dumps(obj)
Parâmetros de modelos
Parâmetro | Descrição |
---|---|
JSONPath |
O caminho gs:// para o ficheiro JSON que define o seu esquema do BigQuery, armazenado no
Cloud Storage. Por exemplo, gs://path/to/my/schema.json . |
pythonExternalTextTransformGcsPath |
O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py .
|
pythonExternalTextTransformFunctionName |
O nome da função definida pelo utilizador (UDF) do Python que quer usar. |
inputFilePattern |
O caminho gs:// para o texto no Cloud Storage que quer processar. Por exemplo, gs://path/to/my/text/data.txt . |
outputTable |
O nome da tabela do BigQuery que quer criar para armazenar os seus dados processados.
Se reutilizar uma tabela do BigQuery existente, os dados são anexados à tabela de destino.
Por exemplo, my-project-name:my-dataset.my-table . |
bigQueryLoadingTemporaryDirectory |
O diretório temporário para o processo de carregamento do BigQuery.
Por exemplo, gs://my-bucket/my-files/temp_dir . |
useStorageWriteApi |
Opcional:
se true , o pipeline usa a
API BigQuery Storage Write. O valor predefinido é false . Para mais informações, consulte o artigo
Usar a API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Opcional:
quando usar a API Storage Write, especifica a semântica de escrita. Para usar a
semântica de, pelo menos, uma vez, defina este parâmetro como true . Para usar a semântica exatamente uma vez,
defina o parâmetro como false . Este parâmetro aplica-se apenas quando
useStorageWriteApi é true . O valor predefinido é false .
|
Função definida pelo utilizador
Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Especificação da função
A FDU tem a seguinte especificação:
- Entrada: uma linha de texto de um ficheiro de entrada do Cloud Storage.
- Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \ --region REGION_NAME \ --parameters \ pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
PYTHON_FUNCTION
: O nome da função definida pelo utilizador (FDU) do Python que quer usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquemaPATH_TO_PYTHON_UDF_FILE
: O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o seu conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
PYTHON_FUNCTION
: O nome da função definida pelo utilizador (FDU) do Python que quer usar.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquemaPATH_TO_PYTHON_UDF_FILE
: O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo,gs://my-bucket/my-udfs/my_file.py
.PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o seu conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.