Fuentes y receptores personalizados

Los SDK de Dataflow proporcionan una API extensible que puedes usar para crear fuentes de datos o receptores personalizados. Deberás crear una fuente de datos o un receptor personalizado si quieres que tu canalización lea o escriba los datos de una fuente de datos o un receptor para el cual los SDK de Dataflow no proporcionan una compatibilidad nativa.

Para crear una fuente personalizada, extiende las subclases abstractas Source de los SDK de Dataflow como BoundedSource o UnboundedSource. Para crear un receptor personalizado, extiende la clase base Sink abstracta de los SDK de Dataflow. Puedes usar la API extensible para crear fuentes personalizadas que lean datos delimitados (por lotes) o no delimitados (por transmisión), y receptores que solo escriban datos delimitados.

Dataflow pretende admitir receptores que escriban datos no delimitados en una versión futura.

Requisitos básicos de código para fuentes y receptores personalizados

El servicio de Dataflow usa las clases que proporcionas para leer o escribir los datos con varias instancias de trabajador en paralelo. Por lo tanto, el código que proporcionas para las subclases Source y Sink debe cumplir con algunos requisitos básicos:

Serialización

La subclase Source o Sink, ya sea delimitada o no delimitada, debe ser Serializable. El servicio de Dataflow puede crear varias instancias de la subclase Source o Sink para que se envíen a varios trabajadores remotos y así facilitar la lectura o la escritura en paralelo.

Inmutabilidad

La subclase Source o Sink debe ser eficazmente inmutable. Todos los campos privados se deben declarar como final y todas las variables privadas del tipo de recopilación deben ser eficazmente inmutables. Si la clase contiene métodos de definición, esos métodos deben mostrar una copia individual del objeto con el campo pertinente modificado.

Solo debes usar un estado mutable en la subclase Source o Sink si usas una evaluación diferida de cálculos costosos que necesitas para implementar la fuente. En ese caso, debes declarar todas las variables de instancias mutables como transient.

Seguridad para los subprocesos

Si compilas la fuente personalizada para trabajar con la función Rebalanceo dinámico del trabajo de Dataflow, es fundamental que tu código sea seguro para los subprocesos. El SDK de Dataflow para Java proporciona una clase auxiliar para hacerlo más fácil. Consulta Cómo usar BoundedSource con la función Rebalanceo dinámico del trabajo a continuación para obtener más detalles.

Capacidad de realizar pruebas

Es fundamental realizar una prueba de unidades exhaustiva de todas las subclases Source y Sink, especialmente, si compilas las clases para trabajar con funciones avanzadas como Rebalanceo dinámico del trabajo de Dataflow. Un mínimo error de implementación puede provocar daños o la pérdida de datos (como omitir o duplicar registros) que pueden ser difíciles de detectar.

Para ayudar a realizar pruebas en las fuentes, el SDK de Dataflow proporciona la clase SourceTestUtils. SourceTestUtils contiene utilidades para verificar automáticamente algunas de las propiedades de la implementación BoundedSource. Puedes usar SourceTestUtils para aumentar la cobertura de pruebas de la implementación con una gran variedad de entradas que contienen algunas líneas de código.

Cómo crear una fuente personalizada

Si deseas crear una fuente de datos personalizada para la canalización, deberás proporcionar la lógica específica del formato que le indica al servicio de Dataflow cómo leer los datos desde la fuente de entrada y cómo dividir la fuente de datos en varias partes a fin de que varias instancias de trabajador puedan leer los datos en paralelo. Si vas a crear una fuente de datos personalizada que lea datos no delimitados, deberás proporcionar una lógica adicional para administrar la marca de agua y los controles opcionales de la fuente.

