Dies sollte einfach sein, aber .... mit Spark 1.6.1 .... Ich habe DataFrame # 1 mit den Spalten A, B, C. Mit Werten:
A B C
1 2 A
2 2 A
3 2 B
4 2 C
Ich erstelle dann einen neuen Datenrahmen mit einer neuen Spalte D, also:
DataFrame df2 = df1.withColumn("D", df1.col("C"));
so weit so gut, aber ich möchte eigentlich, dass der Wert in Spalte D bedingt ist, dh:
// pseudo code
if (col C = "A") the col D = "X"
else if (col C = "B") the col D = "Y"
else col D = "Z"
Ich werde dann die Spalte C ablegen und D in C umbenennen. Ich habe versucht, die Funktionen der Spalte zu betrachten, aber es scheint nichts zu passen. Ich dachte daran, df1.rdd (). Map () zu verwenden und über die Zeilen zu iterieren, aber abgesehen davon, dass ich es nicht wirklich geschafft hatte, es zum Laufen zu bringen, dachte ich irgendwie, dass der Sinn von DataFrames darin bestand, sich von der RDD-Abstraktion zu entfernen.
Leider muss ich das in Java machen (und natürlich ist Spark mit Java nicht optimal !!). Es scheint, als würde mir das Offensichtliche fehlen und ich bin froh, als Idiot dargestellt zu werden, wenn ihm die Lösung präsentiert wird!
Ich glaube, Sie können when
verwenden, um dies zu erreichen. Außerdem können Sie wahrscheinlich die alte Spalte direkt ersetzen. Für Ihr Beispiel würde der Code ungefähr so aussehen:
import static org.Apache.spark.sql.functions.*;
Column newCol = when(col("C").equalTo("A"), "X")
.when(col("C").equalTo("B"), "Y")
.otherwise("Z");
DataFrame df2 = df1.withColumn("C", newCol);
Weitere Informationen zu when
finden Sie im Column
-Javadoc .
Danke an Daniel, ich habe das gelöst :)
Das fehlende Stück war der statische Import der SQL-Funktionen
import static org.Apache.spark.sql.functions.*;
Ich muss eine Million verschiedene Verwendungsweisen ausprobiert haben, aber ich habe Kompilierungsfehler/Laufzeitfehler erhalten, weil ich den Import nicht durchgeführt habe. Nach dem Import war Daniels Antwort genau richtig!
Sie können auch udf's verwenden, um dieselbe Aufgabe zu erledigen. Schreiben Sie einfach eine einfache if-else-Struktur
import org.Apache.spark.sql.functions.udf
val customFunct = udf { d =>
//if then else construct
}
val new_DF= df.withColumn(column_name, customFunct(df("data_column")))