Ich möchte einen Pyspark DataFrame mit einer SQL-ähnlichen IN
-Klausel wie in filtern
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
wobei a
der Tupel (1, 2, 3)
ist. Ich erhalte diesen Fehler:
Java.lang.RuntimeException: [1.67] Fehler: `` ('' erwartet, aber Kennung gefunden
das heißt im Wesentlichen, dass es etwas wie '(1, 2, 3)' anstelle von ..__ erwartete. Das Problem ist, dass ich die Werte nicht manuell in ein schreiben kann, da es aus einem anderen Job extrahiert wird.
Wie würde ich in diesem Fall filtern?
Zeichenfolge, die Sie an SQLContext
übergeben, wird im Rahmen der SQL-Umgebung ausgewertet. Die Schließung wird nicht erfasst. Wenn Sie eine Variable übergeben möchten, müssen Sie dies explizit mit der String-Formatierung tun:
df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
## 2
Natürlich würden Sie dies aus Sicherheitsgründen nicht in einer "echten" SQL-Umgebung verwenden, aber hier sollte es keine Rolle spielen.
In der Praxis ist DataFrame
DSL eine gute Wahl, wenn Sie dynamische Abfragen erstellen möchten:
from pyspark.sql.functions import col
df.where(col("v").isin({"foo", "bar"})).count()
## 2
Es ist einfach zu erstellen und zusammenzustellen und verarbeitet alle Details von HiveQL/Spark SQL für Sie.
wir wiederholen, was @ zero323 oben erwähnt hat: Wir können dasselbe auch mit einer Liste tun (nicht nur set
) wie unten
from pyspark.sql.functions import col
df.where(col("v").isin(["foo", "bar"])).count()
Nur ein kleiner Zusatz/Update:
choice_list = ["foo", "bar", "jack", "joan"]
Wenn Sie Ihren Datenrahmen "df" filtern möchten, so dass Sie Zeilen basierend auf einer Spalte "v" beibehalten möchten, die nur die Werte aus choice_list übernimmt, dann
df_filtered = df.where( ( col("v").isin (choice_list) ) )
Sie können dies auch für ganzzahlige Spalten tun:
df_filtered = df.filter("field1 in (1,2,3)")
oder dies für String-Spalten:
df_filtered = df.filter("field1 in ('a','b','c')")
Ein etwas anderer Ansatz, der für mich funktioniert hat, ist das Filtern mit einer benutzerdefinierten Filterfunktion.
def filter_func(a):
"""wrapper function to pass a in udf"""
def filter_func_(col):
"""filtering function"""
if col in a.value:
return True
return False
return udf(filter_func_, BooleanType())
# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \