Alex Guo
文章36
标签33
分类10
大数据日志分析系统-hdfs日志存储

大数据日志分析系统-hdfs日志存储

hdfs简介:

Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。

项目需求:

使用hdfs存储客户需要的指定域名时间打包日志 以及原始日志存储进行离线计算

遇到的问题:

在这一步遇到的一个重要的问题:

问题:从kafka中日志直接按域名时间分类存入hdfs时速度不够,主要时数据量太大,当数据量减少到1/10的时候满足要求。

试过:

  • spark:从kafka取出数据日志解析存入hdfs

  • logstash: 从kfaka中取出数据,然后自定义conf配置文件,按域名按小时直接存入hdfs

  • flume: flume自定义filter插件(java写的),将原始日志按照时间域名分类存入hdfs

发现这些东西都是存入hdfs速度不够,当然同时也看hdfs日志,hdfs本来就是适合大文件存储,同时每条日志存储有自己的路径有namenode datanode,现在这样一条日志或者百千条日志就进行一次日志存储的效率明显很低。

进行速度测试:

  • spark - kafka -logstash:从spark从kafka中取出原始日志然后将结果写入kafka的另一个topic这样的速度是OK的, 然后尝试结果数据再次通过logstash从kafka取出写入hdfs速度是跟不上的。

  • flume: 直接从kafka中取出然后按域名时间分类,写入本地或者直接屏幕上打印速度都是可以的。

最后的解决是:

flume自定义fliter插件(java),outPutSink插件(java),写入本地(这样已经测试速度是OK的,时间域名分割存储还未OK),本地形成大文件后写入hdfs(这里可以直接通过hdfs的api实现,linux定时脚本调用即可)

当然也可以直接用hbase进行原始日志的存储

git地址示例:

https://github.com/penghaoyou5/Flume-plug-in-log

直接上配置:

ubuntu@sp26:~/apps/hadoop-2.6.4/etc/hadoop$ cat core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://sp26:9000</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/home/ubuntu/hdpdata</value>
</property>
</configuration>
ubuntu@sp26:~/apps/hadoop-2.6.4/etc/hadoop$ cat hadoop-env.sh
export JAVA_HOME=/home/ubuntu/apps/jdk1.7.0_45
ubuntu@sp26:~/apps/hadoop-2.6.4/etc/hadoop$ cat hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/mnt/data2/wlkhadname,/mnt/data3/wlkhadname,/mnt/data4/wlkhadname,/mnt/data5/wlkhadname,/mnt/data6/wlkhadname,/mnt/data7/wlkhadname,/mnt/data8/wlkhadname,/mnt/data9/wlkhadname,/mnt/data10/wlkhadname</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/mnt/data2/wlkhaddata,/mnt/data3/wlkhaddata,/mnt/data4/wlkhaddata,/mnt/data5/wlkhaddata,/mnt/data6/wlkhaddata,/mnt/data7/wlkhaddata,/mnt/data8/wlkhaddata,/mnt/data9/wlkhaddata,/mnt/data10/wlkhaddata</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.secondary.http.address</name>
<value>sp26:50090</value>
</property>
<property>
        <name>dfs.datanode.max.xcievers</name>
        <value>8192</value>
</property>
</configuration>

ubuntu@sp26:~/apps/hadoop-2.6.4/etc/hadoop$ cat slaves  sp27 sp28 sp29 sp30 

大数据日志分析系统-spark进行日志计算

大数据日志分析系统-spark进行日志计算

#spark简介:
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。

#需要满足的项目需求:
用spark进行实时统计,从kafka中获取数据,流式计算每分钟一次将计算结果存入es,供客户进行查询。

