webentwicklung-frage-antwort-db.com.de

Filtern eines Pyspark DataFrame mit einer SQL-ähnlichen IN-Klausel

Ich möchte einen Pyspark DataFrame mit einer SQL-ähnlichen IN-Klausel wie in filtern

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')

wobei a der Tupel (1, 2, 3) ist. Ich erhalte diesen Fehler:

Java.lang.RuntimeException: [1.67] Fehler: `` ('' erwartet, aber Kennung gefunden

das heißt im Wesentlichen, dass es etwas wie '(1, 2, 3)' anstelle von ..__ erwartete. Das Problem ist, dass ich die Werte nicht manuell in ein schreiben kann, da es aus einem anderen Job extrahiert wird.

Wie würde ich in diesem Fall filtern?

25
mar tin

Zeichenfolge, die Sie an SQLContext übergeben, wird im Rahmen der SQL-Umgebung ausgewertet. Die Schließung wird nicht erfasst. Wenn Sie eine Variable übergeben möchten, müssen Sie dies explizit mit der String-Formatierung tun:

df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
##  2 

Natürlich würden Sie dies aus Sicherheitsgründen nicht in einer "echten" SQL-Umgebung verwenden, aber hier sollte es keine Rolle spielen.

In der Praxis ist DataFrame DSL eine gute Wahl, wenn Sie dynamische Abfragen erstellen möchten:

from pyspark.sql.functions import col

df.where(col("v").isin({"foo", "bar"})).count()
## 2

Es ist einfach zu erstellen und zusammenzustellen und verarbeitet alle Details von HiveQL/Spark SQL für Sie.

46
zero323

wir wiederholen, was @ zero323 oben erwähnt hat: Wir können dasselbe auch mit einer Liste tun (nicht nur set) wie unten

from pyspark.sql.functions import col

df.where(col("v").isin(["foo", "bar"])).count()
16
braj

Nur ein kleiner Zusatz/Update:

choice_list = ["foo", "bar", "jack", "joan"]

Wenn Sie Ihren Datenrahmen "df" filtern möchten, so dass Sie Zeilen basierend auf einer Spalte "v" beibehalten möchten, die nur die Werte aus choice_list übernimmt, dann

df_filtered = df.where( ( col("v").isin (choice_list) ) )
1
shwetabharti

Sie können dies auch für ganzzahlige Spalten tun:

df_filtered = df.filter("field1 in (1,2,3)")

oder dies für String-Spalten:

df_filtered = df.filter("field1 in ('a','b','c')")
0
BICube

Ein etwas anderer Ansatz, der für mich funktioniert hat, ist das Filtern mit einer benutzerdefinierten Filterfunktion.

def filter_func(a):
"""wrapper function to pass a in udf"""
    def filter_func_(col):
    """filtering function"""
        if col in a.value:
            return True

    return False

return udf(filter_func_, BooleanType())

# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \
0
Alex_Gidiotis