Para proporcionar la lógica de la fuente personalizada, crea las siguientes clases:

  • Crea una subclase de BoundedSource si deseas leer un conjunto de datos limitados (por lotes) o una subclase de UnboundedSource si deseas leer un conjunto de datos ilimitados (por transmisión). Estas subclases describen los datos que quieres leer, incluidos la ubicación y los parámetros de los datos (por ejemplo, la cantidad de datos que se deben leer).
  • Crea una subclase de la clase Source.Reader del SDK de Dataflow. Cada Source debe tener un Reader asociado que capture el estado involucrado en la lectura de esa Source. Esto puede incluir controladores de archivos, conexiones de RPC y otros parámetros que dependen de los requisitos específicos del formato de datos que quieres leer.

    La jerarquía de la clase Reader refleja la jerarquía de Source. Si vas a extender BoundedSource, deberás proporcionar un BoundedReader asociado. Si vas a extender UnboundedSource, deberás proporcionar un UnboundedReader asociado.

Cómo implementar la subclase Source

Deberás crear una subclase de BoundedSource o UnboundedSource, según si los datos son limitados (por lotes) o ilimitados (por transmisión). En ambos casos, la subclase Source debe anular los métodos abstractos de la superclase. Cuando usas la fuente de datos personalizada, el servicio de Dataflow utiliza estos métodos a fin de calcular el tamaño del conjunto de datos y dividirlo para leer en paralelo.

La subclase Source también debe administrar la información básica sobre la fuente de datos, como la ubicación. Por ejemplo, la implementación Source de ejemplo de la clase DatastoreIO de Dataflow toma como argumentos el host, el datasetID y la query que se usan para obtener los datos de Datastore.

BoundedSource

BoundedSource representa un conjunto de datos limitados que el servicio de Dataflow puede leer, posiblemente en paralelo. BoundedSource contiene un conjunto de métodos abstractos que el servicio usa con el objetivo de dividir el conjunto de datos para leer con varios trabajadores remotos.

Para implementar una BoundedSource, la subclase debe anular los siguientes métodos abstractos:

  • splitIntoBundles: El servicio de Dataflow usa este método para dividir los datos limitados en conjuntos de un determinado tamaño.
  • getEstimatedSizeBytes: El servicio de Dataflow usa este método para calcular el tamaño total de los datos en bytes.
  • producesSortedKeys: Es un método que le indica al servicio de Dataflow si la fuente genera pares clave-valor en orden de clasificación. Si la fuente no genera estos pares, la implementación de este método debe mostrar false.
  • createReader: Crea el BoundedReader asociado para esta BoundedSource.

Puedes ver un modelo de cómo implementar BoundedSource y los métodos abstractos requeridos en la implementación de DatastoreIO de ejemplo del SDK de Dataflow.

UnboundedSource

UnboundedSource representa una transmisión de datos infinita que el servicio de Dataflow puede leer, posiblemente en paralelo. UnboundedSource contiene un conjunto de métodos abstractos que el servicio usa a fin de admitir lecturas por transmisión en paralelo. Estos métodos incluyen controles para recuperar fallas, ID de registro para prevenir la duplicación de los datos y marcas de agua para calcular la integridad de los datos en las partes posteriores de la canalización.

Para implementar una UnboundedSource, la subclase debe anular los siguientes métodos abstractos:

  • generateInitialSplits: El servicio de Dataflow usa este método para generar una lista de objetos UnboundedSource que representan la cantidad de instancias de subtransmisión que el servicio debe leer en paralelo.
  • getCheckpointMarkCoder: El servicio de Dataflow usa este método a fin de obtener el Codificador para los puntos de control de la fuente (si existe).
  • requiresDeduping: El servicio de Dataflow usa este método para determinar si los datos requieren la eliminación explícita de los registros duplicados. Si este método muestra true, el servicio de Dataflow insertará automáticamente un paso para quitar los registros duplicados de la salida de la fuente.
  • createReader: Crea el UnboundedReader asociado para esta UnboundedSource.

Cómo implementar la subclase Reader

Deberás crear una subclase BoundedReader o UnboundedReader para que el método createReader de la subclase Source la muestre. El servicio de Dataflow usa los métodos de Reader (ya sea delimitado o no delimitado) para realizar la lectura real del conjunto de datos.

BoundedReader y UnboundedReader tienen interfaces básicas similares que deberás definir. Además, existen algunos métodos adicionales específicos de UnboundedReader que deberás implementar para trabajar con datos no delimitados y un método opcional que puedes implementar si quieres que BoundedReader aproveche la función Rebalanceo dinámico del trabajo de Dataflow. También existen algunas diferencias en la semántica de los métodos start() y advance() cuando se usa UnboundedReader.