#这里不用直接存入es的方式进行聚合或者存入es之后再进行计算的原因:

  • 1.直接存入es进行聚合的话es中会随着时间的推移保存大量的原始日志,es存入数据量太大的数据会产生性能问题,而且大量用户同时查询也会产生聚合过多的性能问题。

  • 2.先将原始日志存入es,计算结果数据后再次删除原始日志   会产生问题: 

    - 1)虽然现有数据量es能够满足要求,但是当数据量再次大增时会产生kafka堆积 - es速度跟不上,而spark的处理速度可以跟得上  

    - 2)实时性在逻辑上和技术上得不到很好的保证。例如每次计算当前时间前5分钟的日志(线上python脚本运行时计算当前时间前1小时日志),一旦数据量过大产生kafka堆积,日志不能实时收集到es就会产生计算数据少的问题。

#做的过程中遇到的问题:
遇到过内存问题,算是由于缓存处理不当引起的 后来改用了LruCache      现象是不断的进行消费没有产生kafka堆积,但是没有结果数据,运行正常

#spark配置:

ubuntu@sp26:~/apps/spark-1.6.1-bin-hadoop2.6/conf$ cat spark-env.sh | grep -v '#'
export JAVA_HOME=/home/ubuntu/apps/jdk1.8.0_144

export SPARK_MASTER_IP=sp26

export SPARK_MASTER_PORT=7077
ubuntu@sp26:~/apps/spark-1.6.1-bin-hadoop2.6/conf$ cat slaves

sp27

sp28

sp29

sp30

#当然要有 ssh免密配置,linux环境变量配置等等,以后的文章会进行补充

启动ubuntu@sp26:~/apps/spark-1.6.1-bin-hadoop2.6$ sbin/start-all.sh

http://sp26:8080

这先是单节点

#spark代码:
git地址:https://github.com/penghaoyou5/SparkLogAnalysis.git

大数据日志分析系统-elasticsearch

大数据日志分析系统-elasticsearch

elasticsearch简介

Elasticsearch 是一个分布式、可扩展、实时的搜索与数据分析引擎。

两种架构的es配置差不多

选用es存储结果数据的理由:

1.曾经考虑过hbase选用,也进行过真正的测试,用hbse的问题是这种键值对的数据库,不一定能够保证唯一的键(虽然能把时间戳加入key中),而且es本身只存储结果数据完全符合线上需求,并且es自身带有聚合功能,可以多个条件查询而不只是键值对。

es原先配置:

es总共11个节点  进行了角色分配  其中master节点3个  data节点5个  cient节点3个 (角色分配与 node.master    node.data 有关)

ubuntu@sp1:~/elasticsearch-5.5.2/config$ cat elasticsearch.yml | grep -v '#'

 cluster.name: webluker-logstash

 cluster.routing.allocation.balance.shard: 0.10 

 node.name: node-c-sp1

 node.master: false

 node.data: false

 path.data: /mnt/data2,/mnt/data3,/mnt/data4,/mnt/data5,/mnt/data6,/mnt/data7,/mnt/data8,/mnt/data9,/mnt/data10

 network.host: 0.0.0.0

 discovery.zen.ping.unicast.hosts: ["master-ip1","master-ip2","master-ip3"]

 discovery.zen.minimum_master_nodes: 2

 discovery.zen.ping_timeout: 100s

 discovery.zen.fd.ping_timeout: 100s

 discovery.zen.fd.ping_interval: 30s

 gateway.expected_nodes: 8

 gateway.recover_after_nodes: 5

 thread_pool.bulk.queue_size: 2000

 thread_pool.search.queue_size: 3000
ubuntu@sp34:~/elasticsearch-5.5.2/bin$ cat elasticsearch | grep -v '#'

JAVA_HOME="/home/ubuntu/jdk1.8.0_144"

ES_JAVA_OPTS="-Xms31g -Xmx31g"

//==================

es spark计算配置

ubuntu@sp26:~/apps/elasticsearch-5.5.2/bin$ cat elasticsearch | grep -v '#'

JAVA_HOME="/home/ubuntu/apps/jdk1.8.0_144"

ES_JAVA_OPTS="-Xms31g -Xmx31g"

ES_HOME=/home/ubuntu/apps/elasticsearch-5.5.2
ubuntu@sp26:~/apps/elasticsearch-5.5.2/config$ cat elasticsearch.yml | grep -v '#'

