您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

Logstash,来自多个文档中xml文件的split事件,保留来自根标签的信息

Logstash,来自多个文档中xml文件的split事件,保留来自根标签的信息

如果您的结构很简单,则可以使用memorize我编写的插件

您的配置如下所示:

filter {
  if ([message] =~ /<ROOT/) {
    grok {
      match => [ "message", 
        'number="(?<number>\d+)" number2="(?<number1>\d+)"'
      ] 
    }
  } else if ([message] =~ /<EVENT /) {
    grok { 
      match => [ "message", 'name="(?<name>[^"]+)"']
    }
  }
  memorize {
    fields => ["number","number1"]
  }
  if ([message] !~ /<EVENT /) {
    drop {}
  } else {
    mutate { remove_field => ["message"] }
  }
}

我的示例显示ROOT根据下面的注释在元素中查找多个内容。这是支持记忆多个字段的插件版本:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "set"
#
# This filter will look for fields from an event and record the last value
# of them.  If any are not present, their last value will be added to the
# event
#
# The config looks like this:
#
#     filter {
#       memorize {
#         fields => ["time"]
#         default => { "time" => "00:00:00.000" }
#       }
#     }
#
# The `fields` is an array of the field NAMES that you want to memorize
# The `default` is a map of field names to field values that you want
# to use if the field isn't present and has no memorized value (optional)

class LogStash::Filters::Memorize < LogStash::Filters::Base

  config_name "memorize"
  milestone 2

  # An array of the field names to to memorize
  config :fields, :validate => :array, :required => true
  # a map for default values to use if its not seen before we need it
  config :default, :validate => :hash, :required => false

  # The stream identity is how the filter determines which stream an
  # event belongs to. See the multiline plugin if you want more details on how
  # this might work
  config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"

  public
  def initialize(config = {})
    super

    @threadsafe = false

    # This filter needs to keep state.
    @memorized = Hash.new
  end # def initialize

  public
  def register
    # nothing needed
  end # def register

  public
  def filter(event)
    return unless filter?(event)

    any = false
    @fields.each do |field|
      if event[field].nil?
    map = @memorized[@stream_identity]
        val = map.nil? ? nil : map[field]
        if val.nil?
          val = @default.nil? ? nil : @default[field]
        end
    if !val.nil?
          event[field] = val
          any = true
    end
      else
        map = @memorized[@stream_identity]
    if map.nil?
          map = @memorized[@stream_identity] = Hash.new
    end
    val = event[field]
    map[field] = event[field]
      end #if
      if any
        filter_matched(event)
      end
    end #field.each
  end
end

对于logstash 1.5和更高版本,可以通过以下方式安装此插件

bin/plugin install logstash-filter-memorize
其他 2022/1/1 18:18:09 有591人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