Métodos de Reader comunes para BoundedReader y UnboundedReader

Dataflow usa los siguientes métodos para leer los datos con BoundedReader o UnboundedReader:

  • start: Inicializa Reader y avanza al primer registro que se va a leer. Se llama a este método solo una vez cuando Dataflow comienza a leer los datos, y es el lugar apropiado para ubicar las operaciones costosas que se requieren para la inicialización.
  • advance: Hace avanzar al lector al siguiente registro válido. Este método debe mostrar false si no hay más entradas disponibles. BoundedReader debe detener la lectura una vez que advance muestre falso. Sin embargo, UnboundedReader puede mostrar true en llamadas futuras cuando haya más datos disponibles en la transmisión.
  • getCurrent: Muestra el registro de datos en la posición actual, última lectura con start o advance.
  • getCurrentTimestamp: Muestra la marca de tiempo del registro de datos actual. Solo debes anular getCurrentTimestamp si la fuente lee datos que contienen marcas de tiempo intrínsecas. El servicio de Dataflow usa este valor a fin de establecer la marca de tiempo intrínseca para cada elemento de la PCollection de salida resultante.

Métodos específicos de Reader para UnboundedReader

Además de la interfaz básica de Reader, UnboundedReader tiene algunos métodos adicionales para administrar las lecturas desde una fuente de datos no delimitados:

  • getCurrentRecordId: Muestra un identificador único para el registro actual. El servicio de Dataflow usa estos ID de registro para filtrar los registros duplicados. Si los datos tienen ID lógicos en cada registro, puedes indicarle al método que los muestre; de lo contrario, puedes mostrar un hash del contenido del registro con un hash de 128 bits como mínimo. (No se recomienda usar el Object.hashCode() de Java, ya que un hash de 32 bits generalmente no es suficiente para prevenir colisiones).
  • Nota: La implementación de getCurrentRecordId es opcional si la fuente usa un esquema de control que identifica cada registro de manera específica. Sin embargo, los ID de registro todavía pueden ser útiles si los sistemas anteriores que escriben los datos en la fuente producen ocasionalmente registros duplicados que la fuente podría leer.

  • getWatermark: Muestra una marca de agua que proporciona el Reader. La marca de agua es el límite inferior aproximado de las marcas de tiempo de los elementos que se van a leer con Reader. El servicio de Dataflow usa la marca de agua como una estimación de la integridad de los datos. Las marcas de agua se usan en las funciones de Sistema de ventanas y Activador de Dataflow.
  • getCheckpointMark: El servicio de Dataflow usa este método a fin de crear un punto de control en la transmisión de datos. El punto de control representa el progreso de UnboundedReader, que se puede usar para recuperar fallas. Diferentes transmisiones de datos pueden usar diferentes métodos de control. Algunas fuentes pueden requerir que se confirmen los registros recibidos, mientras que otras pueden usar controles de posición. Deberás personalizar este método para el esquema de control más adecuado. Por ejemplo, puedes indicarle a este método que muestre los registros confirmados más recientes.
  • Nota: getCheckpointMark es opcional; no debes implementar este método si los datos no tienen puntos de control significativos. Sin embargo, si eliges no implementar los controles en la fuente, es posible que detectes una duplicación o pérdida de los datos en la canalización, según si la fuente de datos trata de volver a enviar los registros en caso de errores.

Cómo usar BoundedSource con la función Rebalanceo dinámico del trabajo

Si la fuente proporciona datos delimitados, puedes implementar el método splitAtFraction para indicarle a BoundedReader que trabaje con la función Rebalanceo dinámico del trabajo del servicio de Dataflow. Este servicio puede llamar a splitAtFraction al mismo tiempo con start o advance en un lector determinado para que los datos restantes de Source se puedan dividir y redistribuir a otros trabajadores.

Cuando implementas splitAtFraction, tu código debe producir un conjunto de divisiones mutuamente excluyentes en el que la unión de esas divisiones coincide con el conjunto de datos completo.

