Administra PCollections múltiples

Algunas transformaciones del SDK de Dataflow pueden tomar varios objetos PCollection como entrada o producir varios objetos PCollection como salida. Los SDK de Dataflow proporcionan varias formas diferentes de agrupar distintos objetos PCollection.

Para los objetos PCollection que almacenan el mismo tipo de datos, los SDK de Dataflow también proporcionan las transformaciones Flatten o Partition. Flatten combina varios objetos PCollection en una sola PCollection lógica, mientras que Partition divide una sola PCollection en un número fijo de colecciones más pequeñas.

PCollections que almacenan el mismo tipo

Java

Puedes encapsular varios objetos PCollection que almacenan el mismo tipo de datos (como PCollection<String> en Java) mediante la clase PCollectionList.

PCollectionList representa una colección de objetos PCollection que almacenan el mismo tipo de datos, como String o Integer. La transformación Flatten, por ejemplo, toma una PCollectionList y combina todos los elementos de todas las colecciones en un sola PCollection lógica.

De manera similar, la transformación Partition puede dividir una PCollection simple en una PCollectionList que contenga objetos PCollection múltiples, a partir de una función de partición (si divide la entrada en grupos de percentiles, por ejemplo).

En el siguiente código, se muestra cómo crear una PCollectionList a partir de objetos PCollection individuales que contengan elementos String:

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

Puedes acceder a los objetos PCollection individuales en una PCollectionList por medio de un índice (el índice se basa en cero), como en la siguiente muestra de código:

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

Si bien todos los objetos PCollection en una PCollectionList deben contener el mismo tipo de datos, no necesitan tener la misma codificación de datos. Una PCollectionList puede tener dos objetos PCollection<Integer> que usen dos codificadores diferentes (por ejemplo, big endian y little endian).

PCollections que almacenan tipos diferentes

Algunas transformaciones avanzadas toman múltiples entradas y producen varias salidas de tipos diferentes. Una transformación ParDo con salidas laterales, por ejemplo, puede producir varios objetos PCollection que contienen diferentes tipos de datos.

Java

El SDK de Dataflow para Java usa un sistema de tupla etiquetada a fin de representar una colección de objetos PCollection. Esas tuplas etiquetadas ayudan a mantener la seguridad del tipo. Los objetos PCollection están contenidos en una clase PCollectionTuple. Por cada PCollection de la tupla, debes crear una TupleTag asociada. Usa la TupleTag para indexar, identificar y recuperar cada PCollection en la PCollectionTuple.

Nota: Debes usar PCollectionTuple cuando representas un grupo de objetos PCollection que pueden almacenar tipos diferentes, como PCollection<String> y PCollection<Integer>. Si todos tus objetos PCollection contienen el mismo tipo de datos, considera usar PCollectionList.

En el SDK de Dataflow para Java, cada PCollectionTuple está vinculada con una TupleTag. TupleTag es una clase incluida en el SDK de Dataflow para Java con el fin específico de indexar tuplas de tipos diferentes. La combinación de clases de tuplas y objetos TupleTag proporciona un grado razonable de seguridad de los tipos cuando se usan tuplas de tipos diferentes como entradas y salidas en las transformaciones.

Cuando crees una PCollectionTuple, también deberás crear una TupleTag para cada PCollection que contenga la tupla. Cada tipo TupleTag debe coincidir con el tipo de cada PCollection en la tupla.

El tipo TupleTag habilita el seguimiento del tipo estático para cada PCollection en la tupla.

En el siguiente código de ejemplo, se muestra cómo crear una PCollectionTuple que contenga tres objetos PCollection, que tienen los valores String, Integer y Iterable<String>, respectivamente:

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

Para extraer una PCollection en particular de una tupla, usa el método PCollectionTuple.get y pasa la TupleTag que usaste para esa colección cuando creaste la tupla:

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

