Ich weiß, wie man eine UDF in Spark SQL schreibt:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Kann ich etwas Ähnliches tun, um eine Aggregatfunktion zu definieren? Wie geht das?
Für den Kontext möchte ich die folgende SQL-Abfrage ausführen:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Es sollte so etwas wie zurückgeben
Row(span1, false, T0)
Ich möchte, dass die Aggregatfunktion mir mitteilt, ob es Werte für opticalReceivePower
in den durch span
und timestamp
definierten Gruppen gibt, die unter dem Schwellenwert liegen. Muss ich meine UDAF anders schreiben als die oben eingefügte UDF?
Spark> = 2.3
Vektorisiertes UdF (nur Python):
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import *
import pandas as pd
df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])
def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df
return below_threshold_
Anwendungsbeispiel:
df.groupBy("group").apply(below_threshold(-40)).show()
## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+
Siehe auch Anwenden von UDFs auf GroupedData in PySpark (mit funktionierendem python Beispiel)
Spark> = 2.0 (optional 1.6, jedoch mit etwas anderer API):
Es ist möglich, Aggregators
für eingegebene Datasets
zu verwenden:
import org.Apache.spark.sql.expressions.Aggregator
import org.Apache.spark.sql.{Encoder, Encoders}
class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc
def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}
val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)
Funke> = 1,5 :
In Spark 1.5 können Sie UDAF wie folgt erstellen, obwohl es höchstwahrscheinlich ein Overkill ist:
import org.Apache.spark.sql.expressions._
import org.Apache.spark.sql.types._
import org.Apache.spark.sql.Row
object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}
Anwendungsbeispiel:
df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
Problemumgehung für Spark 1.4 :
Ich bin mir nicht sicher, ob ich Ihre Anforderungen richtig verstehe, aber soweit ich das beurteilen kann, sollte hier eine einfache alte Aggregation ausreichen:
val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")
df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
Funke <= 1.4 :
Soweit ich weiß (Spark 1.4.1) gibt es derzeit keine Unterstützung für UDAF, außer für die Hive. Dies sollte mit Spark 1.5 (siehe SPARK-3947 ) möglich sein.
Intern Spark verwendet eine Reihe von Klassen, einschließlich ImperativeAggregates
und DeclarativeAggregates
.
Sie sind für den internen Gebrauch bestimmt und können sich ohne vorherige Ankündigung ändern. Daher möchten Sie sie wahrscheinlich nicht in Ihrem Produktionscode verwenden. Der Vollständigkeit halber könnte BelowThreshold
mit DeclarativeAggregate
so implementiert werden (getestet mit Spark 2.2-SNAPSHOT):
import org.Apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.Apache.spark.sql.catalyst.expressions._
import org.Apache.spark.sql.types._
case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)
override def nullable: Boolean = false
override def dataType: DataType = BooleanType
private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()
// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil
override lazy val initialValues = Seq(
Literal(false)
)
override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))
override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)
override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}
Es sollte weiter mit einem Äquivalent von withAggregateFunction
umbrochen werden.