您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出

logstash 5.0.1:为多个kafka输入主题设置elasticsearch多个索引输出

首先,您需要添加decorate_eventskafka输入中才能知道消息来自哪个主题

input {
  kafka {
    bootstrap_servers => "zookeper_address"
    topics => ["topic1","topic2"]
    decorate_events => true
  }
}

然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索引名称。为此,您需要添加

filter {
   if [kafka][topic] == "topic1" {
      mutate {
         add_field => {"[@Metadata][index]" => "index1"}
      }
   } else {
      mutate {
         add_field => {"[@Metadata][index]" => "index2"}
      }
   }
   # remove the field containing the decorations, unless you want them to land into ES
   mutate {
      remove_field => ["kafka"]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@Metadata][index]}"
    codec => "json"
    document_id => "%{id}"
  }
}

然后第二个选择是直接在输出部分执行if / else,就像这样(但是其他kafka字段将落入ES中):

output {
   if [@Metadata][kafka][topic] == "topic1" {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index1"
       codec => "json"
       document_id => "%{id}"
     }
   } else {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index2"
       codec => "json"
       document_id => "%{id}"
     }
   }
}
其他 2022/1/1 18:20:49 有577人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