La plantilla de Cloud Storage a Elasticsearch es una canalización por lotes que lee datos de archivos CSV almacenados en un bucket de Cloud Storage y los escribe en Elasticsearch como documentos de JSON.
Requisitos de la canalización
- El bucket de Cloud Storage debe existir.
- Debe existir un host de Elasticsearch en una instancia de Google Cloud o en Elasticsearch Cloud al que se pueda acceder desde Dataflow.
- Debe existir una tabla de BigQuery para el resultado del error.
Esquema CSV
Si los archivos CSV contienen encabezados, establece el parámetro de plantilla containsHeaders
en true
.
De lo contrario, crea un archivo de esquema JSON que describa los datos. Especifica el URI de Cloud Storage del archivo de esquema en el parámetro de plantilla jsonSchemaPath
. En el siguiente ejemplo, se muestra un esquema JSON:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Como alternativa, puedes proporcionar una función definida por el usuario (UDF) que analice el texto CSV y genere documentos de Elasticsearch.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFileSpec |
El patrón del archivo de Cloud Storage para buscar archivos CSV. Ejemplo: gs://mybucket/test-*.csv . |
connectionUrl |
URL de Elasticsearch en el formato https://hostname:[port] o especificar el CloudID si usas Elastic Cloud. |
apiKey |
Clave de API codificada en Base64 que se usa para la autenticación. |
index |
El índice de Elasticsearch en el que se emitirá la solicitud, como my-index . |
deadletterTable |
La tabla de mensajes no entregados de BigQuery a la que se enviarán las inserciones con errores. Ejemplo: <your-project>:<your-dataset>.<your-table-name> . |
containsHeaders |
Booleano para indicar si los encabezados se incluyen en el CSV (opcional). false (predeterminada) |
delimiter |
El delimitador que usa el CSV (opcional). Ejemplo: , |
csvFormat |
El formato CSV según el formato CSV de Apache Commons (opcional). Valor predeterminado: Default . |
jsonSchemaPath |
La ruta al esquema de JSON (opcional). Valor predeterminado: null . |
largeNumFiles |
Configurado como verdadero si el número de archivos está en las decenas de miles (opcional). Valor predeterminado: false . |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
batchSize |
El tamaño del lote en cantidad de documentos (opcional). Valor predeterminado: 1000 . |
batchSizeBytes |
El tamaño del lote en cantidad de bytes (opcional). Valor predeterminado: 5242880 (5 MB). |
maxRetryAttempts |
La cantidad máxima de reintentos debe ser mayor que 0 (opcional). Predeterminado: sin reintentos. |
maxRetryDuration |
La duración máxima del reintento en milisegundos debe ser superior a 0 (opcional). Predeterminado: sin reintentos. |
csvFileEncoding |
Codificación de archivos CSV (opcional). |
propertyAsIndex |
Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _index para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _index ). Valor predeterminado: none. |
propertyAsId |
Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _id para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _id ). Valor predeterminado: none. |
javaScriptIndexFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _index a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptIndexFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _index que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptIdFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _id a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptIdFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _id que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptTypeFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _type a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptTypeFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _type que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptIsDeleteFnGcsPath |
La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para la función que determinará si el documento se debe borrar en lugar de insertar o actualizar (opcional). La función debe mostrar el valor de string "true" o "false" . Valor predeterminado: none. |
javaScriptIsDeleteFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que determinará si el documento se debe borrar en lugar de insertarse o actualizarse. La función debe mostrar el valor de string "true" o "false" . Valor predeterminado: none. |
usePartialUpdate |
Opcional: Indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar), que permite documentos parciales con solicitudes de Elasticsearch. Valor predeterminado: false . |
bulkInsertMethod |
Opcional: Si se debe usar INDEX (índice, permite inserción) o CREATE (creación, errores en _id duplicado) con solicitudes masivas de Elasticsearch. Valor predeterminado: CREATE . |
Funciones definidas por el usuario
Esta plantilla admite funciones definidas por el usuario (UDF) en varios puntos de la canalización, que se describen a continuación. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.
Función de transformación de texto
Transforma los datos de CSV en un documento de Elasticsearch.
Parámetros de la plantilla:
javascriptTextTransformGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javascriptTextTransformFunctionName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: una sola línea de un archivo CSV de entrada.
- Resultado: Un documento JSON en string para insertar en Elasticsearch.
Función de índice
Muestra el índice al que pertenece el documento.
Parámetros de la plantilla:
javaScriptIndexFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIndexFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una string JSON.
- Resultado: El valor del campo de metadatos
_index
del documento.
Función de ID de documento
Muestra el ID del documento.
Parámetros de la plantilla:
javaScriptIdFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIdFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: El valor del campo de metadatos
_id
del documento.
Función de eliminación de documentos
Especifica si se debe borrar un documento. Para usar esta función, establece el modo de inserción masiva en INDEX
y proporciona una función de ID de documento.
Parámetros de la plantilla:
javaScriptIsDeleteFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptIsDeleteFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: Muestra la cadena
"true"
para borrar el documento o"false"
a fin de actualizar el documento.
Función de tipo de asignación
Muestra el tipo de asignación del documento.
Parámetros de la plantilla:
javaScriptTypeFnGcsPath
: Es el URI de Cloud Storage del archivo JavaScript.javaScriptTypeFnName
: Es el nombre de la función de JavaScript.
Especificación de la función:
- Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
- Resultado: El valor del campo de metadatos
_type
del documento.
Ejecuta la plantilla
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Storage to Elasticsearch template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
REGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
INPUT_FILE_SPEC
: Es el patrón de archivo de Cloud Storage.CONNECTION_URL
: Es tu URL de ElasticsearchAPIKEY
: Es tu clave de API codificada en Base64 para la autenticaciónINDEX
: Es el índice de Elasticsearch.DEADLETTER_TABLE
: Es tu tabla de BigQuery.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
LOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
INPUT_FILE_SPEC
: Es el patrón de archivo de Cloud Storage.CONNECTION_URL
: Es tu URL de ElasticsearchAPIKEY
: Es tu clave de API codificada en Base64 para la autenticaciónINDEX
: Es el índice de Elasticsearch.DEADLETTER_TABLE
: Es tu tabla de BigQuery.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.