webentwicklung-frage-antwort-db.com.de

Wie erstelle ich ein Thema in Kafka bis Java

Ich möchte ein Thema in Kafka (kafka_2.8.0-0.8.1.1) über Java erstellen. Es funktioniert einwandfrei, wenn ich ein Thema in der Eingabeaufforderung erstelle und wenn ich eine Nachricht über die API Java pushe. Ich möchte jedoch ein Thema über die API Java erstellen. Nach einer langen Suche fand ich unter Code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Ich habe den obigen Code ausprobiert und es wird angezeigt, dass das Thema erstellt wurde, aber ich kann die Nachricht im Thema nicht pushen. Stimmt etwas mit meinem Code nicht? Oder einen anderen Weg, um das oben genannte zu erreichen?

36
Jaya Ananthram

Ich habe es behoben .. Nach langer Recherche ..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Aus dem obigen Code erstellt ZkClient ein Thema, aber diese Themeninformationen sind nicht für die Kafka bekannt. Wir müssen also ein Objekt für ZkClient auf folgende Weise erstellen:

Importieren Sie zuerst die folgende Anweisung,

import kafka.utils.ZKStringSerializer$;

und erstellen Sie ein Objekt für ZkClient auf folgende Weise:

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (für @ajkret Kommentar)

Der obige Code funktioniert nicht für kafka> 0.9, da die API geändert wurde. Verwenden Sie den folgenden Code für kafka> 0.9


import Java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}
37
Jaya Ananthram

In der neuesten API (2.1.0) scheint der Prozess ziemlich vereinfacht zu sein. Mit der neuesten API für Kafka 2.1.0 kann dies wie folgt durchgeführt werden

import org.Apache.kafka.clients.admin.AdminClient;
import org.Apache.kafka.clients.admin.CreateTopicsResult;
import org.Apache.kafka.clients.admin.NewTopic;

Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)

List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);

adminClient.createTopics(newTopics);
adminClient.close();

Die Inhalte von kafka.properties Datei sind wie folgt

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.Apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.Apache.kafka.common.serialization.StringDeserializer

Beachten Sie, dass die Instanz des AdminClient geschlossen werden muss, um das neu erstellte Thema wiederzugeben.

Nur ein Hinweis auf jeden, der dies mit einer aktualisierten Version von Kafka (Zum Zeitpunkt des Schreibens verwendete ich Kafka v0. 10.0.0).

Du musst dich ändern;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);

Zu dem Folgendem;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);

Es ist auch eine gute Idee, die Verbindung nach Beendigung zu schließen.

zkClient.close();
7
Richard G

AdminUtils API wird veraltet. Es gibt einen neuen API AdminZkClient, mit dem wir Themen auf Kafka server verwalten können.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

Weitere Informationen finden Sie unter diesem Link: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-Java/

5
Mahesh Mogal

Für diejenigen, die dies in kafka= v0.10.2.1 versuchen und auf Probleme mit dem Serialisierungsfehler 'Java.io.StreamCorruptedException: invalid stream header: 3139322E' Stoßen, folgt ein Beispiel für einen Arbeitscode mit den erforderlichen Importen.

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.Apache.kafka.clients.consumer.KafkaConsumer;
import org.Apache.kafka.common.PartitionInfo;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;

public static void createTopic(String topicName, int numPartitions, int numReplication) {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
            //Ref: https://Gist.github.com/jjkoshy/3842975
            zkClient.setZkSerializer(new ZkSerializer() {
                @Override
                public byte[] serialize(Object o) throws ZkMarshallingError {
                    return ZKStringSerializer.serialize(o);
                }

                @Override
                public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                    return ZKStringSerializer.deserialize(bytes);
                }
            });

            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
                    RackAwareMode.Enforced$.MODULE$);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
5
Saurabh Mishra