initial upload
This commit is contained in:
240
smmapd_prototype/VerifierHandler.py
Normal file
240
smmapd_prototype/VerifierHandler.py
Normal file
@ -0,0 +1,240 @@
|
||||
import threading
|
||||
import socket
|
||||
import Queue
|
||||
import re
|
||||
import time
|
||||
|
||||
#import timeoutsocket
|
||||
import DNS
|
||||
|
||||
from Logging import *
|
||||
|
||||
from Cache import Cache
|
||||
|
||||
from SendmailSocketMapHandler import smmapBaseHandlerWorker
|
||||
from SendmailSocketMapHandler import smmapBaseHandlerContainer
|
||||
from SendmailSocketMapHandler import MyPermanentVerifierException
|
||||
from SendmailSocketMapHandler import MyTemporaryVerifierException
|
||||
|
||||
|
||||
|
||||
class VerifierHandlerContainer(smmapBaseHandlerContainer):
|
||||
def setup(self):
|
||||
DNS.ParseResolvConf()
|
||||
if self.config.get('EnableCaching').lower() in ('true', 'yes', '1'):
|
||||
debug("enabling cache")
|
||||
self.cache = Cache(int(self.config.get('CacheExpiration')))
|
||||
else:
|
||||
debug("disabling cache")
|
||||
self.cache = None
|
||||
|
||||
|
||||
class VerifierHandlerWorker(smmapBaseHandlerWorker):
|
||||
OK = "OK"
|
||||
NOK = "NOK"
|
||||
TEMPNOK = "TEMPNOK"
|
||||
|
||||
def setup(self):
|
||||
self.zombies = []
|
||||
|
||||
class checker(threading.Thread):
|
||||
def __init__(self, ready, config, host, address):
|
||||
threading.Thread.__init__(self)
|
||||
self.ready = ready
|
||||
self.config = config
|
||||
self.host = host
|
||||
self.address = address
|
||||
|
||||
def checkAddressAvailability(self):
|
||||
try:
|
||||
debug("Trying " + self.host)
|
||||
s = MySMTP(self.host, float(self.config.get('SMTPTimeOut')))
|
||||
s.helo(self.config.get('SMTPHeloParam'))
|
||||
s.mail(self.config.get('SMTPCheckSender'))
|
||||
s.rcpt(self.address.getAddress())
|
||||
s.quit()
|
||||
result = VerifierHandlerWorker.OK
|
||||
except MySMTPTemporaryException:
|
||||
result = VerifierHandlerWorker.TEMPNOK
|
||||
except MySMTPPermanentException:
|
||||
result = VerifierHandlerWorker.NOK
|
||||
except socket.timeout:
|
||||
result = VerifierHandlerWorker.TEMPNOK
|
||||
except socket.error:
|
||||
result = VerifierHandlerWorker.TEMPNOK
|
||||
return result
|
||||
|
||||
def run(self):
|
||||
self.result = self.checkAddressAvailability()
|
||||
self.ready.put(self.getName())
|
||||
debug("NOTIFIED Host %s, Result %s" % (self.host, self.result))
|
||||
|
||||
def getResult(self):
|
||||
return self.result
|
||||
|
||||
def getHost(self):
|
||||
return self.host
|
||||
|
||||
def getAddress(self):
|
||||
return self.address
|
||||
|
||||
def checkAvailability(self, mxes, address):
|
||||
ready = Queue.Queue()
|
||||
checkerThreads = {}
|
||||
for m in mxes:
|
||||
checkerThread = VerifierHandlerWorker.checker(ready, self.container.config, m, address)
|
||||
checkerThread.start()
|
||||
checkerThreads[checkerThread.getName()] = checkerThread
|
||||
|
||||
result = VerifierHandlerWorker.TEMPNOK
|
||||
while 1:
|
||||
debug("%i threads left" % len(checkerThreads))
|
||||
if len(checkerThreads) == 0:
|
||||
debug("no threads left ...")
|
||||
break
|
||||
if result != VerifierHandlerWorker.TEMPNOK:
|
||||
debug("got a permanent result ...")
|
||||
break
|
||||
debug("Waiting for results ...")
|
||||
name = ready.get()
|
||||
checkerThread = checkerThreads[name]
|
||||
checkerThread.join()
|
||||
tempResult = checkerThread.getResult()
|
||||
debug("success, result is " + str(tempResult))
|
||||
if [VerifierHandlerWorker.OK, VerifierHandlerWorker.NOK].count(tempResult) != 0:
|
||||
result = tempResult
|
||||
del checkerThreads[name]
|
||||
self.zombies.extend(checkerThreads.values())
|
||||
return result
|
||||
|
||||
def finish(self):
|
||||
while 1:
|
||||
debug("finish: %i zombies left" % len(self.zombies))
|
||||
for z in self.zombies:
|
||||
if not z.isAlive():
|
||||
debug("finish: thread %s for %s, %s terminated" % (z.getName(), z.getHost(), z.getAddress().getAddress()))
|
||||
self.zombies.remove(z)
|
||||
for z in self.zombies:
|
||||
debug("finish: left over %s for %s, %s" % (z.getName(), z.getHost(), z.getAddress().getAddress()))
|
||||
if len(self.zombies) == 0:
|
||||
debug("finish: no zombie left ...")
|
||||
break
|
||||
debug("finish: WAITING")
|
||||
time.sleep(5)
|
||||
debug("finish: CONTINUE")
|
||||
debug("finish: all threads terminated")
|
||||
|
||||
|
||||
def execute(self, address):
|
||||
debug("address " + address)
|
||||
|
||||
address = EMailAddress(address)
|
||||
|
||||
bestmxes = address.getBestMX()
|
||||
if not bestmxes:
|
||||
return "<NOK> <no bestmx found>"
|
||||
|
||||
if self.container.cache == None:
|
||||
debug("no caching")
|
||||
result = self.checkAvailability(bestmxes, address)
|
||||
else:
|
||||
result = self.container.cache.get(address.getAddress())
|
||||
if result == None:
|
||||
debug("not found in cache")
|
||||
result = self.checkAvailability(bestmxes, address)
|
||||
if result != VerifierHandlerWorker.TEMPNOK:
|
||||
self.container.cache.put(address.getAddress(), result)
|
||||
else:
|
||||
debug("found in cache")
|
||||
|
||||
if result == VerifierHandlerWorker.OK:
|
||||
return "<OK>"
|
||||
elif result == VerifierHandlerWorker.NOK:
|
||||
return "<NOK> <home server sent a permanent negative answer>"
|
||||
else:
|
||||
raise MyTemporaryVerifierException, "no mx reachable"
|
||||
|
||||
|
||||
class MySMTPPermanentException(ValueError): pass
|
||||
|
||||
class MySMTPTemporaryException(ValueError): pass
|
||||
|
||||
class MySMTP(object):
|
||||
def __init__(self, host, timeout, port=25):
|
||||
self.host = host
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.settimeout(timeout)
|
||||
self.socket.connect((host, port))
|
||||
self.socket.recv(8192)
|
||||
self.resPattern = re.compile(r'[\w\W]*?^(\d{3,3}) (.*?)[\r\n]{1,2}$', re.MULTILINE)
|
||||
|
||||
def checkResult(self, r):
|
||||
code, text = r
|
||||
code = code / 100
|
||||
|
||||
if code == 2:
|
||||
return;
|
||||
elif code == 4:
|
||||
raise MySMTPTemporaryException, text
|
||||
elif code == 5:
|
||||
raise MySMTPPermanentException, text
|
||||
else:
|
||||
raise MySMTPPermanentException, "unknown code: " + str(code) + ", text: " + str(text)
|
||||
|
||||
def docmd(self, cmd):
|
||||
debug("docmd: %s, cmd: %s " % (self.host, cmd))
|
||||
self.socket.sendall(cmd + "\r\n")
|
||||
res = self.socket.recv(8192)
|
||||
debug("docmd: result: (%s)" % res)
|
||||
m = self.resPattern.match(res)
|
||||
return self.checkResult((int(m.group(1)), m.group(2)))
|
||||
|
||||
def helo(self, param):
|
||||
return self.docmd("helo " + param)
|
||||
|
||||
def mail(self, sender):
|
||||
if sender[0] != '<' and sender[-1] != '>': sender = '<' + sender + '>'
|
||||
return self.docmd("mail from:" + sender)
|
||||
|
||||
def rcpt(self, recipient):
|
||||
return self.docmd("rcpt to:<%s>" % recipient)
|
||||
|
||||
def quit(self):
|
||||
self.docmd("quit")
|
||||
self.socket.close()
|
||||
|
||||
|
||||
class EMailAddress(object):
|
||||
def __init__(self, address):
|
||||
self.address = address
|
||||
if self.address[0] == '<' and self.address[-1] == '>': self.address = self.address[1:-1]
|
||||
try:
|
||||
self.userpart, self.domain = self.address.split('@')
|
||||
except ValueError:
|
||||
raise MyPermanentVerifierException, "excepted email address, found not at-sign"
|
||||
|
||||
def getUserPart(self):
|
||||
return self.userpart
|
||||
|
||||
def getDomain(self):
|
||||
return self.domain
|
||||
|
||||
def getAddress(self):
|
||||
return self.address
|
||||
|
||||
def getBestMX(self):
|
||||
if self.domain[0] == '[' and self.domain[-1] == ']':
|
||||
bestmx2 = [self.domain[1:-1]]
|
||||
else:
|
||||
bestmx = DNS.mxlookup(self.domain)
|
||||
pref = None
|
||||
bestmx2 = []
|
||||
for mx in bestmx:
|
||||
if pref == None: pref = mx[0]
|
||||
if pref == mx[0]:
|
||||
bestmx2.append(mx[1])
|
||||
else:
|
||||
break
|
||||
debug("bestmx " + str(bestmx2))
|
||||
return bestmx2
|
||||
|
Reference in New Issue
Block a user