webentwicklung-frage-antwort-db.com.de

Zuweisen von eindeutigen fortlaufenden Nummern zu Elementen in einem RDD Spark

Ich habe einen Datensatz von (user, product, review) und möchten es in den ALS-Algorithmus von mllib einspeisen.

Für den Algorithmus müssen Benutzer und Produkte Zahlen sein, während es sich bei meinen Benutzernamen und SKUs um Zeichenfolgen handelt.

Im Moment erhalte ich die unterschiedlichen Benutzer und SKUs und ordne ihnen dann numerische IDs außerhalb von Spark zu.

Ich habe mich gefragt, ob es einen besseren Weg dafür gibt. Der eine Ansatz, den ich mir vorgestellt habe, besteht darin, eine benutzerdefinierte RDD zu schreiben, die im Wesentlichen 1 bis n auflistet und dann Zip für die beiden RDDs aufruft.

46
Dilum Ranatunga

Beginnend mit Spark 1. gibt es zwei Methoden, mit denen Sie dies einfach lösen können:

  • RDD.zipWithIndex ist genau wie Seq.zipWithIndex, es werden fortlaufende Zahlen (Long) hinzugefügt. Dies muss zuerst die Elemente in jeder Partition zählen, damit Ihre Eingabe zweimal ausgewertet wird. Cachen Sie Ihre Eingabe-RDD, wenn Sie dies verwenden möchten.
  • RDD.zipWithUniqueId gibt Ihnen auch eindeutige Long IDs, aber es ist nicht garantiert, dass diese zusammenhängend sind. (Sie sind nur dann zusammenhängend, wenn jede Partition die gleiche Anzahl von Elementen hat.) Der Vorteil ist, dass Sie nichts über die Eingabe wissen müssen, sodass keine doppelte Auswertung erfolgt.
41
Daniel Darabos

Für einen ähnlichen Anwendungsfall habe ich nur die Zeichenfolgenwerte gehasht. Siehe http://blog.cloudera.com/blog/2014/03/why-Apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

Es hört sich so an, als ob Sie bereits so etwas tun, obwohl das Verwalten von Hashing einfacher sein kann.

Matei schlug hier einen Ansatz zur Emulation von zipWithIndex auf einem RDD vor, bei dem es sich um die Zuweisung von IDs innerhalb jeder Partition handelt, die global eindeutig sein werden: https://groups.google.com/forum/# ! topic/spark-users/WxXvcn2gl1E

15
Sean Owen

Wenn Sie DataFrames verwenden und sich nur um die Eindeutigkeit kümmern, können Sie auch die Funktion MonotonicallyIncreasingID verwenden

import org.Apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Bearbeiten: MonotonicallyIncreasingID wurde verworfen und entfernt seit Spark 2. ; es ist jetzt bekannt als monotonically_increasing_id.

8
radek1st

Die Leute haben bereits monotonically_increasing_id () empfohlen und das Problem erwähnt, dass Longs und nicht Ints erzeugt werden.

Meiner Erfahrung nach (Einschränkung - Spark 1.6)) wird jedoch kein Executor-Präfix verwendet, wenn Sie es auf einem einzelnen Executor verwenden (vorher auf 1 partitionieren), und die Nummer kann sicher umgewandelt werden Offensichtlich müssen Sie weniger als Integer.MAX_VALUE Zeilen haben.

2
Eyal

monoton_steigernd_id () scheint die Antwort zu sein, funktioniert aber leider nicht für ALS, da es 64-Bit-Zahlen und erzeugt ALS erwartet 32-Bit-Versionen (siehe meinen Kommentar unter der Antwort von radek1st für Details).

Die Lösung, die ich gefunden habe, ist zipWithIndex () , wie in Darabos 'Antwort erwähnt. So implementieren Sie es:

Wenn Sie bereits einen einspaltigen DataFrame mit Ihren unterschiedlichen Benutzern namens userids haben, können Sie wie folgt eine Nachschlagetabelle (LUT) erstellen:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

Jetzt kannst du:

  • Verwenden Sie diese LUT, um ALS-freundliche Ganzzahl-IDs abzurufen, die ALS bereitgestellt werden sollen
  • Verwenden Sie diese LUT, um eine Rückwärtssuche durchzuführen, wenn Sie von der ALS-ID zur ursprünglichen ID zurückkehren müssen

Machen Sie das selbe natürlich für Gegenstände.

2
xenocyon