webentwicklung-frage-antwort-db.com.de

Alle Datenframes in (pyspark) werden nicht beibehalten

Ich bin eine Funkenanwendung mit mehreren Punkten, an denen ich den aktuellen Status beibehalten möchte. Dies ist normalerweise nach einem großen Schritt oder dem Zwischenspeichern eines Zustands, den ich mehrmals verwenden möchte. Es scheint, als würde eine neue Kopie im Arbeitsspeicher zwischengespeichert, wenn ich den Cache ein zweites Mal in meinem Dataframe aufrufen. In meiner Anwendung führt dies zu Speicherproblemen beim Skalieren. Obwohl ein gegebenes Datenframe in meinen aktuellen Tests maximal etwa 100 MB beträgt, wächst die kumulierte Größe der Zwischenergebnisse über den zugewiesenen Speicher des Executors hinaus. Unten finden Sie ein kleines Beispiel, das dieses Verhalten zeigt.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
Hive_context = HiveContext(spark_context)

df = (Hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

1,2,3
4,5,6
7,8,9

Wenn Sie sich die Anwendungs-Benutzeroberfläche ansehen, gibt es eine Kopie des ursprünglichen Datenrahmens, zusätzlich zu der mit der neuen Spalte. Ich kann die Originalkopie entfernen, indem Sie df.unpersist() vor der withColumn-Zeile aufrufen. Ist dies die empfohlene Methode, um zwischengespeicherte Zwischenergebnisse zu entfernen (d. H. Vor jeder cache()-Anweisung unpersistieren).

Ist es auch möglich, alle zwischengespeicherten Objekte zu löschen. In meiner Anwendung gibt es natürliche Haltepunkte, an denen ich einfach den gesamten Speicher löschen und zur nächsten Datei übergehen kann. Ich möchte dies tun, ohne für jede Eingabedatei eine neue Funkenanwendung zu erstellen. 

Danke im Voraus!

13
bjack3

Spark 2.x

Sie können Catalog.clearCache verwenden:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

Sie können die SQLContext.clearCache - Methode verwenden 

Entfernt alle zwischengespeicherten Tabellen aus dem Cache-Speicher.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
23
zero323

Wir benutzen das oft

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))

dabei ist sc eine sparkContext-Variable.

4
Tagar

kann alle df's einzeln aufheben:

firstDF.unpersist()
secondDF.unpersist()
0
Grant Shannon