Alex Guo
文章36
标签33
分类10
大数据日志分析系统-logstash

大数据日志分析系统-logstash

logstash简介

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

logstash-2.2.2的配置:

从logstash-forward 到kafka的配置

ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf


input {
      lumberjack {
                    port => "5044"
                    ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"
                    ssl_key =>  "/home/ubuntu/logstash-2.2.2/config/lumberjack.key"
                    type => "fc_access"
                  }
      }

output {
       if "_grokparsefailure" not in [tags] {
    #       stdout { codec => rubydebug }
       kafka {
                topic_id => "kafka_es"
                bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"
                compression_type => "snappy" 
                acks => ["1"]
                value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
                timeout_ms => 10000
                retries => 5
                retry_backoff_ms => 100
                send_buffer_bytes => 102400   
                workers => 2
             }
      }
}

从kafka到es配置

其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)

ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf

input {
      kafka {
        topic_id => "kafka_es"
        group_id => "kafka_es"
        zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181" 
        consumer_threads => 1
        consumer_restart_on_error => true
        consumer_restart_sleep_ms => 5000
        decorate_events => true
        consumer_timeout_ms => 1000
        queue_size => 100
        auto_offset_reset => "smallest"
        rebalance_max_retries => 50
      }
}

filter {
       mutate {
             add_field => [ "messageClone", "%{message}" ]
       }

       mutate {
             split => { "messageClone" => '"' }
            add_field => {"agent" => "%{[messageClone][3]}"}

       }
       useragent { 
              source => "agent" 
       }

       mutate {
            split => { "message" => " " }
            add_field => {"timestamp" => "%{[message][0]}"}
            add_field => {"reqtime" => "%{[message][1]}"}
            add_field => {"clientIP" => "%{[message][2]}"}
            add_field => {"squidCache" => "%{[message][3]}"}
            add_field => {"repsize" => "%{[message][4]}"}
            add_field => {"reqMethod" => "%{[message][5]}"}
            add_field => {"requestURL" => "%{[message][6]}"}
            add_field => {"username" => "%{[message][7]}"}
            add_field => {"requestOriginSite" => "%{[message][8]}"}
            add_field => {"mime" => "%{[message][9]}"}
            add_field => {"referer" => "%{[message][10]}"}
            add_field => {"agentCheck" => "%{[message][11]}"}
            add_field => {"dnsGroup" => "%{[message][-1]}"}
            remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]

       }



       if [agentCheck] =~ "ChinaCache" {

         grok { match => { "agentCheck" => "OOPS" } }

       }



       mutate {

          convert => {

                "timestamp" => "float"       

            "reqtime" => "integer"

          "repsize" => "integer"

          }

      remove_field => ["agentCheck"]

       }





       ruby {

       code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"

       }



       date { match => [ "timestamp_str", "ISO8601" ] 

       }



       mutate {

             split => { "requestURL" => '/' }

      add_field => {"uriHost" => "%{[requestURL][2]}"}

      remove_field => ["timestamp_str"]

       }



       mutate {

             join => { "requestURL" => '/' }

       }



       ruby {

              code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"

       }





}



output {

if "ChinaCache" not in [agent] {

#                   stdout { codec => "rubydebug" }

                   elasticsearch {

                         index => "logstash-%{+YYYY.MM.dd.HH}"

                         workers => 1

                         flush_size => 5000

                         idle_flush_time => 1

                         hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]



                         }

        }

}

启动命令:

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置

##从filbeat到kafka的配置

ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf



input {

    beats {

        port => "5055"

type => "log"

    }

}

output {

#   stdout { codec => rubydebug }

  kafka {

    codec => "json"

    bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"

    topic_id => "test"

compression_type => "snappy"

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

  }

}

#检测

/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf  --config.test_and_exit

#启动

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic   2>&1 >  /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log  &