Si necesitas crear una tupla vacía, puedes usar el método PCollectionTuple.empty. Con este método, se crea una tupla vacía asociada con una canalización determinada:

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

Puedes usar el método PCollectionTuple.getAll para obtener un Map de todos los objetos PCollection en una tupla, junto con sus objetos TupleTag asociados:

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

Puedes usar el método PCollectionTuple.has para comprobar si una tupla contiene una PCollection asociada con una TupleTag determinada.

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Combina PCollections con Flatten

Si tu canalización tiene varios objetos PCollection que contienen el mismo tipo de datos, puedes combinarlos en una sola PCollection lógica mediante la transformación Flatten.

Aplica una transformación Flatten

Java

Flatten toma una PCollectionList con cualquier cantidad de objetos PCollection de un tipo determinado y muestra una sola PCollection que contiene todos los elementos en los objetos PCollection de esa lista.

En el siguiente código de ejemplo, se muestra cómo apply una transformación Flatten para combinar varios objetos PCollection<String> en una sola PCollection<String>. En el ejemplo, primero se crea una PCollectionList que contiene todos los objetos PCollection que se combinarán.

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

En Java 7, ten en cuenta que cuando usas el método de fábrica genérico Flatten.pCollections, debes especificar un parámetro de tipo que corresponda al tipo de elemento que conserva cada entrada de PCollection.

Codifica datos en colecciones combinadas

De forma predeterminada, el codificador para la PCollection de salida es el mismo que el codificador de la primera PCollection en la PCollectionList de entrada. Sin embargo, cada objeto PCollection de entrada puede usar codificadores diferentes, siempre que contengan el mismo tipo de datos en el lenguaje que elegiste.

Combina colecciones con ventanas

Cuando usas Flatten para combinar objetos PCollection que tienen una estrategia de sistema de ventanas aplicada, todos los objetos PCollection que quieres combinar deben usar una estrategia de sistema de ventanas compatible y un tamaño de ventana compatible. Por ejemplo, todas las colecciones que deseas combinar deben usar (de manera hipotética) ventanas fijas idénticas de 5 minutos o ventanas variables de 4 minutos que comiencen cada 30 segundos.

Si tu canalización intenta usar Flatten para combinar objetos PCollection con ventanas incompatibles, Dataflow generará un error IllegalStateException cuando se cree la canalización.

Divide una PCollection mediante la partición

Puedes usar la transformación principal Partition para dividir los elementos de una sola PCollection lógica en N particiones. Cada partición resultante es una PCollection, y esas particiones se agrupan como una lista de objetos PCollection.

Puedes usar Partition para dividir una sola PCollection en grupos lógicos, como grupos de percentiles. Esto puede resultar útil si tu canalización necesita realizar un procesamiento diferente para cada grupo de percentiles distinto.

Aplica una transformación Partition

Partition divide los elementos de una PCollection de acuerdo con una función de partición que tú proporcionas. La función de partición contiene la lógica que determina cómo dividir los elementos de la PCollection de entrada en cada PCollection de partición resultante.

Nota: La cantidad de particiones se debe determinar en el momento de la construcción del grafo. Por ejemplo, puedes pasar la cantidad de particiones como una opción de línea de comandos en el entorno de ejecución (que se usará luego para compilar el grafo de la canalización), pero no puedes determinar la cantidad de particiones en una canalización media (según datos calculados después de que se construyó el grafo de tu canalización).

Java

En el siguiente código de ejemplo, se divide una PCollection de objetos de tipo Student en grupos de percentiles:

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;
          }}));

Cuando pases la transformación Partition como un argumento a apply, deberás proporcionar un valor int con la cantidad de particiones que deseas en el resultado y un PartitionFn que represente la función de partición. En el ejemplo, definimos PartitionFn en línea.

El valor que da como resultado la operación apply es una PCollectionList que contiene cada una de las particiones resultantes como objetos PCollection individuales. Puedes extraer cada partición de la PCollectionList mediante el método get, de la siguiente manera:

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);