webentwicklung-frage-antwort-db.com.de

Verketten Sie zwei PySpark-Datenrahmen

Ich versuche, zwei PySpark-Datenrahmen mit einigen Spalten zu verketten, die nur auf jeder von ihnen stehen:

from pyspark.sql.functions import randn, Rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

und jetzt möchte ich einen dritten Datenrahmen generieren. Ich möchte etwas wie Pandas concat:

df_1.show()
+---+--------------------+--------------------+
| id|             uniform|              normal|
+---+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|
|  1|  0.8642043127063618|  0.3900018344856156|
|  2|  0.8292577771850476|  1.8077401259195247|
|  3|   0.198558705368724| -0.4270585782850261|
|  4|0.012661361966674889|   0.702634599720141|
|  5|  0.8535692890157796|-0.42355804115129153|
|  6|  0.3723296190171911|  1.3789648582622995|
|  7|  0.9529794127670571| 0.16238718777444605|
|  8|  0.9746632635918108| 0.02448061333761742|
|  9|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

df_2.show()
+---+--------------------+--------------------+
| id|             uniform|            normal_2|
+---+--------------------+--------------------+
| 11|  0.3221262660507942|  1.0269298899109824|
| 12|  0.4030672316912547|   1.285648175568798|
| 13|  0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876|  -0.678915153834693|
| 15|  0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17|  0.6411908952297819|  0.9161177183227823|
| 18|  0.5669232696934479|  0.7270125277020573|
| 19|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

#do some concatenation here, how?

df_concat.show()

| id|             uniform|              normal| normal_2   |
+---+--------------------+--------------------+------------+
|  0|  0.8122802274304282|  1.2423430583597714| None       |
|  1|  0.8642043127063618|  0.3900018344856156| None       |
|  2|  0.8292577771850476|  1.8077401259195247| None       |
|  3|   0.198558705368724| -0.4270585782850261| None       |
|  4|0.012661361966674889|   0.702634599720141| None       |
|  5|  0.8535692890157796|-0.42355804115129153| None       |
|  6|  0.3723296190171911|  1.3789648582622995| None       |
|  7|  0.9529794127670571| 0.16238718777444605| None       |
|  8|  0.9746632635918108| 0.02448061333761742| None       |
|  9|   0.513622008243935|  0.7626741803250845| None       |
| 11|  0.3221262660507942|  None              | 0.123      |
| 12|  0.4030672316912547|  None              |0.12323     |
| 13|  0.9690555459609131|  None              |0.123       |
| 14|0.011913836266515876|  None              |0.18923     |
| 15|  0.9359607054250594|  None              |0.99123     |
| 16| 0.45680471157575453|  None              |0.123       |
| 17|  0.6411908952297819|  None              |1.123       |
| 18|  0.5669232696934479|  None              |0.10023     |
| 19|   0.513622008243935|  None              |0.916332123 |
+---+--------------------+--------------------+------------+

Ist das möglich?

21
Ivan

Vielleicht können Sie die nicht vorhandenen Spalten erstellen und union (unionAll für Spark 1.6 oder niedriger) aufrufen:

cols = ['id', 'uniform', 'normal', 'normal_2']    

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)

result = df_1_new.union(df_2_new)
36
Daniel de Paula
df_concat = df_1.union(df_2)

Die Datenrahmen müssen möglicherweise identische Spalten haben. In diesem Fall können Sie withColumn() verwenden, um normal_1 und normal_2 zu erstellen.

24
David

Hier ist eine Möglichkeit, falls es noch nützlich ist: Ich habe dies in pyspark Shell, Python Version 2.7.12 ausgeführt und meine Spark-Installation war Version 2.0.1.

PS: Ich denke, Sie wollten verschiedene Samen für df_1 df_2 verwenden, und der folgende Code spiegelt dies wider.

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, Rand
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 10)
df_2 = sqlContext.range(11, 20)
df_1 = df_1.select("id", Rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", Rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))

def get_uniform(df1_uniform, df2_uniform):
    if df1_uniform:
        return df1_uniform
    if df2_uniform:
        return df2_uniform

u_get_uniform = F.udf(get_uniform, FloatType())

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))

Hier sind die Ausgänge, die ich bekomme:

