Ich habe einen DataFrame wie folgt generiert:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
Die Ergebnisse sehen wie folgt aus:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Wie Sie sehen, wird der DataFrame nach aufsteigender Reihenfolge nach Hour
und absteigend nach TotalValue
sortiert.
Ich möchte die oberste Reihe jeder Gruppe auswählen, d. H.
Die gewünschte Ausgabe wäre also:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
Es kann praktisch sein, die oberen N Reihen jeder Gruppe auswählen zu können.
Jede Hilfe wird sehr geschätzt.
Fensterfunktionen:
So etwas sollte den Trick tun:
import org.Apache.spark.sql.functions.{row_number, max, broadcast}
import org.Apache.spark.sql.expressions.Window
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Diese Methode ist im Falle eines erheblichen Datenversatzes ineffizient.
Plain SQL-Aggregation gefolgt von join
:
Alternativ können Sie mit aggregierten Datenrahmen verbinden:
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")
dfTopByJoin.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Es werden doppelte Werte beibehalten (wenn pro Stunde mehr als eine Kategorie mit demselben Gesamtwert vorhanden ist). Sie können diese wie folgt entfernen:
dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))
Bestellung über structs
verwenden:
Ein ordentlicher, wenn auch nicht sehr gut getesteter Trick, für den keine Joins oder Fensterfunktionen erforderlich sind:
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Mit DataSet-API (Spark 1.6+, 2.0+):
Spark 1.6:
case class Record(Hour: Integer, Category: String, TotalValue: Double)
df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show
// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+
Spark 2.0 oder höher:
df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
Die letzten beiden Methoden können die Kombination auf der Kartenseite nutzen und erfordern keine vollständige Umstellung, sodass die meiste Zeit eine bessere Leistung im Vergleich zu Fensterfunktionen und Joins aufweisen sollte. Diese können auch mit Structured Streaming im Ausgabemodus completed
verwendet werden.
Verwenden Sie nicht:
df.orderBy(...).groupBy(...).agg(first(...), ...)
Es scheint zu funktionieren (insbesondere im local
-Modus), aber es ist unzuverlässig ( SPARK-16207 ). Kredite an Tzach Zohar zur Verknüpfung relevanter JIRA-Ausgabe .
Der gleiche Hinweis gilt für
df.orderBy(...).dropDuplicates(...)
die intern einen gleichwertigen Ausführungsplan verwendet.
Für Spark 2.0.2 mit Gruppierung nach mehreren Spalten:
import org.Apache.spark.sql.functions.row_number
import org.Apache.spark.sql.expressions.Window
val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc)
val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
Dies ist genau das Gleiche wie zero323 's answer , jedoch in SQL-Abfrage.
Angenommen, der Datenrahmen wird erstellt und als registriert
df.createOrReplaceTempView("table")
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|0 |cat26 |30.9 |
//|0 |cat13 |22.1 |
//|0 |cat95 |19.6 |
//|0 |cat105 |1.3 |
//|1 |cat67 |28.5 |
//|1 |cat4 |26.8 |
//|1 |cat13 |12.6 |
//|1 |cat23 |5.3 |
//|2 |cat56 |39.6 |
//|2 |cat40 |29.7 |
//|2 |cat187 |27.9 |
//|2 |cat68 |9.8 |
//|3 |cat8 |35.6 |
//+----+--------+----------+
Fensterfunktion:
sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn FROM table) tmp where rn = 1").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1 |cat67 |28.5 |
//|3 |cat8 |35.6 |
//|2 |cat56 |39.6 |
//|0 |cat26 |30.9 |
//+----+--------+----------+
Plain SQL-Aggregation gefolgt von join:
sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " +
"(select Hour, Category, TotalValue from table tmp1 " +
"join " +
"(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " +
"on " +
"tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " +
"group by tmp3.Hour")
.show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1 |cat67 |28.5 |
//|3 |cat8 |35.6 |
//|2 |cat56 |39.6 |
//|0 |cat26 |30.9 |
//+----+--------+----------+
Verwenden der Reihenfolge über Strukturen:
sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1 |cat67 |28.5 |
//|3 |cat8 |35.6 |
//|2 |cat56 |39.6 |
//|0 |cat26 |30.9 |
//+----+--------+----------+
DataSets way und do do s sind die gleichen wie in der ursprünglichen Antwort
Die Lösung unten führt nur eine Gruppe aus. Extrahieren Sie die Zeilen Ihres Datenrahmens, die den maxValue enthalten, in einer Aufnahme. Keine weiteren Joins oder Windows erforderlich.
import org.Apache.spark.sql.Row
import org.Apache.spark.sql.catalyst.encoders.RowEncoder
import org.Apache.spark.sql.DataFrame
//df is the dataframe with Day, Category, TotalValue
implicit val dfEnc = RowEncoder(df.schema)
val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}
Hier kannst du das machen -
val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour")
data.withColumnRenamed("_1","Hour").show
Wenn der Datenrahmen nach mehreren Spalten gruppiert werden muss, kann dies hilfreich sein
val keys = List("Hour", "Category");
val selectFirstValueOfNoneGroupedColumns =
df.columns
.filterNot(keys.toSet)
.map(_ -> "first").toMap
val grouped =
df.groupBy(keys.head, keys.tail: _*)
.agg(selectFirstValueOfNoneGroupedColumns)
Hoffe, das hilft jemandem mit ähnlichen Problemen
Das Muster ist Gruppe durch Tasten => etwas mit jeder Gruppe tun, z. verkleinern => zum Datenrahmen zurückkehren
Ich dachte, dass die Dataframe-Abstraktion in diesem Fall etwas umständlich ist, also habe ich RDD-Funktionalität verwendet
val rdd: RDD[Row] = originalDf
.rdd
.groupBy(row => row.getAs[String]("grouping_row"))
.map(iterableTuple => {
iterableTuple._2.reduce(reduceFunction)
})
val productDf = sqlContext.createDataFrame(rdd, originalDf.schema)
Eine schöne Möglichkeit, dies mit dem Dataframe-API zu tun, ist die argmax-Logik
val df = Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6)).toDF("Hour", "Category", "TotalValue")
df.groupBy($"Hour")
.agg(max(struct($"TotalValue", $"Category")).as("argmax"))
.select($"Hour", $"argmax.*").show
+----+----------+--------+
|Hour|TotalValue|Category|
+----+----------+--------+
| 1| 28.5| cat67|
| 3| 35.6| cat8|
| 2| 39.6| cat56|
| 0| 30.9| cat26|
+----+----------+--------+