Merge pull request #270 from pacm93/master

Make Python files compliant with PEP8, except for E501
This commit is contained in:
Nick O'Leary 2017-04-12 08:58:43 +01:00 committed by GitHub
commit f46d0011ee
3 changed files with 233 additions and 240 deletions

View File

@ -1,43 +1,39 @@
import unittest
import settings
import time
import mosquitto
import serial
def on_message(mosq, obj, msg):
obj.message_queue.append(msg)
obj.message_queue.append(msg)
class mqtt_basic(unittest.TestCase):
message_queue = []
@classmethod
def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self)
self.client.connect(settings.server_ip)
self.client.on_message = on_message
self.client.subscribe("outTopic",0)
@classmethod
def tearDownClass(self):
self.client.disconnect()
def test_one(self):
i=30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i>0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue[0]
self.assertEqual(msg.mid,0,"message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect")
self.assertEqual(msg.payload,"hello world")
self.assertEqual(msg.qos,0,"message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect")
message_queue = []
@classmethod
def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self)
self.client.connect(settings.server_ip)
self.client.on_message = on_message
self.client.subscribe("outTopic", 0)
@classmethod
def tearDownClass(self):
self.client.disconnect()
def test_one(self):
i = 30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue[0]
self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload, "hello world")
self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain, False, "message retain flag incorrect")

View File

@ -1,64 +1,59 @@
import unittest
import settings
import time
import mosquitto
import serial
def on_message(mosq, obj, msg):
obj.message_queue.append(msg)
obj.message_queue.append(msg)
class mqtt_publish_in_callback(unittest.TestCase):
message_queue = []
@classmethod
def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self)
self.client.connect(settings.server_ip)
self.client.on_message = on_message
self.client.subscribe("outTopic",0)
@classmethod
def tearDownClass(self):
self.client.disconnect()
def test_connect(self):
i=30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i>0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0)
self.assertEqual(msg.mid,0,"message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect")
self.assertEqual(msg.payload,"hello world")
self.assertEqual(msg.qos,0,"message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect")
message_queue = []
def test_publish(self):
self.assertEqual(len(self.message_queue), 0, "message queue not empty")
payload = "abcdefghij"
self.client.publish("inTopic",payload)
i=30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
@classmethod
def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self)
self.client.connect(settings.server_ip)
self.client.on_message = on_message
self.client.subscribe("outTopic", 0)
self.assertTrue(i>0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0)
self.assertEqual(msg.mid,0,"message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect")
self.assertEqual(msg.payload,payload)
self.assertEqual(msg.qos,0,"message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect")
@classmethod
def tearDownClass(self):
self.client.disconnect()
def test_connect(self):
i = 30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0)
self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload, "hello world")
self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain, False, "message retain flag incorrect")
def test_publish(self):
self.assertEqual(len(self.message_queue), 0, "message queue not empty")
payload = "abcdefghij"
self.client.publish("inTopic", payload)
i = 30
while len(self.message_queue) == 0 and i > 0:
self.client.loop()
time.sleep(0.5)
i -= 1
self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0)
self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload, payload)
self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain, False, "message retain flag incorrect")

View File