cluster.name: log_big_data_wlk

discovery.zen.ping.unicast.hosts: ["es-master-ip1","es-master-ip2","es-master-ip3"]

discovery.zen.minimum_master_nodes: 2

node.name: node-m-d-sp26

path.data: /mnt/data2,/mnt/data3,/mnt/data4,/mnt/data5,/mnt/data6,/mnt/data7,/mnt/data8,/mnt/data9,/mnt/data10,/mnt/data11

network.host: 0.0.0.0

http.cors.enabled: true

http.cors.allow-origin: "*"

//================================

当然还有es header插件安装:

kibana安装:

//======

es问题

1.es索引过多问题    linux定时脚本删除

2.es索引创建异常,同一时间创建太多 ,那就是前边计算上传有问题了

3.es的shell监控脚本 (这里是配合zabbix使用发送邮件)

#!/usr/bin/env bash
#字符串截取参考 https://www.cnblogs.com/zwgblog/p/6031256.html
#判断字符串包含参考 https://www.cnblogs.com/AndyStudy/p/6064834.html
es_url='http://sp26:9200'
var='0'

#======判断集群健康值是否是green
health_result=`curl -s $es_url/_cluster/health`
#echo ddd$health_result
if [[ $health_result == *'"status":"green"'* ]]
then
#     echo "包含"
     pass='yes'
else
     echo "不包含green"
fi
#====获取上个小时的日志数量 参考是否达标
last_hour=`date +%Y.%m.%d.%H  -d  '-1 hours'`
#last_hour='2017.12.13.13'
last_hour_index='logstash-'$last_hour
result_query=`curl -s ${es_url}/${last_hour_index}/_search -d '{"query":{"bool":{"must":[{"match_all":{}}]}},"size":0}'`
#echo $result_query
right_total=${result_query#*'"hits":{"total":'}
hits_total=${right_total%',"max_score"'*}
#echo $hits_total
if [ $hits_total  -lt 1000000 ]
then
    echo '数据量<1000000'
fi
echo $var
大数据日志分析系统-logstash

大数据日志分析系统-logstash

logstash简介

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

logstash-2.2.2的配置:

从logstash-forward 到kafka的配置

ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf


input {
      lumberjack {
                    port => "5044"
                    ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"
                    ssl_key =>  "/home/ubuntu/logstash-2.2.2/config/lumberjack.key"
                    type => "fc_access"
                  }
      }

output {
       if "_grokparsefailure" not in [tags] {
    #       stdout { codec => rubydebug }
       kafka {
                topic_id => "kafka_es"
                bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"
                compression_type => "snappy" 
                acks => ["1"]
                value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
                timeout_ms => 10000
                retries => 5
                retry_backoff_ms => 100
                send_buffer_bytes => 102400   
                workers => 2
             }
      }
}

从kafka到es配置

其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)

ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf

input {
      kafka {
        topic_id => "kafka_es"
        group_id => "kafka_es"
        zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181" 
        consumer_threads => 1
        consumer_restart_on_error => true
        consumer_restart_sleep_ms => 5000
        decorate_events => true
        consumer_timeout_ms => 1000
        queue_size => 100
        auto_offset_reset => "smallest"
        rebalance_max_retries => 50
      }
}

