-较新版本的Kafka不需要Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur的API版本1.01.0以上的答案
API 0.11.0+中的过程似乎已大大简化。使用它,可以完成以下操作
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)
List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics);
adminClient.close();
bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer