Ecrire sur plusieurs sorties par clé Spark – un travail Spark

Comment pouvez-vous écrire sur plusieurs sorties en fonction de la clé en utilisant Spark dans un seul Job.

Connexe: Écriture de plusieurs sorties par clé Haraop, un travail MapReduce

Par exemple

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption) 

assurerait que le cat prefix/1 est

 a b 

et le cat prefix/2 serait

 c 

Répondre

Pour une réponse exacte avec les importations complètes, le proxénète et le codec de compression, voir https://stackoverflow.com/a/46118044/1586965

Si vous utilisez Spark 1.4+, cela est devenu beaucoup plus facile grâce à l’ API DataFrame . (DataFrames ont été introduits dans Spark 1.3, mais partitionBy() , dont nous avons besoin, a été introduit dans 1.4 .)

Si vous commencez avec un RDD, vous devez d’abord le convertir en un DataFrame:

 val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name") 

En Python, ce même code est:

 people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"]) 

Une fois que vous avez un DataFrame, il est simple d’écrire sur plusieurs sorties en fonction d’une clé particulière. Qui plus est – et c’est la beauté de l’API DataFrame – le code est à peu près le même sur Python, Scala, Java et R:

 people_df.write.partitionBy("number").text("people") 

Et vous pouvez facilement utiliser d’autres formats de sortie si vous le souhaitez:

 people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet") 

Dans chacun de ces exemples, Spark créera un sous-répertoire pour chacune des clés sur lesquelles nous avons partitionné le DataFrame:

 people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh 

Je le ferais comme ça qui est évolutif

 import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: Ssortingng): Ssortingng = key.asInstanceOf[Ssortingng] } object Split { def main(args: Array[Ssortingng]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[Ssortingng], classOf[Ssortingng], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } 

Juste vu la réponse similaire ci-dessus, mais en fait, nous n’avons pas besoin de partitions personnalisées. MultipleTextOutputFormat créera un fichier pour chaque clé. Il est correct que plusieurs enregistrements avec les mêmes clés tombent dans la même partition.

new HashPartitioner (num), où num est le numéro de partition souhaité. Si vous avez un grand nombre de clés différentes, vous pouvez définir le nombre sur grand. Dans ce cas, chaque partition n’ouvrira pas trop de gestionnaires de fichiers hdfs.

Si vous avez potentiellement beaucoup de valeurs pour une clé donnée, je pense que la solution évolutive consiste à écrire un fichier par clé par partition. Malheureusement, il n’y a pas de support intégré pour cela dans Spark, mais nous pouvons faire quelque chose.

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toSsortingng) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To sortinggger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: Ssortingng) { private val writers = collection.mutable.Map[Ssortingng, java.io.PrintWriter]() def write(key: Ssortingng, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) } 

(Remplacez PrintWriter par le choix de l’opération de système de fichiers dissortingbué.)

Cela fait un seul passage sur le RDD et n’effectue aucune lecture aléatoire. Il vous donne un répertoire par clé, avec un certain nombre de fichiers à l’intérieur de chacun.

Cela inclut le codec comme demandé, les importations nécessaires et le proxénète comme demandé.

 import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: Ssortingng, codec: Ssortingng, keyName: Ssortingng = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Une différence subtile avec l’OP est qu’il préfixera = aux noms de répertoire. Par exemple

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Donnerait:

 prefix/key=1/part-00000 prefix/key=2/part-00000 

prefix/my_number=1/part-00000 contiendrait les lignes a et b , et prefix/my_number=2/part-00000 contiendrait la ligne c .

Et

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo") 

Donnerait:

 prefix/foo=1/part-00000 prefix/foo=2/part-00000 

Il devrait être clair comment éditer pour le parquet .

Enfin, voici un exemple pour Dataset , qui est peut-être plus intéressant que Tuples.

 implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: Ssortingng, codec: Ssortingng, field: Ssortingng): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } } 

J’ai un besoin similaire et j’ai trouvé un moyen. Mais il y a un inconvénient (ce qui n’est pas un problème pour moi): vous devez re-partitionner vos données avec une partition par fichier de sortie.

Pour partitionner de cette façon, il faut généralement savoir à l’avance combien de fichiers le travail va générer et trouver une fonction qui mappera chaque clé sur chaque partition.

Commençons par créer notre classe MultipleTextOutputFormat:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } } 

Avec cette classe, Spark recevra une clé d'une partition (la première / dernière, je suppose) et nomme le fichier avec cette clé, il n'est donc pas bon de mélanger plusieurs clés sur la même partition.

Pour votre exemple, vous aurez besoin d'un partitionneur personnalisé. Cela fera le travail:

 import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

Maintenant, mettons tout ensemble:

 val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[Ssortingng], classOf[KeyBasedOutput[Integer, Ssortingng]]) 

Cela générera 3 fichiers sous le préfixe (nommé 1, 2 et 7), traitant tout en une seule fois.

Comme vous pouvez le constater, vous devez connaître vos clés pour pouvoir utiliser cette solution.

