xiaoxiong's blog Thinking and Action

数据统计分析笔记

2017-12-26

事件背景

上周需要对一个巨大量文本进行统计分析,开始使用linux的shell命令进行操作,内存消耗大,速度慢;进而采用python脚本自写算法,在可预料的时间内完成(资源消耗略有好转);事件完成后将数据上传至elastic search便于以后操作和查询,再次对比统计分析时间,让我不得不佩服luceneessolr这些开发者,以前学了很多基础数学、算法和各种编程、哲学思想,被这些开发者使用到了极致,效率提高推动世界发展。

20210712注: 从态势感知使用流式复杂事件处理引擎 esper ,flink,存储数据到 es 集群;离线分析引擎 spark 的使用,存储到 hadoop、clickhouse 集群;以及各种各样的图数据库neo4j; splunk 事件分析工具等大数据计算工具,越来越意识到大量复杂的计算其实都脱离不开底层数据库。站在巨人的肩膀上,使用合适的工具,事半功倍。

shell编程统计数据

shell 中经常使用的文本工具, cat、awk、gawk、grep、sed、sort等,8亿条数据、两千万不同排序约20分钟

查看文本重复次数前十数据

cat txt | awk '{CMD[$2]++;count++;}END { for (a in CMD) print CMD[a] " " CMD[a]/count*100 "% " a;}' | grep -v "./"  | column -c3 -s " " -t | sort -nr | nl | head -n10  

head -n 10 #输出前十条
tail -n 10 #输出后十条
nl #输出行号
sort -nr #-n:依照数值大小排序 -r:相反顺序排序 -nr 从大到小排序
column -c3 -s “ “ -t #按照表格输出 -s 指定分割符 -t 判断输入行的列数创建表 awk ‘{CMD[$2]++;count++;}END { for (a in CMD) print CMD[a] “ “ CMD[a]/count*100 “% “ a;}’ #根据 每行中的第二项CMD[$2] ,每出现一次加一, 输出格式 项 占比% 项名称

优化后

history | awk '{CMD[$2]++;count++;}END { for (a in CMD) print CMD[a] " " CMD[a]/count*100 "% " a;}' |   sort -nr  | head -n30  | nl | column  -s " " -t

省略 grep -v "./",排序并减少数据后 在输出行号和表格输出(数据大表格输出报错)

去重,并按照重复次数排序

sort passwd | uniq -c | sort -rn | cut -c 9- 

对passwd 进行排序,
uniq -c 进行去重 -c 带重复次数
sort -rn 排序
cut -c 9- 将前面9个字符cut 掉,就是把重复次数砍掉

查看(递归)目录下所有文件行数

find ./ -name "*" | xargs wc -l  

将大文件分割成小文件

split -b 10m server.log superxx

server.log分割成10m的小文件 名称为 superxxaasuperxxab ……

python 库进行数据统计

大错误
在使用python进行数据读取时使用b,二进制进行读取可以防止字段中有特殊字符,比如数据中很可能留有ctrl Z之类的字符,让文本读取结束

递归读取文件操作

import os
#handle_file 对单个文件进行操作 
def gci(filepath):
    files = os.listdir(filepath)
    for fi in files:
        fi_d = os.path.join(filepath,fi)
        if os.path.isdir(fi_d):
            gci(fi_d)
        else:
            handle_file(os.path.join(filepath, fi_d))

注意文本内容采用迭代器处理,节约内存

统计库

python 统计中两个模块 heapq, collections都可以用来进行数据统计,排序等工作
使用collections.Counter.most_common(N)对大约两G的字典进行统计排序,占用十G内存

hepq.nsmallest模板,取列表中dict最大或者最小的api

import heapq
portfolio = [
    {'name': 'IBM', 'shares': 100, 'price': 91.1},
    {'name': 'AAPL', 'shares': 50, 'price': 543.22},
    {'name': 'FB', 'shares': 200, 'price': 21.09},
    {'name': 'HPQ', 'shares': 35, 'price': 31.75},
    {'name': 'YHOO', 'shares': 45, 'price': 16.35},
    {'name': 'ACME', 'shares': 75, 'price': 115.65}
]
cheap = heapq.nsmallest(3, portfolio, key=lambda s: s['price'])
expensive = heapq.nlargest(3, portfolio, key=lambda s: s['price'])
print cheap
print expensive

