webentwicklung-frage-antwort-db.com.de

Spark - Aufteilung () vs Coalesce ()

Laut Learning Spark

Denken Sie daran, dass das Neupartitionieren Ihrer Daten ein ziemlich teurer Vorgang ist. Spark bietet auch eine optimierte Version von repartition () mit dem Namen coalesce (), mit der Datenverschiebungen vermieden werden können, jedoch nur, wenn Sie die Anzahl der RDD-Partitionen verringern.

Ein Unterschied, den ich bekomme, ist, dass mit repartition () die Anzahl der Partitionen erhöht/verringert werden kann, mit coalesce () jedoch nur die Anzahl der Partitionen verringert werden kann.

Wenn die Partitionen auf mehrere Maschinen verteilt sind und coalesce () ausgeführt wird, wie können dann Datenverschiebungen vermieden werden?

151
Praveen Sripati

Es vermeidet ein full shuffle. Wenn bekannt ist, dass die Anzahl abnimmt, kann der Executor die Daten auf der minimalen Anzahl von Partitionen sicher aufbewahren und nur die Daten von den zusätzlichen Knoten auf die Knoten verschieben, die wir behalten haben. 

So würde es ungefähr so ​​aussehen:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Dann coalesce bis zu 2 Partitionen:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Beachten Sie, dass für Knoten 1 und Knoten 3 die ursprünglichen Daten nicht zum Verschieben erforderlich waren.

234
Justin Pihony

Justins Antwort ist großartig und diese Antwort geht tiefer.

Der repartition-Algorithmus führt eine vollständige Umstellung durch und erstellt neue Partitionen mit gleichmäßig verteilten Daten. Erstellen Sie einen DataFrame mit den Zahlen von 1 bis 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf enthält 4 Partitionen auf meinem Rechner.

numbersDf.rdd.partitions.size // => 4

So werden die Daten auf den Partitionen aufgeteilt:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Lassen Sie uns mit der Methode repartition eine vollständige Umstellung durchführen und diese Daten auf zwei Knoten abrufen.

val numbersDfR = numbersDf.repartition(2)

So werden die numbersDfR-Daten auf meinem Computer partitioniert:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

Die repartition-Methode erstellt neue Partitionen und verteilt die Daten gleichmäßig in den neuen Partitionen (die Datenverteilung ist sogar für größere Datensätze gleich).

Unterschied zwischen coalesce und repartition

coalesce verwendet vorhandene Partitionen, um die Datenmenge zu minimieren, die gemischt wird. repartition erstellt neue Partitionen und führt eine vollständige Umstellung durch. coalesce führt zu Partitionen mit unterschiedlichen Datenmengen (manchmal Partitionen mit sehr unterschiedlichen Größen), und repartition führt zu Partitionen mit ungefähr gleicher Größe.

Ist coalesce oder repartition schneller?

coalesce ist möglicherweise schneller als repartition, aber Partitionen ungleicher Größe sind im Allgemeinen langsamer als Partitionen gleicher Größe. Nach dem Filtern eines großen Datensatzes müssen Sie in der Regel Datasets neu partitionieren. Ich habe festgestellt, dass repartition insgesamt schneller ist, da Spark für Partitionen gleicher Größe ausgelegt ist.

Lesen Sie diesen Blogbeitrag wenn Sie noch mehr Details wünschen.

102
Powers

Ein zusätzlicher Punkt, der hier zu beachten ist, ist, da das grundlegende Prinzip von Spark RDD Unveränderlichkeit ist. Durch die Aufteilung oder Zusammenlegung wird eine neue RDD erstellt. Die Basis-RDD bleibt mit ihrer ursprünglichen Anzahl von Partitionen bestehen. Falls der Anwendungsfall verlangt, RDD im Cache zu speichern, muss dies auch für die neu erstellte RDD erfolgen.

scala> pairMrkt.repartition(10)
res16: org.Apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2
14
Harikrishnan Ck

Alle Antworten fügen dieser sehr häufig gestellten Frage ein großes Wissen hinzu.

Der Tradition dieser Zeitachse folgend, sind hier meine 2 Cent.

Ich fand die -Repartition in ganz bestimmten Fällen schneller als coalesce.

Wenn in meiner Anwendung die Anzahl der Dateien, die wir schätzen, niedriger als der bestimmte Schwellenwert ist, funktioniert die Partitionierung schneller. 

Hier ist was ich meine

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

Wenn im obigen Snippet meine Dateien weniger als 20 waren, dauerte es für immer, bis die Koaleszierung abgeschlossen war, während die Partitionierung viel schneller war und der obige Code.

Natürlich hängt diese Anzahl (20) von der Anzahl der Arbeiter und der Datenmenge ab.

Hoffentlich hilft das.

4
Abhishek

Was aus den Code und Code-Dokumenten folgt, ist, dass coalesce(n) dasselbe ist wie coalesce(n, shuffle = false) und repartition(n) dasselbe ist wie coalesce(n, shuffle = true)

Somit können sowohl coalesce als auch repartition verwendet werden, um die Anzahl der Partitionen zu erhöhen

