Comment convertir l’object rdd en dataframe en spark

Comment convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] ) en un fichier Dataframe org.apache.spark.sql.DataFrame . J’ai converti un dataframe en rdd en utilisant .rdd . Après le traitement, je le souhaite de nouveau dans dataframe. Comment puis-je faire ceci ?

    SqlContext a un certain nombre de méthodes createDataFrame qui créent un DataFrame un RDD . J’imagine que l’un d’eux fonctionnera pour votre contexte.

    Par exemple:

     def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame 

    Crée un DataFrame à partir d’un RDD contenant des lignes en utilisant le schéma donné.

    En supposant que votre [ligne] RDD s’appelle rdd, vous pouvez utiliser:

     val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF() 

    Ce code fonctionne parfaitement depuis Spark 2.x avec Scala 2.11

    Importer les classes nécessaires

     import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, SsortingngType, StructField, StructType} 

    Créer SparkSession object SparkSession , voici son spark

     val spark: SparkSession = SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs 

    Faisons un RDD pour le rendre DataFrame

     val rdd = sc.parallelize( Seq( ("first", Array(2.0, 1.0, 2.1, 5.4)), ("test", Array(1.5, 0.5, 0.9, 3.7)), ("choose", Array(8.0, 2.9, 9.1, 2.5)) ) ) 

    Méthode 1

    Utilisation de SparkSession.createDataFrame(RDD obj) .

     val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

    Méthode 2

    Utilisation de SparkSession.createDataFrame(RDD obj) et spécification des noms de colonne.

     val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

    Méthode 3 (réponse réelle à la question)

    De cette façon, l’entrée rdd doit être du type RDD[Row] .

     val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) 

    créer le schéma

     val schema = new StructType() .add(StructField("id", SsortingngType, true)) .add(StructField("val1", DoubleType, true)) .add(StructField("val2", DoubleType, true)) 

    Appliquez maintenant rowsRdd et schema à createDataFrame()

     val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+ 

    Supposons que vous ayez un DataFrame et que vous souhaitiez modifier les données des champs en le convertissant en RDD[Row] .

     val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[Ssortingng]]("role").head)) 

    Pour reconvertir en DataFrame partir de RDD il faut définir le type de structure du RDD .

    Si le type de données était Long il deviendrait alors LongType dans la structure.

    If Ssortingng puis SsortingngType dans la structure.

     val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",SsortingngType,nullable = true))) 

    Vous pouvez maintenant convertir le RDD en DataFrame en utilisant la méthode createDataFrame .

     val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct) 

    Note: Cette réponse a été postée ici

    J’affiche cette réponse parce que j’aimerais partager des détails supplémentaires sur les options disponibles que je n’ai pas trouvées dans les autres réponses


    Pour créer un DataFrame à partir d’un RDD de lignes, il existe deux options principales:

    1) Comme déjà indiqué, vous pouvez utiliser toDF() qui peut être importé par import sqlContext.implicits._ . Cependant, cette approche ne fonctionne que pour les types de DDR suivants:

    • RDD[Int]
    • RDD[Long]
    • RDD[Ssortingng]
    • RDD[T <: scala.Product]

    (source: Scaladoc de l'object SQLContext.implicits )

    La dernière signature signifie en fait qu’elle peut fonctionner pour un RDD de tuples ou un RDD de classes de cas (car les tuples et les classes de cas sont des sous-classes de scala.Product ).

    Donc, pour utiliser cette approche pour un RDD[Row] , vous devez le mapper sur un RDD[T <: scala.Product] . Cela peut être fait en mappant chaque ligne à une classe de cas personnalisée ou à un tuple, comme dans les extraits de code suivants:

     val df = rdd.map({ case Row(val1: Ssortingng, ..., valN: Long) => (val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

    ou

     case class MyClass(val1: Ssortingng, ..., valN: Long = 0L) val df = rdd.map({ case Row(val1: Ssortingng, ..., valN: Long) => MyClass(val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

    Le principal inconvénient de cette approche (à mon avis) est que vous devez définir explicitement le schéma du DataFrame résultant dans la fonction map, colonne par colonne. Peut-être que cela peut être fait par programme si vous ne connaissez pas le schéma à l'avance, mais les choses peuvent devenir un peu désordonnées. Donc, alternativement, il y a une autre option:


    2) Vous pouvez utiliser createDataFrame(rowRDD: RDD[Row], schema: StructType) comme dans la réponse acceptée, qui est disponible dans l'object SQLContext . Exemple de conversion d'un RDD d'un ancien DataFrame:

     val rdd = oldDF.rdd val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema) 

    Notez qu'il n'est pas nécessaire de définir explicitement une colonne de schéma. Nous réutilisons le schéma de l'ancien StructType DF, qui est de classe StructType et peut être facilement étendu. Cependant, cette approche n'est parfois pas possible et, dans certains cas, peut être moins efficace que la première.

    Voici un exemple simple de conversion de votre liste en RDD Spark, puis conversion de ce RDD Spark en Dataframe.

    Veuillez noter que j’ai utilisé scala REPL de Spark-shell pour exécuter le code suivant, Here sc est une instance de SparkContext qui est implicitement disponible dans Spark-shell. J’espère que ça répond à votre question.

     scala> val numList = List(1,2,3,4,5) numList: List[Int] = List(1, 2, 3, 4, 5) scala> val numRDD = sc.parallelize(numList) numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at :28 scala> val numDF = numRDD.toDF numDF: org.apache.spark.sql.DataFrame = [_1: int] scala> numDF.show +---+ | _1| +---+ | 1| | 2| | 3| | 4| | 5| +---+ 

    Méthode 1: (Scala)

     val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z") 

    Méthode 2: (Scala)

     case class temp(val1: Ssortingng,val3 : Double) val rdd = sc.parallelize(Seq( Row("foo", 0.5), Row("bar", 0.0) )) val rows = rdd.map({case Row(val1:Ssortingng,val3:Double) => temp(val1,val3)}).toDF() rows.show() 

    Méthode 1: (Python)

     from pyspark.sql import Row l = [('Alice',2)] Person = Row('name','age') rdd = sc.parallelize(l) person = rdd.map(lambda r:Person(*r)) df2 = sqlContext.createDataFrame(person) df2.show() 

    Méthode 2: (Python)

     from pyspark.sql.types import * l = [('Alice',2)] rdd = sc.parallelize(l) schema = StructType([StructField ("name" , SsortingngType(), True) , StructField("age" , IntegerType(), True)]) df3 = sqlContext.createDataFrame(rdd, schema) df3.show() 

    Extraction de la valeur de l’object de ligne et application de la classe de cas pour convertir rdd en DF

     val temp1 = atsortingb1.map{case Row ( key: Int ) => s"$key" } val temp2 = atsortingb2.map{case Row ( key: Int) => s"$key" } case class RLT (id: Ssortingng, atsortingb_1 : Ssortingng, atsortingb_2 : Ssortingng) import hiveContext.implicits._ val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF 

    Sur les nouvelles versions de spark (2.0+). Cela fonctionnera également même sans sqlcontext disponible.

     import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val spark = SparkSession .builder() .getOrCreate() import spark.implicits._ val dfSchema = Seq("col1", "col2", "col3") rdd.toDF(dfSchema: _*) 
     One needs to create a schema, and attach it to the Rdd. 

    En supposant que val spark est le produit d’un SparkSession.builder …

      import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ /* Lets gin up some sample data: * As RDD's and dataframes can have columns of differing types, lets make our * sample data a three wide, two tall, rectangle of mixed types. * A column of Ssortingngs, a column of Longs, and a column of Doubules */ val arrayOfArrayOfAnys = Array.ofDim[Any](2,3) arrayOfArrayOfAnys(0)(0)="aSsortingng" arrayOfArrayOfAnys(0)(1)=0L arrayOfArrayOfAnys(0)(2)=3.14159 arrayOfArrayOfAnys(1)(0)="bSsortingng" arrayOfArrayOfAnys(1)(1)=9876543210L arrayOfArrayOfAnys(1)(2)=2.71828 /* The way to convert an anything which looks rectangular, * (Array[Array[Ssortingng]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to * throw it into sparkContext.parallelize. * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows * the parallelize definition as * def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys. * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. */ val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys) /* We'll be using the sqlContext.createDataFrame to add a schema our RDD. * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have. * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq) * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. */ val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=> Row.fromSeq(f.toSeq) ) /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe. * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as * case class StructField(name: Ssortingng, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty) * Will leave the two default values in place for each of the columns: * nullability as true, * metadata as an empty Map[Ssortingng,Any] * */ val schema = StructType( StructField("colOfSsortingngs", SsortingngType) :: StructField("colOfLongs" , LongType ) :: StructField("colOfDoubles", DoubleType) :: Nil ) val df=spark.sqlContext.createDataFrame(rddOfRows,schema) /* * +------------+----------+------------+ * |colOfSsortingngs|colOfLongs|colOfDoubles| * +------------+----------+------------+ * | aSsortingng| 0| 3.14159| * | bSsortingng|9876543210| 2.71828| * +------------+----------+------------+ */ df.show 

    Mêmes étapes, mais avec moins de déclarations val:

      val arrayOfArrayOfAnys=Array( Array("aSsortingng",0L ,3.14159), Array("bSsortingng",9876543210L,2.71828) ) val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq)) /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata: * Consider constructing the schema from an Array[StructField]. This would allow looping over * the columns, with a match statement applying the appropriate sql datatypes as the second * StructField arguments. */ val sf=new Array[StructField](3) sf(0)=StructField("colOfSsortingngs",SsortingngType) sf(1)=StructField("colOfLongs" ,LongType ) sf(2)=StructField("colOfDoubles",DoubleType) val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList)) df.show 

    Pour convertir un tableau [Row] en DataFrame ou Dataset, les éléments suivants fonctionnent de manière élégante:

    Disons que le schéma est le StructType pour la ligne, puis

     val rows: Array[Row]=... implicit val encoder = RowEncoder.apply(schema) import spark.implicits._ rows.toDS