Die Spark-Dokumentation zeigt, wie ein DataFrame aus einer RDD erstellt wird, wobei Fallklassen von Scala zum Ableiten eines Schemas verwendet werden. Ich versuche, dieses Konzept mit sqlContext.createDataFrame(RDD, CaseClass)
zu reproduzieren, aber mein DataFrame ist leer. Hier ist mein Scala-Code:
// sc is the SparkContext, while sqlContext is the SQLContext.
// Define the case class and raw data
case class Dog(name: String)
val data = Array(
Dog("Rex"),
Dog("Fido")
)
// Create an RDD from the raw data
val dogRDD = sc.parallelize(data)
// Print the RDD for debugging (this works, shows 2 dogs)
dogRDD.collect().foreach(println)
// Create a DataFrame from the RDD
val dogDF = sqlContext.createDataFrame(dogRDD, classOf[Dog])
// Print the DataFrame for debugging (this fails, shows 0 dogs)
dogDF.show()
Die Ausgabe, die ich sehe, ist:
Dog(Rex)
Dog(Fido)
++
||
++
||
||
++
Was vermisse ich?
Vielen Dank!
Alles was Sie brauchen ist nur
val dogDF = sqlContext.createDataFrame(dogRDD)
Der zweite Parameter ist Teil der Java-API und erwartet, dass Ihre Klasse der Java-Beans-Konvention folgt (Getter/Setter). Ihre Fallklasse folgt dieser Konvention nicht, daher wird keine Eigenschaft erkannt, die zu leeren DataFrame ohne Spalten führt.
Sie können eine DataFrame
direkt aus einer Seq
von Fallklasseninstanzen erstellen, indem Sie toDF
folgendermaßen verwenden:
val dogDf = Seq(Dog("Rex"), Dog("Fido")).toDF
Case Class Approach funktioniert nicht im Cluster-Modus. Es gibt ClassNotFoundException
an die von Ihnen definierte Fallklasse.
Konvertieren Sie einen RDD[Row]
und definieren Sie das Schema Ihrer RDD
mit StructField
und dann createDataFrame
val rdd = data.map { attrs => Row(attrs(0),attrs(1)) }
val rddStruct = new StructType(Array(StructField("id", StringType, nullable = true),StructField("pos", StringType, nullable = true)))
sqlContext.createDataFrame(rdd,rddStruct)
toDF()
funktioniert auch nicht