webentwicklung-frage-antwort-db.com.de

Wie füge ich eine dauerhafte Spalte mit Zeilen-IDs zu Spark DataFrame hinzu?

Diese Frage ist nicht neu, jedoch finde ich in Spark ein überraschendes Verhalten. Ich muss einem DataFrame eine Spalte mit Zeilen-IDs hinzufügen. Ich habe die DataFrame-Methode monotonically_increasing_id () verwendet, und es gibt mir eine zusätzliche Spalte von eindeutigen Zeilen-IDs (die übrigens NICHT aufeinander folgend sind, aber eindeutig sind). 

Mein Problem ist, dass beim Filtern des DataFrame die Zeilen-IDs im resultierenden DataFrame neu zugewiesen werden. Die zwei DataFrames werden unten gezeigt. 

  • der erste ist der ursprüngliche DataFrame mit folgenden Zeilen-IDs:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • der zweite DataFrame ist derjenige, der nach dem Filtern des Col P über df.filter(col("P")) erhalten wird. 

Das Problem wird durch die rowId für custId 169 veranschaulicht, die im ursprünglichen DataFrame 5 war. Nach dem Filtern dieser rowId (5) wurde custmId 773 erneut zugewiesen, als custId 169 ausgefiltert wurde. Ich weiß nicht, warum dies das Standardverhalten ist.

Ich möchte, dass rowIds "klebrig" ist; Wenn ich Zeilen aus dem DataFrame entferne, möchte ich nicht, dass ihre IDs "wiederverwendet" werden. Ich möchte, dass sie mit ihren Zeilen mitgehen. Kann man das machen? Ich sehe keine Flags, um dieses Verhalten von der monotonically_increasing_id-Methode anzufordern.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|
27
Kai

Spark 2.0

  • Dieses Problem wurde in Spark 2.0 mit SPARK-14241 behoben.

  • Ein weiteres ähnliches Problem wurde in Spark 2.1 mit SPARK-1439 behoben

Spark 1.x

Das Problem, das Sie haben, ist ziemlich subtil, kann aber auf eine einfache Tatsache reduziert werden. monotonically_increasing_id Ist eine äußerst hässliche Funktion. Es ist eindeutig nicht rein und sein Wert hängt von etwas ab, das völlig außerhalb Ihrer Kontrolle liegt.

Es werden keine Parameter verwendet. Aus Sicht des Optimierers spielt es keine Rolle, wann es aufgerufen wird, und es kann nach allen anderen Vorgängen verschoben werden. Daher das Verhalten, das Sie sehen.

Wenn Sie sich den Code ansehen, werden Sie feststellen, dass dies explizit gekennzeichnet ist, indem Sie den Ausdruck MonotonicallyIncreasingID mit Nondeterministic erweitern.

Ich glaube nicht, dass es eine elegante Lösung gibt, aber eine Möglichkeit, wie Sie damit umgehen können, besteht darin, eine künstliche Abhängigkeit vom gefilterten Wert hinzuzufügen. Zum Beispiel mit einer UDF wie folgt:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

Im Allgemeinen könnte es einfacher sein, Indizes mit zipWithIndex zu einem RDD hinzuzufügen und dann wieder in ein DataFrame umzuwandeln.


* Die oben gezeigte Problemumgehung ist keine gültige Lösung mehr (noch erforderlich) in Spark 2.x wobei Python UDFs Gegenstand der Ausführungsplanoptimierungen sind.

16
zero323

Ich konnte das nicht reproduzieren. Ich verwende Spark 2.0, vielleicht hat sich das Verhalten geändert oder ich mache nicht dasselbe wie Sie.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())

df.show

val df2 = df.filter(col("flag")=== true)

df2.show

df: org.Apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
|  one|    1| true|   0|
|  two|    2|false|   1|
|three|    3| true|   2|
| four|    4| true|   3|
+-----+-----+-----+----+
df2: org.Apache.spark.sql.Dataset[org.Apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
|  one|    1|true|   0|
|three|    3|true|   2|
| four|    4|true|   3|
+-----+-----+----+----+
3
Davos

Ich habe kürzlich an einem ähnlichen Problem gearbeitet. Obwohl monotonically_increasing_id() sehr schnell ist, ist es nicht zuverlässig und gibt Ihnen keine fortlaufenden Zeilennummern, sondern erhöht nur eindeutige ganze Zahlen.

Das Erstellen einer Windows-Partition und die anschließende Verwendung von row_number().over(some_windows_partition) ist sehr zeitaufwändig. 

Die bisher beste Lösung ist die Verwendung von gezippt mit Index und die anschließende Konvertierung der gezippten Datei in den ursprünglichen Datenrahmen, wobei das neue Schema die Indexspalte enthält.

Versuche dies:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

Wo original_dataframe die dataframe ist, müssen Sie einen Index hinzufügen und row_with_index ist das neue Schema mit dem Spaltenindex, den Sie schreiben können 

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

Hier waren calendar_date, year_week_number, year_period_number und realization die Spalten meiner ursprünglichen dataframe. Sie können die Namen durch die Namen Ihrer Spalten ersetzen. Der Index ist der neue Spaltenname, den Sie für die Zeilennummern hinzufügen mussten.

Dieser Prozess ist im Vergleich zur row_number().over(some_windows_partition)-Methode wesentlich effizienter und reibungsloser.

Hoffe das hilft.

2
Shantanu Sharma

Um die Verschiebungsauswertung von monotonically_increasing_id () zu umgehen, können Sie versuchen, den Datenrahmen auf die Festplatte zu schreiben und erneut zu lesen. Dann ist die ID-Spalte jetzt einfach ein Datenfeld, das gerade gelesen wird, und nicht irgendwann in der Pipeline dynamisch berechnet. Obwohl es eine ziemlich hässliche Lösung ist, funktionierte es bei einem schnellen Test.

1
Chris T

Das hat bei mir funktioniert. Erstellt eine weitere Identitätsspalte und verwendet die Fensterfunktion row_number

import org.Apache.spark.sql.functions.{row_number}
import org.Apache.spark.sql.expressions.Window

val df1: DataFrame = df.withColumn("Id",lit(1))

df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
1
Sampad Desai

Um eine bessere Leistung mit der Chris T-Lösung zu erzielen, können Sie versuchen, in einen gemeinsam genutzten Datenrahmen von Apache Ignite zu schreiben, anstatt auf die Festplatte . https://ignite.Apache.org/use-cases/spark/ shared-memory-layer.html

0