Diese Frage ist nicht neu, jedoch finde ich in Spark ein überraschendes Verhalten. Ich muss einem DataFrame eine Spalte mit Zeilen-IDs hinzufügen. Ich habe die DataFrame-Methode monotonically_increasing_id () verwendet, und es gibt mir eine zusätzliche Spalte von eindeutigen Zeilen-IDs (die übrigens NICHT aufeinander folgend sind, aber eindeutig sind).
Mein Problem ist, dass beim Filtern des DataFrame die Zeilen-IDs im resultierenden DataFrame neu zugewiesen werden. Die zwei DataFrames werden unten gezeigt.
der erste ist der ursprüngliche DataFrame mit folgenden Zeilen-IDs:
df.withColumn("rowId", monotonically_increasing_id())
der zweite DataFrame ist derjenige, der nach dem Filtern des Col P über df.filter(col("P"))
erhalten wird.
Das Problem wird durch die rowId für custId 169 veranschaulicht, die im ursprünglichen DataFrame 5 war. Nach dem Filtern dieser rowId (5) wurde custmId 773 erneut zugewiesen, als custId 169 ausgefiltert wurde. Ich weiß nicht, warum dies das Standardverhalten ist.
Ich möchte, dass rowIds
"klebrig" ist; Wenn ich Zeilen aus dem DataFrame entferne, möchte ich nicht, dass ihre IDs "wiederverwendet" werden. Ich möchte, dass sie mit ihren Zeilen mitgehen. Kann man das machen? Ich sehe keine Flags, um dieses Verhalten von der monotonically_increasing_id
-Methode anzufordern.
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Spark 2.0
Dieses Problem wurde in Spark 2.0 mit SPARK-14241 behoben.
Ein weiteres ähnliches Problem wurde in Spark 2.1 mit SPARK-1439 behoben
Spark 1.x
Das Problem, das Sie haben, ist ziemlich subtil, kann aber auf eine einfache Tatsache reduziert werden. monotonically_increasing_id
Ist eine äußerst hässliche Funktion. Es ist eindeutig nicht rein und sein Wert hängt von etwas ab, das völlig außerhalb Ihrer Kontrolle liegt.
Es werden keine Parameter verwendet. Aus Sicht des Optimierers spielt es keine Rolle, wann es aufgerufen wird, und es kann nach allen anderen Vorgängen verschoben werden. Daher das Verhalten, das Sie sehen.
Wenn Sie sich den Code ansehen, werden Sie feststellen, dass dies explizit gekennzeichnet ist, indem Sie den Ausdruck MonotonicallyIncreasingID
mit Nondeterministic
erweitern.
Ich glaube nicht, dass es eine elegante Lösung gibt, aber eine Möglichkeit, wie Sie damit umgehen können, besteht darin, eine künstliche Abhängigkeit vom gefilterten Wert hinzuzufügen. Zum Beispiel mit einer UDF wie folgt:
from pyspark.sql.types import LongType from pyspark.sql.functions import udf bound = udf(lambda _, v: v, LongType()) (df .withColumn("rn", monotonically_increasing_id()) # Due to nondeterministic behavior it has to be a separate step .withColumn("rn", bound("P", "rn")) .where("P"))
Im Allgemeinen könnte es einfacher sein, Indizes mit zipWithIndex
zu einem RDD
hinzuzufügen und dann wieder in ein DataFrame
umzuwandeln.
* Die oben gezeigte Problemumgehung ist keine gültige Lösung mehr (noch erforderlich) in Spark 2.x wobei Python UDFs Gegenstand der Ausführungsplanoptimierungen sind.
Ich konnte das nicht reproduzieren. Ich verwende Spark 2.0, vielleicht hat sich das Verhalten geändert oder ich mache nicht dasselbe wie Sie.
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.Apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.Apache.spark.sql.Dataset[org.Apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
Ich habe kürzlich an einem ähnlichen Problem gearbeitet. Obwohl monotonically_increasing_id()
sehr schnell ist, ist es nicht zuverlässig und gibt Ihnen keine fortlaufenden Zeilennummern, sondern erhöht nur eindeutige ganze Zahlen.
Das Erstellen einer Windows-Partition und die anschließende Verwendung von row_number().over(some_windows_partition)
ist sehr zeitaufwändig.
Die bisher beste Lösung ist die Verwendung von gezippt mit Index und die anschließende Konvertierung der gezippten Datei in den ursprünglichen Datenrahmen, wobei das neue Schema die Indexspalte enthält.
Versuche dies:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
Wo original_dataframe
die dataframe
ist, müssen Sie einen Index hinzufügen und row_with_index
ist das neue Schema mit dem Spaltenindex, den Sie schreiben können
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
Hier waren calendar_date
, year_week_number
, year_period_number
und realization
die Spalten meiner ursprünglichen dataframe
. Sie können die Namen durch die Namen Ihrer Spalten ersetzen. Der Index ist der neue Spaltenname, den Sie für die Zeilennummern hinzufügen mussten.
Dieser Prozess ist im Vergleich zur row_number().over(some_windows_partition)
-Methode wesentlich effizienter und reibungsloser.
Hoffe das hilft.
Um die Verschiebungsauswertung von monotonically_increasing_id () zu umgehen, können Sie versuchen, den Datenrahmen auf die Festplatte zu schreiben und erneut zu lesen. Dann ist die ID-Spalte jetzt einfach ein Datenfeld, das gerade gelesen wird, und nicht irgendwann in der Pipeline dynamisch berechnet. Obwohl es eine ziemlich hässliche Lösung ist, funktionierte es bei einem schnellen Test.
Das hat bei mir funktioniert. Erstellt eine weitere Identitätsspalte und verwendet die Fensterfunktion row_number
import org.Apache.spark.sql.functions.{row_number}
import org.Apache.spark.sql.expressions.Window
val df1: DataFrame = df.withColumn("Id",lit(1))
df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
Um eine bessere Leistung mit der Chris T-Lösung zu erzielen, können Sie versuchen, in einen gemeinsam genutzten Datenrahmen von Apache Ignite zu schreiben, anstatt auf die Festplatte . https://ignite.Apache.org/use-cases/spark/ shared-memory-layer.html