Tâche non sérialisable: java.io.NotSerializableException lors de l’appel de la fonction hors fermeture uniquement sur les classes non les objects

Obtenir un comportement étrange lors de l’appel de la fonction en dehors d’une fermeture:

  • quand la fonction est dans un object tout fonctionne
  • lorsque la fonction est dans une classe, obtenez:

Tâche non sérialisable: java.io.NotSerializableException: testing

Le problème est que j’ai besoin de mon code dans une classe et non d’un object. Une idée de ce qui se passe? Un object Scala est-il sérialisé (par défaut?)?

Ceci est un exemple de code de travail:

object working extends App { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) //calling function outside closure val after = rddList.map(someFunc(_)) def someFunc(a:Int) = a+1 after.collect().map(println(_)) } 

Voici l’exemple non fonctionnel:

 object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 } 

Je ne pense pas que l’autre réponse soit tout à fait correcte. Les RDD sont en effet sérialisables , ce n’est donc pas la cause de l’échec de votre tâche.

Spark est un moteur de calcul dissortingbué et sa principale abstraction est un dataset dissortingbué résilient ( RDD ), qui peut être considéré comme une collection dissortingbuée. Fondamentalement, les éléments de RDD sont partitionnés entre les nœuds du cluster, mais Spark les éloigne de l’utilisateur, ce qui permet à l’utilisateur d’interagir avec le RDD (collection) comme s’il était local.

Ne pas entrer dans trop de détails, mais lorsque vous exécutez différentes transformations sur un RDD ( map , flatMap , filter et autres), votre code de transformation (fermeture) est le suivant:

  1. sérialisé sur le nœud du pilote,
  2. livré aux noeuds appropriés du cluster,
  3. désérialisé,
  4. et finalement exécuté sur les nœuds

Vous pouvez bien sûr l’exécuter localement (comme dans votre exemple), mais toutes ces phases (à l’exception de l’envoi par le réseau) se produisent toujours. [Cela vous permet d’attraper des bogues avant même de les déployer en production]

Ce qui se passe dans votre deuxième cas, c’est que vous appelez une méthode, définie dans le testing de classe depuis la fonction map. Spark voit cela et puisque les méthodes ne peuvent pas être sérialisées par elles-mêmes, Spark essaie de sérialiser toute la classe de testing , de sorte que le code fonctionne toujours lorsqu’il est exécuté dans une autre JVM. Vous avez deux possibilités:

Soit vous faites que le test de classe soit sérialisable, donc toute la classe peut être sérialisée par Spark:

 import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 } 

ou vous faites la fonction someFunc au lieu d’une méthode (les fonctions sont des objects dans Scala), de sorte que Spark pourra le sérialiser:

 import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1 } 

Un problème similaire, mais pas le même avec la sérialisation des classes, peut vous intéresser et vous pouvez le lire dans cette présentation de Spark Summit 2013 .

En parallèle, vous pouvez réécrire rddList.map(someFunc(_)) en rddList.map(someFunc) , elles sont exactement les mêmes. Habituellement, le second est préféré car il est moins verbeux et plus propre à lire.

EDIT (2015-03-15): SPARK-5307 introduit SerializationDebugger et Spark 1.3.0 est la première version à l’utiliser. Il ajoute un chemin de sérialisation à une exception NotSerializableException . Lorsqu’une exception NotSerializableException est rencontrée, le débogueur visite le graphe d’objects pour trouver le chemin vers l’object qui ne peut pas être sérialisé et construit des informations pour aider l’utilisateur à trouver l’object.

Dans le cas de OP, c’est ce qui est imprimé sur stdout:

 Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun$1, name: $outer, type: class testing) - object (class testing$$anonfun$1, ) 

La réponse de Grega est excellente pour expliquer pourquoi le code d’origine ne fonctionne pas et deux façons de résoudre le problème. Cependant, cette solution n’est pas très flexible. Considérez le cas où votre fermeture inclut un appel de méthode sur une classe non Serializable laquelle vous n’avez aucun contrôle. Vous ne pouvez ni append la balise Serializable à cette classe ni modifier l’implémentation sous-jacente pour transformer la méthode en fonction.