Pour moi, c'était plus facile car j'avais besoin d'un fichier de sortie pour chaque hachage de clé et le nombre de fichiers était sous mon contrôle, donc je pouvais utiliser le stockeur HashPartitioner pour faire le tour.

saveAsText () et saveAsHadoop (…) sont implémentés sur la base des données RDD, en particulier par la méthode: PairRDD.saveAsHadoopDataset qui prend les données du PairRdd où elles sont exécutées. Je vois deux options possibles: Si vos données sont relativement petites, vous pourriez gagner du temps d’implémentation en regroupant les RDD, en créant un nouveau RDD à partir de chaque collection et en utilisant ce RDD pour écrire les données. Quelque chose comme ça:

 val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k} 

Notez que cela ne fonctionnera pas pour les jeux de données volumineux b / c, la matérialisation de l’iterator à v.toSeq pourrait ne pas tenir dans la mémoire.

L’autre option que je vois, et en fait celle que je recommande dans ce cas est la suivante: lancez votre propre appel en appelant directement le hadop / hdfs api.

Voici une discussion que j’ai entamée tout en recherchant cette question: Comment créer des RDD à partir d’un autre RDD?

J’avais besoin de la même chose en Java. Publier ma traduction de la réponse Scala de Zhang Zhan aux utilisateurs de l’API Spark Java:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected Ssortingng generateFileNameForKeyValue(A key, B value, Ssortingng name) { return key.toSsortingng(); } } public class Main { public static void main(Ssortingng[] args) { SparkConf conf = new SparkConf() .setAppName("Split Job") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); Ssortingng[] ssortingngs = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"}; sc.parallelize(Arrays.asList(ssortingngs)) // The first character of the ssortingng is the key .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s)) .saveAsHadoopFile("output/", String.class, String.class, RDDMultipleTextOutputFormat.class); sc.stop(); } } 

J’ai eu un cas d’utilisation similaire où j’ai divisé le fichier d’entrée sur Hadoop HDFS en plusieurs fichiers basés sur une clé (1 fichier par clé). Voici mon code scala pour spark

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:Ssortingng, records:Iterable[Ssortingng]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line <- records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2)) 

J’ai regroupé les enregistrements en fonction de la clé. Les valeurs de chaque clé sont écrites dans un fichier séparé.

bonne nouvelle pour l’utilisateur de python dans le cas où vous avez plusieurs colonnes et que vous voulez enregistrer toutes les autres colonnes non partitionnées au format csv, qui échoueront si vous utilisez la méthode “text” comme suggestion de Nick Chammas.

 people_df.write.partitionBy("number").text("people") 

Le message d’erreur est “AnalysisException: la source de données u’Text ne prend en charge qu’une seule colonne et vous avez 2 colonnes. ‘”

Dans spark 2.0.0 (mon environnement de test est hdp spark 2.0.0), le package “com.databricks.spark.csv” est maintenant intégré et il nous permet de sauvegarder le fichier texte partitionné par une seule colonne, voir l’exemple ci-dessous:

 people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [root@namenode people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlie 

Dans mon environnement spark 1.6.1, le code n’a généré aucune erreur, mais il n’y a qu’un seul fichier généré. il n’est pas partitionné par deux dossiers.

J’espère que cela peut aider.

J’ai eu un cas d’utilisation similaire. Je l’ai résolu en Java en écrivant deux classes personnalisées RecordWriter MultipleTextOutputFormat et RecordWriter .

Mon entrée était un JavaPairRDD> et je voulais le stocker dans un fichier nommé par sa clé, avec toutes les lignes contenues dans sa valeur.

Voici le code de mon implémentation MultipleTextOutputFormat

 class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected Ssortingng generateFileNameForKeyValue(K key, V value, Ssortingng name) { return key.toSsortingng(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected Ssortingng generateLeafFileName(Ssortingng name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected Ssortingng getInputFileBasedOutputFileName(JobConf job, Ssortingng name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, Ssortingng name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, Ssortingng name, final Progressable arg3) throws IOException { final Ssortingng myName = this.generateLeafFileName(name); return new MyRecordWriter(this, fs, job, arg3, myName); } } 

Voici le code de mon implémentation RecordWriter .

 class MyRecordWriter implements RecordWriter { private RDDMultipleTextOutputFormat rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private Ssortingng myName; TreeMap> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, Ssortingng myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { Ssortingng keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); Ssortingng finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List lines = (List) actualValue; for (Ssortingng line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } } 

La majeure partie du code est exactement la même que dans FileOutputFormat . La seule différence est ces quelques lignes

 List lines = (List) actualValue; for (Ssortingng line : lines) { rw.write(null, line); } 

Ces lignes m’ont permis d’écrire chaque ligne de ma List entrée List sur le fichier. Le premier argument de la fonction write est défini sur null afin d’éviter d’écrire la clé sur chaque ligne.

Pour finir, il suffit de faire cet appel pour écrire mes fichiers

 javaPairRDD.saveAsHadoopFile(path, Ssortingng.class, List.class, RDDMultipleTextOutputFormat.class);