webentwicklung-frage-antwort-db.com.de

Was ist los mit "unionAll" von Spark "DataFrame"?

Mit Spark 1.5.0 und dem folgenden Code erwarte ich, dass unionAll DataFrames basierend auf ihrem Spaltennamen vereint. Im Code verwende ich FunSuite, um SparkContext sc zu übergeben:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

Ausgabe:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

Warum enthält das Ergebnis gemischte Spalten "b" und "a" , anstatt Spalten auf Spaltennamen auszurichten? Klingt nach einem ernsten Bug !?

18
Martin Senne

Es sieht überhaupt nicht nach einem Fehler aus. Was Sie sehen, ist ein Standard-SQL-Verhalten und alle wichtigen RDMBS, einschließlich PostgreSQL , MySQL , Oracle und MS SQL , verhalten sich genauso. Sie finden SQL Fiddle Beispiele, die mit Namen verknüpft sind.

Um PostgreSQL-Handbuch zu zitieren:

Um die Vereinigung, den Schnittpunkt oder die Differenz zweier Abfragen zu berechnen, müssen die beiden Abfragen "Vereinigungskompatibel" sein, d. H. Sie geben die gleiche Anzahl von Spalten zurück, und die entsprechenden Spalten weisen kompatible Datentypen auf

Spaltennamen mit Ausnahme der ersten Tabelle in der Set-Operation werden einfach ignoriert.

Dieses Verhalten kommt direkt aus der relationalen Algebra, wo der Grundbaustein ein Tupel ist. Da Tupel geordnet sind, ist eine Vereinigung von zwei Tupelsätzen äquivalent (ignoriert die Duplikatenverarbeitung) der hier angezeigten Ausgabe.

Wenn Sie mit Namen übereinstimmen möchten, können Sie so etwas tun

import org.Apache.spark.sql.DataFrame
import org.Apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}

Um sowohl Namen als auch Typen zu überprüfen, sollte es ausreichend sein, columns durch Folgendes zu ersetzen:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq
35
zero323

Dieses Problem wird in spark2.3 behoben. Sie fügen Unterstützung von unionByName in der Datenmenge hinzu. 

https://issues.Apache.org/jira/browse/SPARK-21043

keine Probleme/Fehler - wenn Sie Ihre Fallklasse B genau beobachten, werden Sie sich darüber im Klaren sein . Fallklasse A -> Sie haben die Reihenfolge (a, b) und Fallklasse B -> erwähnt Sie haben die Bestellung erwähnt (b, a) ---> dies wird laut Bestellung erwartet

fallklasse A (a: Int, b: Int) Fallklasse B (b: Int, a: Int)

danke, Subbu

Verwenden Sie unionByName:

Auszug aus der Dokumentation:

def unionByName (other: Dataset [T]): Dataset [T]

Der Unterschied zwischen dieser Funktion und Union besteht darin, dass diese Funktion Spalten nach Namen auflöst (nicht nach Position):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+
0

Wie in SPARK-9813 beschrieben, scheint es so, als würde der Vorgang unionAll funktionieren, solange die Datentypen und die Anzahl der Spalten über Frames hinweg gleich sind. Bitte lesen Sie die Kommentare für weitere Diskussionen.

0
Rohan Aletty