Codage des données

Lorsque vous créez ou écrivez en sortie des données de pipeline, vous devez spécifier la manière dont les éléments de votre PCollection sont encodés et décodés vers et depuis des chaînes d'octets. Les chaînes d'octets sont utilisées pour le stockage intermédiaire, ainsi que pour la lecture des sources et l'écriture dans des récepteurs. Les SDK Dataflow utilisent des objets appelés codeurs pour décrire la façon dont les éléments d'une PCollection donnée doivent être encodés et décodés.

Utiliser des codeurs

Vous devez généralement spécifier un codeur lors de la lecture de données dans votre pipeline à partir d'une source externe (ou lors de la création de données de pipeline à partir de données locales), ainsi que lors de la sortie des données de pipeline dans un récepteur externe. Pour plus d'informations, consultez la page E/S de pipeline.

Java

Dans le SDK Dataflow pour Java, le type Coder fournit les méthodes requises pour encoder et décoder des données. Le SDK Dataflow pour Java fournit un certain nombre de sous-classes Coder avec une variété de types Java standard, tels que Integer, Long, Double, StringUtf8, BigQuery, TableRow, etc. Vous pouvez accéder à toutes les sous-classes de Coder disponibles dans le package com.google.cloud.dataflow.sdk.coders.

Lorsque vous lisez des données dans un pipeline, le codeur indique comment interpréter les données d'entrée dans un type spécifique à la langue, tel que Integer ou String. De même, le codeur indique comment les types spécifiques à la langue de votre pipeline doivent être écrits dans des chaînes d'octets pour un récepteur de données en sortie ou pour matérialiser des données intermédiaires dans votre pipeline.

Les SDK Dataflow définissent un codeur pour chaque PCollection d'un pipeline, y compris ceux générés en sortie d'une transformation. La plupart du temps, les SDK Dataflow peuvent déduire automatiquement le codeur approprié pour une PCollection en sortie. Pour plus d'informations, consultez la page Inférer un codeur.

Notez que les codeurs n'ont pas nécessairement une relation 1:1 avec les types. Par exemple, le type Integer peut avoir plusieurs codeurs valides et les données d'entrée et de sortie peuvent utiliser différents codeurs Integer. Une transformation peut contenir des données d'entrée de type Integer utilisant BigEndianIntegerCoder et des données de sortie de type Integer utilisant VarIntCoder.

Java

Vous pouvez définir explicitement un Coder lors de la saisie ou de la sortie d'une PCollection. Vous définissez le Coder en appelant la méthode .withCoder lorsque vous appliquez la transformation Read ou Write à votre pipeline.

En règle générale, vous définissez le Coder lorsque le codeur d'une PCollection ne peut pas être inféré automatiquement, ou lorsque vous voulez utiliser un codeur différent du pipeline par défaut. L'exemple de code suivant définit un ensemble de chiffres dans un fichier texte et définit un Coder de type TextualIntegerCoder pour la PCollection résultante.

  PCollection<Integer> numbers =
     p.begin()
      .apply(TextIO.Read.named("ReadNumbers")
                        .from("gs://my_bucket/path/to/numbers-*.txt")
                        .withCoder(TextualIntegerCoder.of()));

Vous pouvez définir le codeur pour une PCollection existante en utilisant la méthode PCollection.setCoder. Notez que vous ne pouvez pas appeler setCoder sur une PCollection finalisée (par exemple en appelant .apply).

Vous pouvez obtenir le codeur pour une PCollection existante en utilisant la méthode getCoder. Cette méthode échouera avec une IllegalStateException si aucun codeur défini ne peut être inféré pour la PCollection donnée.

Inférence de codeur et codeurs par défaut

Les SDK Dataflow nécessitent un codeur pour chaque PCollection de votre pipeline. La plupart du temps, cependant, vous n'avez pas besoin de spécifier explicitement un codeur. C'est le cas pour une PCollection intermédiaire produite par une transformation au milieu de votre pipeline. Dans ce cas, les SDK Dataflow peuvent inférer un codeur approprié à partir des entrées et des sorties de la transformation utilisée pour produire la PCollection.

