Comment convertir un fichier csv en rdd

Je suis nouveau pour faire des étincelles. Je veux effectuer certaines opérations sur des données particulières dans un enregistrement CSV.

J’essaie de lire un fichier CSV et de le convertir en RDD. Mes autres opérations sont basées sur l’en-tête fourni dans le fichier CSV.

(De commentaires) Ceci est mon code jusqu’à présent:

final JavaRDD File = sc.textFile(Filename).cache(); final JavaRDD lines = File.flatMap(new FlatMapFunction() { @Override public Iterable call(Ssortingng s) { return Arrays.asList(EOL.split(s)); } }); final Ssortingng heading=lines.first().toSsortingng(); 

Je peux obtenir les valeurs d’en-tête comme ceci. Je veux mapper ceci à chaque enregistrement dans un fichier CSV.

 final Ssortingng[] header=heading.split(" "); 

Je peux obtenir les valeurs d’en-tête comme ceci. Je veux mapper ceci à chaque enregistrement dans un fichier CSV.

En java, j’utilise CSVReader record.getColumnValue(Column header) pour obtenir la valeur particulière. Je dois faire quelque chose de similaire ici.

Une approche simpliste consisterait à conserver l’en-tête.

Disons que vous avez un fichier.csv comme:

 user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1 

Nous pouvons définir une classe d’en-tête utilisant une version analysée de la première ligne:

 class SimpleCSVHeader(header:Array[Ssortingng]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[Ssortingng], key:Ssortingng):Ssortingng = array(index(key)) } 

Que nous pouvons utiliser cet en-tête pour traiter les données plus tard:

 val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.sortingm)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ... 

Notez que l’en- header n’est pas beaucoup plus qu’une simple carte d’un mnémonique à l’index du tableau. A peu près tout cela pourrait être fait sur la place ordinale de l’élément dans le tableau, comme user = row(0)

PS: Bienvenue à Scala 🙂

Vous pouvez utiliser la bibliothèque spark-csv: https://github.com/databricks/spark-csv

Ceci est directement à partir de la documentation:

 import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap options = new HashMap(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options); 

Tout d’abord, je dois dire que c’est beaucoup plus simple si vous placez vos en-têtes dans des fichiers séparés – c’est la convention dans le Big Data.

Quoi qu’il en soit, la réponse de Daniel est plutôt bonne, mais elle présente un manque d’efficacité et un bug, donc je vais poster mon propre message. L’inefficacité est que vous n’avez pas besoin de vérifier chaque enregistrement pour voir si c’est l’en-tête, il vous suffit de vérifier le premier enregistrement pour chaque partition. Le bogue est que, en utilisant .split(",") vous pouvez obtenir une exception ou obtenir la mauvaise colonne lorsque les entrées sont la chaîne vide et se produisent au début ou à la fin de l’enregistrement – pour corriger que vous devez utiliser .split(",", -1) . Donc, voici le code complet:

 val header = scala.io.Source.fromInputStream( hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration) .open(new hadoop.fs.Path(path))) .getLines.head val columnIndex = header.split(",").indexOf(columnName) sc.textFile(path).mapPartitions(iterator => { val head = iterator.next() if (head == header) iterator else Iterator(head) ++ iterator }) .map(_.split(",", -1)(columnIndex)) 

Points finaux, considérez le Parquet si vous souhaitez seulement pêcher certaines colonnes. Ou du moins envisagez d’implémenter une fonction fractionnée évaluée par paresse si vous avez des lignes larges.

Nous pouvons utiliser le nouveau DataFrameRDD pour lire et écrire les données CSV. Il y a peu d’avantages de DataFrameRDD sur NormalRDD:

  1. DataFrameRDD est un peu plus rapide que NormalRDD, car nous déterminons le schéma et nous aidons à optimiser le fonctionnement et à générer des gains de performances significatifs.
  2. Même si la colonne se décale dans CSV, elle prendra automatiquement la bonne colonne car nous ne codons pas en dur le numéro de colonne présent dans la lecture des données en tant que textFile, puis en le divisant et en utilisant le numéro de colonne pour obtenir les données.
  3. En quelques lignes de code, vous pouvez lire directement le fichier CSV.

Vous devrez avoir cette bibliothèque: Ajouter dans build.sbt

 libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0" 

Code Spark Scala pour cela:

 val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val csvInPath = "/path/to/csv/abc.csv" val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath) //format is for specifying the type of file you are reading //header = true indicates that the first line is header in it 

Pour convertir en RDD normal en prenant certaines des colonnes et

 val rddData = df.map(x=>Row(x.getAs("colA"))) //Do other RDD operation on it 

Enregistrement du RDD au format CSV:

 val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",SsortingngType,true)))) aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp") 

Comme l’en-tête est défini sur true, nous obtiendrons le nom de l’en-tête dans tous les fichiers de sortie.

