webentwicklung-frage-antwort-db.com.de

Warum so viele Aufgaben in meinem Funkenjob? Standardmäßig 200 Aufgaben abrufen

Ich habe einen Funkenjob, der eine Datei mit 8 Datensätzen von HDFS übernimmt, eine einfache Aggregation durchführt und sie in HDFS zurückspeichert. Ich stelle fest, dass es Hunderte von Aufgaben gibt, wenn ich das tue. 

Ich bin auch nicht sicher, warum es dafür mehrere Jobs gibt? Ich dachte, ein Job wäre eher so, als wäre eine Aktion passiert. Ich kann spekulieren, warum - aber mein Verständnis war, dass es sich in diesem Code um einen Job handeln sollte, und es sollte in Stufen unterteilt werden, nicht in mehrere Jobs. Warum zerlegt es es nicht einfach in Stufen, wie kommt es, dass es in Jobs einbricht?

Bei den mehr als 200 Tasks ist die Anzahl der Daten und die Anzahl der Knoten winzig, und es macht keinen Sinn, dass es für jede Datenzeile etwa 25 Tasks gibt, wenn es nur eine Aggregation und ein paar Filter gibt. Warum sollte es nicht nur eine Aufgabe pro Partition pro atomarer Operation geben?

Hier ist der relevante Scala-Code - 

import org.Apache.spark.sql._
import org.Apache.spark.sql.types._
import org.Apache.spark.SparkContext._
import org.Apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
      .map(x=>x.split(" "))    //file record into array of strings based spaces
      .map(x=>Record(
        x(0).toInt,
        x(1).asInstanceOf[String],
        x(2).asInstanceOf[String],
        x(3).toInt))


    /* the below dataframe groups on first letter of first name and counts it*/
    val aggDF = rddCase.toDF()
      .groupBy($"firstName".substr(1,1).alias("firstLetter"))
      .count
      .orderBy($"firstLetter")

    /* save to hdfs*/ 
 aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")

  }

    case class Record(id: Int
      , firstName: String
      , lastName: String
      , quantity:Int)

}

Unten ist der Screenshot, nachdem Sie auf die Anwendung geklickt haben enter image description here

Im Folgenden sehen Sie die Phasen, die angezeigt werden, wenn Sie den spezifischen "Job" der ID 0 anzeigen enter image description here

Unten ist der erste Teil des Bildschirms, wenn Sie auf die Bühne mit über 200 Aufgaben klicken

enter image description here

Dies ist der zweite Teil des Bildschirms innerhalb von stage enter image description here

Klicken Sie unten auf die Registerkarte "Ausführende" enter image description here

Wie angefordert, hier sind die Stufen für Job ID 1

enter image description here

Hier sind die Details für die Stufe in Job ID 1 mit 200 Aufgaben

enter image description here

16
big_mike_boiii

Dies ist eine klassische Spark-Frage. 

Die zwei zum Lesen verwendeten Aufgaben (Stage Id 0 in der zweiten Abbildung) sind die Einstellung defaultMinPartitions, die auf 2 eingestellt ist. Sie können diesen Parameter abrufen, indem Sie den Wert in REPL sc.defaultMinPartitions lesen. Es sollte auch in der Spark-Benutzeroberfläche unter "Umwelt" angezeigt werden. 

Sie können einen Blick auf den code von github werfen, um zu sehen, dass genau das passiert, was gerade passiert. Wenn Sie mehr Partitionen für das Lesen verwenden möchten, fügen Sie sie einfach als Parameter hinzu, z. B. sc.textFile("a.txt", 20).

Nun kommt der interessante Teil von den 200 Partitionen, die sich auf der zweiten Stufe befinden (Stufe Id 1 in der zweiten Abbildung). Bei jedem Shuffle muss Spark entscheiden, wie viele Partitionen die Shuffle-RDD enthalten wird. Wie Sie sich vorstellen können, ist der Standardwert 200. 

Sie können das ändern mit:

sqlContext.setConf("spark.sql.shuffle.partitions", "4”)

Wenn Sie Ihren Code mit dieser Konfiguration ausführen, werden Sie feststellen, dass die 200 Partitionen nicht mehr vorhanden sind. Wie man diesen Parameter einstellt, ist eine Art Kunst. Wählen Sie vielleicht zweimal die Anzahl der Kerne, die Sie haben (oder was auch immer). 

Ich denke, Spark 2.0 hat eine Möglichkeit, automatisch die beste Anzahl von Partitionen für Shuffle-RDDs zu ermitteln. Freue mich darauf!

Die Anzahl der Aufträge, die Sie erhalten, hängt schließlich davon ab, zu wie vielen RDD-Aktionen der resultierende optimierte Dataframe-Code führt. Wenn Sie die Spark-Spezifikationen lesen, heißt es, dass jede RDD-Aktion einen Job auslöst. Wenn Sie ein Dataframe oder SparkSQL verwenden, ermittelt das Catalyst-Optimierungsprogramm einen Ausführungsplan und generiert einen auf RDD basierenden Code, um diesen auszuführen. Es ist schwer zu sagen, warum in Ihrem Fall zwei Aktionen verwendet werden. Möglicherweise müssen Sie sich den optimierten Abfrageplan ansehen, um genau zu erfahren, was getan wird. 

25
marios

Ich habe ein ähnliches Problem. In meinem Szenario enthält die von mir parallelisierte Sammlung jedoch weniger Elemente als die von Spark geplante Anzahl von Aufgaben (wodurch sich Spark manchmal seltsam verhält). Mit der erzwungenen Partitionsnummer konnte ich dieses Problem beheben.

Es war so etwas:

collection = range(10) # In the real scenario it was a complex collection
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario

Dann sah ich im Spark-Log:

INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
1
Enrique Altuna