Plantilla de Cloud Storage a Elasticsearch

La plantilla de Cloud Storage a Elasticsearch es una canalización por lotes que lee datos de archivos CSV almacenados en un segmento de Cloud Storage y escribe los datos en Elasticsearch como documentos JSON.

Requisitos del flujo de procesamiento

  • El segmento de Cloud Storage debe existir.
  • Debe haber un host de Elasticsearch en una instancia de Google Cloud Platform o en Elasticsearch Cloud al que pueda acceder Dataflow.
  • Debe existir una tabla de BigQuery para la salida de errores.

Esquema CSV

Si los archivos CSV contienen encabezados, define el parámetro de plantilla containsHeaders como true.

De lo contrario, crea un archivo de esquema JSON que describa los datos. Especifique el URI de Cloud Storage del archivo de esquema en el parámetro de plantilla jsonSchemaPath. En el siguiente ejemplo se muestra un esquema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

También puede proporcionar una función definida por el usuario (UDF) que analice el texto CSV y genere documentos de Elasticsearch.

Parámetros de plantilla

Parámetros obligatorios

  • deadletterTable la tabla de mensajes fallidos de BigQuery a la que se enviarán las inserciones fallidas. Por ejemplo, your-project:your-dataset.your-table-name.
  • inputFileSpec el patrón de archivo de Cloud Storage para buscar archivos CSV. Por ejemplo, gs://mybucket/test-*.csv.
  • connectionUrl la URL de Elasticsearch en el formato https://hostname:[port]. Si usas Elastic Cloud, especifica el CloudID. Por ejemplo, https://elasticsearch-host:9200.
  • apiKey la clave de API codificada en Base64 que se usará para la autenticación.
  • index: el índice de Elasticsearch al que se envían las solicitudes. Por ejemplo, my-index.

Parámetros opcionales

  • inputFormat el formato del archivo de entrada. El valor predeterminado es CSV.
  • containsHeaders los archivos CSV de entrada contienen un registro de encabezado (true/false). Solo es necesario si se leen archivos CSV. Valor predeterminado: false.
  • Delimitador: delimitador de columnas de los archivos de texto de entrada. Valor predeterminado: ,. Por ejemplo, ,.
  • csvFormat especificación del formato CSV que se va a usar para analizar los registros. El valor predeterminado es Default. Consulta https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html para obtener más información. Deben coincidir exactamente con los nombres de formato que se encuentran en https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath la ruta al esquema JSON. El valor predeterminado es null. Por ejemplo, gs://path/to/schema.
  • largeNumFiles se asigna el valor true si el número de archivos es de decenas de miles. El valor predeterminado es false.
  • csvFileEncoding formato de codificación de caracteres del archivo CSV. Los valores permitidos son US-ASCII, ISO-8859-1, UTF-8 y UTF-16. El valor predeterminado es UTF-8.
  • logDetailedCsvConversionErrors se define como true para habilitar el registro de errores detallado cuando falla el análisis de CSV. Ten en cuenta que esto puede exponer datos sensibles en los registros (por ejemplo, si el archivo CSV contiene contraseñas). Valor predeterminado: false.
  • elasticsearchUsername: nombre de usuario de Elasticsearch para autenticarte. Si se especifica, se ignora el valor de apiKey.
  • elasticsearchPassword la contraseña de Elasticsearch para autenticarse. Si se especifica, se ignora el valor de apiKey.
  • batchSize el tamaño del lote en número de documentos. El valor predeterminado es 1000.
  • batchSizeBytes tamaño del lote en número de bytes. El valor predeterminado es 5242880 (5 MB).
  • maxRetryAttempts número máximo de reintentos. Debe ser mayor que cero. El valor predeterminado es no retries.
  • maxRetryDuration la duración máxima de los reintentos en milisegundos. Debe ser mayor que cero. El valor predeterminado es no retries.
  • propertyAsIndex la propiedad del documento que se está indexando cuyo valor especifica los metadatos de _index que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una función definida por el usuario _index. El valor predeterminado es none.
  • javaScriptIndexFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript que especifica los metadatos de _index que se incluirán con el documento en las solicitudes masivas. El valor predeterminado es none.
  • javaScriptIndexFnName nombre de la función JavaScript de UDF que especifica los metadatos de _index que se incluirán con el documento en las solicitudes masivas. El valor predeterminado es none.
  • propertyAsId una propiedad del documento que se está indexando cuyo valor especifica los metadatos de _id que se incluirán con el documento en las solicitudes masivas. Tiene prioridad sobre una función definida por el usuario _id. El valor predeterminado es none.
  • javaScriptIdFnGcsPath la ruta de Cloud Storage a la fuente de la función UDF de JavaScript que especifica los metadatos de _id que se incluirán con el documento en las solicitudes masivas. El valor predeterminado es none.
  • javaScriptIdFnName nombre de la función JavaScript de UDF que especifica los metadatos _id que se incluirán con el documento en las solicitudes masivas. El valor predeterminado es none.
  • javaScriptTypeFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript de una función que especifica los metadatos de _type que se deben incluir con los documentos en las solicitudes masivas. El valor predeterminado es none.
  • javaScriptTypeFnName nombre de la función JavaScript de la UDF que especifica los metadatos de _type que se incluirán con el documento en las solicitudes masivas. El valor predeterminado es none.
  • javaScriptIsDeleteFnGcsPath la ruta de Cloud Storage al origen de la función UDF de JavaScript que determina si se debe eliminar el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de true o false. El valor predeterminado es none.
  • javaScriptIsDeleteFnName el nombre de la función JavaScript de la FDU que determina si se debe eliminar el documento en lugar de insertarlo o actualizarlo. La función devuelve un valor de cadena de true o false. El valor predeterminado es none.
  • usePartialUpdate indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar, lo que permite documentos parciales) con las solicitudes de Elasticsearch. El valor predeterminado es false.
  • bulkInsertMethod indica si se debe usar INDEX (índice, permite upserts) o CREATE (crear, errores en _id duplicados) con solicitudes masivas de Elasticsearch. El valor predeterminado es CREATE.
  • trustSelfSignedCerts indica si se debe confiar en los certificados autofirmados. Una instancia de Elasticsearch instalada puede tener un certificado autofirmado. Habilita esta opción para omitir la validación del certificado SSL. El valor predeterminado es false.
  • disableCertificateValidation si es true, confía en el certificado SSL autofirmado. Una instancia de Elasticsearch puede tener un certificado autofirmado. Para omitir la validación del certificado, asigna el valor true a este parámetro. El valor predeterminado es false.
  • apiKeyKMSEncryptionKey la clave de Cloud KMS para descifrar la clave de API. Este parámetro es obligatorio si apiKeySource tiene el valor KMS. Si se proporciona este parámetro, debe incluir una cadena apiKey cifrada. Encripta los parámetros con el endpoint de encriptación de la API KMS. Para la clave, usa el formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulta https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt. Por ejemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId el ID de secreto de Secret Manager de la apiKey. Si apiKeySource tiene el valor SECRET_MANAGER, proporcione este parámetro. Usa el formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource la fuente de la clave de API. Los valores permitidos son PLAINTEXT, KMS y SECRET_MANAGER. Este parámetro es obligatorio cuando se usa Secret Manager o KMS. Si apiKeySource se define como KMS, se deben proporcionar apiKeyKMSEncryptionKey y la clave de API cifrada. Si apiKeySource se define como SECRET_MANAGER, se debe proporcionar apiKeySecretId. Si apiKeySource se define como PLAINTEXT, se debe proporcionar apiKey. El valor predeterminado es PLAINTEXT.
  • socketTimeout si se define, sobrescribe el tiempo de espera máximo de reintento predeterminado y el tiempo de espera de socket predeterminado (30.000 ms) en Elastic RestClient.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funciones definidas por el usuario