@ -10,170 +10,172 @@ import re
from testcases import settings
class Workspace(object):
def __init__(self):
self.root_dir = os.getcwd()
self.build_dir = os.path.join(self.root_dir,"tmpbin");
self.log_dir = os.path.join(self.root_dir,"logs");
self.tests_dir = os.path.join(self.root_dir,"testcases");
self.examples_dir = os.path.join(self.root_dir,"../PubSubClient/examples")
self.examples = []
self.tests = []
if not os.path.isdir("../PubSubClient"):
raise Exception("Cannot find PubSubClient library")
try:
import ino
except:
raise Exception("ino tool not installed")
def init(self):
if os.path.isdir(self.build_dir):
shutil.rmtree(self.build_dir)
os.mkdir(self.build_dir)
if os.path.isdir(self.log_dir):
shutil.rmtree(self.log_dir)
os.mkdir(self.log_dir)
os.chdir(self.build_dir)
call(["ino","init"])
shutil.copytree("../../PubSubClient","lib/PubSubClient")
filenames = []
for root, dirs, files in os.walk(self.examples_dir):
filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")]
filenames.sort()
for e in filenames:
self.examples.append(Sketch(self,e))
filenames = []
for root, dirs, files in os.walk(self.tests_dir):
filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")]
filenames.sort()
for e in filenames:
self.tests.append(Sketch(self,e))
def clean(self):
shutil.rmtree(self.build_dir)
def __init__(self):
self.root_dir = os.getcwd()
self.build_dir = os.path.join(self.root_dir, "tmpbin")
self.log_dir = os.path.join(self.root_dir, "logs")
self.tests_dir = os.path.join(self.root_dir, "testcases")
self.examples_dir = os.path.join(self.root_dir, "../PubSubClient/examples")
self.examples = []
self.tests = []
if not os.path.isdir("../PubSubClient"):
raise Exception("Cannot find PubSubClient library")
try:
return __import__('ino')
except ImportError:
raise Exception("ino tool not installed")
def init(self):
if os.path.isdir(self.build_dir):
shutil.rmtree(self.build_dir)
os.mkdir(self.build_dir)
if os.path.isdir(self.log_dir):
shutil.rmtree(self.log_dir)
os.mkdir(self.log_dir)
os.chdir(self.build_dir)
call(["ino", "init"])
shutil.copytree("../../PubSubClient", "lib/PubSubClient")
filenames = []
for root, dirs, files in os.walk(self.examples_dir):
filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")]
filenames.sort()
for e in filenames:
self.examples.append(Sketch(self, e))
filenames = []
for root, dirs, files in os.walk(self.tests_dir):
filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")]
filenames.sort()
for e in filenames:
self.tests.append(Sketch(self, e))
def clean(self):
shutil.rmtree(self.build_dir)
class Sketch(object):
def __init__(self,wksp,fn):
self.w = wksp
self.filename = fn
self.basename = os.path.basename(self.filename)
self.build_log = os.path.join(self.w.log_dir,"%s.log"%(os.path.basename(self.filename),))
self.build_err_log = os.path.join(self.w.log_dir,"%s.err.log"%(os.path.basename(self.filename),))
self.build_upload_log = os.path.join(self.w.log_dir,"%s.upload.log"%(os.path.basename(self.filename),))
def __init__(self, wksp, fn):
self.w = wksp
self.filename = fn
self.basename = os.path.basename(self.filename)
self.build_log = os.path.join(self.w.log_dir, "%s.log" % (os.path.basename(self.filename),))
self.build_err_log = os.path.join(self.w.log_dir, "%s.err.log" % (os.path.basename(self.filename),))
self.build_upload_log = os.path.join(self.w.log_dir, "%s.upload.log" % (os.path.basename(self.filename),))
def build(self):
sys.stdout.write(" Build: ")
sys.stdout.flush()
# Copy sketch over, replacing IP addresses as necessary
fin = open(self.filename,"r")
lines = fin.readlines()
fin.close()
fout = open(os.path.join(self.w.build_dir,"src","sketch.ino"),"w")
for l in lines:
if re.match(r"^byte server\[\] = {",l):
fout.write("byte server[] = { %s };\n"%(settings.server_ip.replace(".",", "),))
elif re.match(r"^byte ip\[\] = {",l):
fout.write("byte ip[] = { %s };\n"%(settings.arduino_ip.replace(".",", "),))
else:
fout.write(l)
fout.flush()
fout.close()
# Run build
fout = open(self.build_log, "w")
ferr = open(self.build_err_log, "w")
rc = call(["ino","build"],stdout=fout,stderr=ferr)
fout.close()
ferr.close()
if rc == 0:
sys.stdout.write("pass")
sys.stdout.write("\n")
return True
else:
sys.stdout.write("fail")
sys.stdout.write("\n")
with open(self.build_err_log) as f:
for line in f:
print " ",line,
return False
def upload(self):
sys.stdout.write(" Upload: ")
sys.stdout.flush()
fout = open(self.build_upload_log, "w")
rc = call(["ino","upload"],stdout=fout,stderr=fout)
fout.close()
if rc == 0:
sys.stdout.write("pass")
sys.stdout.write("\n")
return True
else:
sys.stdout.write("fail")
sys.stdout.write("\n")
with open(self.build_upload_log) as f:
for line in f:
print " ",line,
return False
def build(self):
sys.stdout.write(" Build: ")
sys.stdout.flush()
# Copy sketch over, replacing IP addresses as necessary
fin = open(self.filename, "r")
lines = fin.readlines()
fin.close()
fout = open(os.path.join(self.w.build_dir, "src", "sketch.ino"), "w")
for l in lines:
if re.match(r"^byte server\[\] = {", l):
fout.write("byte server[] = { %s };\n" % (settings.server_ip.replace(".", ", "),))
elif re.match(r"^byte ip\[\] = {", l):
fout.write("byte ip[] = { %s };\n" % (settings.arduino_ip.replace(".", ", "),))
else:
fout.write(l)
fout.flush()
fout.close()
# Run build
fout = open(self.build_log, "w")
ferr = open(self.build_err_log, "w")
rc = call(["ino", "build"], stdout=fout, stderr=ferr)
fout.close()
ferr.close()
if rc == 0:
sys.stdout.write("pass")
sys.stdout.write("\n")
return True
else:
sys.stdout.write("fail")
sys.stdout.write("\n")
with open(self.build_err_log) as f:
for line in f:
print(" " + line)
return False
def upload(self):
sys.stdout.write(" Upload: ")
sys.stdout.flush()
fout = open(self.build_upload_log, "w")
rc = call(["ino", "upload"], stdout=fout, stderr=fout)
fout.close()
if rc == 0:
sys.stdout.write("pass")
sys.stdout.write("\n")
return True
else:
sys.stdout.write("fail")
sys.stdout.write("\n")
with open(self.build_upload_log) as f:
for line in f:
print(" " + line)
return False
def test(self):
# import the matching test case, if it exists
try:
basename = os.path.basename(self.filename)[:-4]
i = importlib.import_module("testcases." + basename)
except:
sys.stdout.write(" Test: no tests found")
sys.stdout.write("\n")
return
c = getattr(i, basename)
testmethods = [m for m in dir(c) if m.startswith("test_")]
testmethods.sort()
tests = []
for m in testmethods:
tests.append(c(m))
result = unittest.TestResult()
c.setUpClass()
if self.upload():
sys.stdout.write(" Test: ")
sys.stdout.flush()
for t in tests:
t.run(result)
print(str(result.testsRun - len(result.failures) - len(result.errors)) + "/" + str(result.testsRun))
if not result.wasSuccessful():
if len(result.failures) > 0:
for f in result.failures:
print("-- " + str(f[0]))
print(f[1])
if len(result.errors) > 0:
print(" Errors:")
for f in result.errors:
print("-- " + str(f[0]))
print(f[1])
c.tearDownClass()
def test(self):
# import the matching test case, if it exists
try:
basename = os.path.basename(self.filename)[:-4]
i = importlib.import_module("testcases."+basename)
except:
sys.stdout.write(" Test: no tests found")
sys.stdout.write("\n")
return
c = getattr(i,basename)
testmethods = [m for m in dir(c) if m.startswith("test_")]
testmethods.sort()
tests = []
for m in testmethods:
tests.append(c(m))
result = unittest.TestResult()
c.setUpClass()
if self.upload():
sys.stdout.write(" Test: ")
sys.stdout.flush()
for t in tests:
t.run(result)
print "%d/%d"%(result.testsRun-len(result.failures)-len(result.errors),result.testsRun)
if not result.wasSuccessful():
if len(result.failures) > 0:
for f in result.failures:
print "-- %s"%(str(f[0]),)
print f[1]
if len(result.errors) > 0:
print " Errors:"
for f in result.errors:
print "-- %s"%(str(f[0]),)
print f[1]
c.tearDownClass()
if __name__ == '__main__':
run_tests = True
run_tests = True
w = Workspace()
w.init()
for e in w.examples:
print "--------------------------------------"
print "[%s]"%(e.basename,)
if e.build() and run_tests:
e.test()
for e in w.tests:
print "--------------------------------------"
print "[%s]"%(e.basename,)
if e.build() and run_tests:
e.test()
w.clean()
w = Workspace()
w.init()
for e in w.examples:
print("--------------------------------------")
print("[" + e.basename + "]")
if e.build() and run_tests:
e.test()
for e in w.tests:
print("--------------------------------------")
print("[" + e.basename + "]")
if e.build() and run_tests:
e.test()
w.clean()