python实时监控logstash日志代码
实时读取logstash日志,有异常错误keywork即触发报警。
#/usr/bin/envpython3 #-*-coding:utf-8-*- #__author__=caozhi #create_time2018-11-12,update_time2018-11-15 #version=1.0 #录像高可用报警 #1读取日志使用游标移动 #2线上业务日志文件会切割,切割后,读取上一个切割的日志 importos importsys importjson importrequests importtime importre cini=conf.ini' log_file=logstash.log' defreadconf(): try: withopen(cini,'r+')asf: CONF=json.load(f) except: CONF={"seek":0,"inode":922817,"last_file":logstash.log"} writeconf(CONF=CONF) print('conf.ini配置文件缺失,自动创建一个新的配置文件') returnCONF defwriteconf(CONF): withopen(cini,'w+')ase: json.dump(CONF,e) defread_log(log_file,seek): try: f=open(log_file,'r') exceptFileNotFoundError: f=open(logstash.log','r') seek=0 print('上一个文件读取失败了,请检查切割的日志文件') except: print('日志文件打开错误,退出程序') sys.exit() f.seek(seek) line=f.readline() new_seek=f.tell() ifnew_seek==seek: print('没有追加日志,退出程序') sys.exit() whileline: try: logstash=json.loads(line) except: CONF={"seek":0,"inode":922817,"last_file":"/data/logs/lmrs/logstash.log"} writeconf(CONF=CONF) print('json数据加载错误,重新创建一个新的配置文件') sys.exit() #if'''re.search(time.strftime("%Y:%H:%M",time.localtime()),logstash.get('log_time'))and'''logstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0: iflogstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0: value=1 stream=logstash.get('name') print('{}{}'.format(value,stream)) record(value=value,stream=stream) else: value=0 stream=0 line=f.readline() seek=f.tell() f.close returnvalue,stream,seek defrecord(value,stream): data=[] record={} record['metric']='recording_high_availability_monitor' record['endpoint']=os.uname()[1] record['timestamp']=int(time.time()) record['step']=60 record['value']=value record['counterType']='GAUGE' record['Tags']='{}={}'.format(int(time.time()),stream) data.append(record) ifdata: print('这是data的json数据') print(data) falcon_request=requests.post("http://127.0.0.1:1988/v1/push",data=json.dumps(data)) #falcon_request=requests.post("http://127.0.0.1:1988/v1/push",json=data) print('json参数请求返回状态码为:'+str(falcon_request.status_code)) print('json参数请求返回为:'+str(falcon_request.text)) if__name__=='__main__': print() print('***************************************') print('本次执行脚本时间:{}'.format(time.strftime("%Y%m%d_%H%M",time.localtime()))) CONF=readconf() print('first_CONF:{}'.format(CONF)) print('NO1.log_file',log_file) last_inode=CONF['inode'] inode=os.stat(log_file).st_ino print('last_inode:{}inode:{}'.format(last_inode,inode)) ifinode==last_inode: seek=CONF['seek'] next_file=0 else: log_file=CONF['last_file']+time.strftime("-%Y%m%d_",time.localtime())+str(time.strftime("%H%M",time.localtime()))[:-1]+'0' next_file=1 seek=CONF['seek'] print('NO2.log_file',log_file) value,stream,seek=read_log(log_file=log_file,seek=seek) ifnext_file: CONF['seek']=0 else: CONF['seek']=seek CONF['inode']=os.stat(logstash.log').st_ino writeconf(CONF=CONF) print('last_CONF:{}'.format(CONF))
补充知识:logstash调用exec
我就废话不多说了,还是直接看代码吧!
[elk@Vsftplogstash]$catt3.conf input{ stdin{ } } filter{ grok{ match=>["message","(?m)\s*%{TIMESTAMP_ISO8601:time}\s*(?(\S+)).*"] } date{ match=>["time","yyyy-MM-ddHH:mm:ss,SSS"] } mutate{ add_field=>["type","tailong"] add_field=>["messager","%{type}-%{message}"] remove_field=>["message"] } } output{ if([Level]=="ERROR"or[messager]=~"Exception")and[messager]!~"温金服务未连接"and[messager]!~"调用温金代理系统接口错误"and[messager]!~"BusinessException"{ exec{ command=>"/bin/smail.pl\"%{messager}\"\"%{type}\"" } } stdout{ codec=>rubydebug } } Vsftp:/root#cat/bin/smail.pl #!/usr/bin/perl useNet::SMTP; useHTTP::Dateqw(time2isostr2timetime2isotime2isoz); useData::Dumper; useGetopt::Std; usevarsqw($opt_d); getopts('d:'); #mail_usershouldbeyour_mail@163.com $message="@ARGV"; $env="$opt_d"; subsend_mail{ my$CurrTime=time2iso(time()); my$to_address=shift; my$mail_user='zhao.yangjian@163.com'; my$mail_pwd='xx'; my$mail_server='smtp.163.com'; my$from="From:$mail_user\n"; my$subject="Subject:zjcapinfo\n"; my$info="$CurrTime--$message"; my$message=< new($mail_server); $smtp->auth($mail_user,$mail_pwd)||die"AuthError!$!"; $smtp->mail($mail_user); $smtp->to($to_address); $smtp->data();#beginthedata $smtp->datasend($from);#setuser $smtp->datasend($subject);#setsubject $smtp->datasend("\n\n"); $smtp->datasend("$message\n");#setcontent $smtp->dataend(); $smtp->quit(); }; send_mail('zhao.yangjian@163.com'); 2017-01-1210:19:19,888jjjjjException { "@version"=>"1", "@timestamp"=>"2017-01-12T02:19:19.888Z", "host"=>"Vsftp", "time"=>"2017-01-1210:19:19,888", "Level"=>"jjjjj", "type"=>"tailong", "messager"=>"tailong-2017-01-1210:19:19,888jjjjjException" }
以上这篇python实时监控logstash日志代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。