df_1.show()
+---+-------------------+--------------------+
| id|            uniform|              normal|
+---+-------------------+--------------------+
|  0|0.41371264720975787|  0.5888539012978773|
|  1| 0.7311719281896606|  0.8645537008427937|
|  2| 0.1982919638208397| 0.06157382353970104|
|  3|0.12714181165849525|  0.3623040918178586|
|  4| 0.7604318153406678|-0.49575204523675975|
|  5|0.12030715258495939|  1.0854146699817222|
|  6|0.12131363910425985| -0.5284523629183004|
|  7|0.44292918521277047| -0.4798519469521663|
|  8| 0.8898784253886249| -0.8820294772950535|
|  9|0.03650707717266999| -2.1591956435415334|
+---+-------------------+--------------------+

df_2.show()
+---+-------------------+--------------------+
| id|            uniform|            normal_2|
+---+-------------------+--------------------+
| 11| 0.1982919638208397| 0.06157382353970104|
| 12|0.12714181165849525|  0.3623040918178586|
| 13|0.12030715258495939|  1.0854146699817222|
| 14|0.12131363910425985| -0.5284523629183004|
| 15|0.44292918521277047| -0.4798519469521663|
| 16| 0.8898784253886249| -0.8820294772950535|
| 17| 0.2731073068483362|-0.15116027592854422|
| 18| 0.7784518091224375| -0.3785563841011868|
| 19|0.43776394586845413| 0.47700719174464357|
+---+-------------------+--------------------+

df_3.show()
+---+-----------+--------------------+--------------------+                     
| id|    uniform|              normal|            normal_2|
+---+-----------+--------------------+--------------------+
|  0| 0.41371265|  0.5888539012978773|                null|
|  1|  0.7311719|  0.8645537008427937|                null|
|  2| 0.19829196| 0.06157382353970104|                null|
|  3| 0.12714182|  0.3623040918178586|                null|
|  4|  0.7604318|-0.49575204523675975|                null|
|  5|0.120307155|  1.0854146699817222|                null|
|  6| 0.12131364| -0.5284523629183004|                null|
|  7| 0.44292918| -0.4798519469521663|                null|
|  8| 0.88987845| -0.8820294772950535|                null|
|  9|0.036507078| -2.1591956435415334|                null|
| 11| 0.19829196|                null| 0.06157382353970104|
| 12| 0.12714182|                null|  0.3623040918178586|
| 13|0.120307155|                null|  1.0854146699817222|
| 14| 0.12131364|                null| -0.5284523629183004|
| 15| 0.44292918|                null| -0.4798519469521663|
| 16| 0.88987845|                null| -0.8820294772950535|
| 17| 0.27310732|                null|-0.15116027592854422|
| 18|  0.7784518|                null| -0.3785563841011868|
| 19| 0.43776396|                null| 0.47700719174464357|
+---+-----------+--------------------+--------------------+
3
user020314

Die obigen Antworten sind sehr elegant. Ich habe diese Funktion vor langer Zeit geschrieben, bei der ich auch Schwierigkeiten hatte, zwei Datenrahmen mit unterschiedlichen Spalten zu verketten. 

Angenommen, Sie haben den Datenrahmen sdf1 und sdf2

from pyspark.sql import functions as F
from pyspark.sql.types import *

def unequal_union_sdf(sdf1, sdf2):
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)

    for i,j in s_df2_schema.difference(s_df1_schema):
        sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))

    for i,j in s_df1_schema.difference(s_df2_schema):
        sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))

    common_schema_colnames = sdf1.columns
    sdk = \
        sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2) 
2
furianpandit

Um es allgemeiner zu gestalten, beide Spalten in df1 Und df2 Zu belassen:

import pyspark.sql.functions as F

# Keep all columns in either df1 or df2
def outter_union(df1, df2):

    # Add missing columns to df1
    left_df = df1
    for column in set(df2.columns) - set(df1.columns):
        left_df = left_df.withColumn(column, F.lit(None))

    # Add missing columns to df2
    right_df = df2
    for column in set(df1.columns) - set(df2.columns):
        right_df = right_df.withColumn(column, F.lit(None))

    # Make sure columns are ordered the same
    return left_df.union(right_df.select(left_df.columns))
1
Yuchen Zhong

Das sollte es für dich tun ...

from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, Rand, lit, coalesce, col
import pyspark.sql.functions as F

df_1 = sqlContext.range(0, 6)
df_2 = sqlContext.range(3, 10)
df_1 = df_1.select("id", lit("old").alias("source"))
df_2 = df_2.select("id")

df_1.show()
df_2.show()
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
  .select(\
    [coalesce(df_1.id, df_2.id).alias("id")] +\
    [col("df_1." + c) for c in df_1.columns if c != "id"])\
  .sort("id")
df_3.show()
0
Hans