Spark: quelle est la meilleure stratégie pour rejoindre un RDD à 2 tuples avec une clé unique RDD?

J’ai deux RDD que je veux rejoindre et ils ressemblent à ceci:

val rdd1:RDD[(T,U)] val rdd2:RDD[((T,W), V)] 

Il se trouve que les valeurs de clé de rdd1 sont uniques et que les valeurs de clé de rdd2 de rdd2 sont uniques. Je voudrais rejoindre les deux ensembles de données pour obtenir le disque suivant:

 val rdd_joined:RDD[((T,W), (U,V))] 

Quel est le moyen le plus efficace pour y parvenir? Voici quelques idées auxquelles j’ai pensé.

Option 1:

 val m = rdd1.collectAsMap val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Option 2:

 val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

L’option 1 collectera toutes les données à maîsortingser, n’est-ce pas? Donc, cela ne semble pas être une bonne option si rdd1 est grand (il est relativement grand dans mon cas, bien qu’un ordre de grandeur inférieur à rdd2). L’option 2 fait un produit moche distinct et cartésien, ce qui semble également très inefficace. Une autre possibilité qui m’a traversé l’esprit (mais que je n’ai pas encore essayée) est de faire l’option 1 et de diffuser la carte, même s’il serait préférable de diffuser de manière «intelligente» les clés de la carte clés de rdd2 .

Quelqu’un a-t-il déjà rencontré ce genre de situation? Je serais heureux d’avoir vos pensées.

Merci!

Une option consiste à effectuer une jointure de diffusion en collectant rdd1 sur le pilote et en le diffusant à tous les mappeurs; fait correctement, cela nous évitera un rdd2 coûteux du gros rdd2 RDD:

 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true) 

Le preservesPartitioning = true indique à Spark que cette fonction de carte ne modifie pas les clés de rdd2 ; Cela permettra à Spark d'éviter de re-partitionner rdd2 pour toute opération ultérieure qui se joindrait à la clé (t, w) .

Cette diffusion pourrait être inefficace car elle implique un goulot d'étranglement des communications chez le conducteur. En principe, il est possible de diffuser un RDD à un autre sans impliquer le conducteur; J'en ai un prototype que je voudrais généraliser et append à Spark.

Une autre option consiste à rdd2 les clés de rdd2 et à utiliser la méthode de join Spark; cela impliquera un mélange complet de rdd2 (et éventuellement de rdd1 ):

 rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect() 

Sur mon exemple d'entrée, ces deux méthodes produisent le même résultat:

 res1: Array[((Int, java.lang.Ssortingng), (Int, java.lang.Ssortingng))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

Une troisième option serait de restructurer rdd2 pour que t soit sa clé, puis effectuez la jointure ci-dessus.

Une autre façon de le faire est de créer un partitionneur personnalisé, puis d’utiliser les zipPartitions pour joindre vos RDD.

 import org.apache.spark.HashPartitioner class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, Ssortingng] => super.getPartition(k._1) case _ => super.getPartition(key) } } val numSplits = 8 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) val result = rdd2.zipPartitions(rdd1)( (iter2, iter1) => { val m = iter1.toMap for { ((t: Int, w), u) <- iter2 if m.contains(t) } yield ((t, w), (u, m.get(t).get)) } ).partitionBy(new HashPartitioner(numSplits)) result.glom.collect