webentwicklung-frage-antwort-db.com.de

Besser kann ein Zeichenfolgefeld in Spark in einen Zeitstempel umgewandelt werden

Ich habe eine CSV, in der ein Feld datetime in einem bestimmten Format vorliegt. Ich kann es nicht direkt in mein Dataframe importieren, da es sich um einen Zeitstempel handeln muss. Also importiere ich es als String und konvertiere es in eine Timestamp wie folgt

import Java.sql.Timestamp
import Java.text.SimpleDateFormat
import Java.util.Date
import org.Apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

Gibt es eine bessere und prägnantere Möglichkeit, dies mit der Dataframe-API oder spark-sql zu tun? Die obige Methode erfordert die Erstellung einer RDD und das erneute Bereitstellen des Schemas für das Dataframe.

18
user568109

Spark> = 2.2

Seit 2.2 können Sie Formatstring direkt angeben:

import org.Apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#[email protected]#@#             |null               |
// +---+-------------------+-------------------+

Spark> = 1,6, <2,2

Sie können Datumsverarbeitungsfunktionen verwenden, die in Spark 1.5 eingeführt wurden. Angenommen, Sie haben folgende Daten:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts")

Sie können unix_timestamp verwenden, um Zeichenfolgen zu analysieren und in Zeitstempel umzuwandeln

import org.Apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#[email protected]#@#             |null                 |
// +---+-------------------+---------------------+

Wie Sie sehen, umfasst es sowohl die Analyse als auch die Fehlerbehandlung. Die Formatzeichenfolge sollte mit Java SimpleDateFormat kompatibel sein.

Spark> = 1,5, <1,6 

Sie müssen etwas wie folgt verwenden:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

oder

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

wegen SPARK-11724 .

Spark <1,5

sie sollten diese mit expr und HiveContext verwenden können.

42
zero323

Ich habe noch nicht mit Spark SQL gespielt, aber ich denke, dies wäre mehr idiomatische Skala (Nullnutzung wird nicht als eine gute Praxis betrachtet):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

Ich gehe davon aus, dass Sie Row-Elementtypen im Voraus kennen (wenn Sie sie aus einer CSV-Datei lesen, sind sie alle String). Deshalb verwende ich einen geeigneten Typ wie String und nicht Any (alles ist der Untertyp von Any).

Es hängt auch davon ab, wie Sie mit Ausnahmebedingungen arbeiten möchten. In diesem Fall wird bei Auftreten einer Parsing-Ausnahme einfach eine None zurückgegeben.

Sie könnten es weiter verwenden mit:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
6
jarandaf

Ich habe einen Zeitstempel nach ISO8601 in meinem Datensatz und musste ihn in das Format "JJJJ-MM-TT" konvertieren. Das habe ich gemacht:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

Und Sie können die UDF einfach in Ihrer Spark-SQL-Abfrage verwenden.

1
zengr

Ich würde https://github.com/databricks/spark-csv verwenden

Dies wird Zeitstempel für Sie ableiten.

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)
0
mark

Ich möchte die von Ihnen geschriebene getTimeStamp-Methode in rdd's mapPartitions verschieben und GenericMutableRow zwischen Zeilen in einem Iterator wiederverwenden:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
  new Iterator[Row] {
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = {
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) {
        row.update(3, ts)
      } else {
        row.setNullAt(3)
      }
      row
    }
  }
}

Und Sie sollten immer noch ein Schema verwenden, um einen DataFrame zu generieren

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

Die Verwendung von GenericMutableRow innerhalb einer Iterator-Implementierung kann in Aggregate Operator , InMemoryColumnarTableScan , ParquetTableOperations etc. gefunden werden.

0
Yijie Shen

Ich hatte einige Probleme mit to_timestamp, wo eine leere Zeichenfolge zurückgegeben wurde. Nach langem Ausprobieren konnte ich es umgehen, indem ich als Zeitstempel und dann als Zeichenkette zurückwirbelte. Ich hoffe, das hilft allen anderen, die das gleiche Problem haben:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
})
0
ashwin319