Clases básicas Source y Reader de conveniencia

El SDK de Dataflow contiene algunas clases básicas abstractas convenientes para ayudarte a crear las clases Source y Reader que funcionan con formatos comunes de almacenamiento de datos, como archivos.

FileBasedSource

Si la fuente de datos usa archivos, puedes derivar las clases Source y Reader desde las clases básicas abstractas FileBasedSource y FileBasedReader del SDK de Dataflow para Java. FileBasedSource es una subclase de la fuente delimitada que implementa un código que es común a las fuentes de Dataflow que interactúan con archivos, por ejemplo:

  • Expansión del patrón del archivo
  • Lectura de registro secuencial
  • Puntos de división

XmlSource

Si la fuente de datos usa archivos con formato XML, puedes derivar la clase Source a partir de la clase básica abstracta XmlSource del SDK de Dataflow para Java. XmlSource extiende FileBasedSource y proporciona métodos adicionales para analizar los archivos XML, por ejemplo, configurar los elementos XML que designan la raíz del archivo y los registros individuales del archivo.

Cómo leer desde una fuente personalizada

Para leer los datos desde una fuente personalizada en la canalización, aplica la transformación Read genérica del SDK y pasa la fuente personalizada como un parámetro mediante la operación .from:

Java

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

Cómo crear un receptor personalizado

Si deseas crear un receptor de datos personalizado para la canalización, deberás proporcionar la lógica específica del formato que le indica al servicio de Dataflow cómo escribir datos delimitados desde las PCollection de la canalización en un receptor de salida, como un directorio o sistema de archivos, una tabla de una base de datos, etc. El servicio de Dataflow escribe conjuntos de datos en paralelo mediante varios trabajadores.

Nota: Actualmente, Dataflow solo admite la escritura de datos delimitados en un receptor de salida personalizado.

Para proporcionar la lógica de escritura, crea las siguientes clases:

  • Una subclase de la clase básica abstracta Sink del SDK de Dataflow. Sink describe una ubicación o un recurso en el que la canalización puede escribir en paralelo. La subclase Sink puede contener campos como el recurso o la ubicación del archivo, el nombre de la tabla de una base de datos y demás.
  • Una subclase de Sink.WriteOperation. Sink.WriteOperation representa el estado de una operación de escritura única en paralelo en la ubicación de salida que se describe en Sink. La subclase WriteOperation debe definir los procesos de inicio y finalización de la escritura en paralelo.
  • Una subclase de Sink.Writer. Sink.Writer escribe un conjunto de elementos desde una PCollection de entrada en el receptor de datos designado.

Cómo implementar las subclases Sink

La subclase Sink describe la ubicación o el recurso en el que la canalización escribe su salida. Esto podría incluir la ubicación de un sistema de archivos, el nombre de la tabla de una base de datos o de un conjunto de datos, etc. La subclase Sink debe validar si se puede escribir en la ubicación de salida y debe crear las WriteOperation que definen cómo escribir los datos en esa ubicación de salida.

Para implementar un Sink, la subclase debe anular los siguientes métodos abstractos:

  • validate: Este método garantiza que se pueda escribir en la ubicación de salida de los datos de la canalización y que la ubicación sea válida. validate debe garantizar que el archivo se pueda abrir, el directorio de salida exista, el usuario tenga permisos de acceso a la tabla de una base de datos, etc. El servicio de Dataflow llama a validate en el momento de la creación de la canalización.
  • createWriteOperation: Este método crea un objeto Sink.WriteOperation que define cómo escribir en la ubicación de salida.

Cómo implementar la subclase WriteOperation

La subclase WriteOperation define cómo escribir un conjunto de elementos en la ubicación de salida que se define en Sink. La WriteOperation realiza el proceso de inicio y finalización necesario para lograr una escritura en paralelo.

