webentwicklung-frage-antwort-db.com.de

wie man durch jede Zeile von dataFrame in Pyspark geht

Z.B

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

Die obige Anweisung druckt die gesamte Tabelle auf dem Terminal, aber ich möchte auf jede Zeile in dieser Tabelle mit for oder while zugreifen, um weitere Berechnungen durchzuführen.

24
Arti Berde

Sie würden eine benutzerdefinierte Funktion definieren und eine Karte verwenden.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

oder

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

Die benutzerdefinierte Funktion wird dann auf jede Zeile des Datenrahmens angewendet. Beachten Sie, dass sample2 ein RDD ist, kein Datenframe.

Map ist erforderlich, wenn Sie komplexere Berechnungen durchführen möchten. Wenn Sie nur eine abgeleitete Spalte hinzufügen müssen, können Sie withColumn verwenden und einen Datenrahmen zurückgeben.

sample3 = sample.withColumn('age2', sample.age + 2)
33
David

Das kannst du einfach nicht. DataFrames ist wie andere verteilte Datenstrukturen nicht iterierbar und kann nur mit dedizierten Funktionen höherer Ordnung und/oder SQL-Methoden aufgerufen werden.

Sie können natürlich collect 

for row in df.rdd.collect():
    do_something(row)

oder toLocalIterator konvertieren 

for row in df.rdd.toLocalIterator():
    do_something(row)

und iterieren Sie lokal wie oben gezeigt, aber es übertrifft alle Verwendungszwecke von Spark.

21
zero323

Mit List Comprehensions in Python können Sie eine ganze Spalte mit nur zwei Zeilen in einer Liste sammeln:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

Im obigen Beispiel geben wir eine Liste von Tabellen in der Datenbank "default" zurück. Sie können diese jedoch anpassen, indem Sie die in sql () verwendete Abfrage ersetzen.

Oder mehr abgekürzt:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

Für Ihr Beispiel mit drei Spalten können wir eine Liste von Wörterbüchern erstellen und diese dann in einer for-Schleife durchlaufen.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))
6
aaronsteers

Wenn Sie für jede Zeile in einem DataFrame-Objekt etwas tun möchten, verwenden Sie map. Dadurch können Sie in jeder Zeile weitere Berechnungen durchführen. Es ist das Äquivalent einer Schleife über die gesamte Datenmenge von 0 bis len(dataset)-1

Beachten Sie, dass hierdurch ein PipelinedRDD und kein DataFrame zurückgegeben wird. 

2
Katya Handler

Gib einen Versuch so

result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); for f in result.collect(): print (f.col_name)
1
Bala cse

über

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

sollte sein 

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

für name sind age und city keine Variablen, sondern lediglich Schlüssel des Wörterbuchs.

1
ten2the6