Nilesh présente une excellente solution pour cela, mais la solution peut être plus concise et générale:

 def genMapper[A, B](f: A => B): A => B = { val locker = com.twitter.chill.MeatLocker(f) x => locker.get.apply(x) } 

Ce sérialiseur de fonctions peut ensuite être utilisé pour emballer automatiquement des fermetures et des appels de méthode:

 rdd map genMapper(someFunc) 

Cette technique a également l’avantage de ne pas nécessiter les dépendances supplémentaires de Shark pour accéder à KryoSerializationWrapper , puisque le Chill de Twitter est déjà KryoSerializationWrapper par le kernel Spark.

Discussion complète expliquant le problème, qui propose un excellent moyen de changer de paradigme pour éviter ces problèmes de sérialisation: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

La réponse la plus eloquent suggère en gros d’abandonner une fonctionnalité de langue entière – qui n’utilise plus de méthodes et utilise uniquement des fonctions. En effet, dans les méthodes de functional programming, les classes doivent être évitées, mais les transformer en fonctions ne résout pas le problème de conception (voir le lien ci-dessus).

Comme solution rapide dans cette situation particulière, vous pouvez simplement utiliser l’annotation @transient pour lui dire de ne pas essayer de sérialiser la valeur incriminée (ici, Spark.ctx est une classe personnalisée et non celle de Spark suivant l’OP):

 @transient val rddList = Spark.ctx.parallelize(list) 

Vous pouvez également restructurer le code pour que rddList vive ailleurs, mais c’est également désagréable.

L’avenir est probablement les spores

À l’avenir, Scala inclura ce qu’on appelle les «spores», qui devraient nous permettre de mieux contrôler le grain, ce qui est ou n’est pas exactement entraîné par une fermeture. En outre, cela devrait transformer toutes les erreurs de tirage accidentel en types non sérialisables (ou toute autre valeur indésirable) en erreurs de compilation plutôt que maintenant, ce qui est une horrible exception à l’exécution / memory leaks.

http://docs.scala-lang.org/sips/pending/spores.html

Un conseil sur la sérialisation de Kryo

Lorsque vous utilisez kyro, faites en sorte que l’enregistrement soit nécessaire, cela signifie que vous obtenez des erreurs au lieu de memory leaks:

“Enfin, je sais que kryo a kryo.setRegistrationOptional (true) mais j’ai beaucoup de mal à essayer de l’utiliser. Lorsque cette option est activée, kryo semble toujours renvoyer des exceptions si je ne me suis pas enregistré Des classes.”

Stratégie pour enregistrer des classes avec kryo

Bien sûr, cela ne vous donne que le contrôle au niveau du type, pas le contrôle au niveau de la valeur.

… plus d’idées à venir.

J’ai résolu ce problème en utilisant une approche différente. Il vous suffit de sérialiser les objects avant de passer par la fermeture et de désérialiser par la suite. Cette approche ne fonctionne que même si vos classes ne sont pas sérialisables, car elle utilise Kryo en coulisse. Tout ce dont vous avez besoin est du curry. 😉

Voici un exemple de la façon dont je l’ai fait:

 def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } } 

N’hésitez pas à rendre Blah aussi compliqué que vous le souhaitez, classe, object compagnon, classes nestedes, références à plusieurs bibliothèques tierces.

KryoSerializationWrapper fait référence à: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Je ne suis pas tout à fait certain que cela s’applique à Scala mais, en Java, j’ai résolu le NotSerializableException en refactorisant mon code pour que la fermeture n’accède pas à un champ final non sérialisable.

J’ai fait face à un problème similaire et ce que je comprends de la réponse de Grega est

 object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 } 

votre méthode doIT essaie de sérialiser la méthode someFunc (_) , mais comme la méthode n’est pas sérialisable, elle essaie de sérialiser les tests de classe qui ne sont pas encore sérialisables.

Donc, faites fonctionner votre code, vous devez définir someFunc dans la méthode doIT . Par exemple:

 def doIT = { def someFunc(a:Int) = a+1 //function definition } val after = rddList.map(someFunc(_)) after.collect().map(println(_)) } 

Et si plusieurs fonctions apparaissent dans l’image, toutes ces fonctions doivent être disponibles dans le contexte parent.