webentwicklung-frage-antwort-db.com.de

Wie definiere und verwende ich eine benutzerdefinierte Aggregatfunktion in Spark SQL?

Ich weiß, wie man eine UDF in Spark SQL schreibt:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

Kann ich etwas Ähnliches tun, um eine Aggregatfunktion zu definieren? Wie geht das?

Für den Kontext möchte ich die folgende SQL-Abfrage ausführen:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

Es sollte so etwas wie zurückgeben

Row(span1, false, T0)

Ich möchte, dass die Aggregatfunktion mir mitteilt, ob es Werte für opticalReceivePower in den durch span und timestamp definierten Gruppen gibt, die unter dem Schwellenwert liegen. Muss ich meine UDAF anders schreiben als die oben eingefügte UDF?

36
Rory Byrne

Unterstützte Methoden

Spark> = 2.3

Vektorisiertes UdF (nur Python):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
    ("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
    @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
    def below_threshold_(df):
        df = pd.DataFrame(
           df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
        df.reset_index(inplace=True, drop=False)
        return df

    return below_threshold_

Anwendungsbeispiel:

df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## |    b|           true|
## |    a|          false|
## +-----+---------------+

Siehe auch Anwenden von UDFs auf GroupedData in PySpark (mit funktionierendem python Beispiel)

Spark> = 2.0 (optional 1.6, jedoch mit etwas anderer API):

Es ist möglich, Aggregators für eingegebene Datasets zu verwenden:

import org.Apache.spark.sql.expressions.Aggregator
import org.Apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  def zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Funke> = 1,5 :

In Spark 1.5 können Sie UDAF wie folgt erstellen, obwohl es höchstwahrscheinlich ein Overkill ist:

import org.Apache.spark.sql.expressions._
import org.Apache.spark.sql.types._
import org.Apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
    // Schema you get as an input
    def inputSchema = new StructType().add("power", IntegerType)
    // Schema of the row which is used for aggregation
    def bufferSchema = new StructType().add("ind", BooleanType)
    // Returned type
    def dataType = BooleanType
    // Self-explaining 
    def deterministic = true
    // zero value
    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
    // Similar to seqOp in aggregate
    def update(buffer: MutableAggregationBuffer, input: Row) = {
        if (!input.isNullAt(0))
          buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
    }
    // Similar to combOp in aggregate
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
    }
    // Called on exit to get return value
    def evaluate(buffer: Row) = buffer.getBoolean(0)
}

Anwendungsbeispiel:

df
  .groupBy($"group")
  .agg(belowThreshold($"power").alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Problemumgehung für Spark 1.4 :

Ich bin mir nicht sicher, ob ich Ihre Anforderungen richtig verstehe, aber soweit ich das beurteilen kann, sollte hier eine einfache alte Aggregation ausreichen:

val df = sc.parallelize(Seq(
    ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
  .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
  .groupBy($"group")
  .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Funke <= 1.4 :

Soweit ich weiß (Spark 1.4.1) gibt es derzeit keine Unterstützung für UDAF, außer für die Hive. Dies sollte mit Spark 1.5 (siehe SPARK-3947 ) möglich sein.

Nicht unterstützte/interne Methoden

Intern Spark verwendet eine Reihe von Klassen, einschließlich ImperativeAggregates und DeclarativeAggregates .

Sie sind für den internen Gebrauch bestimmt und können sich ohne vorherige Ankündigung ändern. Daher möchten Sie sie wahrscheinlich nicht in Ihrem Produktionscode verwenden. Der Vollständigkeit halber könnte BelowThreshold mit DeclarativeAggregate so implementiert werden (getestet mit Spark 2.2-SNAPSHOT):

import org.Apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.Apache.spark.sql.catalyst.expressions._
import org.Apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression) 
    extends  DeclarativeAggregate  {
  override def children: Seq[Expression] = Seq(child, threshold)

  override def nullable: Boolean = false
  override def dataType: DataType = BooleanType

  private lazy val belowThreshold = AttributeReference(
    "belowThreshold", BooleanType, nullable = false
  )()

  // Used to derive schema
  override lazy val aggBufferAttributes = belowThreshold :: Nil

  override lazy val initialValues = Seq(
    Literal(false)
  )

  override lazy val updateExpressions = Seq(Or(
    belowThreshold,
    If(IsNull(child), Literal(false), LessThan(child, threshold))
  ))

  override lazy val mergeExpressions = Seq(
    Or(belowThreshold.left, belowThreshold.right)
  )

  override lazy val evaluateExpression = belowThreshold
  override def defaultResult: Option[Literal] = Option(Literal(false))
} 

Es sollte weiter mit einem Äquivalent von withAggregateFunction umbrochen werden.

74
zero323