Die häufigsten Probleme beim Erstellen und Bereitstellen von Spark-Anwendungen sind:
Java.lang.ClassNotFoundException
.object x is not a member of package y
-Kompilierungsfehler.Java.lang.NoSuchMethodError
Wie können diese gelöst werden?
Beim Erstellen und Bereitstellen von Spark-Anwendungen sind für alle Abhängigkeiten kompatible Versionen erforderlich.
Scala-Version . Alle Pakete müssen dieselbe Scala-Version (2.10, 2.11, 2.12) verwenden.
Betrachten Sie folgendes (falsches) build.sbt
:
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.Apache.spark" % "spark-core_2.11" % "2.0.1",
"org.Apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.Apache.bahir" % "spark-streaming-Twitter_2.11" % "2.0.1"
)
Wir verwenden spark-streaming
für Scala 2.10, während die restlichen Pakete für Scala 2.11 sind. Eine gültige Datei könnte sein
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.Apache.spark" % "spark-core_2.11" % "2.0.1",
"org.Apache.spark" % "spark-streaming_2.11" % "2.0.1",
"org.Apache.bahir" % "spark-streaming-Twitter_2.11" % "2.0.1"
)
es ist jedoch besser, die Version global anzugeben und %%
zu verwenden:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.Apache.spark" %% "spark-core" % "2.0.1",
"org.Apache.spark" %% "spark-streaming" % "2.0.1",
"org.Apache.bahir" %% "spark-streaming-Twitter" % "2.0.1"
)
Ähnlich in Maven:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.Apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.Apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.Apache.bahir</groupId>
<artifactId>spark-streaming-Twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark-Version Alle Pakete müssen dieselbe Spark-Version (1.6, 2.0, 2.1, ...) verwenden.
Beachten Sie folgendes (falsches) build.sbt:
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.Apache.spark" % "spark-core_2.11" % "1.6.1",
"org.Apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.Apache.bahir" % "spark-streaming-Twitter_2.11" % "2.0.1"
)
Wir verwenden spark-core
1.6, während die verbleibenden Komponenten in Spark 2.0 enthalten sind. Eine gültige Datei könnte sein
name := "Simple Project"
version := "1.0"
libraryDependencies ++= Seq(
"org.Apache.spark" % "spark-core_2.11" % "2.0.1",
"org.Apache.spark" % "spark-streaming_2.10" % "2.0.1",
"org.Apache.bahir" % "spark-streaming-Twitter_2.11" % "2.0.1"
)
es ist jedoch besser, eine Variable zu verwenden:
name := "Simple Project"
version := "1.0"
val sparkVersion = "2.0.1"
libraryDependencies ++= Seq(
"org.Apache.spark" % "spark-core_2.11" % sparkVersion,
"org.Apache.spark" % "spark-streaming_2.10" % sparkVersion,
"org.Apache.bahir" % "spark-streaming-Twitter_2.11" % sparkVersion
)
Ähnlich in Maven:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.Apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.Apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.Apache.bahir</groupId>
<artifactId>spark-streaming-Twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Die in Spark-Abhängigkeiten verwendete Spark-Version muss mit der Spark-Version der Spark-Installation übereinstimmen. Wenn Sie beispielsweise 1.6.1 im Cluster verwenden, müssen Sie 1.6.1 verwenden, um Jars zu erstellen. Nicht übereinstimmende Versionen werden nicht immer akzeptiert.
Die zum Erstellen von Jar verwendete Scala-Version muss mit der Scala-Version übereinstimmen, die zum Erstellen von bereitgestelltem Spark verwendet wird. Standardmäßig (herunterladbare Binärdateien und Standard-Builds):
Zusätzliche Pakete sollten auf den Arbeiterknoten verfügbar sein, sofern sie im Fettbehälter enthalten sind. Es gibt eine Reihe von Optionen, darunter:
--jars
-Argument für spark-submit
- zur Verteilung lokaler jar
-Dateien.--packages
-Argument für spark-submit
- zum Abrufen von Abhängigkeiten aus dem Maven-Repository.Beim Senden im Cluster-Knoten sollten Sie die Anwendung jar
in --jars
angeben.
Der Klassenpfad von Apache Spark wird dynamisch erstellt (um den anwendungsspezifischen Benutzercode zu berücksichtigen), wodurch er für solche Probleme anfällig wird. Die Antwort von @ user7337271 ist richtig, aber es gibt noch einige Bedenken, abhängig vom Cluster-Manager ("master") du benutzt.
Erstens besteht eine Spark -Anwendung aus diesen Komponenten (jede ist eine separate JVM und enthält daher möglicherweise unterschiedliche Klassen in ihrem Klassenpfad):
SparkSession
(oder SparkContext
) und erstellt Herstellen einer Verbindung zu einem Cluster-Manager, um die eigentliche Arbeit auszuführenDie Beziehung zwischen diesen ist in diesem Diagramm aus Apache Spark's Cluster-Modus-Übersicht beschrieben:
Nun - welche Klassen sollten sich in jeder dieser Komponenten befinden?
Dies kann durch das folgende Diagramm beantwortet werden:
Analysieren wir das langsam:
Spark-Code sind Spark-Bibliotheken. Sie sollten in [~ # ~] allen [~ # ~] drei Komponenten vorhanden sein, da sie den Klebstoff enthalten, mit dem Spark die Kommunikation zwischen ihnen durchgeführt wird. Übrigens - Die Autoren von Spark haben eine Designentscheidung getroffen, Code für ALLE Komponenten in ALLE Komponenten aufzunehmen (z. B. Code, der nur in Executor im Treiber ausgeführt werden soll), um dies zu vereinfachen Versionen bis 1.6) oder "archive" (in 2.0, Details unten) enthalten den erforderlichen Code für alle Komponenten und sollten in allen von ihnen verfügbar sein.
Nur-Treiber-Code Hierbei handelt es sich um Benutzercode, der nichts enthält, was auf Executors verwendet werden sollte, dh Code, der in keiner Transformation auf der RDD/DataFrame/Dataset. Dies muss nicht unbedingt vom verteilten Benutzercode getrennt werden, kann es aber sein.
Distributed Code Dies ist Benutzercode, der mit Treibercode kompiliert wird, aber auch auf den Executors ausgeführt werden muss - alles, was die eigentlichen Transformationen verwenden, muss enthalten sein dieses (diese) Glas (e).
Nachdem wir das klargestellt haben, wie bringen wir die Klassen dazu, in jede Komponente richtig geladen zu werden, und welche Regeln sollten sie befolgen?
Spark Code : Wie in den vorherigen Antworten angegeben, müssen Sie das gleiche Scala verwenden. und Spark Versionen in allen Komponenten.
1.1 Im Standalone-Modus gibt es eine "bereits vorhandene" Spark Installation, mit der sich Anwendungen (Treiber) verbinden können. Das bedeutet, dass alle Treiber dieselbe Spark - Version verwenden müssen, die auf dem Master und den Executoren ausgeführt wird.
1.2 In YARN/Mesos kann jede Anwendung eine andere Spark - Version verwenden, aber alle Komponenten derselben Anwendung müssen dieselbe verwenden. Das bedeutet, wenn Sie Ihre Treiberanwendung mit Version X kompiliert und gepackt haben, sollten Sie beim Starten der SparkSession dieselbe Version bereitstellen (z. B. über die Parameter spark.yarn.archive
Oder spark.yarn.jars
Bei Verwendung von YARN). Die von Ihnen bereitgestellten JARS/Archive sollten alle Spark - Abhängigkeiten ( einschließlich der transitiven Abhängigkeiten enthalten. Sie werden vom Cluster-Manager an alle ausgeliefert Executor, wenn die Anwendung gestartet wird.
Treibercode : Das liegt ganz bei Ihnen. Der Treibercode kann als Bund Gläser oder als "Fettglas" geliefert werden, solange er alle Spark Abhängigkeiten + aller Benutzercode
Verteilter Code : Dieser Code muss nicht nur auf dem Treiber vorhanden sein, sondern auch an die Ausführenden gesendet werden (ebenfalls mit all seinen transitiven Abhängigkeiten). Dies geschieht mit dem Parameter spark.jars
.
Um zusammenzufassen, hier ein empfohlener Ansatz zum Erstellen und Bereitstellen einer Spark - Anwendung (in diesem Fall mit YARN):
spark.jars
, Wenn Sie SparkSession
starten.lib/
Der heruntergeladenen Spark - Binärdateien als Wert von spark.yarn.archive
.Zusätzlich zu der sehr umfangreichen Antwort, die bereits von user7337271 gegeben wurde, können Sie, wenn das Problem auf fehlende externe Abhängigkeiten zurückzuführen ist, eine Dose mit Ihren Abhängigkeiten mit z. maven Assembly Plugin
Markieren Sie in diesem Fall alle Kernfunkenabhängigkeiten in Ihrem Build-System als "bereitgestellt" und stellen Sie, wie bereits erwähnt, sicher, dass sie mit Ihrer Laufzeitfunkenversion korrelieren.
Abhängigkeitsklassen Ihrer Anwendung müssen in der Option application-jar Ihres Startbefehls angegeben werden.
Weitere Details finden Sie in der Spark-Dokumentation
Aus der Dokumentation entnommen:
application-jar: Pfad zu einer gebündelten jar einschließlich Ihrer Anwendung und alle Abhängigkeiten. Die URL muss in Ihrem .__ global sichtbar sein. Cluster, zum Beispiel ein hdfs: // Pfad oder eine Datei: // Pfad, der .__ ist. auf allen Knoten vorhanden
Fügen Sie alle JAR-Dateien aus spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars im Projekt hinzu. Der spark-2.4.0-bin-hadoop2.7 kann von https://spark.Apache.org/downloads.html heruntergeladen werden.
Ich habe folgendes build.sbt
lazy val root = (project in file(".")).
settings(
name := "spark-samples",
version := "1.0",
scalaVersion := "2.11.12",
mainClass in Compile := Some("StreamingExample")
)
libraryDependencies ++= Seq(
"org.Apache.spark" %% "spark-core" % "2.4.0",
"org.Apache.spark" %% "spark-streaming" % "2.4.0",
"org.Apache.spark" %% "spark-sql" % "2.4.0",
"com.couchbase.client" %% "spark-connector" % "2.2.0"
)
// META-INF discarding
assemblyMergeStrategy in Assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
Ich habe mit sbt Assembly Plugin ein fettes Glas meiner Appliction erstellt, aber bei der Ausführung von spark-submit schlägt der Fehler fehl:
Java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)
Ich kann sehen, dass die Klasse in meinem fetten Glas existiert:
jar tf target/scala-2.11/spark-samples-Assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class
nicht sicher, was ich hier vermisse, irgendwelche Hinweise?
Ich denke, dieses Problem muss ein Assembly-Plugin lösen. Sie müssen ein fettes Glas bauen. Zum Beispiel in sbt:
$PROJECT_ROOT/project/Assembly.sbt
mit Code addSbtPlugin("com.eed3si9n" % "sbt-Assembly" % "0.14.0")
hinzufügenadded some libraries
libraryDependencies ++ = Seq ("com.some.company" %% "some-lib"% "1.0.0") ` Weitere Informationen finden Sie unter https://github.com/sbt/sbt-Assembly