changes
This commit is contained in:
parent
b79efd1c98
commit
8bbbb07263
BIN
client/c/md5.o
Normal file
BIN
client/c/md5.o
Normal file
Binary file not shown.
BIN
client/c/yadync
Executable file
BIN
client/c/yadync
Executable file
Binary file not shown.
BIN
client/c/yadync.o
Normal file
BIN
client/c/yadync.o
Normal file
Binary file not shown.
@ -44,15 +44,15 @@ class MyCmd(cmd.Cmd):
|
||||
def do_add(self, l):
|
||||
try:
|
||||
parts = l.split(' ')
|
||||
if len(parts) != 3:
|
||||
if len(parts) != 4:
|
||||
raise LocalMyCmdException("illegal number of arguments")
|
||||
(dynid, name, sharedSecret) = parts
|
||||
(dynid, name, zone, sharedSecret) = parts
|
||||
if MyCmd.entries.has_key(dynid):
|
||||
raise LocalMyCmdException("duplicate dynid")
|
||||
for entry in MyCmd.entries.values():
|
||||
if entry.name == name:
|
||||
raise LocalMyCmdException("duplicate name")
|
||||
newEntry = Entry.Entry(dynid, sharedSecret, name)
|
||||
if entry.name == name and entry.zone == zone:
|
||||
raise LocalMyCmdException("duplicate full name")
|
||||
newEntry = Entry.Entry(dynid, sharedSecret, name, zone)
|
||||
MyCmd.entries[dynid] = newEntry
|
||||
self.stdout.write("Done\n")
|
||||
except LocalMyCmdException, e:
|
||||
|
@ -1,80 +1,73 @@
|
||||
import dns
|
||||
import dns.query
|
||||
import dns.update
|
||||
import dns.rdataclass
|
||||
import dns.rdatatype
|
||||
import dns.rdtypes.IN.A
|
||||
import dns.tsigkeyring
|
||||
import threading
|
||||
from logger import Logger
|
||||
|
||||
|
||||
class DnsHandler(threading.Thread):
|
||||
def __init__(self, msgQueue, tsigKey, nsAddress, ttl, nullIp):
|
||||
def __init__(self, msgQueue, tsigKey, nsAddress, ttl, statusMap):
|
||||
threading.Thread.__init__(self)
|
||||
self.msgQueue = msgQueue
|
||||
self.nullIp = nullIp
|
||||
self.nsAddress = nsAddress
|
||||
self.ttl = ttl
|
||||
self.keyring = dns.tsigkeyring.from_text(tsigKey)
|
||||
self.statusMap = statusMap
|
||||
self.setDaemon(True)
|
||||
|
||||
|
||||
def run(self):
|
||||
while(True):
|
||||
msg = self.msgQueue.get()
|
||||
|
||||
name = msg.name
|
||||
zone = msg.zone
|
||||
address = msg.address
|
||||
|
||||
if msg.successFlag:
|
||||
# success
|
||||
Logger.dbg("DnsManipulator: retrieved positive message")
|
||||
if not self.statusMap.has_key(msg.ip) or not self.statusMap[msg.ip]:
|
||||
self.statusMap[msg.ip] = True
|
||||
self.insertARR(msg.ip)
|
||||
|
||||
self.notifyQueue.put(NotificationMessage("Server returned", "Server %s is back" % msg.ip))
|
||||
fullName = name + '.' + zone
|
||||
|
||||
if self.statusMap.has_key(fullName):
|
||||
if self.statusMap[fullName] != address:
|
||||
self.deleteARR(name, zone, self.statusMap[fullName])
|
||||
self.insertARR(name, zone, address)
|
||||
self.statusMap[fullName] = address
|
||||
else:
|
||||
# failure
|
||||
Logger.dbg("DnsManipulator: retrieved negative message")
|
||||
if not self.statusMap.has_key(msg.ip) or self.statusMap[msg.ip]:
|
||||
self.statusMap[msg.ip] = False
|
||||
self.deleteARR(msg.ip)
|
||||
|
||||
self.notifyQueue.put(NotificationMessage("Server unavailable", "Server %s has gone" % msg.ip))
|
||||
|
||||
failureCnt = 0
|
||||
for v in self.statusMap.values():
|
||||
if not v:
|
||||
failureCnt += 1
|
||||
|
||||
if failureCnt > 0 and not self.exitServerFlag:
|
||||
self.insertARR(self.exitServerIp)
|
||||
self.exitServerFlag = True
|
||||
self.insertARR(name, zone, address)
|
||||
self.statusMap[fullName] = address
|
||||
|
||||
if failureCnt == 0 and self.exitServerFlag:
|
||||
self.deleteARR(self.exitServerIp)
|
||||
self.exitServerFlag = False
|
||||
|
||||
self.notifyQueue.put(NotificationMessage("All clear", "All clear, exit-server removed from DNS"))
|
||||
|
||||
def insertARR(self, ip):
|
||||
def insertARR(self, name, zone, address):
|
||||
# send A-RR insertion for ip to DNS server
|
||||
Logger.log("insert ARR %s" % ip)
|
||||
|
||||
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
|
||||
u = dns.update.Update(self.zone, keyring=self.keyring)
|
||||
u.add(self.name, self.ttl, rr)
|
||||
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, address)
|
||||
u = dns.update.Update(zone, keyring=self.keyring)
|
||||
u.add(name, self.ttl, rr)
|
||||
r = dns.query.tcp(u, self.nsAddress)
|
||||
|
||||
if r.rcode() != 0:
|
||||
# failure
|
||||
Logger.log("failure when inserting A-RR for %s" % ip)
|
||||
Logger.log("failure when inserting A-RR %s -> %s into %s" % (name, address, zone))
|
||||
else:
|
||||
# success
|
||||
pass
|
||||
Logger.log("insert ARR %s -> %s into %s" % (name, address, zone))
|
||||
|
||||
def deleteARR(self, ip):
|
||||
|
||||
def deleteARR(self, name, zone, address):
|
||||
# send A-RR deletion for ip to DNS server
|
||||
Logger.log("delete ARR %s" % ip)
|
||||
|
||||
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, ip)
|
||||
u = dns.update.Update(self.zone, keyring=self.keyring)
|
||||
u.delete(self.name, rr)
|
||||
rr = dns.rdtypes.IN.A.A(dns.rdataclass.IN, dns.rdatatype.A, address)
|
||||
u = dns.update.Update(zone, keyring=self.keyring)
|
||||
u.delete(name, rr)
|
||||
r = dns.query.tcp(u, self.nsAddress)
|
||||
|
||||
if r.rcode() != 0:
|
||||
# failure
|
||||
Logger.log("failure when deleting A-RR for %s" % ip)
|
||||
Logger.log("failure when deleting A-RR %s -> %s from %s" % (name, address, zone))
|
||||
else:
|
||||
# success
|
||||
pass
|
||||
Logger.log("delete ARR %s -> %s from %s" % (name, address, zone))
|
||||
|
||||
|
||||
|
||||
|
4
server/DnsMessage.py
Normal file
4
server/DnsMessage.py
Normal file
@ -0,0 +1,4 @@
|
||||
class DnsMessage(object):
|
||||
def __init__(self, name, address):
|
||||
self.name = name
|
||||
self.address = address
|
@ -1,19 +1,20 @@
|
||||
import threading
|
||||
import Event
|
||||
import Queue
|
||||
from logger import Logger
|
||||
|
||||
class DynHandler(threading.Thread):
|
||||
def __init__(self, queue):
|
||||
def __init__(self, eventq):
|
||||
threading.Thread.__init__(self)
|
||||
self.q = queue
|
||||
self.eventq = eventq
|
||||
self.setDaemon(True)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
event = self.q.get()
|
||||
event = self.eventq.get()
|
||||
try:
|
||||
event.prepare()
|
||||
print "Processing event %s" % str(event)
|
||||
Logger.log("Processing event %s" % str(event))
|
||||
event.process()
|
||||
except Event.IllegalEventException, e:
|
||||
print "Some failure: %s" % e.msg
|
||||
|
||||
Logger.log("Some failure: %s" % e.msg)
|
||||
|
@ -3,7 +3,7 @@ import socket
|
||||
import Queue
|
||||
import time
|
||||
import Event
|
||||
|
||||
from logger import Logger
|
||||
|
||||
|
||||
class DynReceiver(threading.Thread):
|
||||
@ -22,5 +22,5 @@ class DynReceiver(threading.Thread):
|
||||
try:
|
||||
self.queue.put_nowait(Event.Event(address, data, int(time.time())))
|
||||
except Queue.Full:
|
||||
print "Event %s from %s dropped" % (data, str(address))
|
||||
Logger.log("Event %s from %s dropped" % (data, str(address)))
|
||||
|
||||
|
@ -1,12 +1,13 @@
|
||||
|
||||
class Entry(object):
|
||||
def __init__(self, dynid, sharedSecret, name):
|
||||
def __init__(self, dynid, sharedSecret, name, zone):
|
||||
self.dynid = dynid
|
||||
self.sharedSecret = sharedSecret
|
||||
self.name = name
|
||||
self.zone = zone
|
||||
self.lastEventTime = 0
|
||||
self.address = ''
|
||||
|
||||
def __str__(self):
|
||||
return "dynid=%s, name=%s, sharedSecret=%s, address=%s, lastEventTime=%d" % (self.dynid, self.name, self.sharedSecret, self.address, self.lastEventTime)
|
||||
return "dynid=%s, name=%s, zone=%s, sharedSecret=%s, address=%s, lastEventTime=%d" % (self.dynid, self.name, self.zone, self.sharedSecret, self.address, self.lastEventTime)
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
import md5
|
||||
from logger import Logger
|
||||
|
||||
class IllegalEventException(Exception):
|
||||
def __init__(self, msg):
|
||||
@ -7,9 +8,10 @@ class IllegalEventException(Exception):
|
||||
|
||||
class Event(object):
|
||||
@classmethod
|
||||
def setParams(cls, entries, msgTimeCorridor):
|
||||
def setParams(cls, entries, msgTimeCorridor, dnsq):
|
||||
cls.entries = entries
|
||||
cls.msgTimeCorridor = msgTimeCorridor
|
||||
cls.dnsq = dnsq
|
||||
|
||||
def __init__(self, address, data, receiveTime):
|
||||
self.address = address
|
||||
@ -48,18 +50,24 @@ class Event(object):
|
||||
|
||||
di = "%s %s %d" % (self.dynid, entry.sharedSecret, self.msgTime)
|
||||
d = md5.new(di).hexdigest()
|
||||
print "%s, received: %s, calculated: %s" % (di, self.checksum, d)
|
||||
Logger.log("%s, received: %s, calculated: %s" % (di, self.checksum, d))
|
||||
if d != self.checksum:
|
||||
raise IllegalEventException("wrong checksum for event %s" % str(self))
|
||||
|
||||
entry.lastEventTime = self.msgTime
|
||||
|
||||
if entry.address == self.address:
|
||||
print "Same address, nothing to do."
|
||||
entry.lastEventTime = self.msgTime
|
||||
Logger.log("Same address, nothing to do.")
|
||||
else:
|
||||
entry.lastEventTime = self.msgTime
|
||||
entry.address = self.address
|
||||
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
|
||||
|
||||
Logger.log("Set in DNS: %s -> %s" % (entry.name, entry.address))
|
||||
try:
|
||||
Event.dnsq.put_nowait(entry)
|
||||
except Queue.Full, e:
|
||||
Logger.log("Dns Queue overrun, drop event for %s" % str(entry))
|
||||
|
||||
|
||||
|
||||
def __str__(self):
|
||||
if not self.prepared:
|
||||
|
@ -1,24 +1,30 @@
|
||||
import threading
|
||||
import time
|
||||
from logger import Logger
|
||||
|
||||
class Expirer(threading.Thread):
|
||||
def __init__(self, entries, eventLifeTime, nullAddress):
|
||||
def __init__(self, entries, eventLifeTime, nullAddress, dnsq):
|
||||
threading.Thread.__init__(self)
|
||||
self.entries = entries
|
||||
self.eventLifeTime = eventLifeTime
|
||||
self.nullAddress = nullAddress
|
||||
self.dnsq = dnsq
|
||||
self.setDaemon(True)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
print "Expiring ..."
|
||||
Logger.log("Expiring ...")
|
||||
currentTime = int(time.time())
|
||||
for entry in self.entries.values():
|
||||
print "Checking ", str(entry)
|
||||
Logger.log("Checking %s" % str(entry))
|
||||
if entry.lastEventTime != 0 and entry.lastEventTime + self.eventLifeTime < currentTime:
|
||||
print "Entry %s expired" % entry.dynid
|
||||
Logger.log("Entry %s expired" % entry.dynid)
|
||||
entry.lastEventTime = 0
|
||||
entry.address = self.nullAddress
|
||||
print "Set in DNS: %s -> %s" % (entry.name, entry.address)
|
||||
Logger.log("Set in DNS: %s -> %s" % (entry.name, entry.address))
|
||||
try:
|
||||
self.dnsq.put_nowait(entry)
|
||||
except Queue.Full, e:
|
||||
Logger.log("Dns Queue overrun, drop expiry of %s" % str(entry))
|
||||
time.sleep(10)
|
||||
|
||||
|
26
server/logger.py
Normal file
26
server/logger.py
Normal file
@ -0,0 +1,26 @@
|
||||
import syslog
|
||||
|
||||
|
||||
class Logger(object):
|
||||
def log(data):
|
||||
syslog.syslog(syslog.LOG_INFO, data)
|
||||
if Logger.debugFlag:
|
||||
print data
|
||||
|
||||
def debug(data):
|
||||
syslog.syslog(syslog.LOG_DEBUG, data)
|
||||
if Logger.debugFlag:
|
||||
print data
|
||||
|
||||
def openlog():
|
||||
syslog.openlog('hsc', syslog.LOG_PID, syslog.LOG_LOCAL0)
|
||||
|
||||
def debugEnable():
|
||||
Logger.debugFlag = True
|
||||
|
||||
log = staticmethod(log)
|
||||
debug = staticmethod(debug)
|
||||
openlog = staticmethod(openlog)
|
||||
debugEnable = staticmethod(debugEnable)
|
||||
debugFlag = False
|
||||
|
34
server/yadyn
34
server/yadyn
@ -4,8 +4,9 @@ import Queue
|
||||
import time
|
||||
import shelve
|
||||
|
||||
from logger import Logger
|
||||
import DynHandler
|
||||
# import DnsHandler
|
||||
import DnsHandler
|
||||
import Expirer
|
||||
import DynReceiver
|
||||
import CmdReceiver
|
||||
@ -18,38 +19,47 @@ MSG_TIME_CORRIDOR = 5
|
||||
EVENT_LIFE_TIME = 10
|
||||
NULL_ADDRESS = '0.0.0.0'
|
||||
TTL = 120
|
||||
NAMESERVER = '127.0.0.1'
|
||||
TSIGKEY = { "monitoring." : "HYHN8l/dg1+q6QLOURp2qw==" }
|
||||
NAMESERVER = '88.198.170.2'
|
||||
TSIGKEY = { "monitoring." : "+xLH8GuZnEgBljuIEM/iDA==" }
|
||||
PIDFILE = "/tmp/yadyn.pid"
|
||||
ADMIN_PWD = 'test123'
|
||||
|
||||
|
||||
Logger.openlog()
|
||||
Logger.debugEnable()
|
||||
|
||||
entries = shelve.open('entries', flag='c', writeback=True)
|
||||
if len(entries) == 0:
|
||||
entries['testhost'] = Entry.Entry('testhost', 'test123', 'test.test.de')
|
||||
entries['testhost'] = Entry.Entry('testhost', 'test123', 'test', 'test.de')
|
||||
statusMap = shelve.open('statusMap', flag='c', writeback=True)
|
||||
|
||||
|
||||
try:
|
||||
Event.Event.setParams(entries, MSG_TIME_CORRIDOR)
|
||||
eventq = Queue.Queue()
|
||||
dnsq = Queue.Queue()
|
||||
|
||||
q = Queue.Queue()
|
||||
Event.Event.setParams(entries, MSG_TIME_CORRIDOR, dnsq)
|
||||
|
||||
dynHandler = DynHandler.DynHandler(q)
|
||||
dynHandler = DynHandler.DynHandler(eventq)
|
||||
dynHandler.start()
|
||||
|
||||
dynReceiver = DynReceiver.DynReceiver(("", 8053), q)
|
||||
dnsHandler = DnsHandler.DnsHandler(dnsq, TSIGKEY, NAMESERVER, TTL, statusMap)
|
||||
dnsHandler.start()
|
||||
|
||||
dynReceiver = DynReceiver.DynReceiver(("", 8053), eventq)
|
||||
dynReceiver.start()
|
||||
|
||||
expirer = Expirer.Expirer(entries, EVENT_LIFE_TIME, NULL_ADDRESS)
|
||||
expirer = Expirer.Expirer(entries, EVENT_LIFE_TIME, NULL_ADDRESS, dnsq)
|
||||
expirer.start()
|
||||
|
||||
#webReceiver = WebReceiver.WebReceiver(("", 8080), entries, ADMIN_PWD)
|
||||
#webReceiver.start()
|
||||
cmdReceiver = CmdReceiver.CmdReceiver(("", 8023), entries, ADMIN_PWD)
|
||||
cmdReceiver.start()
|
||||
|
||||
while True:
|
||||
entries.sync()
|
||||
statusMap.sync()
|
||||
time.sleep(10)
|
||||
finally:
|
||||
print "Closing shelf"
|
||||
Logger.log("Closing shelves")
|
||||
entries.close()
|
||||
statusMap.close()
|
||||
|
Loading…
x
Reference in New Issue
Block a user