filter {
       mutate {
             add_field => [ "messageClone", "%{message}" ]
       }

       mutate {
             split => { "messageClone" => '"' }
            add_field => {"agent" => "%{[messageClone][3]}"}

       }
       useragent { 
              source => "agent" 
       }

       mutate {
            split => { "message" => " " }
            add_field => {"timestamp" => "%{[message][0]}"}
            add_field => {"reqtime" => "%{[message][1]}"}
            add_field => {"clientIP" => "%{[message][2]}"}
            add_field => {"squidCache" => "%{[message][3]}"}
            add_field => {"repsize" => "%{[message][4]}"}
            add_field => {"reqMethod" => "%{[message][5]}"}
            add_field => {"requestURL" => "%{[message][6]}"}
            add_field => {"username" => "%{[message][7]}"}
            add_field => {"requestOriginSite" => "%{[message][8]}"}
            add_field => {"mime" => "%{[message][9]}"}
            add_field => {"referer" => "%{[message][10]}"}
            add_field => {"agentCheck" => "%{[message][11]}"}
            add_field => {"dnsGroup" => "%{[message][-1]}"}
            remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]

       }



       if [agentCheck] =~ "ChinaCache" {

         grok { match => { "agentCheck" => "OOPS" } }

       }



       mutate {

          convert => {

                "timestamp" => "float"       

            "reqtime" => "integer"

          "repsize" => "integer"

          }

      remove_field => ["agentCheck"]

       }





       ruby {

       code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"

       }



       date { match => [ "timestamp_str", "ISO8601" ] 

       }



       mutate {

             split => { "requestURL" => '/' }

      add_field => {"uriHost" => "%{[requestURL][2]}"}

      remove_field => ["timestamp_str"]

       }



       mutate {

             join => { "requestURL" => '/' }

       }



       ruby {

              code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"

       }





}



output {

if "ChinaCache" not in [agent] {

#                   stdout { codec => "rubydebug" }

                   elasticsearch {

                         index => "logstash-%{+YYYY.MM.dd.HH}"

                         workers => 1

                         flush_size => 5000

                         idle_flush_time => 1

                         hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]



                         }

        }

}

启动命令:

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &
nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置

##从filbeat到kafka的配置

ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf



input {

    beats {

        port => "5055"

type => "log"

    }

}

output {

#   stdout { codec => rubydebug }

  kafka {

    codec => "json"

    bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"

    topic_id => "test"

compression_type => "snappy"

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

  }

}

#检测

/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf  --config.test_and_exit

#启动

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic   2>&1 >  /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log  & 

大数据日志分析系统-缓存组件kafka

大数据日志分析系统-缓存组件kafka

kafka简介

是一种高吞吐量的分布式发布订阅消息系统,当数据量不稳定,数据量大的时候想到它就对了。

zookeeper简介

是一个分布式的,开放源码的分布式应用程序协调服务,很多地方用到, 最常见的是为集群提供基础的、高可用HA(High Availability)服务
是kafka集群的基础依赖,同时也是hadoop系列中实现HA的基础组件。
实现HDFS的NamaNode和YARN的ResourceManager的HA,Spark实现HA,
HBase主要用ZooKeeper来实现HMaster选举与主备切换、系统容错、RootRegion管理、Region状态管理和分布式SplitWAL任务管理等。

//===========================================================

两种集群的配置相差不大

zookeeper集群配置:

ubuntu@sp1:~/kafka/config$ cat zookeeper.properties | grep -v '#'
dataDir=/mnt/data3/zk
clientPort=2181
tickTime=2000
initLimit=7
syncLimit=4
server.1=sp1:2888:3888
server.2=sp2:2888:3888
server.3=sp3:2888:3888
server.4=sp4:2888:3888
server.5=sp5:2888:3888
server.6=sp6:2888:3888
server.7=sp7:2888:3888

强制杀死命令

ps aux |grep kafka |grep -v grep|cut -c 9-15 | xargs kill -9
ps aux |grep zookeeper |grep -v grep|cut -c 9-15 | xargs kill -9

kafka 集群配置:

ubuntu@sp1:~/kafka/config$ cat consumer.properties | grep -v '#'
zookeeper.connect=es1:2181,es2:2181,es3:2181,es4:2181,es5:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
ubuntu@sp1:~/kafka/config$ cat server.properties | grep -v '#'
broker.id=1
port=9092
advertised.host.name=sp1
listeners=PLAINTEXT://sp1-host-ip:9092  
num.network.threads=10
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/mnt/data3/kafka-logs
num.partitions=20
num.recovery.threads.per.data.dir=5
log.retention.hours=8
log.retention.bytes=53687091200
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
auto.leader.rebalance.enable=true
num.replica.fetchers=5

