webentwicklung-frage-antwort-db.com.de

Spark-Java-Fehler: Die Größe überschreitet Integer.MAX_VALUE

Ich versuche, Spark für einige einfache maschinelle Lernaufgaben zu verwenden. Ich habe Pyspark und Spark 1.2.0 verwendet, um ein einfaches logistisches Regressionsproblem zu lösen. Ich habe 1,2 Millionen Datensätze für das Training, und ich hasse die Features der Datensätze. Wenn ich die Anzahl der gehashten Features auf 1024 eingestellt habe, funktioniert das Programm gut, aber wenn ich die Anzahl der gehashten Features auf 16384 stelle, schlägt das Programm mehrmals mit dem folgenden Fehler fehl:

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.Apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): Java.lang.RuntimeException: Java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at Sun.nio.ch.FileChannelImpl.map(FileChannelImpl.Java:828)
    at org.Apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
    at org.Apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
    at org.Apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
    at org.Apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
    at org.Apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at org.Apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.Apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
    at org.Apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.Java:124)
    at org.Apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.Java:97)
    at org.Apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.Java:91)
    at org.Apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.Java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.Java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.Java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.Java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.Java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.Java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.Java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.Java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.Java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.Java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.Java:116)
    at Java.lang.Thread.run(Thread.Java:745)

    at org.Apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.Java:156)
    at org.Apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.Java:93)
    at org.Apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.Java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.Java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.Java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.Java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.Java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.Java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.Java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.Java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.Java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.Java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.Java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.Java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.Java:116)
    at Java.lang.Thread.run(Thread.Java:745)

Driver stacktrace:
    at org.Apache.spark.scheduler.DAGScheduler.org$Apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.Apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.Apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.Apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.Apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.Apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.Apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.Apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.Apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.Java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.Java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.Java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.Java:107)

Dieser Fehler tritt auf, wenn ich LogisticRegressionWithSGD nach dem Übertragen der Daten in LabeledPoint trainiere.

Hat jemand eine Idee dazu?

Mein Code lautet wie folgt (ich verwende dafür ein IPython Notebook):

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array
from sklearn.feature_extraction import FeatureHasher
from pyspark import SparkContext
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30)
sc = SparkContext(conf=sf)
training_file = sc.textFile("train_small.txt")
def hash_feature(line):
    values = [0, dict()]
    for index, x in enumerate(line.strip("\n").split('\t')):
        if index == 0:
            values[0] = float(x)
        else:
            values[1][str(index)+"_"+x] = 1
    return values
n_feature = 2**14
hasher = FeatureHasher(n_features=n_feature)
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])])
def build_lable_points(line):
    values = [0.0] * n_feature
    for index, value in Zip(line[1].indices, line[1].data):
        values[index] = value
    return LabeledPoint(line[0], values)
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line))
model = LogisticRegressionWithSGD.train(parsed_training_data)

Der Fehler tritt bei der Ausführung der letzten Zeile auf.

15
peng

Die Integer.MAX_INT-Einschränkung betrifft die Größe einer Datei, die gespeichert wird. 1.2M-Reihen sind keine große Sache, ich bin mir nicht sicher, ob Ihr Problem "die Grenzen des Funkens" ist. Es ist wahrscheinlicher, dass ein Teil Ihrer Arbeit etwas zu Großes schafft, um von einem bestimmten Executor bearbeitet zu werden. 

Ich bin kein Python-Codierer, aber wenn Sie die Features der Datensätze "gehashed" haben, nehmen Sie möglicherweise sehr spärliche Datensätze für ein Sample auf und erstellen ein nicht-spärliches Array. Dies bedeutet viel Speicher für 16384-Funktionen. Insbesondere wenn Sie Zip(line[1].indices, line[1].data) ausführen. Der einzige Grund, aus dem Sie nicht aus dem Speicher kommen, ist die Scheiße, die Sie anscheinend konfiguriert haben (50G).

Eine andere Sache, die helfen könnte, ist die Partitionierung zu erhöhen. Wenn Sie also nicht erreichen können, dass Ihre Zeilen weniger Speicher benötigen, können Sie zumindest versuchen, für eine bestimmte Aufgabe weniger Zeilen zu verwenden. Alle temporären Dateien, die erstellt werden, sind wahrscheinlich davon abhängig. Daher werden die Dateilimits unwahrscheinlicher.


Und völlig unabhängig von dem Fehler, aber relevant für das, was Sie zu tun versuchen: 

16384 ist in der Tat eine große Anzahl von Features. In dem optimistischen Fall, in dem jedes nur ein boolesches Feature ist, haben Sie insgesamt 2 ^ 16384 mögliche Permutationen, von denen Sie lernen können. Dies ist eine große Anzahl (probieren Sie es hier: https: //defuse.ca/big-number-calculator.htm ). 

Es ist sehr, sehr wahrscheinlich, dass kein Algorithmus in der Lage ist, eine Entscheidungsgrenze mit nur 1,2 Millionen Samples zu lernen. Sie würden wahrscheinlich mindestens ein paar Billionen Billionen Beispiele benötigen, um einen solchen Merkmalsraum auszubilden. Maschinelles Lernen hat seine Grenzen, seien Sie also nicht überrascht, wenn Sie keine überdurchschnittliche Genauigkeit erhalten.

Ich würde definitiv empfehlen, zuerst eine Art Dimensionsreduzierung zu versuchen !!

11
Daniel Langdon

Irgendwann versucht es, die Features zu speichern, und 1.2M * 16384 ist größer als Integer.MAX_INT, sodass Sie versuchen, mehr als die maximale Größe der von Spark unterstützten Features zu speichern. 

Sie stoßen wahrscheinlich an die Grenzen von Apache Spark. 

1
Baptiste Wicht

Das Erhöhen der Anzahl der Partitionen kann dazu führen, dass Aktive Tasks eine negative Zahl in der Spark-Benutzeroberfläche sind. Dies bedeutet wahrscheinlich, dass die Anzahl der Partitionen zu hoch ist.

0
gsamaras