This commit is contained in:
who
2007-11-12 11:17:25 +01:00
parent fbb070c940
commit 197d60b102
8 changed files with 256 additions and 212 deletions

View File

@ -5,5 +5,5 @@ DYNID="testhost"
DATE=`date +%s`
CHECKSUM=`echo -n "$DYNID $SHAREDSECRET $DATE" | md5sum | awk '{print $1}'`
OUT="$DYNID $DATE $CHECKSUM"
echo -n $OUT | nc -q 0 -u 127.0.0.1 9090
echo -n $OUT | nc -q 0 -u 127.0.0.1 8053

80
server/DnsHandler.py Normal file
View File

@ -0,0 +1,80 @@
import dns
class DnsHandler(threading.Thread):
def __init__(self, msgQueue, tsigKey, nsAddress, ttl, nullIp):
threading.Thread.__init__(self)
self.msgQueue = msgQueue
self.nullIp = nullIp
self.nsAddress = nsAddress
self.ttl = ttl
self.keyring = dns.tsigkeyring.from_text(tsigKey)
def run(self):
while(True):
msg = self.msgQueue.get()
if msg.successFlag:
# success
Logger.dbg("DnsManipulator: retrieved positive message")
if not self.statusMap.has_key(msg.ip) or not self.statusMap[msg.ip]:
self.statusMap[msg.ip] = True
self.insertARR(msg.ip)
self.notifyQueue.put(NotificationMessage("Server returned", "Server %s is back" % msg.ip))
else:
# failure
Logger.dbg("DnsManipulator: retrieved negative message")
if not self.statusMap.has_key(msg.ip) or self.statusMap[msg.ip]:
self.statusMap[msg.ip] = False
self.deleteARR(msg.ip)
self.notifyQueue.put(NotificationMessage("Server unavailable", "Server %s has gone" % msg.ip))
failureCnt = 0
for v in self.statusMap.values():
if not v:
failureCnt += 1
if failureCnt > 0 and not self.exitServerFlag:
self.insertARR(self.exitServerIp)
self.exitServerFlag = True
if failureCnt == 0 and self.exitServerFlag:
self.deleteARR(self.exitServerIp)
self.exitServerFlag = False
self.notifyQueue.put(NotificationMessage("All clear", "All clear, exit-server removed from DNS"))
def insertARR(self, ip):
# send A-RR insertion for ip to DNS server
Logger.log("insert ARR %s" % ip)
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
u = dns.update.Update(self.zone, keyring=self.keyring)
u.add(self.name, self.ttl, rr)
r = dns.query.tcp(u, self.nsAddress)
if r.rcode() != 0:
# failure
Logger.log("failure when inserting A-RR for %s" % ip)
else:
# success
pass
def deleteARR(self, ip):
# send A-RR deletion for ip to DNS server
Logger.log("delete ARR %s" % ip)
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
u = dns.update.Update(self.zone, keyring=self.keyring)
u.delete(self.name, rr)
r = dns.query.tcp(u, self.nsAddress)
if r.rcode() != 0:
# failure
Logger.log("failure when deleting A-RR for %s" % ip)
else:
# success
pass

19
server/DynHandler.py Normal file
View File

