Méthode optimale pour créer un pipeline ml dans Apache Spark pour un jeu de données avec un nombre élevé de colonnes

Je travaille avec Spark 2.1.1 sur un jeu de données avec ~ 2000 fonctionnalités et j’essaie de créer un pipeline ML de base, composé de certains transformateurs et d’un classificateur.

Supposons, pour simplifier, que le pipeline sur lequel je travaille se compose d’un VectorAssembler, d’un SsortingngIndexer et d’un classificateur, ce qui serait un cas assez courant.

// Pipeline elements val assmbleFeatures: VectorAssembler = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("featuresRaw") val labelIndexer: SsortingngIndexer = new SsortingngIndexer() .setInputCol("TARGET") .setOutputCol("indexedLabel") // Train a RandomForest model. val rf: RandomForestClassifier = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("featuresRaw") .setMaxBins(30) // add the params, unique to this classifier val paramGrid = new ParamGridBuilder() .addGrid(rf.numTrees, Array(5)) .addGrid(rf.maxDepth, Array(5)) .build() // Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages. val evaluator = new BinaryClassificationEvaluator() .setMesortingcName("areaUnderROC") .setLabelCol("indexedLabel") 

Si les étapes du pipeline sont séparées en un pipeline de transformateur (VectorAssembler + SsortingngIndexer) et un second pipeline de classificateur, et si les colonnes inutiles sont placées entre les deux pipelines, la formation aboutit. Ce moyen de réutiliser les modèles, deux PipelineModels doit être enregistré après la formation et une étape de prétraitement intermédiaire doit être introduite.

 // Split indexers and forest in two Pipelines. val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain) // Transform data and drop all columns, except those needed for training val dfTrainT = prePipeline.transform(dfTrain) val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col)) val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*) val mainPipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator() .setEstimator(mainPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel] 

La solution (beaucoup plus propre) consisterait à fusionner toutes les étapes du pipeline en un seul pipeline.

 val pipeline = new Pipeline() .setStages(Array(labelIndexer, assmbleFeatures, rf)) val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // This will fail! val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel] 

Cependant, placer tous les PipelineStages dans un seul pipeline conduit à l’exception suivante, probablement due au problème que ce PR résoudra éventuellement:

ERROR CodeGenerator: impossible de comstackr: org.codehaus.janino.JaninoRuntimeException: pool constant pour la classe org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection a dépassé la limite JVM de 0xFFFF

La raison en est que VectorAssembler double efficacement (dans cet exemple) la quantité de données dans le DataFrame, car aucun transformateur ne peut supprimer les colonnes inutiles. (Voir l’ assembleur de vecteur de pipeline d’étincelle déposer d’autres colonnes )

L’exemple fonctionne sur le jeu de données golub et les étapes de prétraitement suivantes sont nécessaires:

 import org.apache.spark.sql.types.DoubleType import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature._ import org.apache.spark.sql._ import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100) // Those steps are necessary, otherwise training would fail either way val colsToDrop = df.columns.take(5000) val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*) // Split df in train and test sets val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3)) // Feature columns are columns except "TARGET" val featureColumns = dfTrain.columns.filter(col => col != "TARGET") 

Comme je suis nouveau sur Spark, je ne suis pas sûr de ce qui serait le meilleur moyen de résoudre ce problème. Voulez-vous suggérer …

  1. créer un nouveau transformateur, qui laisse tomber les colonnes et qui peut être incorporé dans le pipeline?
  2. diviser les deux pipelines et introduire l’étape intermédiaire
  3. rien d’autre? 🙂

Ou est-ce que je manque quelque chose d’important (étapes du pipeline, PR, etc.) qui résoudrait ce problème?


Modifier:

J’ai implémenté un nouveau Transformer DroppingVectorAssembler , qui supprime les colonnes inutiles, mais la même exception est levée.

En outre, définir spark.sql.codegen.wholeStage sur false ne résout pas le problème.

L’erreur janino est due au nombre de variables constantes créées lors du processus d’optimisation. La limite maximale des variables constantes autorisées dans la machine virtuelle Java est ((2 ^ 16) -1). Si cette limite est dépassée, vous obtenez le Constant pool for class ... has grown past JVM limit of 0xFFFF

Le JIRA qui résoudra ce problème est SPARK-18016 , mais il est toujours en cours à ce stade.

Votre code est probablement défaillant au cours de l’étape VectorAssembler , lorsqu’il doit effectuer des performances sur des milliers de colonnes au cours d’une tâche d’optimisation unique.

La solution de contournement que j’ai développée pour ce problème consiste à créer un “vecteur de vecteurs” en travaillant sur des sous-ensembles de colonnes, puis en regroupant les résultats à la fin pour créer un vecteur d’entité singulier. Cela empêche toute tâche d’optimisation unique de dépasser la limite constante JVM. Ce n’est pas élégant, mais je l’ai utilisé sur des ensembles de données atteignant la gamme de 10 000 colonnes.

Cette méthode vous permet également de conserver un seul pipeline, même si des étapes supplémentaires sont nécessaires pour le faire fonctionner (création des sous-vecteurs). Après avoir créé le vecteur d’entités à partir des sous-vecteurs, vous pouvez, si vous le souhaitez, supprimer les colonnes source d’origine.

Exemple de code:

 // IMPORT DEPENDENCIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column} import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.{Pipeline, PipelineModel} // Create first example dataframe val exampleDF = spark.createDataFrame(Seq( (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5), (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8), (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6), (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5), (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3), (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4) )).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "colA", "colB", "colC", "colD", "colE", "colF", "colG", "colH", "colI", "colJ", "colK") // Create multiple column lists using the sliding method val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray // Create a vector assembler for each column list val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec") val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec") val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec") val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec") // Create a vector assembler using column list vectors as input val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features") // Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler)) // Fit and transform the data val featuresDF = pipeline.fit(exampleDF).transform(exampleDF) // Get the number of features in "features" vector val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs")) // Print number of features in "features vector" print(featureLength) 

(Note: La méthode de création des listes de colonnes doit être faite de manière programmée, mais j’ai gardé cet exemple simple pour comprendre le concept.)

L’erreur janino que vous obtenez est due au fait que, en fonction de l’ensemble de fonctionnalités, le code généré augmente.

Je séparerais les étapes en différents pipelines et supprimerais les fonctionnalités inutiles, enregistrerais les modèles intermédiaires comme SsortingngIndexer et OneHotEncoder et les chargerais pendant la phase de prédiction, ce qui est également utile car les transformations seraient plus rapides pour les données à prévoir.

Enfin, vous n’avez pas besoin de conserver les colonnes d’ VectorAssembler après avoir exécuté l’étape VectorAssembler car elles transforment les entités en une colonne de feature vector entités et d’ label et c’est tout ce dont vous avez besoin pour exécuter des prédictions.

Exemple de pipeline en Scala avec sauvegarde des étapes intermédiaires- (API à étincelle plus ancienne)

De plus, si vous utilisez une ancienne version d’étincelle comme 1.6.0, vous devez vérifier la version corrigée, c’est-à-dire 2.1.1 ou 2.2.0 ou 1.6.4, sinon vous Janino erreur Janino avec environ 400 colonnes d’ Janino .