Apache Spark: carte vs mapPartitions?

Quelle est la différence entre une map RDD et la méthode mapPartitions ? Et flatMap se comporte-t-il comme une map ou comme mapPartitions ? Merci.

(edit) c’est à dire quelle est la différence (sémantiquement ou en terme d’exécution) entre

  def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, preservesPartitioning = true) } 

Et:

  def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.map(fn) } 

Quelle est la différence entre une carte RDD et la méthode mapPartitions?

La méthode map convertit chaque élément du RDD source en un seul élément du résultat RDD en appliquant une fonction. mapPartitions convertit chaque partition du RDD source en plusieurs éléments du résultat (éventuellement aucun).

Et flatMap se comporte-t-il comme une carte ou comme mapPartitions?

FlatMap ne fonctionne pas sur un seul élément (en tant que map ) et produit plusieurs éléments du résultat (comme mapPartitions ).

Lutin. POINTE :

Chaque fois que vous avez une initialisation lourde qui doit être effectuée une fois pour plusieurs éléments RDD plutôt qu’une fois par élément RDD , et si cette initialisation, telle que la création d’objects à partir d’une bibliothèque tierce, ne peut pas être sérialisée cluster aux nœuds de travail), utilisez mapPartitions() au lieu de map() . mapPartitions() permet que l’initialisation soit effectuée une fois par tâche / thread / partition au lieu d’une fois par élément de données RDD , par exemple: voir ci-dessous.

 val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator }) 

Q2. flatMap que flatMap se comporte comme une carte ou comme mapPartitions ?

Oui. s’il vous plaît voir l’exemple 2 de flatmap .. son auto explicatif.

Q1. Quelle est la différence entre une map RDD et des mapPartitions

map fonctionne la fonction utilisée à un niveau par élément alors que mapPartitions exerce la fonction au niveau de la partition.

Exemple de scénario : si nous avons 100K éléments dans une partition RDD particulière, nous déclencherons la fonction utilisée par la transformation de mappage 100K fois lorsque nous utiliserons map .

Inversement, si nous utilisons mapPartitions nous n’appellerons la fonction particulière qu’une seule fois, mais nous passerons tous les enregistrements de 100 Ko et récupérerons toutes les réponses dans un appel de fonction.

Il y aura un gain de performance car la map fonctionne sur une fonction particulière à de nombreuses resockets, en particulier si la fonction fait quelque chose de cher à chaque fois qu’elle ne le ferait pas si nous passions tous les éléments en même temps (en cas de mappartitions ).

carte

Applique une fonction de transformation sur chaque élément du RDD et renvoie le résultat sous la forme d’un nouveau RDD.

Variantes de cotation

def map [U: ClassTag] (f: T => U): RDD [U]

Exemple :

 val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(Ssortingng, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Ceci est une carte spécialisée qui est appelée une seule fois pour chaque partition. L’intégralité du contenu des partitions respectives est disponible sous forme de stream séquentiel de valeurs via l’argument d’entrée (Iterarator [T]). La fonction personnalisée doit renvoyer un autre iterator [U]. Les iterators de résultats combinés sont automatiquement convertis en un nouveau RDD. Veuillez noter que les tuples (3,4) et (6,7) sont absents du résultat suivant en raison du partitionnement choisi.

preservesPartitioning indique si la fonction d’entrée conserve le partitionneur, qui doit être false sauf s’il s’agit d’une paire RDD et que la fonction d’entrée ne modifie pas les clés.

Variantes de cotation

def mapPartitions [U: ClassTag] (f: Iterator [T] => Itérateur [U], conserveesPartitioning: Boolean = false): RDD [U]

Exemple 1

 val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Exemple 2

 val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Le programme ci-dessus peut également être écrit en utilisant flatMap comme suit.

Exemple 2 en utilisant une carte

 val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusion :

mapPartitions transformation de mapPartitions est plus rapide que map car elle appelle votre fonction une fois / partition, pas une fois / element.

Carte :

  1. Il traite une ligne à la fois, très similaire à la méthode map () de MapReduce.
  2. Vous revenez de la transformation après chaque ligne.

MapPartitions

  1. Il traite la partition complète en une seule fois.
  2. Vous ne pouvez revenir de la fonction qu’une seule fois après avoir traité la partition entière.
  3. Tous les résultats intermédiaires doivent être conservés en mémoire jusqu’à ce que vous traitiez toute la partition.
  4. Fournit la fonction setup () map () et cleanup () de MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/