启动命令:

1.zookeeper启动

sp1 - sp7在Ubuntu用户下 执行

/home/ubuntu/zookeeper-3.4.7/bin/zkServer.sh start

jps检查存在

QuorumPeerMain

状态查看

/home/ubuntu/zookeeper-3.4.7/bin/zkServer.sh status

查看为一主动多从启动正常

文件查看

/home/ubuntu/zookeeper-3.4.7/bin/zkCli.sh -server localhost:2181

进入客户端 ls / 等命令查看

2.kafka启动

sp1到sp7执行

nohup /home/ubuntu/kafka/bin/kafka-server-start.sh /home/ubuntu/kafka/config/server.properties 2>&1 > /home/ubuntu/kafka/logs/kafka.log &

当然也可以这样远程执行

ssh sp1 nohup /home/ubuntu/kafka/bin/kafka-server-start.sh /home/ubuntu/kafka/config/server.properties 2>&1 > /home/ubuntu/kafka/logs/kafka.log &

检查

~/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –broker-info –group kafka_es –topic kafka_es –zookeeper localhost:2181

就可以看机器是否正常了

注意:

很可能出现kafka个别分片堆积问题:例如现在kafka有35个分片,只有2个分片产生堆积,通过命令

~/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group kafka_es --topic kafka_es --zookeeper localhost:2181

查看,我遇见过是消费线程小于分片数量(现象是用命令查看是不同的分片Pid 拥有相同的 Owner),这个时候增加消费者进程即可(我个人认为数据量大时消费者要多于分片数量  这样不容易出现挂掉一两个消费者出现分片被堆积的情况)  

大数据日志分析系统边缘节点日志上传-flume,filbeat,logstash-forward

大数据日志分析系统边缘节点日志上传-flume,filbeat,logstash-forward

上传组件简介:

它们都是很好的资源上传工具,直接指定目录、文件就可以上传,通用功能不多说,区别除了与本公司产品兼容性好以外:

  • filebeat elastic(ELK)官网推荐:占用资源少
  • flume    apache官网产品:可定制性强
  • logstash-forward  已经过期的产品不多说。

因为需求简单,只是边缘节点日志上传最终选用了filebeat 

#正确格式原始日志示例:

1512231002.276     89 117.169.22.89 TCP_REFRESH_HIT/304 199 GET http://www.baidu.com/download/EF_patch_1.0.3.2-1.0.3.3.exe  - DIRECT/122.228.246.78 - "-" "Mozilla/5.0 Gecko/20100115 Firefox/3.6" "-"

#测试时追加日志的shell

echo '1512231002.276     89 117.169.22.89 TCP_REFRESH_HIT/304 199 GET http://www.baidu.com/download/EF_patch_1.0.3.2-1.0.3.3.exe  - DIRECT/122.228.246.78 - "-" "Mozilla/5.0 Gecko/20100115 Firefox/3.6" "-"'  >>  /data/cache1/filbeat_conf/logsdir/test.log

#filbeat配置示例:

