import threading import socket import Queue import re import time #import timeoutsocket import DNS from Logging import * from Cache import Cache from SendmailSocketMapHandler import smmapBaseHandlerWorker from SendmailSocketMapHandler import smmapBaseHandlerContainer from SendmailSocketMapHandler import MyPermanentVerifierException from SendmailSocketMapHandler import MyTemporaryVerifierException class VerifierHandlerContainer(smmapBaseHandlerContainer): def setup(self): DNS.ParseResolvConf() if self.config.get('EnableCaching').lower() in ('true', 'yes', '1'): debug("enabling cache") self.cache = Cache(int(self.config.get('CacheExpiration'))) else: debug("disabling cache") self.cache = None class VerifierHandlerWorker(smmapBaseHandlerWorker): OK = "OK" NOK = "NOK" TEMPNOK = "TEMPNOK" def setup(self): self.zombies = [] class checker(threading.Thread): def __init__(self, ready, config, host, address): threading.Thread.__init__(self) self.ready = ready self.config = config self.host = host self.address = address def checkAddressAvailability(self): try: debug("Trying " + self.host) s = MySMTP(self.host, float(self.config.get('SMTPTimeOut'))) s.helo(self.config.get('SMTPHeloParam')) s.mail(self.config.get('SMTPCheckSender')) s.rcpt(self.address.getAddress()) s.quit() result = VerifierHandlerWorker.OK except MySMTPTemporaryException: result = VerifierHandlerWorker.TEMPNOK except MySMTPPermanentException: result = VerifierHandlerWorker.NOK except socket.timeout: result = VerifierHandlerWorker.TEMPNOK except socket.error: result = VerifierHandlerWorker.TEMPNOK return result def run(self): self.result = self.checkAddressAvailability() self.ready.put(self.getName()) debug("NOTIFIED Host %s, Result %s" % (self.host, self.result)) def getResult(self): return self.result def getHost(self): return self.host def getAddress(self): return self.address def checkAvailability(self, mxes, address): ready = Queue.Queue() checkerThreads = {} for m in mxes: checkerThread = VerifierHandlerWorker.checker(ready, self.container.config, m, address) checkerThread.start() checkerThreads[checkerThread.getName()] = checkerThread result = VerifierHandlerWorker.TEMPNOK while 1: debug("%i threads left" % len(checkerThreads)) if len(checkerThreads) == 0: debug("no threads left ...") break if result != VerifierHandlerWorker.TEMPNOK: debug("got a permanent result ...") break debug("Waiting for results ...") name = ready.get() checkerThread = checkerThreads[name] checkerThread.join() tempResult = checkerThread.getResult() debug("success, result is " + str(tempResult)) if [VerifierHandlerWorker.OK, VerifierHandlerWorker.NOK].count(tempResult) != 0: result = tempResult del checkerThreads[name] self.zombies.extend(checkerThreads.values()) return result def finish(self): while 1: debug("finish: %i zombies left" % len(self.zombies)) for z in self.zombies: if not z.isAlive(): debug("finish: thread %s for %s, %s terminated" % (z.getName(), z.getHost(), z.getAddress().getAddress())) self.zombies.remove(z) for z in self.zombies: debug("finish: left over %s for %s, %s" % (z.getName(), z.getHost(), z.getAddress().getAddress())) if len(self.zombies) == 0: debug("finish: no zombie left ...") break debug("finish: WAITING") time.sleep(5) debug("finish: CONTINUE") debug("finish: all threads terminated") def execute(self, address): debug("address " + address) address = EMailAddress(address) bestmxes = address.getBestMX() if not bestmxes: return " " if self.container.cache == None: debug("no caching") result = self.checkAvailability(bestmxes, address) else: result = self.container.cache.get(address.getAddress()) if result == None: debug("not found in cache") result = self.checkAvailability(bestmxes, address) if result != VerifierHandlerWorker.TEMPNOK: self.container.cache.put(address.getAddress(), result) else: debug("found in cache") if result == VerifierHandlerWorker.OK: return "" elif result == VerifierHandlerWorker.NOK: return " " else: raise MyTemporaryVerifierException, "no mx reachable" class MySMTPPermanentException(ValueError): pass class MySMTPTemporaryException(ValueError): pass class MySMTP(object): def __init__(self, host, timeout, port=25): self.host = host self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(timeout) self.socket.connect((host, port)) self.socket.recv(8192) self.resPattern = re.compile(r'[\w\W]*?^(\d{3,3}) (.*?)[\r\n]{1,2}$', re.MULTILINE) def checkResult(self, r): code, text = r code = code / 100 if code == 2: return; elif code == 4: raise MySMTPTemporaryException, text elif code == 5: raise MySMTPPermanentException, text else: raise MySMTPPermanentException, "unknown code: " + str(code) + ", text: " + str(text) def docmd(self, cmd): debug("docmd: %s, cmd: %s " % (self.host, cmd)) self.socket.sendall(cmd + "\r\n") res = self.socket.recv(8192) debug("docmd: result: (%s)" % res) m = self.resPattern.match(res) return self.checkResult((int(m.group(1)), m.group(2))) def helo(self, param): return self.docmd("helo " + param) def mail(self, sender): if sender[0] != '<' and sender[-1] != '>': sender = '<' + sender + '>' return self.docmd("mail from:" + sender) def rcpt(self, recipient): return self.docmd("rcpt to:<%s>" % recipient) def quit(self): self.docmd("quit") self.socket.close() class EMailAddress(object): def __init__(self, address): self.address = address if self.address[0] == '<' and self.address[-1] == '>': self.address = self.address[1:-1] try: self.userpart, self.domain = self.address.split('@') except ValueError: raise MyPermanentVerifierException, "excepted email address, found not at-sign" def getUserPart(self): return self.userpart def getDomain(self): return self.domain def getAddress(self): return self.address def getBestMX(self): if self.domain[0] == '[' and self.domain[-1] == ']': bestmx2 = [self.domain[1:-1]] else: bestmx = DNS.mxlookup(self.domain) pref = None bestmx2 = [] for mx in bestmx: if pref == None: pref = mx[0] if pref == mx[0]: bestmx2.append(mx[1]) else: break debug("bestmx " + str(bestmx2)) return bestmx2