Mit shuffle = true können Sie tatsächlich zu einer größeren Anzahl von Partitionen verschmelzen. Dies ist nützlich, wenn Sie eine kleine Anzahl von Partitionen haben, z. B. 100, und möglicherweise einige Partitionen ungewöhnlich groß sind.

Ein weiterer wichtiger Hinweis zur Hervorhebung: Wenn Siedie Anzahl der Partitionen drastisch verringern, sollten Sie erwägen, eine gemischte Version von coalesce (wie repartition in diesem Fall). Auf diese Weise können Ihre Berechnungenparallel auf übergeordneten Partitionen(mehrere Aufgaben) ausgeführt werden.

Wenn Sie jedoch eine drastische Verschmelzung durchführen, z. Bei numPartitions = 1 kann dies dazu führen, dass Ihre Berechnung auf weniger Knoten erfolgt, als Sie möchten (z. B. ein Knoten bei numPartitions = 1). Um dies zu vermeiden, können Sie shuffle = true übergeben. Dies fügt einen Shuffle-Schritt hinzu, bedeutet jedoch, dass die aktuellen Upstream-Partitionen parallel ausgeführt werden (unabhängig von der aktuellen Partitionierung).

Bitte beachten Sie auch die entsprechende Antwort hier

1
kasur

repartition - Es wird empfohlen, die Partitionierung zu verwenden und dabei keine Partitionen zu vergrößern, da dabei alle Daten gemischt werden.

coalesce- Es wird empfohlen, Coalesce zu verwenden, ohne Partitionen zu reduzieren. Wenn Sie beispielsweise über 3 Partitionen verfügen und diese auf 2 Partitionen reduzieren möchten, wird Coalesce die Daten der dritten Partition auf Partition 1 und Partition 2 verschieben. Partition 1 und Partition 2 verbleiben im selben Container zwischen Executor wird hoch sein und es wirkt sich auf die Leistung aus.

Performance weise coalesce Leistung besser als repartition, während keine Partitionen reduziert werden. 

1
Kamalesan C

Auf einfache Weise COALESCE: - ist nur für die Verringerung der Anzahl der Partitionen. Kein Mischen von Daten. Es werden nur die Partitionen komprimiert

REPARTITION: - ist sowohl für das Erhöhen als auch für das Verringern der Anzahl der Partitionen gedacht. Es erfolgt jedoch ein Mischen

Beispiel:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Beides funktioniert gut

Im Allgemeinen entscheiden wir uns jedoch für diese beiden Dinge, wenn wir die Ausgabe in einem Cluster sehen müssen. 

All die großartige Antwort, die ich hinzufügen möchte, ist, dass die Neu-Partitionierung die beste Option ist, um die Parallelisierung von Daten zu nutzen, und Coalesce gibt eine preiswerte Option, um die Partition zu reduzieren, und sehr nützlich, wenn Daten in HDFS oder eine andere Senke geschrieben werden, um die Vorteile von Big zu nutzen schreibt Ich habe dies beim Schreiben von Daten im Parkettformat als nützlich erachtet, um den vollen Nutzen daraus zu ziehen.

0
Ashkrit Sharma

Sie sollten jedoch auch sicherstellen, dass die Daten, die zu Koaleszenzknoten kommen, stark konfiguriert sein sollten, wenn Sie große Datenmengen verwenden. Da alle Daten auf diese Knoten geladen werden, kann dies zu einer Speicherausnahme führen Obwohl Reparaturen teuer sind, ziehe ich sie vor. Da mischt und verteilt sie die Daten gleichmäßig.

Seien Sie weise, zwischen Koaleszenz und Aufteilung zu wählen. 

0
Arun Goudar

Für jemanden, der Probleme hatte, eine einzelne CSV-Datei aus PySpark (AWS EMR) als Ausgabe zu generieren und auf s3 zu speichern, half die Verwendung der Neupartitionierung. Der Grund dafür ist, dass Coalesce kein vollständiges Shuffle ausführen kann, aber eine Neupartitionierung. Grundsätzlich können Sie die Anzahl der Partitionen mithilfe der Neupartitionierung erhöhen oder verringern, jedoch nur die Anzahl der Partitionen (jedoch nicht 1) mithilfe der Zusammenführung verringern. Hier ist der Code für alle, die versuchen, eine CSV von AWS EMR nach S3 zu schreiben:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')
0
user11385784

Ich möchte der Antwort von Justin und Power folgendes hinzufügen: 

"Partitionierung" ignoriert vorhandene Partitionen und erstellt neue Partitionen. So können Sie Datenversatz korrigieren. Sie können Partitionsschlüssel angeben, um die Verteilung zu definieren. Datenversatz ist eines der größten Probleme im "Big Data" -Problemraum.

"coalesce" arbeitet mit vorhandenen Partitionen zusammen und mischt eine Teilmenge davon. Es kann den Datenversatz nicht so sehr beheben, wie dies bei der "Aufteilung" der Fall ist. Selbst wenn es weniger teuer ist, ist es möglicherweise nicht das, was Sie brauchen.

0
Salim