webentwicklung-frage-antwort-db.com.de

Funken beim Extrahieren von Werten aus einer Zeile

Ich habe den folgenden Datenrahmen

val transactions_with_counts = sqlContext.sql(
  """SELECT user_id AS user_id, category_id AS category_id,
  COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")

Ich versuche, die Zeilen in Bewertungsobjekte zu konvertieren, aber da x(0) ein Array zurückgibt, schlägt dies fehl

val ratings = transactions_with_counts
  .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))

error: value toInt ist kein Mitglied von Any

27
Sam D

Beginnen wir mit einigen Dummy-Daten:

val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")

val transactions_with_counts = transactions
  .groupBy($"user_id", $"category_id")
  .count

transactions_with_counts.printSchema

// root
// |-- user_id: integer (nullable = false)
// |-- category_id: integer (nullable = false)
// |-- count: long (nullable = false)

Es gibt verschiedene Möglichkeiten, auf Row-Werte zuzugreifen und die erwarteten Typen beizubehalten:

  1. Musterabgleich 

    import org.Apache.spark.sql.Row
    
    transactions_with_counts.map{
      case Row(user_id: Int, category_id: Int, rating: Long) =>
        Rating(user_id, category_id, rating)
    } 
    
  2. Typisierte get*-Methoden wie getInt, getLong:

    transactions_with_counts.map(
      r => Rating(r.getInt(0), r.getInt(1), r.getLong(2))
    )
    
  3. getAs Methode, die sowohl Namen als auch Indizes verwenden kann:

    transactions_with_counts.map(r => Rating(
      r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2)
    ))
    

    Es kann verwendet werden, um benutzerdefinierte Typen einschließlich mllib.linalg.Vector zu extrahieren. Für den Zugriff nach Namen ist offensichtlich ein Schema erforderlich.

  4. Umwandlung in statisch typisierte Dataset (Spark 1.6+/2.0+):

    transactions_with_counts.as[(Int, Int, Long)]
    
54
zero323

Mit Datensätzen können Sie Bewertungen wie folgt definieren:

case class Rating(user_id: Int, category_id:Int, count:Long)

Die Ratingklasse hat hier einen Spaltennamen 'count' anstelle von 'rating', wie von zero323 vorgeschlagen. Somit wird die Bewertungsvariable wie folgt zugewiesen:

val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count

val rating = transactions_with_counts.as[Rating]

Auf diese Weise werden Laufzeitfehler in Spark nicht ausgeführt, da der Name Ihrer Rating-Klassenspalte mit dem von Spark zur Laufzeit generierten Zählspaltennamen identisch ist.

7
user-asterix

Um auf einen Wert einer Zeile von Dataframe zuzugreifen, müssen Sie rdd.collect von Dataframe mit for-Schleife verwenden.

Beachten Sie, dass Ihr Dataframe wie folgt aussieht.

val df = Seq(
      (1,"James"),    
      (2,"Albert"),
      (3,"Pete")).toDF("user_id","name")

Verwenden Sie rdd.collect über Ihrem Dataframe . Die Variable row enthält jede Zeile des Zeilentyps Dataframe von rdd. Um jedes Element aus einer Zeile zu erhalten, verwenden Sie row.mkString(","), das den Wert jeder Zeile in durch Kommas getrennten Werten enthält. Mit der split-Funktion (integrierte Funktion) können Sie auf jeden Spaltenwert der rdd-Zeile mit Index zugreifen.

for (row <- df.rdd.collect)
{   
    var user_id = row.mkString(",").split(",")(0)
    var category_id = row.mkString(",").split(",")(1)       
}

Der obige Code sieht im Vergleich zu dataframe.foreach-Schleifen etwas größer aus, aber Sie erhalten mehr Kontrolle über Ihre Logik, indem Sie den obigen Code verwenden.

0
Sarath Avanavu