Java

Chaque objet Pipeline possède un CoderRegistry. CoderRegistry représente une mise en correspondance de types Java avec les codeurs par défaut que le pipeline devrait utiliser pour les PCollection de chaque type.

Par défaut, le SDK Dataflow pour Java infère automatiquement le Coder des éléments d'une sortie PCollection à l'aide du paramètre type de l'objet fonction de la transformation, tel que DoFn. Dans le cas de ParDo, par exemple, un objet fonction DoFn<Integer, String> accepte un élément d'entrée de type Integer et génère un élément de sortie de type String. Dans un tel cas, le SDK Dataflow pour Java infère automatiquement la valeur par défaut Coder pour la sortie PCollection<String> (dans le pipeline par défaut CoderRegistry, il s'agit de StringUtf8Coder).

Codeurs par défaut et objet CoderRegistry

Chaque objet Pipeline possède un objet CoderRegistry, qui met en correspondance les types de langue avec le codeur par défaut que le pipeline doit utiliser pour ces types. Vous pouvez utiliser le CoderRegistry vous-même pour rechercher le codeur par défaut d'un type donné ou pour enregistrer un nouveau codeur par défaut pour un type donné.

Java

CoderRegistry contient une mise en correspondance par défaut de Coder avec les types Java standards pour tout Pipeline créé à l'aide du SDK Dataflow pour Java. Le tableau suivant montre la mise en correspondance standard :

Type Java Coder par défaut
Double DoubleCoder
Instant InstantCoder
Integer VarIntCoder
Iterable IterableCoder
KV KvCoder
List ListCoder
Map MapCoder
Long VarLongCoder
String StringUtf8Coder
TableRow TableRowJsonCoder
Void VoidCoder
byte[] ByteArrayCoder
TimestampedValue TimestampedValueCoder

Vous pouvez utiliser la méthode CoderRegistry.registerStandardCoders pour définir les mises en correspondance par défaut pour un CoderRegistry donné.

Recherche d'un codeur par défaut

Java

Vous pouvez utiliser la méthode CoderRegistry.getDefaultCoder pour déterminer la valeur par défaut de Coder pour un type Java. Vous pouvez accéder au CoderRegistry pour un Pipeline donné en utilisant la méthode Pipeline.getCoderRegistry. Cela vous permet de déterminer (ou de définir) le Coder par défaut pour un type Java par pipeline : c'est-à-dire "pour ce pipeline, vérifiez que les valeurs Integer sont codées à l'aide de BigEndianIntegerCoder".

Définir le codeur par défaut pour un type

Java

Pour définir le Coder par défaut d'un type Java pour un pipeline particulier, obtenez et modifiez le CoderRegistry du pipeline. Vous utilisez la méthode Pipeline.getCoderRegistry pour obtenir l'objet CoderRegistry, puis la méthode CoderRegistry.registerCoder pour enregistrer un nouveau Coder pour le type Java cible.

Le code exemple suivant montre comment définir un Coder par défaut, ici BigEndianIntegerCoder, en tant que valeurs Integer pour un pipeline.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  CoderRegistry cr = p.getCoderRegistry();
  cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);

Annoter un type de données personnalisé avec un codeur par défaut

Java

Si votre programme de pipeline définit un type de données personnalisé, vous pouvez utiliser l'annotation @DefaultCoder pour spécifier le codeur à utiliser avec ce type. Par exemple, supposons que vous ayez un type de données personnalisé pour lequel vous voulez utiliser SerializableCoder. Vous pouvez utiliser l'annotation @DefaultCoder comme suit :

  @DefaultCoder(AvroCoder.class)
  public class MyCustomDataType {
    ...
  }

Si vous avez créé un codeur personnalisé correspondant à votre type de données et que vous souhaitez utiliser l'annotation @DefaultCoder, votre classe de codeur doit implémenter une méthode par défaut Coder.of(Class<T>) statique.

  public class MyCustomCoder implements Coder {
    public static Coder<T> of(Class<T> clazz) {...}
    ...
  }

  @DefaultCoder(MyCustomCoder.class)
  public class MyCustomDataType {
    ...
  }