@ -0,0 +1,19 @@
import threading
import Event
class DynHandler(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.q = queue
self.setDaemon(True)
def run(self):
while True:
event = self.q.get()
try:
event.prepare()
print "Processing event %s" % str(event)
event.process()
except Event.IllegalEventException, e:
print "Some failure: %s" % e.msg

26
server/DynReceiver.py Normal file
View File

@ -0,0 +1,26 @@
import threading
import socket
import Queue
import time
import Event
class DynReceiver(threading.Thread):
def __init__(self, dynRecvAddr, queue):
threading.Thread.__init__(self)
self.dynRecvAddr = dynRecvAddr
self.queue = queue
self.setDaemon(True)
def run(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(self.dynRecvAddr)
while True:
data, address = s.recvfrom(256)
try:
self.queue.put_nowait(Event.Event(address, data, int(time.time())))
except Queue.Full:
print "Event %s from %s dropped" % (data, str(address))

12
server/Entry.py Normal file
View File

@ -0,0 +1,12 @@
class Entry(object):
def __init__(self, dynid, sharedSecret, name):
self.dynid = dynid
self.sharedSecret = sharedSecret
self.name = name
self.lastEventTime = 0
self.address = ''
def __str__(self):
return "dynid=%s, name=%s, sharedSecret=%s, address=%s, lastEventTime=%d" % (self.dynid, self.name, self.sharedSecret, self.address, self.lastEventTime)

67
server/Event.py Normal file
View File

@ -0,0 +1,67 @@
import md5
class IllegalEventException(Exception):
def __init__(self, msg):
self.msg = msg
class Event(object):
@classmethod
def setParams(cls, entries, msgTimeCorridor):
cls.entries = entries
cls.msgTimeCorridor = msgTimeCorridor
def __init__(self, address, data, receiveTime):
self.address = address
self.data = data
self.receiveTime = receiveTime
def prepare(self):
self.port = self.address[1]
self.address = self.address[0]
parts = self.data.split(' ')
if len(parts) != 3:
raise IllegalEventException("data format error 1")
(self.dynid, self.msgTime, self.checksum) = self.data.split(' ')
try:
self.msgTime = int(self.msgTime)
except ValueError, e:
raise IllegalEventException("data format error 2")
self.prepared = True
def process(self):
if not self.prepared:
self.prepare()
if not Event.entries.has_key(self.dynid):
raise IllegalEventException("unknown dynid in event %s" % str(self))
entry = Event.entries[self.dynid]
if self.msgTime + Event.msgTimeCorridor < self.receiveTime:
raise IllegalEventException("event too old %s" % str(self))
if self.msgTime - Event.msgTimeCorridor > self.receiveTime:
raise IllegalEventException("event too young %s" % str(self))
if entry.lastEventTime >= self.msgTime:
raise IllegalEventException("timing sequence failure in event, possibly replay %s" % str(self))
di = "%s %s %d" % (self.dynid, entry.sharedSecret, self.msgTime)
d = md5.new(di).hexdigest()
print "%s, received: %s, calculated: %s" % (di, self.checksum, d)
if d != self.checksum:
raise IllegalEventException("wrong checksum for event %s" % str(self))
entry.lastEventTime = self.msgTime
if entry.address == self.address:
print "Same address, nothing to do."
else:
entry.address = self.address
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
def __str__(self):
if not self.prepared:
self.prepare()
return "%s from %s:%d" % (self.data, self.address, self.port)

24
server/Expirer.py Normal file
View File

@ -0,0 +1,24 @@
import threading
import time
class Expirer(threading.Thread):
def __init__(self, entries, eventLifeTime, nullAddress):
threading.Thread.__init__(self)
self.entries = entries
self.eventLifeTime = eventLifeTime
self.nullAddress = nullAddress
self.setDaemon(True)
def run(self):
while True:
print "Expiring ..."
currentTime = int(time.time())
for entry in self.entries.values():
print "Checking ", str(entry)
if entry.lastEventTime != 0 and entry.lastEventTime + self.eventLifeTime < currentTime:
print "Entry %s expired" % entry.dynid
entry.lastEventTime = 0
entry.address = self.nullAddress
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
time.sleep(10)

View File

@ -1,197 +1,18 @@
#!/usr/bin/python
import socket
import Queue
import threading
import md5
import time
import dns
import shelve
import DynHandler
# import DnsHandler
import Expirer
import DynReceiver
import Entry
import Event
class Entry(object):
def __init__(self, dynid, sharedSecret, name):
self.dynid = dynid
self.sharedSecret = sharedSecret
self.name = name
self.lastEventTime = 0
self.address = ''
class IllegalEventException(Exception):
def __init__(self, msg):
self.msg = msg
class Event(object):
def __init__(self, address, data, receiveTime):
self.address = address
self.data = data
self.receiveTime = receiveTime
def prepare(self):
self.port = self.address[1]
self.address = self.address[0]
(self.dynid, self.msgTime, self.checksum) = self.data.split(' ')
self.msgTime = int(self.msgTime)
self.prepared = True
def process(self):
if not self.prepared:
self.prepare()
if not ENTRIES.has_key(self.dynid):
raise IllegalEventException("unknown dynid in event %s" % str(self))
entry = ENTRIES[self.dynid]
if self.msgTime + MSG_TIME_CORRIDOR < self.receiveTime:
raise IllegalEventException("event too old %s" % str(self))
if self.msgTime - MSG_TIME_CORRIDOR > self.receiveTime:
raise IllegalEventException("event too young %s" % str(self))
if entry.lastEventTime >= self.msgTime:
raise IllegalEventException("timing sequence failure in event, possibly replay %s" % str(self))
di = "%s %s %d" % (self.dynid, entry.sharedSecret, self.msgTime)
d = md5.new(di).hexdigest()
print "%s, received: %s, calculated: %s" % (di, self.checksum, d)
if d != self.checksum:
raise IllegalEventException("wrong checksum for event %s" % str(self))
entry.lastEventTime = self.msgTime
if entry.address == self.address:
print "Same address, nothing to do."
else:
entry.address = self.address
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
def __str__(self):
if not self.prepared:
self.prepare()
return "%s from %s:%d" % (self.data, self.address, self.port)
class Handler(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.q = queue
self.setDaemon(True)
def run(self):
while True:
event = self.q.get()
try:
event.prepare()
print "Processing event %s" % str(event)
event.process()
except IllegalEventException, e:
print "Some failure: %s" % e.msg
class Expirer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
def run(self):
while True:
print "Expiring ..."
currentTime = int(time.time())
for entry in ENTRIES.values():
if entry.lastEventTime != 0 and entry.lastEventTime + EVENT_LIFE_TIME < currentTime:
print "Entry %s expired" % entry.dynid
entry.lastEventTime = 0
entry.address = NULL_ADDRESS
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
time.sleep(10)
class DnsManipulator(threading.Thread):
def __init__(self, msgQueue, notifyQueue, tsigKey, nsAddress, zone, name, ttl, exitServerIp):
threading.Thread.__init__(self)
self.msgQueue = msgQueue
self.notifyQueue = notifyQueue
self.exitServerIp = exitServerIp
self.statusMap = {}
self.exitServerFlag = True
self.nsAddress = nsAddress
self.zone = zone
self.name = name
self.ttl = ttl
self.keyring = dns.tsigkeyring.from_text(tsigKey)
def run(self):
while(True):
msg = self.msgQueue.get()
if msg.successFlag:
# success
Logger.dbg("DnsManipulator: retrieved positive message")
if not self.statusMap.has_key(msg.ip) or not self.statusMap[msg.ip]:
self.statusMap[msg.ip] = True
self.insertARR(msg.ip)
self.notifyQueue.put(NotificationMessage("Server returned", "Server %s is back" % msg.ip))
else:
# failure
Logger.dbg("DnsManipulator: retrieved negative message")
if not self.statusMap.has_key(msg.ip) or self.statusMap[msg.ip]:
self.statusMap[msg.ip] = False
self.deleteARR(msg.ip)
self.notifyQueue.put(NotificationMessage("Server unavailable", "Server %s has gone" % msg.ip))
failureCnt = 0
for v in self.statusMap.values():
if not v:
failureCnt += 1
if failureCnt > 0 and not self.exitServerFlag:
self.insertARR(self.exitServerIp)
self.exitServerFlag = True
if failureCnt == 0 and self.exitServerFlag:
self.deleteARR(self.exitServerIp)
self.exitServerFlag = False
self.notifyQueue.put(NotificationMessage("All clear", "All clear, exit-server removed from DNS"))
def insertARR(self, ip):
# send A-RR insertion for ip to DNS server
Logger.log("insert ARR %s" % ip)
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
u = dns.update.Update(self.zone, keyring=self.keyring)
u.add(self.name, self.ttl, rr)
r = dns.query.tcp(u, self.nsAddress)
if r.rcode() != 0:
# failure
Logger.log("failure when inserting A-RR for %s" % ip)
else:
# success
pass
def deleteARR(self, ip):
# send A-RR deletion for ip to DNS server
Logger.log("delete ARR %s" % ip)
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
u = dns.update.Update(self.zone, keyring=self.keyring)
u.delete(self.name, rr)
r = dns.query.tcp(u, self.nsAddress)
if r.rcode() != 0:
# failure
Logger.log("failure when deleting A-RR for %s" % ip)
else:
# success
pass
ENTRIES = {
'testhost': Entry('testhost,', 'test123', 'test.test.de'),
}
MSG_TIME_CORRIDOR = 5
EVENT_LIFE_TIME = 10
NULL_ADDRESS = '0.0.0.0'
@ -200,36 +21,31 @@ NAME = 'serve'
TTL = 120
NAMESERVER = '127.0.0.1'
TSIGKEY = { "monitoring." : "HYHN8l/dg1+q6QLOURp2qw==" }
EXITSERVER = '87.230.59.51'
FAILURE_THRESHOLD = 3
PERIOD = 10
FROM = "root@hottis.de"
TO = "dns-alarm@adinside.de"
SMTPHOST = "submission.hottis.de"
SMTPPORT = 25
SMTPLOGIN = "admon-adinside"
SMTPPASSWD = "test123"
DEBUG = False
PIDFILE = "/tmp/monitor.pid"
PIDFILE = "/tmp/yadyn.pid"
entries = shelve.open('entries', flag='c', writeback=True)
if len(entries) == 0:
entries['testhost'] = Entry.Entry('testhost', 'test123', 'test.test.de')
q = Queue.Queue()
try:
Event.Event.setParams(entries, MSG_TIME_CORRIDOR)
handler = Handler(q)
handler.start()
q = Queue.Queue()
expirer = Expirer()
expirer.start()
dynHandler = DynHandler.DynHandler(q)
dynHandler.start()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", 9090))
dynReceiver = DynReceiver.DynReceiver(("", 8053), q)
dynReceiver.start()
while True:
data, address = s.recvfrom(256)
try:
q.put_nowait(Event(address, data, int(time.time())))
except Queue.Full:
print "Event %s from %s dropped" % (data, str(address))
expirer = Expirer.Expirer(entries, EVENT_LIFE_TIME, NULL_ADDRESS)
expirer.start()
while True:
entries.sync()
time.sleep(10)
finally:
print "Closing shelf"
entries.close()