webentwicklung-frage-antwort-db.com.de

Fügen Sie die Spaltensumme als neue Spalte im PySpark-Datenrahmen hinzu

Ich verwende PySpark und habe ein Spark-Datenfeld mit einer Reihe numerischer Spalten. Ich möchte eine Spalte hinzufügen, die die Summe aller anderen Spalten ist.

Angenommen, mein Datenrahmen hatte die Spalten "a", "b" und "c". Ich weiß ich kann das:

df.withColumn('total_col', df.a + df.b + df.c)

Das Problem ist, dass ich nicht jede Spalte einzeln eingeben und hinzufügen möchte, besonders wenn ich viele Spalten habe. Ich möchte in der Lage sein, dies automatisch zu tun, oder indem ich eine Liste mit Spaltennamen angibt, die ich hinzufügen möchte. Gibt es eine andere Möglichkeit, dies zu tun?

17
plam

Das war nicht offensichtlich. Ich sehe keine zeilenbasierte Summe der in der Spark-Dataframes-API definierten Spalten.

Version 2

Dies kann auf relativ einfache Weise erfolgen:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns wird von pyspark als Liste von Zeichenfolgen bereitgestellt, die alle Spaltennamen im Spark-Datenframe enthält. Für eine andere Summe können Sie stattdessen eine andere Liste von Spaltennamen angeben.

Ich habe dies nicht als erste Lösung versucht, weil ich nicht sicher war, wie es sich verhalten würde. Aber es funktioniert.

Version 1

Das ist zu kompliziert, funktioniert aber auch.

Du kannst das:

  1. verwenden Sie df.columns, um eine Liste der Namen der Spalten zu erhalten
  2. verwenden Sie diese Namensliste, um eine Liste der Spalten zu erstellen
  3. Übergeben Sie diese Liste an etwas, das die überladene Add-Funktion der Spalte in einer Falz-Funktionsweise aufruft.

Mit python's verkleinern , etwas Wissen über die Funktionsweise der Überladung von Operatoren und dem Pyspark-Code für die Spalten hier wird das zu:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

Beachten Sie, dass dies eine Python-Reduzierung ist, keine Funken-RDD-Reduzierung, und der Klammerausdruck in dem zweiten Parameter zur Reduzierung erfordert die Klammer, da es sich um einen Listengeneratorausdruck handelt. 

Getestet, funktioniert!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
32
Paul

Mein Problem war dem obigen ähnlich (etwas komplexer), da ich aufeinanderfolgende Spaltensummen als neue Spalten im PySpark-Datenrahmen hinzufügen musste. Dieser Ansatz verwendet Code aus Pauls Version 1 oben: 

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

fügen Sie neue Spalten hinzu, die kumulative Summen sind (fortlaufend):

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show ()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

"Summensummen" hinzugefügt:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
0
Grant Shannon

Die Lösung 

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

gepostet von @Paul works. Trotzdem bekam ich den Fehler, so viele andere, wie ich gesehen habe.

TypeError: 'Column' object is not callable

Nach einiger Zeit fand ich das Problem (zumindest in meinem Fall). Das Problem ist, dass ich zuvor einige Pyspark-Funktionen mit der Leitung importiert habe

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

die Zeile importierte also den Befehl sum pyspark, während df.withColumn('total', sum(df[col] for col in df.columns)) die normale Python-Funktion sum verwenden soll.

Sie können die Referenz der Pyspark-Funktion mit del sum löschen.

Ansonsten habe ich in meinem Fall den Import geändert 

import pyspark.sql.functions as F

und verwies die Funktionen dann als F.sum.

0
Francesco Boi

Am einfachsten geht es mit der Funktion expr

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
0
Jonathan