Administra PCollections múltiples

Algunas transformaciones de SDK de Dataflow pueden tomar objetos PCollection múltiples como entrada o producir objetos PCollection múltiples como salida. Los SDK de Dataflow proporcionan varias maneras diferentes de agrupar objetos PCollection múltiples.

Para objetos PCollection que almacenan los mismos tipos de datos, los SDK de Dataflow también proporcionan las transformaciones Flatten o Partition. Flatten combina objetos PCollection múltiples en una lógica simple PCollection, mientras que Partition divide una PCollection simple en una cantidad fija de colecciones más pequeñas.

PCollections que almacenan el mismo tipo

Java

Puedes encapsular objetos PCollection múltiples que almacenen los mismos tipos de datos (como PCollection<String> en Java) mediante la clase PCollectionList.

PCollectionList representa una colección de objetos PCollection que almacenan el mismo tipo de datos, como String o Integer. La transformación Flatten, por ejemplo, toma una PCollectionList y combina todos los elementos en todas las colecciones en una sola PCollection lógica.

De manera similar, la transformación Partition puede dividir una PCollection simple en una PCollectionList que contiene objetos PCollection múltiples, según una función de partición (si divide la entrada en grupos percentiles, por ejemplo).

En el ejemplo siguiente el código muestra cómo crear una PCollectionList de objetos PCollection individuales que contienen elementos String:

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

Puedes acceder a objetos PCollection individuales en una PCollectionList por índice (el índice se basa en cero), como se muestra en el código de ejemplo siguiente:

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

Todos los objetos PCollection en una PCollectionList deben contener el mismo tipo de datos, pero no necesitan tener la misma codificación de datos. Una PCollectionList puede tener dos objetos PCollection<Integer> que usen dos codificadores diferentes (por ejemplo, big endian y little endian).

PCollections que almacenan tipos diferentes

Algunas transformaciones avanzadas toman entradas múltiples y producen salidas múltiples de tipos diferentes. Una transformación ParDo con salidas laterales, por ejemplo, puede producir objetos PCollection múltiples que contienen tipos de datos diferentes.

Java

El SDK de Dataflow para Java usa un sistema de tupla etiquetado a fin de representar una colección de objetos PCollection. Esas tuplas etiquetadas ayudan a mantener la seguridad del tipo. Los objetos PCollection están contenidos en una clase PCollectionTuple. Por cada PCollection en una tupla, debes crear una TupleTag asociada. Usas la TupleTag para indexar, recuperar y luego identificar cada PCollection en la PCollectionTuple.

Nota: Debes usar PCollectionTuple cuando representas un grupo de objetos PCollection que pueden almacenar tipos diferentes, como PCollection<String> y PCollection<Integer>. Si todos tus objetos PCollection contienen el mismo tipo de datos, considera usar PCollectionList.

En el SDK de Dataflow para Java, cada PCollectionTuple está vinculada con una TupleTag. TupleTag es una clase incluida en el SDK de Dataflow para Java con el fin de indexar de manera específica tuplas de tipo heterogéneo. La combinación de clases de tuplas y objetos TupleTag proporciona un grado razonable de seguridad de tipo cuando se usan tuplas de tipo heterogéneo como entradas y salidas de tus transformaciones.

Cuando creas una PCollectionTuple, también tienes que crear una TupleTag por cada PCollection que contenga la tupla. Cada tipo de TupleTag debe coincidir con el tipo para cada PCollection en la tupla.

El tipo TupleTag habilita el seguimiento del tipo estático para cada PCollection en la tupla.

En el ejemplo siguiente, se muestra cómo crear una PCollectionTuple que contenga tres objetos PCollection que, a su vez, contienen valores String, Integer y también Iterable<String>:

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

Para extraer una PCollection en particular, usas el método PCollectionTuple.get y pasas la TupleTag que usaste para esa colección cuando creaste la tupla:

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