Para implementar una WriteOperation, la subclase debe anular los siguientes métodos abstractos:

  • initialize: Este método realiza cualquier proceso de inicio necesario antes de escribir en la ubicación de salida. El servicio de Dataflow llama a este método antes de que comience la escritura. Por ejemplo, puedes usar initialize para crear un directorio de salida temporal.
  • finalize: Este método controla los resultados de una escritura que realiza la clase Writer. La implementación de finalize debe realizar una limpieza de escrituras con errores o escrituras que se volvieron a realizar correctamente, y debe poder ubicar cualquier salida temporal o parcial escrita con errores.

    Dado que se puede llamar al método finalize varias veces en caso de falla o reintento, se recomienda que la implementación de finalize sea atómica. Si esto no es posible, debes hacer que la implementación de finalize sea idempotente.
  • createWriter: Este método crea un objeto Sink.Writer que escribe un conjunto de datos en la ubicación de salida que se define en Sink.

Cómo implementar la subclase Writer

La subclase Writer implementa la lógica para escribir un conjunto de registros simple en la ubicación de salida que se define en Sink. El servicio de Dataflow puede crear varias instancias de Writer en diferentes subprocesos del mismo trabajador; por lo tanto, el acceso a cualquier miembro o método estático debe ser seguro para los subprocesos.

Para implementar un Writer, la subclase debe anular los siguientes métodos abstractos:

  • open: Este método realiza cualquier proceso de inicio para que se escriba el conjunto de registros, por ejemplo, crear un archivo temporal de escritura. El servicio de Dataflow llama a este método solo una vez al inicio de la escritura y lo pasa a un ID de conjunto único para que se escriba el conjunto de registros.
  • write: Este método escribe un registro único en la ubicación de salida. El servicio de Dataflow llama a write para cada valor del conjunto.
  • close: Este método finaliza la escritura y cierra los recursos utilizados para escribir el conjunto. close debe mostrar un resultado de escritor que la WriteOperation delimitante usará con el objetivo de identificar las escrituras correctas. El servicio de Dataflow llama a este método solo una vez al final de la escritura.

Cómo controlar los ID del conjunto

Cuando el servicio llama a Writer.open, pasará un ID de conjunto único para que se escriban los registros. El elemento Writer debe usar este ID de conjunto para asegurarse de que su salida no interfiera en la de otras instancias de Writer que pueden crearse en paralelo. Esto resulta especialmente importante dado que el servicio de Dataflow puede reintentar las escrituras varias veces en caso de producirse fallas.

Por ejemplo, si la salida de Sink se basa en archivos, la clase Writer puede usar el ID del conjunto como un sufijo de nombre de archivo para garantizar que Writer escriba sus registros en un archivo de salida único que no sea utilizado por otros Writer. Puedes indicarle al método close de Writer que muestre la ubicación de ese archivo como parte del resultado de la escritura.

Puedes ver un modelo de cómo implementar Sink, WriteOperation y Writer junto con sus métodos abstractos requeridos en la implementación de DatastoreIO de ejemplo del SDK de Dataflow.

Clases básicas Sink y Writer de conveniencia

El SDK de Dataflow contiene algunas clases básicas abstractas convenientes para ayudarte a crear las clases Source y Reader que funcionan con formatos comunes de almacenamiento de datos, como archivos.

FileBasedSink

Si la fuente de datos usa archivos, puedes derivar las clases Sink, WriteOperation y Writer desde las clases básicas abstractas FileBasedSink, FileBasedWriteOperation y FileBasedWriter del SDK de Dataflow para Java. Estas clases implementan un código que es común a las fuentes de Dataflow que interactúan con archivos, por ejemplo:

  • Configuración de encabezados y pies de página del archivo
  • Escritura de registro secuencial
  • Configuración del tipo de MIME de salida

FileBasedSink y sus subclases admiten la escritura tanto en archivos locales como en archivos de Google Cloud Storage. Para obtener más información, consulta la implementación de FileBasedSink de ejemplo llamada XmlSink del SDK de Dataflow para Java.

Cómo escribir en un receptor personalizado

Para escribir los datos en un receptor personalizado en la canalización, aplica la transformación Write genérica del SDK y pasa el receptor personalizado como un parámetro mediante la operación .to:

Java

  p.apply("WriteResults", Write.to(new MySink()));
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.