webentwicklung-frage-antwort-db.com.de

Erstellen eines neuen Spark DataFrame mit einem neuen Spaltenwert basierend auf einer Spalte im ersten Dataframe Java

Dies sollte einfach sein, aber .... mit Spark 1.6.1 .... Ich habe DataFrame # 1 mit den Spalten A, B, C. Mit Werten:

A  B  C
1  2  A
2  2  A
3  2  B
4  2  C

Ich erstelle dann einen neuen Datenrahmen mit einer neuen Spalte D, also:

DataFrame df2 = df1.withColumn("D", df1.col("C"));

so weit so gut, aber ich möchte eigentlich, dass der Wert in Spalte D bedingt ist, dh:

// pseudo code
if (col C = "A") the col D = "X"
else if (col C = "B") the col D = "Y"
else col D = "Z"

Ich werde dann die Spalte C ablegen und D in C umbenennen. Ich habe versucht, die Funktionen der Spalte zu betrachten, aber es scheint nichts zu passen. Ich dachte daran, df1.rdd (). Map () zu verwenden und über die Zeilen zu iterieren, aber abgesehen davon, dass ich es nicht wirklich geschafft hatte, es zum Laufen zu bringen, dachte ich irgendwie, dass der Sinn von DataFrames darin bestand, sich von der RDD-Abstraktion zu entfernen. 

Leider muss ich das in Java machen (und natürlich ist Spark mit Java nicht optimal !!). Es scheint, als würde mir das Offensichtliche fehlen und ich bin froh, als Idiot dargestellt zu werden, wenn ihm die Lösung präsentiert wird!

11
user1128482

Ich glaube, Sie können when verwenden, um dies zu erreichen. Außerdem können Sie wahrscheinlich die alte Spalte direkt ersetzen. Für Ihr Beispiel würde der Code ungefähr so ​​aussehen:

import static org.Apache.spark.sql.functions.*;

Column newCol = when(col("C").equalTo("A"), "X")
    .when(col("C").equalTo("B"), "Y")
    .otherwise("Z");

DataFrame df2 = df1.withColumn("C", newCol);

Weitere Informationen zu when finden Sie im Column-Javadoc .

18
Daniel de Paula

Danke an Daniel, ich habe das gelöst :)

Das fehlende Stück war der statische Import der SQL-Funktionen 

import static org.Apache.spark.sql.functions.*;

Ich muss eine Million verschiedene Verwendungsweisen ausprobiert haben, aber ich habe Kompilierungsfehler/Laufzeitfehler erhalten, weil ich den Import nicht durchgeführt habe. Nach dem Import war Daniels Antwort genau richtig!

2
user1128482

Sie können auch udf's verwenden, um dieselbe Aufgabe zu erledigen. Schreiben Sie einfach eine einfache if-else-Struktur

import org.Apache.spark.sql.functions.udf
val customFunct = udf { d =>
      //if then else construct
    }

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))
1
sudeepgupta90