Ich versuche, zwei PySpark-Datenrahmen mit einigen Spalten zu verketten, die nur auf jeder von ihnen stehen:
from pyspark.sql.functions import randn, Rand
df_1 = sqlContext.range(0, 10)
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
df_2 = sqlContext.range(11, 20)
+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+
df_1 = df_1.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))
und jetzt möchte ich einen dritten Datenrahmen generieren. Ich möchte etwas wie Pandas concat
:
df_1.show()
+---+--------------------+--------------------+
| id| uniform| normal|
+---+--------------------+--------------------+
| 0| 0.8122802274304282| 1.2423430583597714|
| 1| 0.8642043127063618| 0.3900018344856156|
| 2| 0.8292577771850476| 1.8077401259195247|
| 3| 0.198558705368724| -0.4270585782850261|
| 4|0.012661361966674889| 0.702634599720141|
| 5| 0.8535692890157796|-0.42355804115129153|
| 6| 0.3723296190171911| 1.3789648582622995|
| 7| 0.9529794127670571| 0.16238718777444605|
| 8| 0.9746632635918108| 0.02448061333761742|
| 9| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
df_2.show()
+---+--------------------+--------------------+
| id| uniform| normal_2|
+---+--------------------+--------------------+
| 11| 0.3221262660507942| 1.0269298899109824|
| 12| 0.4030672316912547| 1.285648175568798|
| 13| 0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876| -0.678915153834693|
| 15| 0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17| 0.6411908952297819| 0.9161177183227823|
| 18| 0.5669232696934479| 0.7270125277020573|
| 19| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
#do some concatenation here, how?
df_concat.show()
| id| uniform| normal| normal_2 |
+---+--------------------+--------------------+------------+
| 0| 0.8122802274304282| 1.2423430583597714| None |
| 1| 0.8642043127063618| 0.3900018344856156| None |
| 2| 0.8292577771850476| 1.8077401259195247| None |
| 3| 0.198558705368724| -0.4270585782850261| None |
| 4|0.012661361966674889| 0.702634599720141| None |
| 5| 0.8535692890157796|-0.42355804115129153| None |
| 6| 0.3723296190171911| 1.3789648582622995| None |
| 7| 0.9529794127670571| 0.16238718777444605| None |
| 8| 0.9746632635918108| 0.02448061333761742| None |
| 9| 0.513622008243935| 0.7626741803250845| None |
| 11| 0.3221262660507942| None | 0.123 |
| 12| 0.4030672316912547| None |0.12323 |
| 13| 0.9690555459609131| None |0.123 |
| 14|0.011913836266515876| None |0.18923 |
| 15| 0.9359607054250594| None |0.99123 |
| 16| 0.45680471157575453| None |0.123 |
| 17| 0.6411908952297819| None |1.123 |
| 18| 0.5669232696934479| None |0.10023 |
| 19| 0.513622008243935| None |0.916332123 |
+---+--------------------+--------------------+------------+
Ist das möglich?
Vielleicht können Sie die nicht vorhandenen Spalten erstellen und union
(unionAll
für Spark 1.6 oder niedriger) aufrufen:
cols = ['id', 'uniform', 'normal', 'normal_2']
df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)
result = df_1_new.union(df_2_new)
df_concat = df_1.union(df_2)
Die Datenrahmen müssen möglicherweise identische Spalten haben. In diesem Fall können Sie withColumn()
verwenden, um normal_1
und normal_2
zu erstellen.
Hier ist eine Möglichkeit, falls es noch nützlich ist: Ich habe dies in pyspark Shell, Python Version 2.7.12 ausgeführt und meine Spark-Installation war Version 2.0.1.
PS: Ich denke, Sie wollten verschiedene Samen für df_1 df_2 verwenden, und der folgende Code spiegelt dies wider.
from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, Rand
import pyspark.sql.functions as F
df_1 = sqlContext.range(0, 10)
df_2 = sqlContext.range(11, 20)
df_1 = df_1.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", Rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))
def get_uniform(df1_uniform, df2_uniform):
if df1_uniform:
return df1_uniform
if df2_uniform:
return df2_uniform
u_get_uniform = F.udf(get_uniform, FloatType())
df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))
Hier sind die Ausgänge, die ich bekomme:
df_1.show()
+---+-------------------+--------------------+
| id| uniform| normal|
+---+-------------------+--------------------+
| 0|0.41371264720975787| 0.5888539012978773|
| 1| 0.7311719281896606| 0.8645537008427937|
| 2| 0.1982919638208397| 0.06157382353970104|
| 3|0.12714181165849525| 0.3623040918178586|
| 4| 0.7604318153406678|-0.49575204523675975|
| 5|0.12030715258495939| 1.0854146699817222|
| 6|0.12131363910425985| -0.5284523629183004|
| 7|0.44292918521277047| -0.4798519469521663|
| 8| 0.8898784253886249| -0.8820294772950535|
| 9|0.03650707717266999| -2.1591956435415334|
+---+-------------------+--------------------+
df_2.show()
+---+-------------------+--------------------+
| id| uniform| normal_2|
+---+-------------------+--------------------+
| 11| 0.1982919638208397| 0.06157382353970104|
| 12|0.12714181165849525| 0.3623040918178586|
| 13|0.12030715258495939| 1.0854146699817222|
| 14|0.12131363910425985| -0.5284523629183004|
| 15|0.44292918521277047| -0.4798519469521663|
| 16| 0.8898784253886249| -0.8820294772950535|
| 17| 0.2731073068483362|-0.15116027592854422|
| 18| 0.7784518091224375| -0.3785563841011868|
| 19|0.43776394586845413| 0.47700719174464357|
+---+-------------------+--------------------+
df_3.show()
+---+-----------+--------------------+--------------------+
| id| uniform| normal| normal_2|
+---+-----------+--------------------+--------------------+
| 0| 0.41371265| 0.5888539012978773| null|
| 1| 0.7311719| 0.8645537008427937| null|
| 2| 0.19829196| 0.06157382353970104| null|
| 3| 0.12714182| 0.3623040918178586| null|
| 4| 0.7604318|-0.49575204523675975| null|
| 5|0.120307155| 1.0854146699817222| null|
| 6| 0.12131364| -0.5284523629183004| null|
| 7| 0.44292918| -0.4798519469521663| null|
| 8| 0.88987845| -0.8820294772950535| null|
| 9|0.036507078| -2.1591956435415334| null|
| 11| 0.19829196| null| 0.06157382353970104|
| 12| 0.12714182| null| 0.3623040918178586|
| 13|0.120307155| null| 1.0854146699817222|
| 14| 0.12131364| null| -0.5284523629183004|
| 15| 0.44292918| null| -0.4798519469521663|
| 16| 0.88987845| null| -0.8820294772950535|
| 17| 0.27310732| null|-0.15116027592854422|
| 18| 0.7784518| null| -0.3785563841011868|
| 19| 0.43776396| null| 0.47700719174464357|
+---+-----------+--------------------+--------------------+
Die obigen Antworten sind sehr elegant. Ich habe diese Funktion vor langer Zeit geschrieben, bei der ich auch Schwierigkeiten hatte, zwei Datenrahmen mit unterschiedlichen Spalten zu verketten.
Angenommen, Sie haben den Datenrahmen sdf1 und sdf2
from pyspark.sql import functions as F
from pyspark.sql.types import *
def unequal_union_sdf(sdf1, sdf2):
s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)
for i,j in s_df2_schema.difference(s_df1_schema):
sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))
for i,j in s_df1_schema.difference(s_df2_schema):
sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))
common_schema_colnames = sdf1.columns
sdk = \
sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
return sdk
sdf_concat = unequal_union_sdf(sdf1, sdf2)
Um es allgemeiner zu gestalten, beide Spalten in df1
Und df2
Zu belassen:
import pyspark.sql.functions as F
# Keep all columns in either df1 or df2
def outter_union(df1, df2):
# Add missing columns to df1
left_df = df1
for column in set(df2.columns) - set(df1.columns):
left_df = left_df.withColumn(column, F.lit(None))
# Add missing columns to df2
right_df = df2
for column in set(df1.columns) - set(df2.columns):
right_df = right_df.withColumn(column, F.lit(None))
# Make sure columns are ordered the same
return left_df.union(right_df.select(left_df.columns))
Das sollte es für dich tun ...
from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, Rand, lit, coalesce, col
import pyspark.sql.functions as F
df_1 = sqlContext.range(0, 6)
df_2 = sqlContext.range(3, 10)
df_1 = df_1.select("id", lit("old").alias("source"))
df_2 = df_2.select("id")
df_1.show()
df_2.show()
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
.select(\
[coalesce(df_1.id, df_2.id).alias("id")] +\
[col("df_1." + c) for c in df_1.columns if c != "id"])\
.sort("id")
df_3.show()