webentwicklung-frage-antwort-db.com.de

Wie wählt man die erste Reihe jeder Gruppe aus?

Ich habe einen DataFrame wie folgt generiert:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))

Die Ergebnisse sehen wie folgt aus:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

Wie Sie sehen, wird der DataFrame nach aufsteigender Reihenfolge nach Hour und absteigend nach TotalValue sortiert.

Ich möchte die oberste Reihe jeder Gruppe auswählen, d. H.

  • aus der Gruppe von Hour == 0 select (0, cat26,30.9)
  • aus der Gruppe von Hour == 1 select (1, cat67,28.5)
  • aus der Gruppe von Hour == 2 select (2, cat56,39.6)
  • und so weiter

Die gewünschte Ausgabe wäre also:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   1|   cat67|      28.5|
|   2|   cat56|      39.6|
|   3|    cat8|      35.6|
| ...|     ...|       ...|
+----+--------+----------+

Es kann praktisch sein, die oberen N Reihen jeder Gruppe auswählen zu können.

Jede Hilfe wird sehr geschätzt.

101
Rami

Fensterfunktionen:

So etwas sollte den Trick tun:

import org.Apache.spark.sql.functions.{row_number, max, broadcast}
import org.Apache.spark.sql.expressions.Window

val df = sc.parallelize(Seq(
  (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
  (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
  (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
  (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+

Diese Methode ist im Falle eines erheblichen Datenversatzes ineffizient.

Plain SQL-Aggregation gefolgt von join:

Alternativ können Sie mit aggregierten Datenrahmen verbinden:

val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax),
    ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")

dfTopByJoin.show

// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+

Es werden doppelte Werte beibehalten (wenn pro Stunde mehr als eine Kategorie mit demselben Gesamtwert vorhanden ist). Sie können diese wie folgt entfernen:

dfTopByJoin
  .groupBy($"hour")
  .agg(
    first("category").alias("category"),
    first("TotalValue").alias("TotalValue"))

Bestellung über structs verwenden:

Ein ordentlicher, wenn auch nicht sehr gut getesteter Trick, für den keine Joins oder Fensterfunktionen erforderlich sind:

val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+

Mit DataSet-API (Spark 1.6+, 2.0+):

Spark 1.6:

case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
  .groupBy($"hour")
  .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
  .show

// +---+--------------+
// | _1|            _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+

Spark 2.0 oder höher:

df.as[Record]
  .groupByKey(_.Hour)
  .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)

Die letzten beiden Methoden können die Kombination auf der Kartenseite nutzen und erfordern keine vollständige Umstellung, sodass die meiste Zeit eine bessere Leistung im Vergleich zu Fensterfunktionen und Joins aufweisen sollte. Diese können auch mit Structured Streaming im Ausgabemodus completed verwendet werden.

Verwenden Sie nicht:

df.orderBy(...).groupBy(...).agg(first(...), ...)

Es scheint zu funktionieren (insbesondere im local-Modus), aber es ist unzuverlässig ( SPARK-16207 ). Kredite an Tzach Zohar zur Verknüpfung relevanter JIRA-Ausgabe .

Der gleiche Hinweis gilt für 

df.orderBy(...).dropDuplicates(...)

die intern einen gleichwertigen Ausführungsplan verwendet.

169
zero323

Für Spark 2.0.2 mit Gruppierung nach mehreren Spalten:

import org.Apache.spark.sql.functions.row_number
import org.Apache.spark.sql.expressions.Window

val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc)

val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
11

Dies ist genau das Gleiche wie zero323 's answer , jedoch in SQL-Abfrage.

Angenommen, der Datenrahmen wird erstellt und als registriert 

df.createOrReplaceTempView("table")
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|0   |cat26   |30.9      |
//|0   |cat13   |22.1      |
//|0   |cat95   |19.6      |
//|0   |cat105  |1.3       |
//|1   |cat67   |28.5      |
//|1   |cat4    |26.8      |
//|1   |cat13   |12.6      |
//|1   |cat23   |5.3       |
//|2   |cat56   |39.6      |
//|2   |cat40   |29.7      |
//|2   |cat187  |27.9      |
//|2   |cat68   |9.8       |
//|3   |cat8    |35.6      |
//+----+--------+----------+

Fensterfunktion:

sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn  FROM table) tmp where rn = 1").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

Plain SQL-Aggregation gefolgt von join:

sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " +
  "(select Hour, Category, TotalValue from table tmp1 " +
  "join " +
  "(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " +
  "on " +
  "tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " +
  "group by tmp3.Hour")
  .show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

Verwenden der Reihenfolge über Strukturen:

sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false)
//+----+--------+----------+
//|Hour|Category|TotalValue|
//+----+--------+----------+
//|1   |cat67   |28.5      |
//|3   |cat8    |35.6      |
//|2   |cat56   |39.6      |
//|0   |cat26   |30.9      |
//+----+--------+----------+

DataSets way und do do s sind die gleichen wie in der ursprünglichen Antwort

7
Ramesh Maharjan

Die Lösung unten führt nur eine Gruppe aus. Extrahieren Sie die Zeilen Ihres Datenrahmens, die den maxValue enthalten, in einer Aufnahme. Keine weiteren Joins oder Windows erforderlich.

import org.Apache.spark.sql.Row
import org.Apache.spark.sql.catalyst.encoders.RowEncoder
import org.Apache.spark.sql.DataFrame

//df is the dataframe with Day, Category, TotalValue

implicit val dfEnc = RowEncoder(df.schema)

val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}
1
elghoto

Hier kannst du das machen -

   val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour")

data.withColumnRenamed("_1","Hour").show
1
Shubham Agrawal

Wenn der Datenrahmen nach mehreren Spalten gruppiert werden muss, kann dies hilfreich sein

val keys = List("Hour", "Category");
 val selectFirstValueOfNoneGroupedColumns = 
 df.columns
   .filterNot(keys.toSet)
   .map(_ -> "first").toMap
 val grouped = 
 df.groupBy(keys.head, keys.tail: _*)
   .agg(selectFirstValueOfNoneGroupedColumns)

Hoffe, das hilft jemandem mit ähnlichen Problemen

0
NehaM

Das Muster ist Gruppe durch Tasten => etwas mit jeder Gruppe tun, z. verkleinern => zum Datenrahmen zurückkehren

Ich dachte, dass die Dataframe-Abstraktion in diesem Fall etwas umständlich ist, also habe ich RDD-Funktionalität verwendet

 val rdd: RDD[Row] = originalDf
  .rdd
  .groupBy(row => row.getAs[String]("grouping_row"))
  .map(iterableTuple => {
    iterableTuple._2.reduce(reduceFunction)
  })

val productDf = sqlContext.createDataFrame(rdd, originalDf.schema)
0
Rubber Duck

Eine schöne Möglichkeit, dies mit dem Dataframe-API zu tun, ist die argmax-Logik

  val df = Seq(
    (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
    (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
    (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
    (3,"cat8",35.6)).toDF("Hour", "Category", "TotalValue")

  df.groupBy($"Hour")
    .agg(max(struct($"TotalValue", $"Category")).as("argmax"))
    .select($"Hour", $"argmax.*").show

 +----+----------+--------+
 |Hour|TotalValue|Category|
 +----+----------+--------+
 |   1|      28.5|   cat67|
 |   3|      35.6|    cat8|
 |   2|      39.6|   cat56|
 |   0|      30.9|   cat26|
 +----+----------+--------+
0
randal25