首先,您需要添加decorate_events
到kafka
输入中才能知道消息来自哪个主题
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
decorate_events => true
}
}
然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索引名称。为此,您需要添加
filter {
if [kafka][topic] == "topic1" {
mutate {
add_field => {"[@Metadata][index]" => "index1"}
}
} else {
mutate {
add_field => {"[@Metadata][index]" => "index2"}
}
}
# remove the field containing the decorations, unless you want them to land into ES
mutate {
remove_field => ["kafka"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@Metadata][index]}"
codec => "json"
document_id => "%{id}"
}
}
然后第二个选择是直接在输出部分执行if / else,就像这样(但是其他kafka
字段将落入ES中):
output {
if [@Metadata][kafka][topic] == "topic1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
codec => "json"
document_id => "%{id}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "index2"
codec => "json"
document_id => "%{id}"
}
}
}