Alex Guo
文章36
标签33
分类10
大数据日志分析系统-python脚本利用es聚合计算

大数据日志分析系统-python脚本利用es聚合计算

之所以不进行es聚合实时查询一个是查询数量过大,另一方面是实时查询要保存大量的原始日志,现在只有5台es data节点,不能承受这么大的原始日志量。原始日志保留一定的天数要进行删除。

    当然也有的数据只是查询几天内的数据就直接用es的自身聚合能力了

#python部分脚本示例:


def main_statistic(domain,userId):

    body = {

        "query": {

            "bool": {

                "must": [

                    {

                        "term": {

                            "uriHost.raw": domain

                        }

                    }

                ]

            }

        },

        "size": 0,

        "aggs": {

            "fileCount": {

                "terms": {

                    "field": "mime.raw"

                },

                "aggs": {

                    "totalFileSize": {

                        "sum": {

                            "field": "repsize"

                        }

                    }

                }

            }

        }

    }



    result = in_es.search(index=common_index.logstash_index,doc_type="fc_access",body=body)



    name = result["aggregations"]["fileCount"]

    buckets = name["buckets"]

    for name_item in buckets:

        name_key = name_item["key"]

        doc_count = name_item["doc_count"]

        totalFileSize = name_item["totalFileSize"]["value"]



        if doc_count > 0:

            browser_count_item = {

                "_index": common_index.spark_portal_index,

                "_type": "logstashIndexDF_filetype_totalsize",

                "_source": {

                    "@timestamp": common_index.timestamp_attr,

                    "add_time": common_index.add_time_attr,

                    "uriHost": domain,

                    "userId": userId,

                    "mime": name_key,

                    "fileCount": doc_count,

                    "totalFileSize": totalFileSize

                }

            }

            print browser_count_item

            out_count_arr.append(browser_count_item)



            # 这是按照用户分类进行数据填充的

            browser_count_item_use = {

                "_index": common_index.spark_portal_index,

                "_type": "logstashIndexDF_filetype_totalsize_sum",

                "_source": {

                    "@timestamp": common_index.timestamp_attr,

                    "add_time": common_index.add_time_attr,

                    "userId": userId,

                    "mime": name_key,

                    "fileCountSum": doc_count,

                    "totalFileSizeSum": totalFileSize

                }

            }

            print browser_count_item_use

            out_count_arr.append(browser_count_item_use)







def cacl_main(common_index_obj,domain_users):

    global common_index

    common_index = common_index_obj



    global out_count_arr

    out_count_arr = []



    for domain_user_item in domain_users:

        domain = domain_user_item["key"]

        userId = domain_user_item["user_id"]

        main_statistic(domain=domain, userId=userId)



        if len(out_count_arr) > 300:

            helpers.bulk(out_es, out_count_arr)

            out_count_arr = []
本文作者:Alex Guo
本文链接:https://alexguo.net/2018/12/18/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%97%A5%E5%BF%97%E5%88%86%E6%9E%90%E7%B3%BB%E7%BB%9F-python%E8%84%9A%E6%9C%AC%E5%88%A9%E7%94%A8es%E8%81%9A%E5%90%88%E8%AE%A1%E7%AE%97/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可