collections.Counter most_common模块,返回列表中Top([N]) 列表

passwd_name_fin = 'C:\Users\xxx\Desktop\fin'
passwd_analy_fout = 'C:\Users\xxx\Desktop\fin.out'
passwd_fin = open(passwd_name_fin, 'rb')
passwd_iter = iter(passwd_fin)
passwd_lst_counter = Counter(passwd_iter)
for key, value in passwd_lst_counter.most_common(200):
    print key.strip('\n'), value
    passwd_analy_fout.write(key.strip('\n') +'\t' +  str(value) + '\n')
passwd_fin.close()
passwd_analy_fout.close()

写入excel 表格

听说微软要出官方库了,在没出之前使用xlwt进行数据写

数据写

import xlwt
data_in = open('file','rb')
iter_data = iter(data_in)
style0 = xlwt.easyxf('font:name Times New Roman, color-index black, bold on', num_format_str='#,##0')  
#定义excel表格输出格式

excel_out = xlwt.Workbook(encoding='utf-8')
#excel文件

excel_w = excel_out.add_sheet('domain_analysis')
#工作表sheet

excel_w.write(i, j, 111,  style0)
#写入表格第i行,第j列,数据111,格式style0

excel_out.save('xxx/aasdf.xls')
#excel文件保存

es使用

es 特点:高可扩展,开源,全文搜索,分析引擎
作用:存储、搜索和分析大量数据(近实时)

关键术语介绍

集群,节点。

es英文术语 es中文术语 mysql中文术语
index 索引 数据库
type 类型
documents 文件
Fields 字段

聚合使用

对需要进行排序统计字段 设置聚合 "fielddata": true

PUT combo_list/_mapping/user_data
{
   "user_data": {
      "properties": {
        "domain": {
          "type": "text",
          "fielddata": true
        }
      }
   }
}

聚合查询,size=0为不显示搜索命中结果,只看到在响应中聚集的结果

然后以计数下降(默认)返回前100个(size)

data_dict = {
    "query":{
        "query_string":{
            "query":"*"
        }
    },
    "size":0,
    "aggs": {
        "group_by_password": {
            "terms": {
                "field": "password.keyword",
                "size": 100,
                "order": {
                        "_count": "desc"
                }
            }
        }
    }
}

去重后的独立数量

SELECT COUNT(DISTINCT color) FROM cars

GET /cars/transactions/_search
{
    "size" : 0,
    "aggs" : {
        "distinct_colors" : {
            "cardinality" : {
              "field" : "color"
            }
        }
    }
}

es的文档

  1. 中文文档

  2. 英文文档

  3. github翻译地址

上传数据

最开始使用python脚本上传,数据大概在8000条左右,方法:多线程,线程池,多进程,协程,bulk api。

时间比对 一共 1134536条数据,37个文件,对每一个文件单独进行上传
4核双cpu

使用gevent pool 8核心 、1个 bulk 4000条 117.04s
使用gevent pool 8核心 、1个 bulk 2000条 89.38s
使用gevent pool 8核心 、1个 bulk 2500条 95.197s
使用gevent pool 8核心 、1个 bulk 1000条 71.112s
使用gevent pool 8核心 、1个 bulk 500条 73.807s
使用gevent pool 8核心 、1个 bulk 800条 68.807s
使用multiprocessing.Pool(8)、1个bulk 800条 25.458秒
使用multiprocessing.Pool(8)、1个bulk 3000条 39.458秒
使用multiprocessing.Pool(8)、1个bulk 1500条 28.498秒
初步结论: 在不影响需求的情况下,使用进程池上传数据可以避免python多线程处理不完美的弊端,bulk块大小 根据数据量大小而定目前800条/bulk上传速度较快,可达到44565条/s

数据处理目标

在对一个社工库分析时(纸面理论):泄露事件,域名来源,账号密码新鲜度(与原有库重复量)(待做)


Comments