webentwicklung-frage-antwort-db.com.de

Wie wählt man die letzte Zeile aus und wie kann auf den PySpark-Datenrahmen nach Index zugegriffen werden?

Wie aus einem PySpark SQL-Datenrahmen 

name age city
abc   20  A
def   30  B

So erhalten Sie die letzte Zeile (wie bei df.limit (1) kann ich die erste Zeile des Datenrahmens in den neuen Datenrahmen bringen).

Und wie kann ich über Index auf die Datenrahmenzeilen zugreifen? 12 oder 200.

In Pandas kann ich das machen

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

Ich bin nur neugierig, wie man auf solche oder alternative Weise auf den Pyspark-Datenrahmen zugreift.

Vielen Dank

7
Satya

Wie bekomme ich die letzte Reihe?.

Langer und hässlicher Weg, der davon ausgeht, dass alle Spalten auffindbar sind:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

Wenn nicht alle Spalten bestellt werden können, können Sie Folgendes versuchen:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Hinweis. Es gibt eine last-Funktion in pyspark.sql.functions`o.a.s.sql.functions, aber unter Berücksichtigung von Beschreibung der entsprechenden Ausdrücke ist hier keine gute Wahl.

wie kann ich über index.like auf die Datenrahmenzeilen zugreifen?

Du kannst nicht. Spark DataFrame und über Index zugänglich. Sie können Indizes mit zipWithIndex hinzufügen und später filtern. Denken Sie nur an diese _/O(N) - Operation. 

5
zero323

Wie bekomme ich die letzte Reihe?.

Wenn Sie über eine Spalte verfügen, die Sie zum Bestellen von Dataframes verwenden können, z. B. "Index", können Sie den letzten Datensatz mit SQL einfach abrufen: 1) Ihre Tabelle in absteigender Reihenfolge und .__ 1. Wert aus dieser Bestellung

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

Und wie kann ich über Index auf die Datenrahmenzeilen zugreifen? 12 oder 200.

Auf ähnliche Weise können Sie in jeder Zeile aufnehmen

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

Wenn Sie keine "Index" -Spalte haben, können Sie diese mit erstellen

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())
3
from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.groupBy().agg(*expr)

Nur ein Tipp: Sie haben anscheinend immer noch die Denkweise einer Person, die mit Pandas oder R. arbeitet. Spark ist ein anderes Paradigma in der Art, wie wir mit Daten arbeiten. Sie greifen nicht mehr auf Daten in einzelnen Zellen zu, jetzt arbeiten Sie mit ganzen Brocken davon. Wenn Sie ständig Sachen sammeln und Aktionen durchführen, wie Sie gerade getan haben, verlieren Sie das gesamte Konzept des Parallelismus, das der Funke bietet. Sehen Sie sich das Konzept von Transformationen und Aktionen in Spark an.

2

Verwenden Sie das Folgende, um eine Indexspalte abzurufen, die monoton steigende, eindeutige,undaufeinanderfolgende Ganzzahlen enthält, die nicht wie monotonically_increasing_id() funktioniert. Die Indizes werden in der gleichen Reihenfolge wie colName Ihres DataFrame aufsteigend angezeigt.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)

df = df\
 .withColumn('int', F.lit(1))\
 .withColumn('index', F.sum('int').over(window))\
 .drop('int')\

Verwenden Sie den folgenden Code, um das Ende oder das letzte rownums des DataFrame anzuzeigen.

rownums = 10
df.where(F.col('index')>df.count()-rownums).show()

Verwenden Sie den folgenden Code, um die Zeilen von start_row bis end_row dem DataFrame anzuzeigen. 

start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()

zipWithIndex() ist eine RDD-Methode, die monoton steigende, eindeutige und fortlaufende Ganzzahlen zurückgibt, die Implementierung scheint jedoch viel langsamer zu sein, so dass Sie zu Ihrem ursprünglichen DataFrame mit einer ID-Spalte zurückkehren können.

0
Clay