Esta plantilla admite funciones definidas por el usuario (UDF) en varios puntos del flujo de procesamiento, que se describen a continuación. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Función de transformación de texto

Transforma los datos CSV en un documento de Elasticsearch.

Parámetros de plantilla:

  • javascriptTextTransformGcsPath: el URI de Cloud Storage del archivo JavaScript.
  • javascriptTextTransformFunctionName: el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: una sola línea de un archivo CSV de entrada.
  • Salida: un documento JSON convertido en cadena para insertarlo en Elasticsearch.

Función de índice

Devuelve el índice al que pertenece el documento.

Parámetros de plantilla:

  • javaScriptIndexFnGcsPath: el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIndexFnName: el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Salida: el valor del campo de metadatos _index del documento.

Función de ID de documento

Devuelve el ID del documento.

Parámetros de plantilla:

  • javaScriptIdFnGcsPath: el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIdFnName: el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Salida: el valor del campo de metadatos _id del documento.

Función de eliminación de documentos

Especifica si se debe eliminar un documento. Para usar esta función, define el modo de inserción masiva en INDEX y proporciona una función de ID de documento.

Parámetros de plantilla:

  • javaScriptIsDeleteFnGcsPath: el URI de Cloud Storage del archivo JavaScript.
  • javaScriptIsDeleteFnName: el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Salida: devuelve la cadena "true" para eliminar el documento o "false" para insertarlo o actualizarlo.

Función de tipo de asignación

Devuelve el tipo de asignación del documento.

Parámetros de plantilla:

  • javaScriptTypeFnGcsPath: el URI de Cloud Storage del archivo JavaScript.
  • javaScriptTypeFnName: el nombre de la función de JavaScript.

Especificación de la función:

  • Entrada: el documento de Elasticsearch, serializado como una cadena JSON.
  • Salida: el valor del campo de metadatos _type del documento.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Storage to Elasticsearch template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • INPUT_FILE_SPEC: el patrón de archivo de Cloud Storage.
  • CONNECTION_URL: la URL de Elasticsearch.
  • APIKEY: tu clave de API codificada en base64 para la autenticación.
  • INDEX: tu índice de Elasticsearch.
  • DEADLETTER_TABLE: tu tabla de BigQuery.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • INPUT_FILE_SPEC: el patrón de archivo de Cloud Storage.
  • CONNECTION_URL: la URL de Elasticsearch.
  • APIKEY: tu clave de API codificada en base64 para la autenticación.
  • INDEX: tu índice de Elasticsearch.
  • DEADLETTER_TABLE: tu tabla de BigQuery.

Siguientes pasos