Organisation du code Spark et meilleures pratiques

Donc, après avoir passé de nombreuses années dans un monde orienté object avec la réutilisation du code, les schémas de conception et les meilleures pratiques toujours pris en compte, je me heurte quelque peu à l’organisation du code et à la réutilisation du code dans le monde de Spark.

Si j’essaie d’écrire du code de manière réutilisable, cela entraîne presque toujours un coût de performance et je finis par le réécrire en fonction de ce qui est optimal pour mon cas d’utilisation particulier. Cette constante “écrire ce qui est optimal pour ce cas particulier d’utilisation” affecte également l’organisation du code, car diviser le code en différents objects ou modules est difficile lorsque “tout appartient vraiment ensemble” et que chaînes de transformations complexes. En fait, je pense souvent que si j’avais jeté un coup d’oeil à la plupart des codes Spark que j’écrivais maintenant alors que je travaillais dans le monde orienté object, j’aurais grisé et je l’ai rejeté comme “code spaghetti”.

J’ai surfé sur internet en essayant de trouver une sorte d’équivalent aux meilleures pratiques du monde orienté object, mais sans beaucoup de chance. Je peux trouver quelques “meilleures pratiques” pour la functional programming, mais Spark ajoute juste une couche supplémentaire, car la performance est un facteur majeur ici.

Donc, ma question est la suivante: est-ce que certains d’entre vous ont trouvé les meilleures pratiques pour écrire du code Spark que vous pouvez recommander?

MODIFIER

Comme écrit dans un commentaire, je ne m’attendais pas à ce que quelqu’un publie une réponse sur la façon de résoudre ce problème, mais j’espérais plutôt que quelqu’un de cette communauté avait rencontré un type de Martin Fowler qui avait écrit des articles sur la façon de résoudre les problèmes d’organisation du code dans le monde de Spark.

@DanielDarabos a suggéré que je puisse donner un exemple d’une situation où l’organisation du code et les performances sont contradictoires. Bien que je trouve que je rencontre souvent des problèmes avec cela dans mon travail quotidien, je trouve que c’est un peu difficile de le réduire à un bon exemple minimal;) mais je vais essayer.

Dans le monde orienté object, je suis un grand fan du principe de la responsabilité unique. Je m’assurais donc que mes méthodes n’étaient responsables que d’une seule chose. Cela les rend réutilisables et facilement testables. Donc, si je devais calculer la sum de certains nombres dans une liste (correspondant à certains critères) et que je devais calculer la moyenne du même nombre, je créerais très certainement deux méthodes – une qui calculait la sum et une qui calculé la moyenne. Comme ça:

def main(implicit args: Array[Ssortingng]): Unit = { val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)) println("Summed weights for DK = " + summedWeights(list, "DK") println("Averaged weights for DK = " + averagedWeights(list, "DK") } def summedWeights(list: List, country: Ssortingng): Double = { list.filter(_._1 == country).map(_._2).sum } def averagedWeights(list: List, country: Ssortingng): Double = { val filteredByCountry = list.filter(_._1 == country) filteredByCountry.map(_._2).sum/ filteredByCountry.length } 

Je peux bien sûr continuer à honorer SRP à Spark:

 def main(implicit args: Array[Ssortingng]): Unit = { val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight") println("Summed weights for DK = " + summedWeights(df, "DK") println("Averaged weights for DK = " + averagedWeights(df, "DK") } def avgWeights(df: DataFrame, country: Ssortingng, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country) val summedWeight = countrySpecific.agg(avg('weight)) summedWeight.first().getDouble(0) } def summedWeights(df: DataFrame, country: Ssortingng, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country) val summedWeight = countrySpecific.agg(sum('weight)) summedWeight.first().getDouble(0) } 

Mais comme mon df peut contenir des milliards de lignes, je n’aurais pas à effectuer le filter deux fois. En fait, la performance est directement liée au coût du DME, donc je ne veux VRAIMENT pas. Pour le surmonter, je décide donc de violer SRP et de mettre simplement les deux fonctions en une et de m’assurer que j’appelle persister sur le DataFrame filtré par DataFrame , comme ceci:

 def summedAndAveragedWeights(df: DataFrame, country: Ssortingng, sqlContext: SQLContext): (Double, Double) = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER) val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0) val averagedWeights = summedWeights / countrySpecific.count() (summedWeights, averagedWeights) } 

Maintenant, cet exemple est bien sûr une énorme simplification de ce qui se passe dans la vie réelle. Ici, je pourrais simplement le résoudre en filtrant et en persistant df avant de le transférer aux fonctions sum et moy (qui serait également plus SRP), mais dans la réalité, il se peut que plusieurs calculs intermédiaires soient nécessaires à plusieurs resockets. En d’autres termes, la fonction de filter est simplement une tentative de faire un exemple simple de quelque chose qui bénéficiera de la persistance. En fait, je pense que les appels à persist sont un mot-clé ici. Les appels persist accélèreront considérablement mon travail, mais le coût est que je dois coupler étroitement tout le code qui dépend du DataFrame persistant – même s’ils sont logiquement séparés.

    Je pense que vous pouvez vous abonner à Apache Spark , databricks channel sur youtube, écouter plus et en savoir plus, en particulier pour les expériences et les leçons des autres.

    • Apache Spark
    • databricks
    • Centre de technologie Spark

    voici quelques vidéos recommandées:

    • Visualisation SparkUI
    • slide Visualisation SparkUI

    • Spark in Production: Leçons de plus de 100 utilisateurs de production

    • slide Spark in Production: Leçons de plus de 100 utilisateurs de production

    • Optimisation Spark pour les administrateurs système d’entreprise

    • Optimisation Spark pour les administrateurs système d’entreprise

    • Construction, débogage et optimisation des pipelines d’apprentissage Spark Machine – Joseph Bradley (Databricks)

    • slide Construction, débogage et optimisation des pipelines Spark Machine Learning

    • Top 5 des erreurs lors de l’écriture d’applications Spark

    • slide 5 erreurs lors de l’écriture d’applications Spark

    • Réglage et débogage d’Apache Spark

    • slide Réglage et débogage d’Apache Spark

    • Une compréhension approfondie des composants internes Spark – Aaron Davidson (Databricks)

    • slide Une compréhension plus profonde des composants internes Spark – Aaron Davidson (Databricks)

    et j’ai posté et toujours le mettre à jour sur mon github et mon blog:

    • post github
    • article de blog

    J’espère que cela peut vous aider ~