webentwicklung-frage-antwort-db.com.de

Spark Streaming - Lesen und Schreiben am Kafka Thema

Ich verwende Spark Streaming, um Daten zwischen zwei Kafka Warteschlangen zu verarbeiten, aber ich kann anscheinend keinen guten Weg finden, um über Kafka von Spark. Ich habe Folgendes versucht:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.Apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.Apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

und es funktioniert wie beabsichtigt, aber die Instanzierung eines neuen KafkaProducer für jede Nachricht ist in einem realen Kontext eindeutig undurchführbar und ich versuche, es zu umgehen.

Ich möchte für jeden Prozess einen Verweis auf eine einzelne Instanz behalten und darauf zugreifen, wenn ich eine Nachricht senden muss. Wie kann ich Kafka von Spark Streaming schreiben?

33
Chobeat

Mein erster Rat wäre, zu versuchen, eine neue Instanz in foreachPartition zu erstellen und zu messen, ob dies für Ihre Anforderungen schnell genug ist (die offizielle Dokumentation schlägt vor, schwere Objekte in foreachPartition zu instanziieren).

Eine andere Option ist die Verwendung eines Objektpools, wie in diesem Beispiel dargestellt:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

Ich fand es jedoch schwierig, bei der Verwendung von Checkpointing zu implementieren.

Eine andere Version, die für mich gut funktioniert, ist eine Factory, wie im folgenden Blog-Beitrag beschrieben. Sie müssen nur überprüfen, ob sie genügend Parallelität für Ihre Anforderungen bietet (siehe Abschnitt "Kommentare"):

http://allegro.tech/2015/08/spark-kafka-integration.html

18
Marius Soutier

Ja, leider ist es mit Spark (1.x, 2.x) nicht einfach, effizient an Kafka zu schreiben.

Ich würde den folgenden Ansatz vorschlagen:

  • Verwenden Sie eine KafkaProducer -Instanz pro Executor-Prozess/JVM (und verwenden Sie sie erneut).

Hier ist das übergeordnete Setup für diesen Ansatz:

  1. Zunächst müssen Sie Kafkas KafkaProducer "umbrechen", da er, wie Sie bereits erwähnt haben, nicht serialisierbar ist. Wenn Sie es einpacken, können Sie es an die Executoren "versenden". Die Schlüsselidee hier ist die Verwendung eines lazy val, so dass Sie die Instantiierung des Produzenten bis zu seiner ersten Verwendung verzögern, was praktisch eine Problemumgehung darstellt, sodass Sie sich keine Sorgen machen müssen, dass KafkaProducer nicht serialisierbar ist.
  2. Sie "versenden" den verpackten Produzenten an jeden Executor, indem Sie eine Broadcast-Variable verwenden.
  3. Innerhalb Ihrer eigentlichen Verarbeitungslogik greifen Sie über die Broadcast-Variable auf den umschlossenen Produzenten zu und schreiben die Verarbeitungsergebnisse an Kafka zurück.

Die folgenden Codefragmente funktionieren mit Spark Streaming ab Spark 2.0.

Schritt 1: KafkaProducer einwickeln

import Java.util.concurrent.Future

import org.Apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: Java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Schritt 2: Verwenden Sie eine Broadcast-Variable, um jedem Executor eine eigene umschlossene KafkaProducer -Instanz zuzuweisen.

import org.Apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Schritt 3: Schreiben von Spark Streaming an Kafka, Wiederverwendung derselben umschlossenen KafkaProducer -Instanz (für jeden Executor)

import Java.util.concurrent.Future
import org.Apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

Hoffe das hilft.

28
Michael G. Noll

Es gibt einen Streaming Kafka von Cloudera gepflegten Writer (eigentlich aus einem gesponnen Spark JIRA [1] ). Es schafft im Grunde Ein Produzent pro Partition, der den Zeitaufwand für die Erstellung von "schweren" Objekten über eine (hoffentlich große) Sammlung von Elementen amortisiert.

Den Writer finden Sie hier: https://github.com/cloudera/spark-kafka-writer

8
maasg

Mit Spark> = 2.2

Sowohl Lese- als auch Schreibvorgänge sind auf Kafka mit der Structured Streaming-API möglich

Erstellen Sie einen Stream aus Kafka topic

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Lesen Sie den Schlüssel und den Wert und wenden Sie das Schema für beide an. Der Einfachheit halber konvertieren wir beide in den Typ String.

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Da dsStruc das Schema hat, akzeptiert es alle SQL-Operationen wie filter, agg, select ..etc.

Schreibe einen Stream in Kafka topic

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

Mehr Konfiguration für Kafka Integration zum Lesen oder Schreiben

Wichtige Artefakte, die der Anwendung hinzugefügt werden sollen

 "org.Apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.Apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.Apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
7
mrsrinivas

Ich hatte das gleiche Problem und fand diesen Beitrag .

Der Autor löst das Problem, indem er 1 Produzenten pro Executor erstellt. Anstatt den Produzenten selbst zu senden, sendet er nur ein „Rezept“, wie ein Produzent in einem Executor durch Senden erstellt wird.

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

Er benutzt einen Wrapper, der den Produzenten träge erschafft:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

Der Wrapper kann serialisiert werden, da der Produzent Kafka= direkt vor der ersten Verwendung auf einem Executor initialisiert wird. Der Treiber behält den Verweis auf den Wrapper bei und der Wrapper sendet die Nachrichten unter Verwendung des Produzenten jedes Executors:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }
7
gcaliari

Warum ist es unmöglich? Grundsätzlich wird jede Partition jeder RDD unabhängig ausgeführt (und kann auch auf einem anderen Clusterknoten ausgeführt werden), sodass Sie die Verbindung wiederherstellen müssen (und Synchronisation) zu Beginn der Aufgabe jeder Partition. Wenn der Overhead dafür zu hoch ist, sollten Sie die Stapelgröße in Ihrem StreamingContext erhöhen, bis sie akzeptabel wird (dies verursacht natürlich Latenzkosten).

(Wenn Sie nicht Tausende von Nachrichten in jeder Partition verarbeiten, sind Sie sicher, dass Sie überhaupt Spark-Streaming benötigen? Würden Sie es besser mit einer eigenständigen Anwendung machen?)

3
lmm

Möglicherweise möchten Sie dies tun. Sie erstellen grundsätzlich einen Produzenten für jede Partition von Datensätzen.

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.Apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.Apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

Hoffentlich hilft das

2
sainath reddy