# -*- coding: utf-8 -*- ''' webq.py - An service for sagator's quarantine over HTTP. (c) 2005-2024 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' import sys, re, db, gettext, avlib import binascii from aglib import * from avir.basic import const from avlib import safe, trans_class, _ if sys.version_info[0]>2: from http.server import HTTPServer from http.server import SimpleHTTPRequestHandler from urllib.parse import parse_qs unicode = str else: from BaseHTTPServer import HTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler from cgi import parse_qs __all__ = ['webq_jinja'] try: from passlib.hosts import linux_context as passcrypt except ImportError: from crypt import crypt class passcrypt: def hash(self, p): return crypt(p, '$1$'+randomchars(8)) def verity(self, p, hash): return crypt(p, hash)==hash passcrypt = passcrypt() # load neccessary libraries for chroot passcrypt.hash("sagator") class pwq: def __init__(self): self.check = None try: import pwquality self.lib = pwquality self.pwq = pwquality.PWQSettings() self.trans = trans_class("libpwquality") self.check = self.pwq_check return except ImportError: pass try: import cracklib except ImportError as err: try: # debian hack to load renamed cracklib import crack as cracklib except ImportError: cracklib = None debug.echo(1, "webq(): PWQuality/Cracklib import failed (%s), " "disabling password quality functionality." % err) self.lib = cracklib if cracklib: self.trans = trans_class('cracklib') self.check = self.cracklib_check def pwq_check(self, p): try: self.pwq.check(p) except self.lib.PWQError as err: return err.args[1] return p def cracklib_check(self, p): try: p = self.lib.FascistCheck(p) except ValueError as err: return err.args[0] return p def set_lang(self, lang): self.trans.set_lang(lang) def translate(self, s): return self.trans.gettext(s) class condition: def __init__(self, WA): self.WA = WA def like(self, key, value): if not value: return None if value[:1]=='!': self.WA.append(value[1:]) return key+" NOT LIKE %s" else: self.WA.append(value) return key+" LIKE %s" def date(self, key, value, dir): if not value: return None self.WA.append(value) return "%s %s %%s" % (key, dir) def eq(self, key, user): if not user: return None if user[:1]=='!': oper = "!=" user = user[1:] else: oper = "=" self.WA.append(user) return key+oper+'%s' def utf_header(headers, hdr, i18n): if headers[hdr]==None: return [] elif type(headers[hdr])==str: return (i18n+' '+headers[hdr]).split('\n') else: return (i18n+' '+unicode(headers[hdr], errors='replace')).split('\n') ## Jinja2 ########### class namespace(dict): def __init__(self, **kw): from etc import SMTP_SERVER self.smtp_server = SMTP_SERVER dict.__init__(self, **kw) self['int'] = int self['unicode'] = unicode def reinit(self): self['get_index'] = self.get_index self['action_view'] = self.action_view self['action_recheck'] = self.action_recheck self['action_deliver'] = self.action_deliver self['set_rows_and_lang'] = self.set_rows_and_lang self['set_password'] = self.set_password self['check_permission'] = self.check_permission def __getattr__(self, name): return self.get(name) def get_index(self): try: QS = "SELECT qname,datetime,virname,level,sender,size,status,recipient"\ " FROM log" QA = [] # conditions WS, WA = [], [] cond = condition(WA) WS.append(cond.like('virname', self.REQ.get('virname'))) WS.append(cond.like('sender', self.REQ.get('sender'))) WS.append(cond.like('status', self.REQ.get('status', '!SENT'))) WS.append(cond.date('datetime', self.REQ.get('begin'), '>=')) WS.append(cond.date('datetime', self.REQ.get('end'), '<=')) if 'A' in self.PERMS: WS.append(cond.like('recipient', self.REQ.get('recipient'))) else: WS.append(cond.eq('recipient', self.REMOTE_USER)) WS = [x for x in WS if x] if WS: QS += " WHERE "+" AND ".join(WS) QA = WA QS += " ORDER BY datetime DESC LIMIT %s OFFSET %s" QA.append(self.SHOW_ROWS) QA.append(int(self.REQ.get('offset', 0))) data = self.DB.query(QS,QA) return dict(rows=self.DB.rowcount, data=data, error='') except Exception as e: return dict(rows=0, data=[], error=r) def set_rows_and_lang(self, rows, lng): if (rows!=self.SHOW_ROWS) or (lng!=self.LANG): # store rows self.DB.execute("UPDATE webaccess SET showrows=%s, lang=%s" "WHERE email=%s", [rows, lng, self.REMOTE_LOGIN]) return rows, lng return self.SHOW_ROWS, self.LANG def set_password(self, REMOTE_LOGIN, REQ): try: return self.change_passwd( self.DB, REMOTE_LOGIN, REQ.get('old'), REQ.get('new'), REQ.get('retyped'), self.LANG ) except Exception as e: return e def check_permission(self, qn): error = "" try: qname = safe.fn(qn) if 'A' in self.PERMS: self.DB.query("SELECT qname FROM log WHERE qname=%s", [qn]) else: self.DB.query( "SELECT qname FROM log WHERE qname=%s AND recipient=%s", [qn, self.REMOTE_USER] ) if self.DB.rowcount==0: error = "%s: Permission denied or no quarantined file found!" \ % qname if not os.path.isfile(qname): error = "File %s does not exist!" % qname except Exception as e: error = "Internal error occurred [%s]!" % e return dict( error=error, qname=qname ) def action_view(self, qname): try: email = qfile() email.parse(open(qname, 'rb')) # parse headers headers = message_from_string(email.message()) show_headers = \ utf_header(headers, 'From', _('From:')) + \ utf_header(headers, 'To', _('To:')) + \ utf_header(headers, 'Date', _('Date:')) + \ utf_header(headers, 'Subject', _('Subject:')) for key in list(headers.keys()): if key[:7]=='X-Spam-' or key[:10]=='X-Sagator-': show_headers.extend(utf_header(headers, key, key+':')) return dict( show_headers=show_headers, email=email ) except IOError as error: pass #except Exception as e: # print("action.html:", e) return dict(show_headers='', email=None) def action_recheck(self, qname): try: email = qfile() email.parse(open(qname, 'rb')) level, virname, status = self['SCANNER'].scanbuffer(email.message()) self['SCANNER'].destroy() if not virname: return dict( level=level, virname=_("CLEAN"), div_class="clean", error='' ) else: return dict( level=level, virname=tostr(virname), div_class="infected", error='' ) except Exception as error: return dict( level=0, virname='', div_class="clean", error=error ) def action_deliver(self, qname): try: return dict( status=tostr(send_qfile(qname, self.smtp_server)), error='' ) except Exception as error: return dict(status='', error=error) class webq_jinja_request_handler(SimpleHTTPRequestHandler): def do_POST(self): return self.send_head() def send_head(self): path = self.path i = path.rfind('?') if i>=0: path, query = path[:i], path[i+1:] else: query = '' if not path.strip('/'): path = '/index.html' scriptfile = self.translate_path("srv/templates/"+path) if not os.path.exists(scriptfile): self.send_error(404, "No such template (%r)" % path) return if not os.path.isfile(scriptfile): self.send_error(403, "Template is not a plain file (%r)" % path) return ns = namespace() # copy defaults ns.update(self.ENV) if query: ns['QUERY_STRING'] = query ns['REQUESTS'].update(parse_qs(query, True)) length = int(self.headers.get('content-length') or '0') if length > 0: data=self.rfile.read(length) ns['REQUESTS'].update(parse_qs(data, True)) for key, value in list(ns['REQUESTS'].items()): ns['REQ'][tostr(key)] = tostr(value[0], "utf8") ns['CONTENT_LENGTH'] = length ns['REMOTE_ADDR'] = self.client_address[0] ns.update(self.check_auth()) ns['_'].set_lang(ns['LANG']) # other ns.reinit() show_rows = int(ns.get('SHOW_ROWS', 0)) ns['prev_page'] = max(0, int(ns['REQ'].get('offset', 0)) - show_rows) ns['next_page'] = int(ns['REQ'].get('offset', 0)) + show_rows ns['query'] = '&'.join([ "%s=%s" % (x,y) for x,y in list(ns['REQ'].items()) if x!="offset" ]) if not ns.get('REMOTE_USER'): self.send_response(401, "Authorization Required") self.send_header("WWW-Authenticate", 'Basic realm="Restricted access"') self.end_headers() self.wfile.write(b"Authorization Required") return try: template = self.load_template(scriptfile.split('/')[-1]) s = template.render(**ns) self.send_response(200, "Script output follows") self.send_header("Content-type", "text/html; charset=UTF-8") except Exception as e: import traceback s = traceback.format_exc() self.send_response(500, "Internal server error!") self.send_header("Content-type", "text/plain; charset=UTF-8") s = s.encode('UTF-8') self.send_header("Content-Length", str(len(s))) self.end_headers() self.wfile.write(s) def check_auth(self): ''' Check authorization request. Return value is an empty dictionary, if authorization fails or an namespace update on success. ''' env = {} authorization = self.headers.get("authorization", "").encode() if authorization: authorization = authorization.split() if len(authorization) == 2: env['AUTH_TYPE'] = authorization[0] if authorization[0].lower() == b"basic": try: authorization = b64decode(authorization[1]) except binascii.Error: pass else: authorization = authorization.split(b':') if len(authorization) == 2: env.update(self.check_pass( [x.decode() for x in authorization] )) return env def check_pass(self, auth): ''' Check user password. Parameter auth is an array of: ['login', 'plain text password'] Return value is an empty dictionary, if authorization fails, or an namespace update on success. ''' try: passwd, perms, lang, showrows = self.ENV['DB'].query( "SELECT pass,perms,lang,showrows FROM webaccess WHERE email=%s", auth[:1] )[0] except IndexError: return {} # no record found for this login #cryptpass = crypt(auth[1], passwd) #if passwd!=cryptpass: if passcrypt.verify(auth[1], passwd): return { 'REMOTE_USER': self.USERCONV_REG.sub(self.USERCONV_REPL, auth[0]), 'REMOTE_LOGIN': auth[0], 'PERMS': perms, 'LANG': lang or "en_US", 'SHOW_ROWS': showrows or 50 } return {} class webq_jinja(ServiceTCPServer, HTTPServer): ''' Web service for sagator's quaratine access. This service can be used to access email collected by sagator via web interface. Requirements: python-jinja2 or python-jinja Usage: webq_jinja(host='0.0.0.0', port=8008, db, log='/var/log/sagator/webq.log', scanner, userconv) Where: host is an string, which defines IP address to bind, default: 0.0.0.0 port is an integer, which defines tcp port to listen, default: 8008 db is a database connection. For description see Databases.txt. log is defining a log file name, by default /var/log/sagator/webq.log scanner is a scanner to use for checking (only one scanner can be used here and it must be a buffer scanner!) userconv is an array, which defines regular expression and substitution strings. Usernames from login prompt are matched against this regular expression and substitued by substitution string. request_handler is an SimpleHTTPRequestHandler class. By default webq_jinja_request_handler is used. Use this class as parent if you need to override some functions. This parameter was introduced in sagator 1.3. It is recommended to use apache mod_proxy module to redirect standard web traffic from port 80 to webq()'s 8008. For example: ProxyPass /webq http://localhost:8008 ProxyPassReverse /webq http://localhost:8008 Example: See default config file for example. New in version 1.3.0. ''' name = 'webq_jinja()' def __init__(self, host='0.0.0.0', port=8008, db=None, log='/var/log/sagator/webq.log', scanner=const(0), userconv=['^(.*)$','\\1'], request_handler=webq_jinja_request_handler): self.BINDTO = (host, port) self.LOGFILE = log self.SCANNERS = [scanner] self.WORK_DIR = os.path.dirname(os.path.abspath(avlib.__file__)) self.WEB_ROOT = os.path.join(self.WORK_DIR, 'srv/templates') self.REQUEST_HANDLER = request_handler self.REQUEST_HANDLER.ENV = namespace( _ = _, DB = db, SCANNER = scanner, REQUESTS = {}, REQ = {}, LANGS = ['en_US', 'sk_SK'], LANG = 'C', REMOTE_ADDR = '', REMOTE_USER = '', REMOTE_LOGIN = '', PERMS = '', condition = condition, check_passwd = self.check_passwd, change_passwd = self.change_passwd ) TemplateLoader = self.get_template_loader() self.REQUEST_HANDLER.load_template = TemplateLoader.get_template self.REQUEST_HANDLER.USERCONV_REG = re.compile(userconv[0]) self.REQUEST_HANDLER.USERCONV_REPL = userconv[1] self.cracklib = pwq() def get_template_loader(self): try: from jinja2 import Environment, PackageLoader except ImportError: from jinja import Environment, PackageLoader return Environment(loader=PackageLoader('srv', 'templates')) def test_scanners(self, scanner): ServiceTCPServer.test_scanners(self, self.SCANNERS) # copy all templates import shutil try: import srv.templates except ImportError: debug.echo(1, 'webq(): ERROR: webq not installed, ' 'please install sagator-webq package') raise webqsource = os.path.abspath(os.path.dirname(srv.templates.__file__)) webqtarget = safe.fn(self.WEB_ROOT) if webqsource!=webqtarget: try: if not os.path.isdir(webqtarget): debug.echo(9, "Making webq chroot directory: ", webqtarget) os.makedirs(webqtarget) for f in os.listdir(webqsource): if os.path.isfile(os.path.join(webqsource, f)): debug.echo(9, "Copying webq file: ", f) shutil.copy(os.path.join(webqsource, f), os.path.join(webqtarget, f)) except OSError as e: debug.echo(9, "OSError: %s" % e) def serve_forever(self, poll_interval=0.5): if safe.fn('/usr')=='/usr': os.chdir(self.WORK_DIR) sys.path.insert(0, 'srv/templates') import srv.templates # reimport for chroot self.sighup(0,0) # redirect logs HTTPServer.serve_forever(self) def sighup(self,sn,stack): if debug.logfile=="-": return # reopen logs os.close(1) os.open(self.LOGFILE, os.O_CREAT|os.O_WRONLY|os.O_APPEND, 0o640) os.dup2(1,2) # copy to stderr fd def check_passwd(self, DB, login, old): ''' Check, if new and old passwords are same. ''' passwd, perms, lang, showrows = DB.query( "SELECT pass,perms,lang,showrows " "FROM webaccess WHERE email=%s", [login] )[0] if passcrypt.verify(old, passwd): return dict( REMOTE_LOGIN = login, PERMS = perms, LANG = lang or "en_US", SHOW_ROWS = showrows or 50 ) return dict() def change_passwd(self, DB, login, old, new, retyped, lang='C'): if not new: return "" if not self.check_passwd(DB, login, old): return "Wrong password!" if self.cracklib.check: e = self.cracklib.check(new) if e and e!=new: self.cracklib.set_lang(lang) return _('Password %s.') % self.cracklib.translate(str(e)) if new != retyped: return "New and retyped passwords does not match!" DB.execute("UPDATE webaccess SET pass=%s WHERE email=%s", [passcrypt.hash(new), login]) return "Password changed successfully."