''' dspam module for python, version 0.5 (c) 2004-2016 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from __future__ import absolute_import from ctypes import * from ctypes.util import find_library from .const import * class DSpamError(Exception): '''DSpamError''' pass class ds_spam_signature(Structure): _fields_ = [ ('data', c_void_p), ('length', c_ulong) ] class ds_spam_totals(Structure): _fields_ = [ ('spam_learned', c_long), ('innocent_learned', c_long), ('spam_misclassified', c_long), ('innocent_misclassified', c_long), ('spam_corpusfed', c_long), ('innocent_corpusfed', c_long), ('spam_classified', c_long), ('innocent_classified', c_long) ] class DSPAM_CTX(Structure): _fields_ = [ ('totals', ds_spam_totals), ('signature', POINTER(ds_spam_signature)), ('message', c_void_p), ('config', c_void_p), ('username', c_char_p), ('group', c_char_p), ('home', c_char_p), ('operating_mode', c_int), # DSM_ ('training_mode', c_int), # DST_ ('training_buffer', c_int), # 0-10 ('wh_threshold', c_int), ('classification', c_int), # DSR_ ('source', c_int), # DSS_ ('learned', c_int), ('tokenizer', c_int), # DSZ_ ('flags', c_uint32), ('algorithms', c_uint32), ('result', c_int), ('class', c_char*32), ('probability', c_float), ('confidence', c_float), ('locked', c_int), ('storage', c_void_p), ('_process_start', c_uint32), # ??? time_t ??? ('_sig_provided', c_int), ('factors', c_void_p) ] _lib = None class dspam: def __init__(self, home, op=DSM_PROCESS, flags=DSF_SIGNATURE|DSF_NOISE|DSF_BIAS, algorithms=DSA_GRAHAM | DSA_BURTON | DSP_GRAHAM, tokenizer=DSZ_CHAIN, user='root', group=None, lib=None): global _lib if _lib is None: if lib is None: lib = find_library('dspam') # load lib and set return types _lib = cdll.LoadLibrary(lib) _lib.dspam_init.restype = POINTER(DSPAM_CTX) _lib.dspam_create.restype = POINTER(DSPAM_CTX) #_lib.dspam_process.argtypes = [DSPAM_CTX, c_char_p] _lib.dspam_destroy.restype = None # init dspam self.CTX = _lib.dspam_init(user, group, home, op, flags); if self.CTX is None: raise DSpamError(0, "dspam_init failed!") self.CTX.contents.algorithms = algorithms self.CTX.contents.tokenizer = tokenizer def __del__(self): if _lib: _lib.dspam_destroy(self.CTX) def check_error(self, es): if self.ec!=0: raise DSpamError(self.ec, es+" failed ("+str(self.ec)+")!") def training_mode(self, mode): self.CTX.contents.training_mode = mode def classification(self, classification): self.CTX.contents.classification = classification def source(self, source): self.CTX.contents.source = source def init_driver(self): self.ec = _lib.dspam_init_driver(byref(self.DTX)) self.check_error("dspam_init_driver") def shutdown_driver(self): self.ec = _lib.dspam_shutdown_driver(byref(self.DTX)) self.check_error("dspam_shutdown_driver") def attach(self): self.ec = _lib.dspam_attach(self.CTX, byref(self.DTX)) self.check_error("dspam_attach") def detach(self): self.ec = _lib.dspam_detach(self.CTX) self.check_error("dspam_detach") def process(self, message): self.ec = _lib.dspam_process(self.CTX, message) self.check_error("dspam_process") return self.CTX.contents.result, \ self.CTX.contents.probability, \ self.CTX.contents.confidence def getsource(self, buf, size): self.ec = _lib.dspam_getsource(self.CTX, buf, size) self.check_error("dspam_getsource") def addattribute(self, name, value): self.ec = _lib.dspam_addattribute(self.CTX, name, value) self.check_error("dspam_addattribute") def clearattributes(self): self.ec = _lib.dspam_clearattributes(self.CTX) self.check_error("dspam_clearattributes") def create_signature_id(self): #cdef char buf[128] buf = (c_char * 128)() while True: _lib._ds_create_signature_id(self.CTX, buf, sizeof(buf)) if _lib._ds_verify_signature(self.CTX, buf): break return string_at(buf, 128).rstrip('\0') def delete_signature(self, signature): self.ec = _lib._ds_delete_signature(self.CTX, signature) self.check_error("delete_signature('%s')" % signature) def get_signature(self, signature): #cdef _ds_spam_signature_t *SIG #SIG=<_ds_spam_signature_t *>malloc(sizeof(SIG)) SIG = (ds_spam_signature)() self.ec = _lib._ds_get_signature(self.CTX, SIG, signature) self.check_error("get_signature('%s')" % signature) self.CTX.contents.signature = SIG def set_signature(self, signature): self.ec = _lib._ds_set_signature(self.CTX, self.CTX.contents.signature, signature) self.check_error("set_signature('%s')" % signature) def save_signature(self): if self.CTX.contents.signature!=None: sig = self.create_signature_id() self.set_signature(sig) return sig else: return ''