Apache Spark logging dans Scala

Je cherche une solution pour pouvoir enregistrer des données supplémentaires lors de l’exécution du code sur les nœuds Apache Spark, ce qui pourrait aider à enquêter ultérieurement sur certains problèmes susceptibles d’apparaître lors de l’exécution. Essayer d’utiliser une solution traditionnelle comme par exemple com.typesafe.scalalogging.LazyLogging échoue car l’instance de journal ne peut pas être sérialisée dans un environnement dissortingbué tel qu’Apache Spark.

J’ai étudié ce problème et pour le moment la solution que j’ai trouvée consistait à utiliser le trait org.apache.spark.Logging comme ceci:

 class SparkExample with Logging { val someRDD = ... someRDD.map { rddElement => logInfo(s"$rddElement will be processed.") doSomething(rddElement) } } 

Cependant, il semble que le trait de journalisation ne soit pas une solution permanente pour Apache Spark car il est marqué comme @DeveloperApi et la documentation de la classe mentionne:

Cela sera probablement modifié ou supprimé dans les versions futures.

Je me demande – est-ce que je peux utiliser une solution de journalisation connue et que je pourrai enregistrer les données lorsque les RDD sont exécutés sur des nœuds Apache Spark?

@Later Edit : Certains des commentaires ci-dessous suggèrent d’utiliser Log4J. J’ai essayé d’utiliser Log4J mais j’ai toujours des problèmes lors de l’utilisation de Logger d’une classe Scala (et non d’un object Scala). Voici mon code complet:

 import org.apache.log4j.Logger import org.apache.spark._ object Main { def main(args: Array[Ssortingng]) { new LoggingTestWithRDD().doTest() } } class LoggingTestWithRDD extends Serializable { val log = Logger.getLogger(getClass.getName) def doTest(): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest") val spark = new SparkContext(conf) val someRdd = spark.parallelize(List(1, 2, 3)) someRdd.map { element => log.info(s"$element will be processed") element + 1 } spark.stop() } 

}

L’exception que je vois est:

Exception dans le thread “main” org.apache.spark.SparkException: Tâche non sérialisable -> Causée par: java.io.NotSerializableException: org.apache.log4j.Logger

Vous pouvez utiliser la solution proposée par Akhil dans
https://www.mail-archive.com/[email protected]/msg29010.html . J’ai utilisé par moi-même et ça marche.

Akhil Das lun. 25 mai 2015 08:20:40 -0700
Essayez de cette façon:

 object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) } 
 val log = Logger.getLogger(getClass.getName), 

Vous pouvez utiliser “log” pour écrire des journaux. Aussi, si vous avez besoin de modifier les propriétés du journal, vous devez avoir log4j.properties dans le dossier / conf. Par défaut, nous aurons un modèle à cet endroit.

Utilisez Log4j 2.x. L’enregistreur principal a été rendu sérialisable. Problème résolu.

Discussion Jira: https://issues.apache.org/jira/browse/LOG4J2-801

“org.apache.logging.log4j”% “log4j-api”% “2.xx”

“org.apache.logging.log4j”% “log4j-core”% “2.xx”

“org.apache.logging.log4j” %% “log4j-api-scala”% “2.xx”

Voici ma solution:

J’utilise SLF4j (avec la liaison Log4j), dans ma classe de base de chaque travail d’étincelle, j’ai quelque chose comme ça:

 import org.slf4j.LoggerFactory val LOG = LoggerFactory.getLogger(getClass) 

Juste avant l’endroit où j’utilise LOG dans le code fonctionnel dissortingbué, je copie la référence de l’enregistreur à une constante locale.

 val LOG = this.LOG 

Cela a fonctionné pour moi!

Si vous avez besoin de code à exécuter avant et après une map , un filter ou une autre fonction RDD , essayez d’utiliser mapPartition , où l’iterator sous-jacent est transmis explicitement.

Exemple:

 val log = ??? // this gets captured and produced serialization error rdd.map { x => log.info(x) x+1 } 

Devient:

 rdd.mapPartition { it => val log = ??? // this is freshly initialized in worker nodes it.map { x => log.info(x) x + 1 } } 

Chaque fonction RDD base est toujours implémentée avec une mapPartition .

Veillez à manipuler le partitionneur de manière explicite et à ne pas le perdre: voir Scaladoc, paramètre preservesPartitioning , ce qui est essentiel pour les performances.

Ceci est un ancien message mais je veux fournir ma solution de travail que je viens de recevoir après avoir beaucoup lutté et peut encore être utile pour les autres:

Je veux imprimer le contenu rdd dans la fonction rdd.map mais obtenir “tâche pas erreur sérialisable”. Ceci est ma solution pour ce problème en utilisant un object statique Scala qui étend java.io.Serializable:

import org.apache.log4j.Level

object MyClass étend Serializable {

val log = org.apache.log4j.LogManager.getLogger (“nom de mon journal d’étincelles”)

log.setLevel (Level.INFO)

def main (args: Array [Ssortingng]) {

rdd.map (t =>

// Utilisation de l’enregistreur d’object ici

val log = MyClass.log

log.INFO (“count” + rdd.count))}

}