[root@filbeathost filbeat_conf]# cat /data/cache1/filbeat_conf/filebeat-file-sp265055.yml
filebeat.prospectors:
- type: log
 paths:
   - /data/cache1/filbeat_conf/logsdir/* 
output.logstash:
 hosts: ["logstash-host1:5055","logstash-host2:5055","logstash-host3:5055","logstash-host4:5055","logstash-host5:5055"]

#启动

nohup filebeat -e -c /data/cache1/filbeat_conf/filebeat-file-sp265055.yml  -d "publish" &
大数据日志分析系统

大数据日志分析系统

原始日志量

每小时高的是否达到了 45303452条日志(四千五百多万条原始日志) ,某天日志量(这个随便选的)422110779 条(4亿两千多万)

需求

  • 1)对原始日志按域名进行分析包括: 请求数分析、独立IP分析、PV分析、地区分布运营商分布分析(根据ip计算)、浏览器操作系统分布分析(根据原始日志的agent进行分析)、热点页面分析、文件类型分析
  • 2)原始日志按域名、按天、按小时进行打包。

两种方案

方案1

  • →logstash-forward(边缘设备)  
  • → logstash (用logstast-before配置文件)
  • → Kafka (同时依赖zookeeper)
  • → logstash (用logstash-after配置文件)
  • → elaticsearch 
  • → python脚本
  • → 统计日志本地然后上传到hadoop,各种统计结果到elasticsearch(nginx负载均衡)   
  • →  界面展示

边缘节点服务器会产生很多用户请求日志,要对日志进行各种分析和原始日志打包,最终分析结果进行收费、让客户可以获取请求日志各种分析结果、为客户进行原始日志按域名按天按小时分割打包。

方案2

  • –> filebeat(或flume)
  • –> logstash
  • –> kafka(kafka依赖zookeeper)
  • –> spark统计计算
  • –> 统计各种结果到elasticsearch(nginx负载均衡)
  • –> 界面展示

–> flume(自定义sink插件、验证可行待完成)
–> 原始日志本地打包
–> 原始日志hadoop上传 (当然这里也可以用hbase进行日志存储)

大数据实时计算需要的几个基本组件(一定要注意版本问题,java大数据机器间通信用的是RPC 而不是restful_api,如果版本不对应很可能出现版本间的兼容问题):

  • 1.日志收集 -从CDN边缘节点服务器进行日志收集

  • 2.日志缓存 -收集上来的日志存储到一个缓存设备

  • 3.数据计算 -对收集的日志进行计算,域名请求数分析、地区统计等等

  • 4.计算结果存储 -对各种分析结果进行存储,要方便查找

  • 5.日志打包结果存储

    公司刚开始的日志系统分布:

  • logstash-forward (边缘节点日志收集,上传到有logstash的机器)       -》

  • kafka  强大的数据缓存组建          (由logstash收集日志到kafka)    -》

  • elasticsearch (分布式、可扩展、实时的搜索与数据分析引擎)  (用logstash从kafka取出数据存到es)  ->

  • spark(大规模数据计算引擎,从es取出原始日志然后通过spark计算结果)    -》

  • elasticsearch (进行计算结果数据存储),hadoop(原始日志打包存储) -》

  • nginx  + django服务  进行反向代理、负载均衡、地址隐藏            django进行界面展示-》

  • 可客户直接访问观看

    版本

    zookeeper 3.4.7
    kafka_2.11-0.8.2.1
    logstash-2.2.2
    elasticsearch-2.4.6
    hadoop-2.6.4
    spark-1.6.1

    问题

    现在这样的系统由于经常出现问题,es报红,或者计算有问题等,这是由于刚开始一个人做完还没完全稳定就直接撤人了,转了几次到了我手里(因为我懂java,android但是部门基本上是以python为主)。到我手里了是个挑战也是一个机遇嘛,然后就开始了填坑之旅。。。。。。。。。。。。
    还有一点是这样不能够很好的保持数据的实时性。 

    改进:

    其中遇到也解决了各种问题,就举例最主要的两个。
        1.线上的客户投诉获取不到原始日志,没那么多时间弄懂了再改进所以解决是 –》 写python脚本(到了这家公司开始用与部门统一的python)—》功能是从elasticsearch获取到某个域名原始日志然后写入本地文件(虽然不会写,但是这么多年编程scala的大概对日志的处理逻辑还是能看懂的),本地压缩,然后python调用hadoop本地命令行将日志上传到hadoop,先临时解决了问题
        2.elasticsearch报红、kafka堆积
        不知道怎样解决,只能看日志了,尝试用elasticsearch升级到了5.5.2,发现不会出现这样的问题。  但是又出现了新的问题:spark不能兼容这样的es版本,  给老大反映情况(提了 我倾向的用python脚本计算这一条路和对es降级尝试的路),听命于领导就先对原来的elasticsearch-2.4.6进行了配置改动-但是发现还是偶尔会出现es报红的情况。    只能用另一条路用python脚本代替spark,使用es自身的聚合功能进行计算,这样就解决了问题。
        总结:所以最后的解决办法是 去掉spark        先用python脚本直接聚合统计方式请求es获取结果,这样也满足线上要求,就先这样了。

    结果:

  • logstash-forward (边缘节点日志收集,上传到有logstash的机器)       -》

  • kafka  强大的数据缓存组建          (由logstash收集日志到kafka)    -》

  • elasticsearch (分布式、可扩展、实时的搜索与数据分析引擎)  (用logstash从kafka取出数据存到es)  ->

  • python脚本(调用elasticsearch的resultful_api获取统计结果,同时打包原始日志到本地上传到hadoop)    -》

  • elasticsearch (进行计算结果数据存储),hadoop(原始日志打包存储) -》

  • nginx  + django服务  进行反向代理、负载均衡、地址隐藏            django进行界面展示-》

  • 客户直接访问观看

    版本

    zookeeper 3.4.7       
    kafka_2.11-0.8.2.1   
    logstash-2.2.2    
    elasticsearch-5.5.2      
    hadoop-2.6.4   
    python以及python elasticsearch库

    现在的架构:

    公司在我刚开始接手的同时已经公司招聘了一个大数据人员(但不是本部门的),本部门也要有新项目有更大的数据量要计算需要跟他对接,但是等了好几个月仍然没有啥成果,老大忍不住了让我去看看他的大数据代码、提改进建议催进度(他主要用spark最终结果存到hbase,java写的代码),然后发现代码写的比较烂(因为java基本的静态变量 方法封装 继承多态等一看就是新手),业务逻辑也有点问题-跟老大反映,但是不同部门管不着也不能说啊,细节不说了,最终又开始了新架构的尝试之旅。

  • 1.filbeat 边缘节点日志收集,上传到有logstash的机器(或者可以用flume)    -》

  • 2.kafka  强大的数据缓存组件         (由logstash收集日志到kafka)    -》

  • 3.spark(大规模数据计算引擎,从kafka取出日志,通过scala编写的spark代码把计算结果存入es)    -》

  • 4.elasticsearch (进行计算结果数据存储) -》 

  • 5.nginx + django   界面展示或接口让客户获取服务

同时并行的原始日志打包   (刚开始日志打包考虑到了用spark或者flume直接上传到hadoop,但是后来发现现有机器这样的速度赶不上实际需要,)

  • 3.flume(自定义sink插件,filter插件 从kafka的另一个topic中获取日志数据,把日志按域名,按小时打包到本地)     -》
  • 4.python 调用hadoop命令行上传本地打包好的日志到hadoop ->
  • 5.nginx + django   界面展示或接口让客户获取服务

版本信息

apache-flume-1.6.0-bin  hadoop-2.6.4   kafka_2.11-1.0.0  spark-1.6.1-bin-hadoop2.6
elasticsearch-5.5.2     hbase         jdk1.8.0_144  logstash-6.1.1    zookeeper-3.4.5
jdk1.7.0_45(刚开始hadoop使用 后来spark要求更高就用了1.8版本) 

监控

当然每个阶段都需要增加监控,这里用的是zabbix监控配合脚本监控

rocketMQ启动

rocketMQ启动

下载构建

  > unzip rocketmq-all-4.3.2-source-release.zip
  > cd rocketmq-all-4.3.2/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq

Start Name Server

  > nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

Start Broker

> nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log 
The broker[%s, 172.30.30.233:10911] boot success...

Send & Receive Messages

Before sending/receiving messages, we need to tell clients the location of name servers. RocketMQ provides multiple ways to achieve this. For simplicity, we use environment variable NAMESRV_ADDR

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Shutdown Servers

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
跨站请求伪造(CSRF/XSRF)

跨站请求伪造(CSRF/XSRF)

#CSRF

linux基础之文件目录权限

linux基础之文件目录权限

权限概述