Si necesitas crear una tupla vacía, puedes usar el método PCollectionTuple.empty. Este método crea una tupla vacía asociada con una canalización dada:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

Puedes usar el método PCollectionTuple.getAll para obtener un Map de todos los objetos PCollection en una tupla, junto con sus objetos TupleTag asociados:

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

Puedes usar el método PCollectionTuple.has para verificar si una tupla contiene una PCollection asociada con la TupleTag dada.

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Combina PCollections con Flatten

Si tu canalización tiene objetos PCollection múltiples que contienen el mismo tipo de datos, puedes combinarlos en una sola PCollection lógica con la transformación Flatten.

Aplica una transformación Flatten

Java

Flatten toma una PCollectionList con una cantidad cualquiera de objetos PCollection de un cierto tipo y muestra una sola PCollection que contiene todos los elementos en los objetos de la PCollection en esa lista.

En el código de ejemplo siguiente, se muestra cómo apply una transformación Flatten para combinar objetos PCollection<String> múltiples en una sola PCollection<String>. En el ejemplo, primero se construye una PCollectionList que contiene todos los objetos de la PCollection para combinar.

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

En Java 7, ten en cuenta que, cuando usas el método de fábrica genérico Flatten.pCollections, tienes que especificar un parámetro de tipo que corresponda al tipo de elemento que contiene cada entrada de PCollection.

Codifica datos en colecciones combinadas

Por configuración predeterminada, el codificador para la salida PCollection es el mismo que el codificador para la primera PCollection en la entrada PCollectionList. Sin embargo, cada objeto PCollection de entrada puede usar codificadores diferentes, siempre que todos los objetos contengan el mismo tipo de datos en el lenguaje que elegiste.

Combina colecciones de sistema de ventanas

Cuando usas Flatten para combinar objetos PCollection que tienen una estrategia de sistema de ventanas aplicada, todos los objetos PCollection que quieres combinar deben usar una estrategia de sistema de ventanas compatible y un tamaño de ventanas compatible. Por ejemplo, todas las colecciones que estás combinando deben usar (de manera hipotética) ventanas fijas idénticas de 5 minutos o ventanas deslizantes de 4 minutos que comiencen cada 30 segundos.

Si tu canalización intenta usar Flatten para combinar objetos de PCollection con ventanas incompatibles, Dataflow generará un error IllegalStateException cuando se construya tu canalización.

Divide una PCollection con Partition

Puedes usar la transformación del núcleo de Partition para dividir los elementos de una sola PCollection lógica en N particiones. Cada partición es una PCollection y esas particiones están agrupadas como una lista de objetos PCollection.

Puedes usar Partition para dividir una sola PCollection en grupos lógicos, como grupos percentiles. Esto puede ser útil si tu canalización necesita realizar un procesamiento diferente por cada grupo percentil diferente.

Aplica una transformación de partición

Partition divide los elementos de una PCollection de acuerdo con una función de partición que tú proporciones. La función de partición contiene la lógica que determina cómo dividir los elementos de la entrada de PCollection en cada partición resultante PCollection.

Nota: La cantidad de particiones se debe determinar en el momento de la construcción del grafo. Por ejemplo, puedes pasar la cantidad de particiones como una opción de línea de comandos en el entorno de ejecución (que se usará luego para compilar el grafo de la canalización), pero no puedes determinar la cantidad de particiones en una canalización media (según datos calculados después de que se construyó el grafo de tu canalización).

Java

En el código de ejemplo siguiente, se divide una PCollection de objetos de tipo Student en grupos percentiles:

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;
          }}));

Cuando pasas la transformación Partition como un argumento para apply, tienes que proporcionar un valor int con la cantidad de particiones resultantes que quieres y una PartitionFn que represente la función de partición. En el ejemplo, definimos la PartitionFn en línea.

El valor que se muestra de apply es una PCollectionList que contiene cada una de las particiones resultantes como objetos PCollection individuales. Puedes extraer cada partición de la PCollectionList con el método get, como se muestra a continuación:

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.