Ich möchte ein Thema in Kafka (kafka_2.8.0-0.8.1.1) über Java erstellen. Es funktioniert einwandfrei, wenn ich ein Thema in der Eingabeaufforderung erstelle und wenn ich eine Nachricht über die API Java pushe. Ich möchte jedoch ein Thema über die API Java erstellen. Nach einer langen Suche fand ich unter Code,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Ich habe den obigen Code ausprobiert und es wird angezeigt, dass das Thema erstellt wurde, aber ich kann die Nachricht im Thema nicht pushen. Stimmt etwas mit meinem Code nicht? Oder einen anderen Weg, um das oben genannte zu erreichen?
Ich habe es behoben .. Nach langer Recherche ..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Aus dem obigen Code erstellt ZkClient ein Thema, aber diese Themeninformationen sind nicht für die Kafka bekannt. Wir müssen also ein Objekt für ZkClient auf folgende Weise erstellen:
Importieren Sie zuerst die folgende Anweisung,
import kafka.utils.ZKStringSerializer$;
und erstellen Sie ein Objekt für ZkClient auf folgende Weise:
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Der obige Code funktioniert nicht für kafka> 0.9, da die API geändert wurde. Verwenden Sie den folgenden Code für kafka> 0.9
import Java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
In der neuesten API (2.1.0) scheint der Prozess ziemlich vereinfacht zu sein. Mit der neuesten API für Kafka 2.1.0 kann dies wie folgt durchgeführt werden
import org.Apache.kafka.clients.admin.AdminClient;
import org.Apache.kafka.clients.admin.CreateTopicsResult;
import org.Apache.kafka.clients.admin.NewTopic;
Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)
List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics);
adminClient.close();
Die Inhalte von kafka.properties
Datei sind wie folgt
bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.Apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.Apache.kafka.common.serialization.StringDeserializer
Beachten Sie, dass die Instanz des AdminClient geschlossen werden muss, um das neu erstellte Thema wiederzugeben.
Nur ein Hinweis auf jeden, der dies mit einer aktualisierten Version von Kafka (Zum Zeitpunkt des Schreibens verwendete ich Kafka v0. 10.0.0).
Du musst dich ändern;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
Zu dem Folgendem;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
Es ist auch eine gute Idee, die Verbindung nach Beendigung zu schließen.
zkClient.close();
AdminUtils API wird veraltet. Es gibt einen neuen API AdminZkClient, mit dem wir Themen auf Kafka server verwalten können.
String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();
adminZkClient.createTopic(topicName1,partitions,replication,
topicConfig,RackAwareMode.Disabled$.MODULE$);
Weitere Informationen finden Sie unter diesem Link: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-Java/
Für diejenigen, die dies in kafka= v0.10.2.1 versuchen und auf Probleme mit dem Serialisierungsfehler 'Java.io.StreamCorruptedException: invalid stream header: 3139322E
' Stoßen, folgt ein Beispiel für einen Arbeitscode mit den erforderlichen Importen.
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.Apache.kafka.clients.consumer.KafkaConsumer;
import org.Apache.kafka.common.PartitionInfo;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
public static void createTopic(String topicName, int numPartitions, int numReplication) {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
//Ref: https://Gist.github.com/jjkoshy/3842975
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
});
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
RackAwareMode.Enforced$.MODULE$);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}