Comment faire pour que saveAsTextFile ne divise pas la sortie en plusieurs fichiers?

Lorsque vous utilisez Scala dans Spark, chaque fois que je vide les résultats en utilisant saveAsTextFile , il semble que la sortie soit divisée en plusieurs parties. Je ne fais que lui passer un paramètre (path).

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 
  1. Le nombre de sorties correspond-il au nombre de réducteurs qu’il utilise?
  2. Est-ce que cela signifie que la sortie est compressée?
  3. Je sais que je peux combiner la sortie en utilisant bash, mais existe-t-il une option pour stocker la sortie dans un seul fichier texte, sans fractionnement? J’ai regardé les documents de l’API, mais cela n’en dit pas grand chose.

La raison pour laquelle il enregistre plusieurs fichiers est que le calcul est dissortingbué. Si la sortie est suffisamment petite pour que vous puissiez l’installer sur une machine, vous pouvez terminer votre programme avec

 val arr = year.collect() 

Ensuite, enregistrez le tableau résultant sous forme de fichier. Une autre méthode consisterait à utiliser un partitionneur personnalisé, partitionBy , pour que tout se passe sur une partition, même si cela n’est pas souhaitable car vous n’obtiendrez aucune parallélisation.

Si vous souhaitez que le fichier soit enregistré avec saveAsTextFile vous pouvez utiliser coalesce(1,true).saveAsTextFile() . Cela signifie essentiellement faire le calcul puis coalescer à 1 partition. Vous pouvez également utiliser repartition(1) qui n’est qu’un wrapper pour la coalesce avec l’argument shuffle défini sur true. En parcourant la source de RDD.scala, c’est comme ça que j’ai compris la plupart de ces choses, vous devriez jeter un coup d’oeil.

Vous pouvez appeler coalesce(1) puis saveAsTextFile() , mais cela peut être une mauvaise idée si vous avez beaucoup de données. Comme dans Hadoop, des fichiers séparés par split sont générés pour permettre aux mappeurs et réducteurs séparés d’écrire dans différents fichiers. Avoir un seul fichier de sortie est seulement une bonne idée si vous avez très peu de données, auquel cas vous pourriez aussi faire collecter (), comme l’a dit @aaronman.

Pour ceux qui travaillent avec un dataset plus important :

  • rdd.collect() ne doit pas être utilisé dans ce cas car il collectera toutes les données sous forme de Array dans le pilote, ce qui est le moyen le plus simple de sortir de la mémoire.

  • rdd.coalesce(1).saveAsTextFile() ne devrait pas non plus être utilisé car le parallélisme des étapes en amont sera perdu pour être exécuté sur un seul nœud, où les données seront stockées.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() est la meilleure option simple car elle permet de garder le traitement des tâches en amont en parallèle et de n’effectuer que la rdd.repartition(1).saveAsTextFile() aléatoire sur un seul noeud ( rdd.repartition(1).saveAsTextFile() est un synonyme exact).

  • rdd.saveAsSingleTextFile() comme rdd.saveAsSingleTextFile() ci-dessous permet également de stocker le rdd dans un seul fichier avec un nom spécifique tout en conservant les propriétés de rdd.coalesce(1, shuffle = true).saveAsTextFile() de rdd.coalesce(1, shuffle = true).saveAsTextFile() .

Ce qui peut être gênant avec rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") est qu’il produit effectivement un fichier dont le chemin est path/to/file.txt/part-00000 et pas path/to/file.txt .

La solution suivante rdd.saveAsSingleTextFile("path/to/file.txt") produira réellement un fichier dont le chemin est path/to/file.txt :

 package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[Ssortingng]) extends AnyVal { def saveAsSingleTextFile(path: Ssortingng): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: Ssortingng, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it's not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) hdfs.delete(new Path(s"$path.tmp"), true) } } } 

qui peut être utilisé de cette façon:

 import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt") // Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec]) 

Cet extrait stocke d’abord le rdd.saveAsTextFile("path/to/file.txt") avec rdd.saveAsTextFile("path/to/file.txt") dans un dossier temporaire path/to/file.txt.tmp comme si nous ne voulions pas stocker de données dans un fichier (ce qui permet de conserver le traitement des tâches en amont en parallèle).

Et puis seulement, en utilisant l’ api du système de fichiers hadoop , nous procédons à la fusion ( FileUtil.copyMerge() ) des différents fichiers de sortie pour créer notre fichier unique de sortie path/to/file.txt .

Comme d’autres l’ont mentionné, vous pouvez collecter ou regrouper vos données pour forcer Spark à produire un seul fichier. Mais cela limite également le nombre de tâches Spark pouvant fonctionner sur votre jeu de données en parallèle. Je préfère le laisser créer une centaine de fichiers dans le répertoire HDFS de sortie, puis utiliser hadoop fs -getmerge /hdfs/dir /local/file.txt pour extraire les résultats dans un seul fichier du système de fichiers local. Cela prend tout son sens lorsque votre résultat est un rapport relativement petit, bien sûr.

Vous pourrez le faire dans la prochaine version de Spark, dans la version 1.0.0 actuelle, mais ce n’est pas possible à moins que vous ne le fassiez manuellement, par exemple, comme vous l’avez mentionné, avec un appel de script bash.

Je tiens également à mentionner que la documentation indique clairement que les utilisateurs doivent faire preuve de prudence lorsqu’ils appellent la fusion avec un nombre réellement limité de partitions. Cela peut amener les partitions en amont à hériter de ce nombre de partitions.

Je ne recommanderais pas l’utilisation de coalesce (1) à moins que ce soit vraiment nécessaire.

Dans Spark 1.6.1, le format est le suivant. Il crée un fichier de sortie unique. Il est recommandé de l’utiliser si la sortie est suffisamment petite pour être manipulée. En gros, il renvoie un nouveau RDD réduit en partitions numPartitions. Si vous effectuez une fusion drastique, Par exemple, pour numPartitions = 1, votre calcul peut avoir lieu sur moins de nœuds que vous ne le souhaitez (par exemple, un nœud dans le cas de numPartitions = 1)

 pair_result.coalesce(1).saveAsTextFile("/app/data/") 

Vous pouvez appeler repartition() et suivre cette procédure:

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) var repartitioned = year.repartition(1) repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

entrer la description de l'image ici

Voici ma réponse pour sortir un seul fichier. Je viens d’append coalesce(1)

 val year = sc.textFile("apat63_99.txt") .map(_.split(",")(1)) .flatMap(_.split(",")) .map((_,1)) .reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 

Code:

 year.coalesce(1).saveAsTextFile("year")