''' clamav module (c) 2003-2024 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from __future__ import absolute_import from __future__ import print_function from avlib import * import socket, re, struct __all__ = ['clamscan', 'clamd0', 'clamd', 'libclam'] class clamscan(ascanner): ''' ClamAV command line realscanner. This scanner is a realscanner, which can be used to scan for viruses. ClamAV is a free program under GPL and may be freely used. Its database is very well. It can be easily used in chroot. mkchroot.sh from sagator package automatically integrates it into chroot environment. Requires: clamscan binary from clamav Usage: clamscan(['/usr/bin/clamscan','--stdout','--infected', '--no-summary','-r']) Where: in [] are command line parameters for clamscan binary ''' name='clamscan()' def __init__(self, arg): self.scanner = arg def scanfile(self, files, dirname='', args={}): level, rc, ret = 0.0, 0, [] pf = popen(self.scanner+[safe.fn(dirname)]) pf.tocmd.close() rc = pf.wait() for line in pf.readlines(): if re.search(b" FOUND", line): ret.append(line) level += 1.0 pf.fromcmd.close() debug.echo(4, "CLAMSCANF: RET: ", rc, ret) if rc==0: level, vir = 0.0, b'' elif rc==1: l = re.search(b": (.*) FOUND", ret[0].rstrip()) if l: vir = l.group(1) else: debug.echo(0, 'CLAMSCANF: ERROR: ', ret) raise ScannerError('clamscandir: '+str(ret)) else: raise ScannerError('clamscan: returned error code: '+str(rc)+str(ret)) return level, vir, ret class clamd0(ascanner): ''' ClamAV daemon realscanner for clamav-0.103 or older. This scanners is a realscanner, which can be used to scan for viruses. ClamAV is a free program under GPL and may be freely used. Its database is very well. If you need a chrooted clamd, copy it into chroot and start it from here. If you are not familiar with it, use clamscan scanner from this module. It easily does chroot support, but it is not powerfull. Requires: clamd<1.0 (clamav daemon) Usage: clamd0(['localhost', 3310]) or: clamd0('/var/spool/vscan/clamd') Where: 1st definition defines a clam daemon on host localhost, port 3310 (it is the default) 2nd definition defines access to clamd via UNIX socket ''' name='clamd0()' def __init__(self, arg): self.conn = arg def scanbuffer(self, buffer, args={}): if type(self.conn)==str: # local socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) socket_settimeout(s, 120) s.connect(self.conn) else: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_settimeout(s, 120) s.connect((self.conn[0], self.conn[1])) f = s.makefile('rw', BUFSIZE) s.sendall(b"nSTREAM\n") rl = f.readline() l = re.search('^PORT ([0-9]*)', rl) if l: # Send buffer ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_settimeout(ss, 120) if type(self.conn)==type('string'): # local socket ss.connect(('127.0.0.1', int(l.group(1)))) else: ss.connect((self.conn[0], int(l.group(1)))) ss.sendall(fromhdr()) ss.sendall(buffer) ss.shutdown(socket.SHUT_RDWR) ss.close() # Read reply level = 0.0 ret = [] for line in f.readlines(): if not re.search(": OK$", line.rstrip()): ret.append(line) f.close() s.shutdown(socket.SHUT_RDWR) s.close() if ret==[]: level, vir = 0.0, b'' else: l = re.search(": (.*) FOUND$", ret[0].rstrip()) if l: vir = l.group(1).encode() level = 1.0 else: debug.echo(0,'CLAMD: ERROR: ', ret) raise ScannerError('clamd: '+str(ret)) return level, vir, ret else: raise ScannerError('Protocol error, status: '+rl) class clamd(ascanner): ''' ClamAV daemon realscanner for clamav-1.0 or higher. This scanners is a realscanner, which can be used to scan for viruses. ClamAV is a free program under GPL and may be freely used. Its database is very well. If you need a chrooted clamd, copy it into chroot and start it from here. If you are not familiar with it, use clamscan scanner from this module. It easily does chroot support, but it is not powerfull. Requires: clamd-1.0+ (clamav daemon) Usage: clamd(['localhost', 3310]) or: clamd('/var/spool/vscan/clamd') Where: 1st definition defines a clam daemon on host localhost, port 3310 (it is the default) 2nd definition defines access to clamd via UNIX socket ''' name='clamd()' def __init__(self, arg): self.conn = arg def scanbuffer(self, buffer, args={}): if type(self.conn)==str: # local socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) socket_settimeout(s, 120) s.connect(self.conn) else: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_settimeout(s, 120) s.connect((self.conn[0], self.conn[1])) s.sendall(b"zINSTREAM\0") # Send buffer hdr = fromhdr() s.sendall(struct.pack(">I", len(hdr)+len(buffer))) s.sendall(hdr) s.sendall(buffer) s.sendall(struct.pack(">I", 0)) # Read reply ret = s.recv(1024).strip(b"\0") s.shutdown(socket.SHUT_RDWR) s.close() if ret.endswith(b": OK"): return 0.0, b"", [ret] l = re.search(b": (.*) FOUND$", ret) if l: return 1.0, l.group(1), [ret] debug.echo(0, 'CLAMD: ERROR: ', ret) raise ScannerError('clamd: '+ret.decode("utf8")) from . import libclamav class libclam(ascanner): ''' ClamAV realscanner - uses libclamavmodule python library. This scanners is a realscanner, which can be used to scan for viruses. ClamAV is a free program under GPL and may be freely used. Its database is very well. Requires: libclamav module for python (sagator-libclamav package) Usage: libclam(options=libclam.CL_SCAN_STDOPT, limits={}, db_options=None, solib=None) Where: options is an number or structure, which defines libclam options. See clamav documentation for more info. limits is an tuple, which defines limits for clamav decompressor, see clamav doc. for more info. db_options is an number, which defines libclam loaddb options. See clamav documentation for more info. This option is available only with sagator-libclamav-1.2.2 or higher. datadir is an string, defining path to clamav virus database files. By default value compiled into libclamav. This parameter is new in 1.2.0. solib is a string, defining clamav shared library path and filename. For example for clamav-0.94.2 it's "/usr/lib/libclamav.so.5". By default it's autodetected. This parameter is new in 1.2.0. ''' name='libclam()' # some constants def __init__(self, options=None, limits={}, db_options=None, datadir=None, solib=None): self.av = None # do not load database before chroot! self.options = options self.limits = limits self.db_options = db_options self.datadir = datadir self.solib = solib def reinit(self): if not self.av: if self.db_options is not None: # allow db_options for new sagator-libclamav self.av = libclamav.clamav(solib=self.solib, db_options=self.db_options, datadir=self.datadir) else: self.av = libclamav.clamav(solib=self.solib, datadir=self.datadir) try: self.av.setlimits(self.limits) except NameError as es: debug.echo(2, "WARNING: %s [limits=%s]: " % (es, self.limits) + "Please update your sagator.conf, remove clamav options, " "which are not supported by you clamav installation." ) else: if self.av.reload()==0: return debug.echo(2, "%s: Version: %s, loaded virpatterns: %d" % (self.name, self.av.retver(), self.av.virnum)) def scanfile(self, files, dir='', args={}): for fname in files: fd = safe.osopen(fname, os.O_RDONLY) try: v = self.av.scandesc(fd, fname, self.options) os.close(fd) fd = None if v: mail.addheader('X-Sagator-LibClamAV', v) return 1.0, v.encode(), [fname+": "+v] except: if fd is not None: os.close(fd) raise return 0.0, b'', []