Voici un autre exemple d’utilisation de Spark / Scala pour convertir un fichier CSV en fichier RDD . Pour une description plus détaillée, voir cet article .

 def main(args: Array[Ssortingng]): Unit = { val csv = sc.textFile("/path/to/your/file.csv") // split / clean data val headerAndRows = csv.map(line => line.split(",").map(_.sortingm)) // get header val header = headerAndRows.first // filter out header (eh. just check if the first val matches the first header name) val data = headerAndRows.filter(_(0) != header(0)) // splits to map (header/value pairs) val maps = data.map(splits => header.zip(splits).toMap) // filter out the user "me" val result = maps.filter(map => map("user") != "me") // print result result.foreach(println) } 

Je recommande de lire l’en-tête directement du pilote, pas via Spark. Deux raisons à cela: 1) C’est une seule ligne. Il n’y a aucun avantage à une approche dissortingbuée. 2) Nous avons besoin de cette ligne dans le pilote, pas dans les noeuds de travail.

Ca fait plutot comme ca:

 // Ridiculous amount of code to read one line. val uri = new java.net.URI(filename) val conf = sc.hadoopConfiguration val fs = hadoop.fs.FileSystem.get(uri, conf) val path = new hadoop.fs.Path(filename) val stream = fs.open(path) val source = scala.io.Source.fromInputStream(stream) val header = source.getLines.head 

Maintenant, lorsque vous créez le RDD, vous pouvez supprimer l’en-tête.

 val csvRDD = sc.textFile(filename).filter(_ != header) 

Ensuite, nous pouvons créer un RDD à partir d’une colonne, par exemple:

 val idx = header.split(",").indexOf(columnName) val columnRDD = csvRDD.map(_.split(",")(idx)) 

Vous pouvez également utiliser la méthode mapPartitionsWithIndex pour obtenir le numéro d’index de partition et une liste de toutes les lignes de cette partition. La partition 0 et la ligne 0 seront l’en-tête

 val rows = sc.textFile(path) .mapPartitionsWithIndex({ (index: Int, rows: Iterator[Ssortingng]) => val results = new ArrayBuffer[(Ssortingng, Int)] var first = true while (rows.hasNext) { // check for first line if (index == 0 && first) { first = false rows.next // skip the first row } else { results += rows.next } } results.toIterator }, true) rows.flatMap { row => row.split(",") } 

Que dis-tu de ça?

 val Delimeter = "," val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter)) 

Pour les scala d’allumage, j’utilise généralement quand je ne peux pas utiliser les paquets spark csv …

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv") val header = rawdata.first() val tbldata = rawdata.filter(_(0) != header(0)) 

Je vous suggère d’essayer

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

 JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( new Function() { public Person call(Ssortingng line) throws Exception { Ssortingng[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].sortingm())); return person; } }); 

Vous devez avoir une classe dans cette personne exemple avec les spécifications de votre en-tête de fichier et associer vos données au schéma et appliquer des critères comme dans mysql .. pour obtenir le résultat souhaité

Je pense que vous pouvez essayer de charger ce csv dans un RDD, puis créer un dataframe à partir de ce RDD, voici le document de création de dataframe à partir de rdd: http://spark.apache.org/docs/latest/sql-programming-guide .html # interopérer-avec-rdds

À partir de Spark 2.0, CSV peut être lu directement dans un DataFrame .

Si le fichier de données n’a pas de ligne d’en-tête, alors ce serait:

 val df = spark.read.csv("file://path/to/data.csv") 

Cela va charger les données, mais donnez à chaque colonne des noms génériques comme _c0 , _c1 , etc.

S’il existe des en-têtes, l’ajout de .option("header", "true") utilisera la première ligne pour définir les colonnes dans le DataFrame :

 val df = spark.read .option("header", "true") .csv("file://path/to/data.csv") 

Pour un exemple concret, disons que vous avez un fichier avec le contenu:

 user,topic,hits om,scala,120 daniel,spark,80 3754978,spark,1 

Ensuite, le total des résultats sera groupé par sujet:

 import org.apache.spark.sql.functions._ import spark.implicits._ val rawData = spark.read .option("header", "true") .csv("file://path/to/data.csv") // specifies the query, but does not execute it val grouped = rawData.groupBy($"topic").agg(sum($"hits)) // runs the query, pulling the data to the master node // can fail if the amount of data is too much to fit // into the master node's memory! val collected = grouped.collect // runs the query, writing the result back out // in this case, changing format to Parquet since that can // be nicer to work with in Spark grouped.write.parquet("hdfs://some/output/directory/") // runs the query, writing the result back out // in this case, in CSV format with a header and // coalesced to a single file. This is easier for human // consumption but usually much slower. grouped.coalesce(1) .write .option("header", "true") .csv("hdfs://some/output/directory/")