#!/usr/bin/python3 ''' Sagators statistics collector, distributor and graph maker (c) 2003-2019 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Examples: python stats.py / python stats.py mrtg Total-count Filtered-count python stats.py rrdtool /var/www/html/sagator ''' from __future__ import absolute_import from __future__ import print_function import socket, re, os, sys import time, signal, select, struct, resource import interscan.match from avlib import * __all__ = ['collector', 'statistics', 'status'] COLLECTOR_SERVER = () COLLECTOR_STATFILE = None MYCOLLECTOR = None def uptime(starttime): up = time.time()-starttime mins = int(up/60)%60 hours = int(up/3600)%24 days = int(up/86400) if days>0: return "%d days, %02d:%02d" % (days, hours, mins) else: return "%02d:%02d" % (hours, mins) class collector: pid = 0 data = { b'Total-count': 0.0, b'Total-bytes': 0.0, b'Filtered-count': 0.0, b'Filtered-bytes': 0.0, b'Clean-count': 0.0, b'Clean-bytes': 0.0, b'Total-time': 0.0 } def __init__(self, bindaddr, statusfile): global MYCOLLECTOR, COLLECTOR_SERVER MYCOLLECTOR = self COLLECTOR_SERVER = bindaddr # startup initialization self.bindaddr = bindaddr self.statusfile = statusfile # create a server if self.bindaddr: self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.s.bind(self.bindaddr) self.s.listen(10) except socket.error as err: (ec, es) = err.args debug.echo(0, "collector(): ERROR: BIND %s (%s)" % (es, self.bindaddr)) sys.exit(1) else: self.s = None def start(self): self.lastsave = time.time() self.pipe_r, self.pipe_w = os.pipe() # pipe for updates self.pid = os.fork() if self.pid==0: # main process # chroot from aglib import dochroot, sigchld dochroot() signal.signal(signal.SIGTERM, self.sigterm) signal.signal(signal.SIGHUP, self.sighup) # reopen log #signal.signal(signal.SIGUSR1, self.read_pipe) # do update signal.signal(signal.SIGCHLD, sigchld) # remove zombie os.close(self.pipe_w) self.data[b'Start-at'] = self.lastsave self.loadstats() self.data[b'Restart-at'] = self.lastsave self.data[b'Hostname'] = socket.gethostname() self.avg1m = average(2) self.mypid = os.getpid() debug.echo(1, "collector(): service started," " waiting for connections ... [%d]" % self.mypid) while 1: try: self.read_pipe() if not self.s: time.sleep(0.1) # no socket defined, continue to read pipe again continue try: socket_settimeout(self.s, 0.1) self.conn, addr = self.s.accept() except socket.timeout: continue except socket.error as eces: if eces[0]==4: # Interrupted system call continue else: raise self.handle_request() try: self.conn.shutdown(2) self.conn.close() except socket.error: pass except (SystemExit, KeyboardInterrupt): raise except Exception as e: debug.echo(1, "collector(): ERROR: ", e) debug.traceback(4, "collector(): ") try: self.conn.shutdown(2) self.conn.close() except: pass # while 1 ... end else: if self.s: self.s.close() os.close(self.pipe_r) return self.pid def handle_request(self): socket_settimeout(self.conn, 8) try: f = self.conn.makefile("rwb", 0) req = f.readline() debug.echo(5, "collector(): QUERY: ", req.strip()) proto = 0.0 reg1 = re.search(b"^GET /([^ ]*) HTTP/(1.[01])", req) if reg1: dirs = reg1.group(1).split(b"/") proto = float(reg1.group(2)) else: reg1 = re.search(b"^GET /([^ ]*)$", req.rstrip()) if reg1: proto = 1.0 dirs = reg1.group(1).split(b"/") if proto>=1.0: lines = {} while proto>=1.1: line = f.readline() if (line==b"\r\n") or (line==b"\n"): break a = line.split(b": ") if len(a)>1: lines[a[0]] = a[1].strip() else: lines[a[0]] = a[0].strip() self.conn.sendall(b"HTTP/1.1 200 OK\r\n") self.conn.sendall(b"Content-Type: text/plain;" b" charset=iso-8859-1\r\n") self.conn.sendall(b"\r\n"); self.data[b"Uptime"] = uptime(self.data[b'Start-at']) self.data[b"Restart-age"] = uptime(self.data[b'Restart-at']) self.data[b'Avg-count-1m'], self.data[b'Avg-bytes-1m']\ = self.avg1m.get() sstr = b"" if dirs[0]==b"": for key, value in list(self.data.items()): sstr += key+b":\t"+tobytes(str(value))+b"\r\n" self.conn.sendall(sstr) elif dirs[0]==b"mrtg": for key in dirs[1:3]+[b'Uptime', b'Hostname']: if key in list(self.data.keys()): try: o = b"%-20.0f" % self.data[key] sstr += o.strip()+b"\r\n" except TypeError: sstr += tobytes(str(self.data[key]))+b"\r\n" else: # return 0 for unknown key sstr += b"0\r\n" self.conn.sendall(sstr) else: self.message(b"404 Not Found") else: self.message(b"501 Method Not Implemented") self.conn.sendall(req) except socket.error as err: (ec, es) = err.args if not ec in [32, 104, 107]: debug.echo(3, "collector(): socket.error: ", ec, es) debug.traceback(4, "collector(): ") except: debug.traceback(3, "collector(): ") def sighup(self, sn, stack): debug.reopen() def sigterm(self, sn, stack): try: os.close(self.pipe_r) self.savestats() except: pass debug.echo(1, "collector(): Exiting - SIGTERM ...") os._exit(0) def update(self, dirs): d = b'/'.join(dirs)+b"\n"+b' '*1024 try: if select.select([], [self.pipe_w], [], 1)==([], [self.pipe_w], []): os.write(self.pipe_w, d[:1024]) else: debug.echo(1, "collector(): WARNING: Pipe full! Ignoring data.") except OSError as err: (ec, es) = err.args signal.alarm(0) if ec==4: # Interrupted system call debug.echo(3, "collector(): writing to pipe: ", es) except: debug.echo(1, "collector(): unknown error on update") debug.traceback(3, "collector()") def sigalrm(self, sn, stack): # ignore this signal, only do an "interrupted system call" debug.echo(1, "collector(): WARNING: Interrupting read from pipe!" " (pipe empty?)") def read_pipe(self, sn=None, stack=None): while True: try: if select.select([self.pipe_r], [], [], 0.1)==([], [], []): break signal.signal(signal.SIGALRM, self.sigalrm) signal.alarm(1) sstr = os.read(self.pipe_r, 1024) signal.alarm(0) except select.error as err: (ec, es) = err.args if ec==4: # Interrupted system call continue else: raise except OSError as err: (ec, es) = err.args signal.alarm(0) args = self.parseargs(sstr.rstrip(b' ').split(b'/')) for k, v in list(args.items()): try: self.data[k] += float(v) except KeyError: self.data[k] = float(v) except ValueError: self.data[k] = v except TypeError: debug.echo(0, "collector(): TypeError: ", k, v, type(v)) # update averages try: self.avg1m.update(float(args[b"Total-count"]), float(args[b"Total-bytes"])) except KeyError: # do not update totals for policy data pass # save stats minutely ctime = time.time() if (ctime-self.lastsave)>60: self.lastsave = ctime self.savestats() def savestats(self): self.data[b"Uptime"] = uptime(self.data[b"Start-at"]) self.data[b"Restart-age"] = uptime(self.data[b"Restart-at"]) self.data[b"Avg-count-1m"], self.data[b"Avg-bytes-1m"]\ = self.avg1m.get() out = b"" for k, v in sorted(self.data.items()): if type(v)==float: out += b"%s:\t%f\n" % (k, v) elif type(v)==int: out += b"%s:\t%d\n" % (k, v) else: out += b"%s:\t%s\n" % (k, tobytes(v)) debug.echo(4, "collector(): Saving stats ...") f = open(safe.fn(self.statusfile), "wb") f.write(out) f.close() def loadstats(self): try: f = open(safe.fn(self.statusfile), "rb") for line in f.readlines(): a = line.split(b":\t") try: self.data[a[0]] = float(a[1]) except ValueError: self.data[a[0]] = a[1] except IndexError: # ignore wrong lines from status file pass f.close() except IOError as err: (ec, es) = err.args debug.echo(1, "collector(): loadstat error: ", es) except: debug.echo(1, "collector(): Can't load status file.") debug.traceback(1, "collector()") def message(self, sstr): self.conn.sendall(b"HTTP/1.1 "+sstr+b"\r\n\r\n") def parseargs(self, dirs): args = {} for arg in dirs: a = arg.split(b"=") k = tobytes(unquote_plus(tostr(a[0]))).rstrip(b": \t\r\n") if re.search(b"^[^: \t\r\n]*$", k): if len(a)>1: args[k] = tobytes(unquote_plus(tostr(a[1]))) else: args[k] = b"1" else: debug.echo(1, "collector(): Unknown key: ", tostr(k)) return args class statistics(object): total_time = None def __init__(self): self.t0 = time.time() self.user_time0, self.sys_time0 = \ resource.getrusage(resource.RUSAGE_SELF)[0:2] def end(self): user_time1, sys_time1 = resource.getrusage(resource.RUSAGE_SELF)[0:2] self.total_time = time.time() - self.t0 self.user_time = user_time1 - self.user_time0 self.sys_time = sys_time1 - self.sys_time0 return self.total_time def update(self, bs=0, infected=0, tempfail=0): ''' Update statistics by 'bs' bytes, if infected, then update also filtered bytes, files. ''' if self.total_time==None: self.end() args = [b'Total-count', b'Total-bytes='+tobytes(str(bs)), b'Total-time='+tobytes(str(self.total_time)), b'User-time='+tobytes(str(self.user_time)), b'System-time='+tobytes(str(self.sys_time)), b'Temp-fail='+tobytes(str(tempfail))] if infected: args.extend([b'Filtered-count', b'Filtered-bytes='+tobytes(str(bs))]) else: args.extend([b'Clean-count', b'Clean-bytes='+tobytes(str(bs))]) try: MYCOLLECTOR.update(args+status.collect) status.collect = [] except: pass def policy_update(self): ''' Update policy statistics. ''' args = [b'Policy-count'] try: MYCOLLECTOR.update(args+status.collect) status.collect = [] except: pass # TrafGrapher class grouper(dict): one_day = 24*3600 compress_intervals = { one_day: 1, 7*one_day: 300, # 5m 30*one_day: 1800, # 30m 62*one_day: 7200, # 2h int(4*365.25*one_day): one_day/4 # 6 hours } def __getitem__(self, key): if not key in self: self[key] = [] return dict.__getitem__(self, key) def items(self, counter=False): ret = [] for key, values in dict.items(self): lv = len(values) val = sum([x for x in values])/lv # avg ret.append((key, val)) return ret def load(self, deltas, start=None): if start is None: start = max(deltas.keys()) intervals = self.compress_intervals.items() limit = None for t in sorted(deltas, reverse=True): if start-t>=limit: if intervals: limit, interval_range = intervals.pop(0) else: break st = int(t/interval_range)*interval_range self[st].append(deltas[t]) class trafgrapher: ignored_data = [ b'', b'Avg-bytes-1m', b'Avg-count-1m', b'Hostname', b'Restart-age', b'Restart-at', b'Start-at', b'Uptime' ] def __init__(self, path="/"): self.datadir = path def compress(self, filename): # read current file in_f = open(filename, "rt") header = in_f.readline() data = {} for row in in_f.readlines(): rowa = row.strip().split() data[int(rowa[0])] = float(rowa[1]) in_f.close() # compress data grp = grouper() grp.load(data) ret = dict(grp.items(header.split("\t")[3]=="c")) # save new file out_f = open(filename+".tmp", "wt") out_f.write(header) for key in sorted(ret.keys()): val = ret[key] if int(val)==val: val = int(val) out_f.write("%d %s\n" % (key, val)) out_f.close() # rename new file to old file os.rename(filename+".tmp", filename) def update_file(self, filename, service, utime, value): try: mtime = os.stat(filename).st_mtime except OSError: mtime = time.time() self.f = open(filename, "at") if self.f.tell()==0: self.f.write("%s\tsagator\t%s\tc\n" % (socket.gethostname(), service)) self.f.write("%d %s\n" % (utime, value)) self.f.close() if mtime//grouper.one_day < time.time()//grouper.one_day: self.compress(filename) def update(self): for row in self.stat_query("/", []).split(b"\n"): if not row.strip(): continue key, value = row.strip().split(b"\t", 1) key = key.strip(b": ") if key in self.ignored_data: continue self.update_file( os.path.join(self.datadir, tostr(key).lower()+".log"), tostr(key), time.time(), tostr(value) ) def stat_query(self, *args): if not COLLECTOR_SERVER: data = open(COLLECTOR_STATFILE, 'rb').read() if len(args)>0 and args[0]=='mrtg': out = [] for key in [b'Uptime', b'Hostname'] + args[1]: out.append( re.compile(br'^('+key+':.*)$', re.M).search(data).group(1) ) return b'\n'.join(out) else: return data for counter in range(3): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_settimeout(s, 30) s.connect(COLLECTOR_SERVER) urlargv = '' for arg in args[1]: urlargv += "/"+quote_plus(arg, "/=") s.sendall(b"GET /"+tobytes(args[0]+urlargv)+b" HTTP/1.1\r\n\r\n") f = s.makefile("rb", 0) # read header lines = [] for _ in range(512): line = f.readline() if (line==b"\r\n") or (line==b"\n") or (line==b""): break; lines.append(line) lines = [] s.shutdown(2) # read data ret = b'' for i in range(1024): r = s.recv(2048) if not r: break ret += r return ret.rstrip() except socket.error as err: debug.echo(3, "collector(): stat_query(%s, %s): socket.error: %s" % (args[0], args[1], err)) time.sleep(0.7) return '' # RRD try: import rrdtool except ImportError: class rrdtool_class: '''rrdtool module simulator''' def cmd(self, cmd, args): os.system("rrdtool %s %s" \ % (cmd, ' '.join(['"'+x+'"' for x in args if x])) ) def create(self, *args): self.cmd("create", args) def update(self, *args): self.cmd("update", args) def graph(self, *args): self.cmd("graph", args) rrdtool = rrdtool_class() class rrd(trafgrapher): DATA = { b'Total-bytes': 'totalbs', b'Clean-bytes': 'cleanbs', b'Virus-bytes': 'virusbs', b'Spam-bytes': 'spambs', b'Total-count': 'total', b'Clean-count': 'clean', b'Virus-count': 'virus', b'Spam-count': 'spam', b'Temp-fail': 'fails', b'Total-time': 'time' } DATES = [ [3600*24, 'Day', 1, 600], [3600*24*7, 'Week', 6, 700], [3600*24*31, 'Month', 24, 775], [3600*24*365, 'Year', 288, 797] ] PARAMS = ['--imgformat', 'PNG', '--interlaced', '--width', '576'] def __init__(self, path='./'): self.RRDPATH = path self.RRDFILE = os.path.join(path, 'sagator.rrd') def create(self): # create an RRD cmd = [self.RRDFILE] for ds in list(self.DATA.values()): cmd.append('DS:%s:DERIVE:600:0:%d' % (ds, 2**30)) for p1 in ['AVERAGE', 'MAX']: for p2 in self.DATES: cmd.append('RRA:%s:0.5:%d:%d' % (p1, p2[2], p2[3])) rrdtool.create(*cmd) return cmd def update(self): try: open(self.RRDFILE) except IOError: self.create() q = self.stat_query('/', []) cmd = [self.RRDFILE, 'N'] for key, value in list(self.DATA.items()): reg1 = re.compile(br'^%s:[ \t]*([0-9]+)' % key, re.M).search(q) if reg1: cmd[-1] += ':'+str(reg1.group(1).decode()) else: cmd[-1] += ':U' #print("WARNING: Key %s not found!" % key) print(cmd) rrdtool.update(*cmd) return self def time(self): return time.strftime("%c") def range(self, delta, r, xparams): rrdtool.graph(*[ '%s/%s-count.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'msgs/min', '--units-exponent', '0', '--start', '-%s' % r, 'DEF:total=%s:total:AVERAGE' % self.RRDFILE, 'DEF:clean=%s:clean:AVERAGE' % self.RRDFILE, 'DEF:virus=%s:virus:AVERAGE' % self.RRDFILE, 'DEF:spam=%s:spam:AVERAGE' % self.RRDFILE, 'CDEF:mtotal=total,60,*', 'CDEF:mclean=clean,60,*', 'CDEF:mvirus=virus,60,*', 'CDEF:mspam=spam,60,*', 'CDEF:dtotal=total,UN,0,total,IF,%s,*' % self.STEP, 'CDEF:stotal=PREV,UN,dtotal,PREV,IF,dtotal,+', 'CDEF:dclean=clean,UN,0,clean,IF,%s,*' % self.STEP, 'CDEF:sclean=PREV,UN,dclean,PREV,IF,dclean,+', 'CDEF:dvirus=virus,UN,0,virus,IF,%s,*' % self.STEP, 'CDEF:svirus=PREV,UN,dvirus,PREV,IF,dvirus,+', 'CDEF:dspam=spam,UN,0,spam,IF,%s,*' % self.STEP, 'CDEF:sspam=PREV,UN,dspam,PREV,IF,dspam,+', 'CDEF:rclean=sclean,100,*,stotal,/', 'CDEF:rvirus=svirus,100,*,stotal,/', 'CDEF:rspam=sspam,100,*,stotal,/', 'AREA:mtotal#00a000:Total ', 'GPRINT:stotal:MAX:%8.0lf msgs ', 'GPRINT:mtotal:AVERAGE:avg\\: %5.2lf%s/min ', 'GPRINT:mtotal:MAX:max\\: %5.2lf%s/min\\l', 'LINE2:mclean#c0c000:Clean ', 'GPRINT:sclean:MAX:%8.0lf msgs ', 'GPRINT:rclean:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mvirus#a00000:Virus ', 'GPRINT:svirus:MAX:%8.0lf msgs ', 'GPRINT:rvirus:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mspam#0000a0:Spam ', 'GPRINT:sspam:MAX:%8.0lf msgs ', 'GPRINT:rspam:AVERAGE:(%6.2lf %%)\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) rrdtool.graph(*[ '%s/%s-bytes.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'bytes/min', '--start', '-%s' % r, 'DEF:total=%s:totalbs:AVERAGE' % self.RRDFILE, 'DEF:clean=%s:cleanbs:AVERAGE' % self.RRDFILE, 'DEF:virus=%s:virusbs:AVERAGE' % self.RRDFILE, 'DEF:spam=%s:spambs:AVERAGE' % self.RRDFILE, 'CDEF:mtotal=total,60,*', 'CDEF:mclean=clean,60,*', 'CDEF:mvirus=virus,60,*', 'CDEF:mspam=spam,60,*', 'CDEF:dtotal=total,UN,0,total,IF,%s,*' % self.STEP, 'CDEF:stotal=PREV,UN,dtotal,PREV,IF,dtotal,+', 'CDEF:dclean=clean,UN,0,clean,IF,%s,*' % self.STEP, 'CDEF:sclean=PREV,UN,dclean,PREV,IF,dclean,+', 'CDEF:dvirus=virus,UN,0,virus,IF,%s,*' % self.STEP, 'CDEF:svirus=PREV,UN,dvirus,PREV,IF,dvirus,+', 'CDEF:dspam=spam,UN,0,spam,IF,%s,*' % self.STEP, 'CDEF:sspam=PREV,UN,dspam,PREV,IF,dspam,+', 'CDEF:rclean=sclean,100,*,stotal,/', 'CDEF:rvirus=svirus,100,*,stotal,/', 'CDEF:rspam=sspam,100,*,stotal,/', 'AREA:mtotal#00a000:Total ', 'GPRINT:stotal:MAX:%8.2lf %sB ', 'GPRINT:mtotal:AVERAGE:avg\\: %5.2lf %sB/min ', 'GPRINT:mtotal:MAX:max\\: %5.2lf %sB/min\\l', 'LINE2:mclean#c0c000:Clean ', 'GPRINT:sclean:MAX:%8.2lf %sB ', 'GPRINT:rclean:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mvirus#a00000:Virus ', 'GPRINT:svirus:MAX:%8.2lf %sB ', 'GPRINT:rvirus:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mspam#0000a0:Spam ', 'GPRINT:sspam:MAX:%8.2lf %sB ', 'GPRINT:rspam:AVERAGE:(%6.2lf %%)\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) rrdtool.graph(*[ '%s/%s-time.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'Sagator Load', '--units-exponent', '0', '--start', '-%s' % r, 'DEF:time=%s:time:MAX' % self.RRDFILE, 'AREA:time#00a000:Sagator load', 'GPRINT:time:AVERAGE:avg\\: %5.2lf ', 'GPRINT:time:MAX:max\\: %5.2lf\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) def graph(self, lazy=['--lazy']): xparams=[] for d in self.DATES: self.STEP = str(300*d[2]) self.range(d[1].lower(), d[0], xparams) xparams = lazy return self class rrd12(rrd): def time(self): return time.strftime("%c").replace(':', '\:') class status(interscan.match.match_any): ''' This interscanner can be used to collect some other statistics. Usage: status("String", scanner1(), scanner2(), ...) Where: "String" is a string, which defines a prefix for status update This scring can't contain spaces. These string will be replaced: %(VIRNAME)s virus name %(LEVEL)s detected level as float %(STARS)s detected level as stars scanner1(), ... are sagator's scanners Preffered usage: SCANNERS = [ status("Virus", virus_scanner1(), ...), status("Spam", spam_scanner1(), ...) ] This collects Virus-count, Virus-bytes, Spam-count, Spam-bytes in collector. ''' name = 'status()' collect = [] def __init__(self, s, *scanners): self.status_string = s interscan.match.match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): level, detected, ret \ = interscan.match.match_any.scanbuffer(self, buffer, args) if is_infected(level, detected): repl_vars = { 'VNAME': tostr(detected).replace(' ', '_').replace(':', '_'), 'VIRNAME': tostr(detected).replace(' ', '_').replace(':', '_'), 'LEVEL': str(level), 'STARS': '*'*int(level) } status_string = replace_tmpl(self.status_string, repl_vars) debug.echo(7, 'stats(): %s: %d' % (status_string, len(buffer))) status.collect.extend([ tobytes(status_string)+b"-count", tobytes(status_string)+b"-bytes="+tobytes(str(len(buffer))) ]) return level, detected, ret if __name__ == '__main__': if len(sys.argv)>1: # get host and port from collector service from etc import SRV, CHROOT safe.ROOT_PATH = CHROOT for service in SRV: if service.name=='collector()': COLLECTOR_SERVER = service.BINDTO COLLECTOR_STATFILE = safe.fn(service.STATFILE) if sys.argv[1]=='trafgrapher': try: trafgrapher(sys.argv[2]).update() except Exception: debug.traceback(4) elif sys.argv[1]=='rrdtool10': r = rrd(sys.argv[2]) try: r.update() except Exception: debug.traceback(4) r.graph() elif sys.argv[1] in ('rrdtool', 'rrdtool12'): r = rrd12(sys.argv[2]) try: r.update() except Exception: debug.traceback(4) r.graph() elif sys.argv[1]=='rrdupdate': rrd(sys.argv[2]).update() elif sys.argv[1]=='rrdgraph': rrd(sys.argv[2]).graph([]) else: print(tostr(rrd12().stat_query(sys.argv[1], sys.argv[2:])))