webentwicklung-frage-antwort-db.com.de

Wie erhalte ich andere Spalten, wenn Sie Spark DataFrame groupby verwenden?

wenn ich DataFrame-Gruppe wie folgt verwende:

df.groupBy(df("age")).agg(Map("id"->"count"))

Ich bekomme nur einen DataFrame mit den Spalten "age" und "count (id)", aber in df gibt es viele andere Spalten wie "name".

Insgesamt möchte ich das Ergebnis wie in MySQL erhalten.

msgstr "Name, Alter, Anzahl (ID) aus der DF-Gruppe nach Alter auswählen"

Was soll ich tun, wenn Sie Groupby in Spark verwenden?

21
Psychevic

Um es kurz zu machen, in der Regel müssen Sie aggregierte Ergebnisse mit der Originaltabelle verknüpfen. Spark SQL folgt der gleichen Prä-SQL: 1999-Konvention wie die meisten großen Datenbanken (PostgreSQL, Oracle, MS SQL Server), die keine zusätzlichen Spalten in Aggregationsabfragen zulässt. 

Da für Aggregationen wie count-Ergebnisse nicht genau definiert sind und das Verhalten in Systemen, die diese Art von Abfragen unterstützen, tendenziell unterschiedlich ist, können Sie einfach zusätzliche Spalten mit beliebigen Aggregaten wie first oder last hinzufügen.

In einigen Fällen können Sie agg durch select durch Fensterfunktionen und nachfolgende where ersetzen, dies kann jedoch je nach Kontext recht teuer sein.

23
zero323

Eine Möglichkeit, alle Spalten nach einem groupBy-Vorgang abzurufen, ist die Join-Funktion.

feature_group = ['name', 'age']
data_counts = df.groupBy(feature_group).count().alias("counts")
data_joined = df.join(data_counts, feature_group)

data_joined enthält jetzt alle Spalten einschließlich der Zählwerte. 

8
Swetha Kannan

Hier ein Beispiel, das mir in einem Funken-Workshop begegnet ist

val populationDF = spark.read
                .option("infer-schema", "true")
                .option("header", "true")
                .format("csv").load("file:///databricks/driver/population.csv")
                .select('name, regexp_replace(col("population"), "\\s", "").cast("integer").as("population"))

val maxPopulationDF = populationDF.agg(max('population).as("populationmax"))

Um andere Spalten zu erhalten, führe ich eine einfache Verknüpfung zwischen dem ursprünglichen DF und dem aggregierten aus

populationDF.join(maxPopulationDF,populationDF.col("population") === maxPopulationDF.col("populationmax")).select('name, 'populationmax).show()
0
Mohamed Hosni

Diese Lösung kann hilfreich sein.

from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql import Window

    name_list = [(101, 'abc', 24), (102, 'cde', 24), (103, 'efg', 22), (104, 'ghi', 21),
                 (105, 'ijk', 20), (106, 'klm', 19), (107, 'mno', 18), (108, 'pqr', 18),
                 (109, 'rst', 26), (110, 'tuv', 27), (111, 'pqr', 18), (112, 'rst', 28), (113, 'tuv', 29)]

age_w = Window.partitionBy("age")
name_age_df = sqlContext.createDataFrame(name_list, ['id', 'name', 'age'])

name_age_count_df = name_age_df.withColumn("count", F.count("id").over(age_w)).orderBy("count")
name_age_count_df.show()

Ausgabe:

+---+----+---+-----+
| id|name|age|count|
+---+----+---+-----+
|109| rst| 26|    1|
|113| tuv| 29|    1|
|110| tuv| 27|    1|
|106| klm| 19|    1|
|103| efg| 22|    1|
|104| ghi| 21|    1|
|105| ijk| 20|    1|
|112| rst| 28|    1|
|101| abc| 24|    2|
|102| cde| 24|    2|
|107| mno| 18|    3|
|111| pqr| 18|    3|
|108| pqr| 18|    3|
+---+----+---+-----+
0

Aggregatfunktionen reduzieren die Zeilenwerte für bestimmte Spalten in der Gruppe. Wenn Sie andere Zeilenwerte beibehalten möchten, müssen Sie eine Reduzierungslogik implementieren, die eine Zeile angibt, von der jeder Wert stammt. Behalten Sie zum Beispiel alle Werte der ersten Zeile mit dem Höchstalter bei. Zu diesem Zweck können Sie eine UDAF (benutzerdefinierte Aggregatfunktion) verwenden, um Zeilen innerhalb der Gruppe zu reduzieren.

import org.Apache.spark.sql._
import org.Apache.spark.sql.functions._


object AggregateKeepingRowJob {

  def main (args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName.replace("$", ""))
      .master("local")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("ERROR")

    import sparkSession.sqlContext.implicits._

    val rawDf = Seq(
      (1L, "Moe",  "Slap",  2.0, 18),
      (2L, "Larry",  "Spank",  3.0, 15),
      (3L, "Curly",  "Twist", 5.0, 15),
      (4L, "Laurel", "Whimper", 3.0, 15),
      (5L, "Hardy", "Laugh", 6.0, 15),
      (6L, "Charley",  "Ignore",   5.0, 5)
    ).toDF("id", "name", "requisite", "money", "age")

    rawDf.show(false)
    rawDf.printSchema

    val maxAgeUdaf = new KeepRowWithMaxAge

    val aggDf = rawDf
      .groupBy("age")
      .agg(
        count("id"),
        max(col("money")),
        maxAgeUdaf(
          col("id"),
          col("name"),
          col("requisite"),
          col("money"),
          col("age")).as("KeepRowWithMaxAge")
      )

    aggDf.printSchema
    aggDf.show(false)

  }


}

Die UDAF:

import org.Apache.spark.sql.Row
import org.Apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.Apache.spark.sql.types._

class KeepRowWithMaxAmt extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
override def inputSchema: org.Apache.spark.sql.types.StructType =
  StructType(
    StructField("store", StringType) ::
    StructField("prod", StringType) ::
    StructField("amt", DoubleType) ::
    StructField("units", IntegerType) :: Nil
  )

// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
  StructField("store", StringType) ::
  StructField("prod", StringType) ::
  StructField("amt", DoubleType) ::
  StructField("units", IntegerType) :: Nil
)


// This is the output type of your aggregation function.
override def dataType: DataType =
  StructType((Array(
    StructField("store", StringType),
    StructField("prod", StringType),
    StructField("amt", DoubleType),
    StructField("units", IntegerType)
  )))

override def deterministic: Boolean = true

// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
  buffer(0) = ""
  buffer(1) = ""
  buffer(2) = 0.0
  buffer(3) = 0
}

// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

  val amt = buffer.getAs[Double](2)
  val candidateAmt = input.getAs[Double](2)

  amt match {
    case a if a < candidateAmt =>
      buffer(0) = input.getAs[String](0)
      buffer(1) = input.getAs[String](1)
      buffer(2) = input.getAs[Double](2)
      buffer(3) = input.getAs[Int](3)
    case _ =>
  }
}

// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

  buffer1(0) = buffer2.getAs[String](0)
  buffer1(1) = buffer2.getAs[String](1)
  buffer1(2) = buffer2.getAs[Double](2)
  buffer1(3) = buffer2.getAs[Int](3)
}

// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
  buffer
}
}
0
Rubber Duck