Comment fonctionne HashPartitioner?

Je lis sur la documentation de HashPartitioner . Malheureusement, rien n’a été expliqué à part les appels API. Je suppose que HashPartitioner partitionne l’ensemble dissortingbué en fonction du hachage des clés. Par exemple si mes données sont comme

 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 

Donc, partitioner mettrait cela dans différentes partitions avec les mêmes clés dans la même partition. Cependant, je ne comprends pas la signification de l’argument constructeur

 new HashPartitoner(numPartitions) //What does numPartitions do? 

Pour le jeu de données ci-dessus, en quoi les résultats seraient-ils différents si

 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 

Alors, comment HashPartitioner fonctionne-t-il?

Eh bien, permet de rendre votre jeu de données légèrement plus intéressant:

 val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) 

Nous avons six éléments:

 rdd.count 
 Long = 6 

pas de partitionneur:

 rdd.partitioner 
 Option[org.apache.spark.Partitioner] = None 

et huit partitions:

 rdd.partitions.length 
 Int = 8 

Définissons maintenant le petit assistant pour compter le nombre d'éléments par partition:

 import org.apache.spark.rdd.RDD def countByPartition(rdd: RDD[(Int, None.type)]) = { rdd.mapPartitions(iter => Iterator(iter.length)) } 

Comme nous n'avons pas de partitionneur, notre jeu de données est dissortingbué uniformément entre les partitions ( schéma de partitionnement par défaut dans Spark ):

 countByPartition(rdd).collect() 
 Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1) 

distribution initiale

Maintenant, permet de repartitionner notre dataset:

 import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1)) 

Comme le paramètre passé à HashPartitioner définit le nombre de partitions, nous attendons une partition:

 rddOneP.partitions.length 
 Int = 1 

Comme nous n'avons qu'une seule partition, elle contient tous les éléments:

 countByPartition(rddOneP).collect 
 Array[Int] = Array(6) 

hash-partitioner-1

Notez que l'ordre des valeurs après le shuffle est non déterministe.

De même si nous utilisons HashPartitioner(2)

 val rddTwoP = rdd.partitionBy(new HashPartitioner(2)) 

nous aurons 2 partitions:

 rddTwoP.partitions.length 
 Int = 2 

Puisque rdd est partitionné par clé, les données ne seront plus dissortingbuées uniformément:

 countByPartition(rddTwoP).collect() 
 Array[Int] = Array(2, 4) 

Car avec avoir trois clés et seulement deux valeurs différentes de hashCode mod numPartitions il n'y a rien d'inattendu ici:

 (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) 
 scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1)) 

Juste pour confirmer ce qui précède:

 rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect() 
 Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3)) 

hash-partitioner-2

Enfin, avec HashPartitioner(7) nous obtenons sept partitions, trois non vides avec deux éléments chacune:

 val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length 
 Int = 7 
 countByPartition(rddTenP).collect() 
 Array[Int] = Array(0, 2, 2, 2, 0, 0, 0) 

hash-partitioner-7

Résumé et notes

  • HashPartitioner prend un seul argument qui définit le nombre de partitions
  • les valeurs sont assignées aux partitions en utilisant le hash des clés. hash fonction de hash peut varier en fonction de la langue (Scala RDD peut utiliser hashCode , les DataSets utilisent MurmurHash 3, PySpark, portable_hash ).

    Dans ce cas simple, où key est un petit entier, vous pouvez supposer que hash est une identité ( i = hash(i) ).

    Scala API utilise nonNegativeMod pour déterminer la partition basée sur le hachage calculé,

  • Si la dissortingbution des clés n'est pas uniforme, vous pouvez vous retrouver dans des situations où une partie de votre cluster est inactive

  • les clés doivent être lavables. Vous pouvez vérifier ma réponse pour une liste en tant que clé pour réduireByKey de PySpark pour en savoir plus sur les problèmes spécifiques à PySpark. Un autre problème possible est mis en évidence par la documentation de HashPartitioner :

    Les tableaux Java ont des hashCodes basés sur les identités des tableaux plutôt que sur leur contenu. Tenter de partitionner un RDD [Array [ ]] ou RDD [(Array [ ], _)] à l'aide d'un HashPartitioner produira un résultat inattendu ou incorrect.

  • Dans Python 3, vous devez vous assurer que le hachage est cohérent. Voir Que fait Exception: le caractère aléatoire du hachage de chaîne doit être désactivé via la moyenne PYTHONHASHSEED dans pyspark?

  • Hash Partitioner n'est ni injectif ni surjectif. Plusieurs clés peuvent être atsortingbuées à une seule partition et certaines partitions peuvent restr vides.

  • Veuillez noter que les méthodes actuellement basées sur le hachage ne fonctionnent pas dans Scala lorsqu'elles sont combinées avec des classes de cas définies par REPL ( égalité de classe de cas dans Apache Spark ).

  • HashPartitioner (ou tout autre Partitioner ) mélange les données. À moins que le partitionnement ne soit réutilisé entre plusieurs opérations, cela ne réduit pas la quantité de données à mélanger.

RDD est dissortingbué, cela signifie qu’il est divisé en plusieurs parties. Chacune de ces partitions est potentiellement sur une machine différente. Le partitionneur de hachage avec arument numPartitions numPartitions sur quelle partition placer la paire (key, value) dans un sens:

  1. Crée exactement des partitions numPartitions .
  2. Lieux (key, value) dans la partition avec le numéro Hash(key) % numPartitions

La méthode HashPartitioner.getPartition prend une clé en argument et renvoie l’ index de la partition à laquelle appartient la clé. Le partitionneur doit savoir quels sont les index valides, de sorte qu’il renvoie les nombres dans la bonne plage. Le nombre de partitions est spécifié via l’argument du constructeur numPartitions .

L’implémentation renvoie à peu près key.hashCode() % numPartitions . Voir Partitioner.scala pour plus de détails.