diff --git a/client/shell/yadync.sh b/client/shell/yadync.sh index 665fa47..3ec2b93 100755 --- a/client/shell/yadync.sh +++ b/client/shell/yadync.sh @@ -5,5 +5,5 @@ DYNID="testhost" DATE=`date +%s` CHECKSUM=`echo -n "$DYNID $SHAREDSECRET $DATE" | md5sum | awk '{print $1}'` OUT="$DYNID $DATE $CHECKSUM" -echo -n $OUT | nc -q 0 -u 127.0.0.1 9090 +echo -n $OUT | nc -q 0 -u 127.0.0.1 8053 diff --git a/server/DnsHandler.py b/server/DnsHandler.py new file mode 100644 index 0000000..c3d5bde --- /dev/null +++ b/server/DnsHandler.py @@ -0,0 +1,80 @@ +import dns + +class DnsHandler(threading.Thread): + def __init__(self, msgQueue, tsigKey, nsAddress, ttl, nullIp): + threading.Thread.__init__(self) + self.msgQueue = msgQueue + self.nullIp = nullIp + self.nsAddress = nsAddress + self.ttl = ttl + self.keyring = dns.tsigkeyring.from_text(tsigKey) + + def run(self): + while(True): + msg = self.msgQueue.get() + + if msg.successFlag: + # success + Logger.dbg("DnsManipulator: retrieved positive message") + if not self.statusMap.has_key(msg.ip) or not self.statusMap[msg.ip]: + self.statusMap[msg.ip] = True + self.insertARR(msg.ip) + + self.notifyQueue.put(NotificationMessage("Server returned", "Server %s is back" % msg.ip)) + else: + # failure + Logger.dbg("DnsManipulator: retrieved negative message") + if not self.statusMap.has_key(msg.ip) or self.statusMap[msg.ip]: + self.statusMap[msg.ip] = False + self.deleteARR(msg.ip) + + self.notifyQueue.put(NotificationMessage("Server unavailable", "Server %s has gone" % msg.ip)) + + failureCnt = 0 + for v in self.statusMap.values(): + if not v: + failureCnt += 1 + + if failureCnt > 0 and not self.exitServerFlag: + self.insertARR(self.exitServerIp) + self.exitServerFlag = True + + if failureCnt == 0 and self.exitServerFlag: + self.deleteARR(self.exitServerIp) + self.exitServerFlag = False + + self.notifyQueue.put(NotificationMessage("All clear", "All clear, exit-server removed from DNS")) + + def insertARR(self, ip): + # send A-RR insertion for ip to DNS server + Logger.log("insert ARR %s" % ip) + + rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip) + u = dns.update.Update(self.zone, keyring=self.keyring) + u.add(self.name, self.ttl, rr) + r = dns.query.tcp(u, self.nsAddress) + + if r.rcode() != 0: + # failure + Logger.log("failure when inserting A-RR for %s" % ip) + else: + # success + pass + + def deleteARR(self, ip): + # send A-RR deletion for ip to DNS server + Logger.log("delete ARR %s" % ip) + + rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip) + u = dns.update.Update(self.zone, keyring=self.keyring) + u.delete(self.name, rr) + r = dns.query.tcp(u, self.nsAddress) + + if r.rcode() != 0: + # failure + Logger.log("failure when deleting A-RR for %s" % ip) + else: + # success + pass + + diff --git a/server/DynHandler.py b/server/DynHandler.py new file mode 100644 index 0000000..8f7f8d4 --- /dev/null +++ b/server/DynHandler.py @@ -0,0 +1,19 @@ +import threading +import Event + +class DynHandler(threading.Thread): + def __init__(self, queue): + threading.Thread.__init__(self) + self.q = queue + self.setDaemon(True) + + def run(self): + while True: + event = self.q.get() + try: + event.prepare() + print "Processing event %s" % str(event) + event.process() + except Event.IllegalEventException, e: + print "Some failure: %s" % e.msg + diff --git a/server/DynReceiver.py b/server/DynReceiver.py new file mode 100644 index 0000000..b0c0858 --- /dev/null +++ b/server/DynReceiver.py @@ -0,0 +1,26 @@ +import threading +import socket +import Queue +import time +import Event + + + +class DynReceiver(threading.Thread): + def __init__(self, dynRecvAddr, queue): + threading.Thread.__init__(self) + self.dynRecvAddr = dynRecvAddr + self.queue = queue + self.setDaemon(True) + + def run(self): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind(self.dynRecvAddr) + + while True: + data, address = s.recvfrom(256) + try: + self.queue.put_nowait(Event.Event(address, data, int(time.time()))) + except Queue.Full: + print "Event %s from %s dropped" % (data, str(address)) + diff --git a/server/Entry.py b/server/Entry.py new file mode 100644 index 0000000..f1768c8 --- /dev/null +++ b/server/Entry.py @@ -0,0 +1,12 @@ + +class Entry(object): + def __init__(self, dynid, sharedSecret, name): + self.dynid = dynid + self.sharedSecret = sharedSecret + self.name = name + self.lastEventTime = 0 + self.address = '' + + def __str__(self): + return "dynid=%s, name=%s, sharedSecret=%s, address=%s, lastEventTime=%d" % (self.dynid, self.name, self.sharedSecret, self.address, self.lastEventTime) + diff --git a/server/Event.py b/server/Event.py new file mode 100644 index 0000000..c582d29 --- /dev/null +++ b/server/Event.py @@ -0,0 +1,67 @@ +import md5 + +class IllegalEventException(Exception): + def __init__(self, msg): + self.msg = msg + + +class Event(object): + @classmethod + def setParams(cls, entries, msgTimeCorridor): + cls.entries = entries + cls.msgTimeCorridor = msgTimeCorridor + + def __init__(self, address, data, receiveTime): + self.address = address + self.data = data + self.receiveTime = receiveTime + + def prepare(self): + self.port = self.address[1] + self.address = self.address[0] + parts = self.data.split(' ') + if len(parts) != 3: + raise IllegalEventException("data format error 1") + (self.dynid, self.msgTime, self.checksum) = self.data.split(' ') + try: + self.msgTime = int(self.msgTime) + except ValueError, e: + raise IllegalEventException("data format error 2") + self.prepared = True + + def process(self): + if not self.prepared: + self.prepare() + + if not Event.entries.has_key(self.dynid): + raise IllegalEventException("unknown dynid in event %s" % str(self)) + entry = Event.entries[self.dynid] + + if self.msgTime + Event.msgTimeCorridor < self.receiveTime: + raise IllegalEventException("event too old %s" % str(self)) + if self.msgTime - Event.msgTimeCorridor > self.receiveTime: + raise IllegalEventException("event too young %s" % str(self)) + + + if entry.lastEventTime >= self.msgTime: + raise IllegalEventException("timing sequence failure in event, possibly replay %s" % str(self)) + + di = "%s %s %d" % (self.dynid, entry.sharedSecret, self.msgTime) + d = md5.new(di).hexdigest() + print "%s, received: %s, calculated: %s" % (di, self.checksum, d) + if d != self.checksum: + raise IllegalEventException("wrong checksum for event %s" % str(self)) + + entry.lastEventTime = self.msgTime + + if entry.address == self.address: + print "Same address, nothing to do." + else: + entry.address = self.address + print "Set in DNS: %s -> %s" % (entry.name, entry.address) + + + def __str__(self): + if not self.prepared: + self.prepare() + return "%s from %s:%d" % (self.data, self.address, self.port) diff --git a/server/Expirer.py b/server/Expirer.py new file mode 100644 index 0000000..75e0773 --- /dev/null +++ b/server/Expirer.py @@ -0,0 +1,24 @@ +import threading +import time + +class Expirer(threading.Thread): + def __init__(self, entries, eventLifeTime, nullAddress): + threading.Thread.__init__(self) + self.entries = entries + self.eventLifeTime = eventLifeTime + self.nullAddress = nullAddress + self.setDaemon(True) + + def run(self): + while True: + print "Expiring ..." + currentTime = int(time.time()) + for entry in self.entries.values(): + print "Checking ", str(entry) + if entry.lastEventTime != 0 and entry.lastEventTime + self.eventLifeTime < currentTime: + print "Entry %s expired" % entry.dynid + entry.lastEventTime = 0 + entry.address = self.nullAddress + print "Set in DNS: %s -> %s" % (entry.name, entry.address) + time.sleep(10) + diff --git a/server/yadyn b/server/yadyn index c547621..9942188 100755 --- a/server/yadyn +++ b/server/yadyn @@ -1,197 +1,18 @@ #!/usr/bin/python -import socket import Queue -import threading -import md5 import time -import dns +import shelve + +import DynHandler +# import DnsHandler +import Expirer +import DynReceiver +import Entry +import Event -class Entry(object): - def __init__(self, dynid, sharedSecret, name): - self.dynid = dynid - self.sharedSecret = sharedSecret - self.name = name - self.lastEventTime = 0 - self.address = '' -class IllegalEventException(Exception): - def __init__(self, msg): - self.msg = msg - - -class Event(object): - def __init__(self, address, data, receiveTime): - self.address = address - self.data = data - self.receiveTime = receiveTime - - def prepare(self): - self.port = self.address[1] - self.address = self.address[0] - (self.dynid, self.msgTime, self.checksum) = self.data.split(' ') - self.msgTime = int(self.msgTime) - self.prepared = True - - def process(self): - if not self.prepared: - self.prepare() - - if not ENTRIES.has_key(self.dynid): - raise IllegalEventException("unknown dynid in event %s" % str(self)) - entry = ENTRIES[self.dynid] - - if self.msgTime + MSG_TIME_CORRIDOR < self.receiveTime: - raise IllegalEventException("event too old %s" % str(self)) - if self.msgTime - MSG_TIME_CORRIDOR > self.receiveTime: - raise IllegalEventException("event too young %s" % str(self)) - - - if entry.lastEventTime >= self.msgTime: - raise IllegalEventException("timing sequence failure in event, possibly replay %s" % str(self)) - - di = "%s %s %d" % (self.dynid, entry.sharedSecret, self.msgTime) - d = md5.new(di).hexdigest() - print "%s, received: %s, calculated: %s" % (di, self.checksum, d) - if d != self.checksum: - raise IllegalEventException("wrong checksum for event %s" % str(self)) - - entry.lastEventTime = self.msgTime - - if entry.address == self.address: - print "Same address, nothing to do." - else: - entry.address = self.address - print "Set in DNS: %s -> %s" % (entry.name, entry.address) - - - def __str__(self): - if not self.prepared: - self.prepare() - return "%s from %s:%d" % (self.data, self.address, self.port) - -class Handler(threading.Thread): - def __init__(self, queue): - threading.Thread.__init__(self) - self.q = queue - self.setDaemon(True) - - def run(self): - while True: - event = self.q.get() - try: - event.prepare() - print "Processing event %s" % str(event) - event.process() - except IllegalEventException, e: - print "Some failure: %s" % e.msg - -class Expirer(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - self.setDaemon(True) - - def run(self): - while True: - print "Expiring ..." - currentTime = int(time.time()) - for entry in ENTRIES.values(): - if entry.lastEventTime != 0 and entry.lastEventTime + EVENT_LIFE_TIME < currentTime: - print "Entry %s expired" % entry.dynid - entry.lastEventTime = 0 - entry.address = NULL_ADDRESS - print "Set in DNS: %s -> %s" % (entry.name, entry.address) - time.sleep(10) - - -class DnsManipulator(threading.Thread): - def __init__(self, msgQueue, notifyQueue, tsigKey, nsAddress, zone, name, ttl, exitServerIp): - threading.Thread.__init__(self) - self.msgQueue = msgQueue - self.notifyQueue = notifyQueue - self.exitServerIp = exitServerIp - self.statusMap = {} - self.exitServerFlag = True - self.nsAddress = nsAddress - self.zone = zone - self.name = name - self.ttl = ttl - self.keyring = dns.tsigkeyring.from_text(tsigKey) - - def run(self): - while(True): - msg = self.msgQueue.get() - - if msg.successFlag: - # success - Logger.dbg("DnsManipulator: retrieved positive message") - if not self.statusMap.has_key(msg.ip) or not self.statusMap[msg.ip]: - self.statusMap[msg.ip] = True - self.insertARR(msg.ip) - - self.notifyQueue.put(NotificationMessage("Server returned", "Server %s is back" % msg.ip)) - else: - # failure - Logger.dbg("DnsManipulator: retrieved negative message") - if not self.statusMap.has_key(msg.ip) or self.statusMap[msg.ip]: - self.statusMap[msg.ip] = False - self.deleteARR(msg.ip) - - self.notifyQueue.put(NotificationMessage("Server unavailable", "Server %s has gone" % msg.ip)) - - failureCnt = 0 - for v in self.statusMap.values(): - if not v: - failureCnt += 1 - - if failureCnt > 0 and not self.exitServerFlag: - self.insertARR(self.exitServerIp) - self.exitServerFlag = True - - if failureCnt == 0 and self.exitServerFlag: - self.deleteARR(self.exitServerIp) - self.exitServerFlag = False - - self.notifyQueue.put(NotificationMessage("All clear", "All clear, exit-server removed from DNS")) - - def insertARR(self, ip): - # send A-RR insertion for ip to DNS server - Logger.log("insert ARR %s" % ip) - - rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip) - u = dns.update.Update(self.zone, keyring=self.keyring) - u.add(self.name, self.ttl, rr) - r = dns.query.tcp(u, self.nsAddress) - - if r.rcode() != 0: - # failure - Logger.log("failure when inserting A-RR for %s" % ip) - else: - # success - pass - - def deleteARR(self, ip): - # send A-RR deletion for ip to DNS server - Logger.log("delete ARR %s" % ip) - - rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip) - u = dns.update.Update(self.zone, keyring=self.keyring) - u.delete(self.name, rr) - r = dns.query.tcp(u, self.nsAddress) - - if r.rcode() != 0: - # failure - Logger.log("failure when deleting A-RR for %s" % ip) - else: - # success - pass - - - -ENTRIES = { - 'testhost': Entry('testhost,', 'test123', 'test.test.de'), - } MSG_TIME_CORRIDOR = 5 EVENT_LIFE_TIME = 10 NULL_ADDRESS = '0.0.0.0' @@ -200,36 +21,31 @@ NAME = 'serve' TTL = 120 NAMESERVER = '127.0.0.1' TSIGKEY = { "monitoring." : "HYHN8l/dg1+q6QLOURp2qw==" } -EXITSERVER = '87.230.59.51' -FAILURE_THRESHOLD = 3 -PERIOD = 10 -FROM = "root@hottis.de" -TO = "dns-alarm@adinside.de" -SMTPHOST = "submission.hottis.de" -SMTPPORT = 25 -SMTPLOGIN = "admon-adinside" -SMTPPASSWD = "test123" -DEBUG = False -PIDFILE = "/tmp/monitor.pid" +PIDFILE = "/tmp/yadyn.pid" +entries = shelve.open('entries', flag='c', writeback=True) +if len(entries) == 0: + entries['testhost'] = Entry.Entry('testhost', 'test123', 'test.test.de') -q = Queue.Queue() +try: + Event.Event.setParams(entries, MSG_TIME_CORRIDOR) -handler = Handler(q) -handler.start() + q = Queue.Queue() -expirer = Expirer() -expirer.start() + dynHandler = DynHandler.DynHandler(q) + dynHandler.start() -s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -s.bind(("", 9090)) + dynReceiver = DynReceiver.DynReceiver(("", 8053), q) + dynReceiver.start() -while True: - data, address = s.recvfrom(256) - try: - q.put_nowait(Event(address, data, int(time.time()))) - except Queue.Full: - print "Event %s from %s dropped" % (data, str(address)) + expirer = Expirer.Expirer(entries, EVENT_LIFE_TIME, NULL_ADDRESS) + expirer.start() + while True: + entries.sync() + time.sleep(10) +finally: + print "Closing shelf" + entries.close()