webentwicklung-frage-antwort-db.com.de

So lösen Sie die AnalysisException: aufgelöste Attribute in Spark

val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")

Die Verknüpfungsoperation funktioniert einwandfrei Wenn ich jedoch die df2 wiederverwende, stehe ich vor ungelösten Attributfehlern

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")

FEHLER: org.Apache.spark.sql.AnalysisException: Aufgelöstes Attribut (e) ID # 426

11
Rajita

Wie in meinem Kommentar erwähnt, ist er mit https://issues.Apache.org/jira/browse/SPARK-10925 und insbesondere mit https://issues.Apache.org/jira/browse verwandt/SPARK-14948 . Durch die Wiederverwendung der Referenz wird Mehrdeutigkeit bei der Benennung erzeugt. Sie müssen also das df klonen - siehe letzten Kommentar in https://issues.Apache.org/jira/browse/SPARK-14948 für ein Beispiel.

15

Wenn Sie df1 und df2 von df1 abgeleitet haben, versuchen Sie, alle Spalten in df2 umzubenennen, sodass nach dem Join keine zwei Spalten einen identischen Namen haben. Also vor dem Join:

also anstelle von df1.join(df2...

tun

# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')

# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)
0
Tomer Ben David

Aus meiner Erfahrung haben wir 2 Lösungen 1) Klonen DF 2) Umbenennen von mehrdeutigen Spalten vor dem Verbinden von Tabellen. (Vergessen Sie nicht, den duplizierten Join-Schlüssel zu löschen.)

Persönlich bevorzuge ich die zweite Methode, da das Klonen DF in der ersten Methode Zeit in Anspruch nimmt, insbesondere wenn die Datenmenge groß ist.

0
Zhenyi Lin

Ich habe das gleiche Problem, als ich versuchte, einen DataFrame in zwei aufeinanderfolgenden Joins zu verwenden.

Hier ist das Problem: DataFrame A hat 2 Spalten (nennen wir sie x und y) und DataFrame B hat auch 2 Spalten (nennen wir sie w und z). Ich muss A mit B auf x = z verbinden und sie dann auf y = z zusammenfügen.

(A join B on A.x=B.z) as C join B on C.y=B.z

Ich habe genau die Fehlermeldung erhalten, dass im zweiten Join " aufgelöste Attribute (s) B.z # 1234 ... " beschwert wurden.

Nach den Links @Erik und einigen anderen Blogs und Fragen erfuhr ich, dass ich einen Klon von B brauche.

Folgendes habe ich getan:

val aDF = ...
val bDF = ...
val bCloned = spark.createDataFrame(bDF.rdd, bDF.schema)
aDF.join(bDF, aDF("x") === bDF("z")).join(bCloned, aDF("y") === bCloned("z"))
0
Iraj Hedayati

Es wird funktionieren, wenn Sie das unten tun.

angenommen, Sie haben einen Datenrahmen. df1 und wenn Sie denselben Datenrahmen überqueren möchten, können Sie den folgenden Befehl verwenden

df1.toDF("ColA","ColB").as("f_df").join(df1.toDF("ColA","ColB").as("t_df"), 
   $"f_df.pcmdty_id" === 
   $"t_df.assctd_pcmdty_id").select($"f_df.pcmdty_id",$"f_df.assctd_pcmdty_id")
0
dharani sugumar

Versuchen Sie für Java-Entwickler, diese Methode aufzurufen:

private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
    List<Column> filterColumns = new ArrayList<>();
    List<String> filterColumnsNames = new ArrayList<>();
    scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
    while (it.hasNext()) {
        String columnName = it.next().name();
        filterColumns.add(ds.col(columnName));
        filterColumnsNames.add(columnName);
    }
    ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
    return ds;
}

bei beiden Datensätzen kurz vor dem Zusammenfügen werden die Datensätze in neue kopiert:

df1 = cloneDataset(df1); 
df2 = cloneDataset(df2);
Dataset<Row> join = df1.join(df2, col("column_name"));
// if it didn't work try this
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq)); 
0