(Pourquoi) devons-nous appeler le cache ou persister sur un RDD

Lorsqu’un dataset dissortingbué résilient (RDD) est créé à partir d’un fichier texte ou d’une collection (ou d’un autre RDD), devons-nous appeler “cache” ou “persist” explicitement pour stocker les données RDD en mémoire? Ou les données RDD sont-elles stockées de manière dissortingbuée dans la mémoire par défaut?

val textFile = sc.textFile("/user/emp.txt") 

D’après ce que j’ai compris, après l’étape ci-dessus, textFile est un RDD disponible dans tout ou partie de la mémoire du nœud.

Si oui, pourquoi devons-nous alors appeler “cache” ou “persist” sur textFile RDD?

La plupart des opérations RDD sont paresseuses. Considérez un RDD comme une description d’une série d’opérations. Un RDD n’est pas une donnée. Donc cette ligne:

 val textFile = sc.textFile("/user/emp.txt") 

Ça ne fait rien Il crée un RDD qui dit “nous devrons charger ce fichier”. Le fichier n’est pas chargé à ce stade.

Les opérations RDD nécessitant l’observation du contenu des données ne peuvent pas être paresseuses. (Ils sont appelés actions .) Un exemple est RDD.count – pour vous dire le nombre de lignes dans le fichier, le fichier doit être lu. Donc, si vous écrivez textFile.count , à ce stade, le fichier sera lu, les lignes seront comptées et le compte sera retourné.

Et si vous appelez textFile.count nouveau? La même chose: le fichier sera lu et compté à nouveau. Rien n’est stocké. Un RDD n’est pas une donnée.

Alors, que fait RDD.cache ? Si vous ajoutez textFile.cache au code ci-dessus:

 val textFile = sc.textFile("/user/emp.txt") textFile.cache 

Ça ne fait rien RDD.cache est également une opération paresseuse. Le fichier n’est toujours pas lu. Mais maintenant, le RDD dit “lisez ce fichier puis mettez le contenu en cache”. Si vous exécutez ensuite textFile.count la première fois, le fichier sera chargé, mis en cache et compté. Si vous appelez textFile.count une seconde fois, l’opération utilisera le cache. Il suffit de prendre les données du cache et de compter les lignes.

Le comportement du cache dépend de la mémoire disponible. Si le fichier ne rentre pas dans la mémoire, par exemple, textFile.count reprendra le comportement habituel et relira le fichier.

Je pense que la question serait mieux formulée comme suit:

Quand devons-nous appeler le cache ou persister sur un RDD?

Les processus d’étincelle sont paresseux, c’est-à-dire que rien ne se passera jusqu’à ce qu’il soit nécessaire. Pour répondre rapidement à la question, après que val textFile = sc.textFile("/user/emp.txt") est émis, rien ne se passe dans les données, seul un HadoopRDD est construit, en utilisant le fichier comme source.

Disons que nous transformons un peu ces données:

 val wordsRDD = textFile.flatMap(line => line.split("\\W")) 

Encore une fois, rien ne se passe avec les données. Maintenant, il y a un nouveau RDD wordsRDD qui contient une référence à testFile et une fonction à appliquer en cas de besoin.

Seulement quand une action est appelée sur un RDD, comme wordsRDD.count , la chaîne RDD, appelée lignage sera exécutée. En d’autres flatMap , les données, ventilées en partitions, seront chargées par les exécuteurs du cluster Spark, la fonction flatMap sera appliquée et le résultat sera calculé.

Sur une lignée linéaire, comme celle de cet exemple, le cache() n’est pas nécessaire. Les données seront chargées dans les exécuteurs, toutes les transformations seront appliquées et le count sera finalement calculé, tout en mémoire – si les données sont en mémoire.

cache est utile lorsque le lignage des RDD est sorti. Disons que vous voulez filtrer les mots de l’exemple précédent dans un compte pour les mots positifs et négatifs. Vous pourriez faire ça comme ça:

 val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

Ici, chaque twig émet un rechargement des données. L’ajout d’une instruction de cache explicite garantira que le traitement effectué précédemment est préservé et réutilisé. Le travail ressemblera à ceci:

 val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

Pour cette raison, le cache est dit «casser la lignée» car il crée un sharepoint contrôle qui peut être réutilisé pour un traitement ultérieur.

Règle de base: utilisez le cache lorsque le lignage de votre RDD se déroule ou lorsqu’un RDD est utilisé plusieurs fois comme dans une boucle.

Avons-nous besoin d’appeler “cache” ou “persist” pour stocker les données RDD en mémoire?

Oui, seulement si nécessaire.

Les données RDD stockées de manière dissortingbuée dans la mémoire par défaut?

Non!

Et ce sont les raisons pour lesquelles:

  • Spark prend en charge deux types de variables partagées: les variables de diffusion, qui peuvent être utilisées pour mettre en cache une valeur en mémoire sur tous les noeuds, et les accumulateurs, qui ne sont que «ajoutés», tels que les compteurs et les sums.

  • Les RDD prennent en charge deux types d’opérations: les transformations, qui créent un nouveau jeu de données à partir d’un ensemble existant, et les actions, qui renvoient une valeur au programme pilote après avoir exécuté un calcul sur le jeu de données. Par exemple, map est une transformation qui transmet chaque élément de jeu de données via une fonction et renvoie un nouveau RDD représentant les résultats. D’autre part, réduire est une action qui regroupe tous les éléments du RDD en utilisant une fonction et renvoie le résultat final au programme du pilote (bien qu’il existe également un ReduceByKey parallèle qui renvoie un dataset dissortingbué).

  • Toutes les transformations dans Spark sont paresseuses, en ce sens qu’elles ne calculent pas immédiatement leurs résultats. Au lieu de cela, ils se souviennent simplement des transformations appliquées à certains jeux de données de base (par exemple, un fichier). Les transformations ne sont calculées que lorsqu’une action nécessite que le résultat soit renvoyé au programme du pilote. Cette conception permet à Spark de fonctionner plus efficacement – par exemple, nous pouvons réaliser qu’un jeu de données créé par carte sera utilisé dans une réduction et renverra uniquement le résultat de la réduction au pilote, plutôt que le plus grand jeu de données mappé.

  • Par défaut, chaque RDD transformé peut être recalculé à chaque fois que vous exécutez une action sur celui-ci. Cependant, vous pouvez également persister un RDD en mémoire à l’aide de la méthode persist (ou cache), auquel cas Spark conservera les éléments sur le cluster pour un access beaucoup plus rapide la prochaine fois que vous l’interrogerez. Il existe également un support pour les RDD persistants sur disque ou répliqués sur plusieurs nœuds.

Pour plus de détails, veuillez consulter le guide de programmation Spark .

Ajout d’une autre raison pour append (ou append temporairement) un appel de méthode de cache .

pour les problèmes de mémoire de débogage

avec cache méthode cache , spark donnera des informations de débogage concernant la taille du RDD. Ainsi, dans l’interface utilisateur intégrée, vous obtiendrez des informations sur la consommation de mémoire RDD. et cela s’est avéré très utile pour diagnostiquer les problèmes de mémoire.

Voici les trois situations dans lesquelles vous devez mettre en cache vos DDR:

en utilisant un RDD plusieurs fois

effectuer plusieurs actions sur le même RDD

pour de longues chaînes de transformations (ou très coûteuses)