Dériver plusieurs colonnes d’une seule colonne dans un Spark DataFrame

J’ai un DF avec une métadonnée analysable énorme comme une colonne de chaîne unique dans un Dataframe, appelons-le DFA, avec ColmnA.

Je voudrais diviser cette colonne, ColmnA en plusieurs colonnes à travers une fonction, ClassXYZ = Func1 (ColmnA). Cette fonction retourne une classe ClassXYZ, avec plusieurs variables, et chacune de ces variables doit maintenant être mappée sur une nouvelle colonne, telle que ColmnA1, ColmnA2, etc.

Comment ferais-je une telle transformation de 1 Dataframe à un autre avec ces colonnes supplémentaires en appelant ce Func1 une seule fois, sans devoir le répéter pour créer toutes les colonnes.

Il est facile à résoudre si je devais appeler cette énorme fonction à chaque fois pour append une nouvelle colonne, mais que je souhaite éviter.

Veuillez s’il vous plaît aviser avec un code de travail ou de pseudo.

Merci

Sanjay

De manière générale, ce que vous voulez n’est pas directement possible. UDF ne peut renvoyer qu’une seule colonne à la fois. Vous pouvez surmonter cette limitation de deux manières différentes:

  1. Renvoie une colonne de type complexe. La solution la plus générale est un StructType mais vous pouvez également considérer ArrayType ou MapType .

     import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: Ssortingng) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: ssortingng (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false) 

    Cela peut être facilement aplati plus tard, mais cela n’est généralement pas nécessaire.

  2. Basculer vers RDD, remodeler et reconstruire DF:

     import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: Ssortingng): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[Ssortingng]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+ 

Supposons qu’après votre fonction, il y aura une séquence d’éléments, donnant un exemple comme ci-dessous:

 val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age") df.show +------------------+---+ | infoComb|age| +------------------+---+ |Mike,1986,Toronto| 30| | Andre,1980,Ottawa| 36| | jill,1989,London| 27| +------------------+---+ 

Maintenant, ce que vous pouvez faire avec cette infoComb, c’est que vous pouvez commencer à diviser la chaîne et obtenir plus de colonnes avec:

 df.select(expr("(split(infoComb, ','))[0]").cast("ssortingng").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("ssortingng").as("city"), $"age").show +-----+----------+-------+---+ | name|yearOfBorn| city|age| +-----+----------+-------+---+ |Mike| 1986|Toronto| 30| |Andre| 1980| Ottawa| 36| | jill| 1989| London| 27| +-----+----------+-------+---+ 

J’espère que cela t’aides.

Si vos colonnes résultantes ont la même longueur que celles d’origine, vous pouvez créer de nouvelles colonnes avec la fonction withColumn et en appliquant un udf. Après cela, vous pouvez déposer votre colonne d’origine, par exemple:

  val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn"))) .withColumn("newCol2", myFun2(myDf("originalColumn")) .drop(myDf("originalColumn")) 

où myFun est un udf défini comme ceci:

  def myFun= udf( (originalColumnContent : Ssortingng) => { // do something with your original column content and return a new one } ) 

J’ai opté pour créer une fonction pour aplatir une colonne puis l’appeler simultanément avec le fichier udf.

Définissez d’abord ceci:

 implicit class DfOperations(df: DataFrame) { def flattenColumn(col: Ssortingng) = { def addColumns(df: DataFrame, cols: Array[Ssortingng]): DataFrame = { if (cols.isEmpty) df else addColumns( df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)), cols.tail ) } val field = df.select(col).schema.fields(0) val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name) addColumns(df, newCols).drop(col) } def withColumnMany(colName: Ssortingng, col: Column) = { df.withColumn(colName, col).flattenColumn(colName) } } 

Alors l’utilisation est très simple:

 case class MyClass(a: Int, b: Int) val df = sc.parallelize(Seq( (0), (1) )).toDF("x") val f = udf((x: Int) => MyClass(x*2,x*3)) df.withColumnMany("test", f($"x")).show() // +---+------+------+ // | x|test_a|test_b| // +---+------+------+ // | 0| 0| 0| // | 1| 2| 3| // +---+------+------+ 

Cela peut être facilement réalisé en utilisant la fonction pivot

 df4.groupBy("year").pivot("course").sum("earnings").collect()