From 10925659ef07166d02fd311eff47bea54d2f0b3c Mon Sep 17 00:00:00 2001 From: Paulo Martinez Date: Wed, 12 Apr 2017 09:39:36 +0200 Subject: [PATCH] Made Python files compliant with PEP8, except for E501 --- tests/testcases/mqtt_basic.py | 60 ++-- tests/testcases/mqtt_publish_in_callback.py | 95 +++--- tests/testsuite.py | 318 ++++++++++---------- 3 files changed, 233 insertions(+), 240 deletions(-) diff --git a/tests/testcases/mqtt_basic.py b/tests/testcases/mqtt_basic.py index 1b0cc65..f23ef71 100644 --- a/tests/testcases/mqtt_basic.py +++ b/tests/testcases/mqtt_basic.py @@ -1,43 +1,39 @@ import unittest import settings - import time import mosquitto -import serial def on_message(mosq, obj, msg): - obj.message_queue.append(msg) + obj.message_queue.append(msg) + class mqtt_basic(unittest.TestCase): - - message_queue = [] - - @classmethod - def setUpClass(self): - self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self) - self.client.connect(settings.server_ip) - self.client.on_message = on_message - self.client.subscribe("outTopic",0) - @classmethod - def tearDownClass(self): - self.client.disconnect() - - def test_one(self): - i=30 - while len(self.message_queue) == 0 and i > 0: - self.client.loop() - time.sleep(0.5) - i -= 1 - self.assertTrue(i>0, "message receive timed-out") - self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") - msg = self.message_queue[0] - self.assertEqual(msg.mid,0,"message id not 0") - self.assertEqual(msg.topic,"outTopic","message topic incorrect") - self.assertEqual(msg.payload,"hello world") - self.assertEqual(msg.qos,0,"message qos not 0") - self.assertEqual(msg.retain,False,"message retain flag incorrect") - - + message_queue = [] + @classmethod + def setUpClass(self): + self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self) + self.client.connect(settings.server_ip) + self.client.on_message = on_message + self.client.subscribe("outTopic", 0) + + @classmethod + def tearDownClass(self): + self.client.disconnect() + + def test_one(self): + i = 30 + while len(self.message_queue) == 0 and i > 0: + self.client.loop() + time.sleep(0.5) + i -= 1 + self.assertTrue(i > 0, "message receive timed-out") + self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") + msg = self.message_queue[0] + self.assertEqual(msg.mid, 0, "message id not 0") + self.assertEqual(msg.topic, "outTopic", "message topic incorrect") + self.assertEqual(msg.payload, "hello world") + self.assertEqual(msg.qos, 0, "message qos not 0") + self.assertEqual(msg.retain, False, "message retain flag incorrect") diff --git a/tests/testcases/mqtt_publish_in_callback.py b/tests/testcases/mqtt_publish_in_callback.py index 7989f7f..45b0a85 100644 --- a/tests/testcases/mqtt_publish_in_callback.py +++ b/tests/testcases/mqtt_publish_in_callback.py @@ -1,64 +1,59 @@ import unittest import settings - import time import mosquitto -import serial def on_message(mosq, obj, msg): - obj.message_queue.append(msg) + obj.message_queue.append(msg) + class mqtt_publish_in_callback(unittest.TestCase): - - message_queue = [] - - @classmethod - def setUpClass(self): - self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self) - self.client.connect(settings.server_ip) - self.client.on_message = on_message - self.client.subscribe("outTopic",0) - @classmethod - def tearDownClass(self): - self.client.disconnect() - - def test_connect(self): - i=30 - while len(self.message_queue) == 0 and i > 0: - self.client.loop() - time.sleep(0.5) - i -= 1 - self.assertTrue(i>0, "message receive timed-out") - self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") - msg = self.message_queue.pop(0) - self.assertEqual(msg.mid,0,"message id not 0") - self.assertEqual(msg.topic,"outTopic","message topic incorrect") - self.assertEqual(msg.payload,"hello world") - self.assertEqual(msg.qos,0,"message qos not 0") - self.assertEqual(msg.retain,False,"message retain flag incorrect") - + message_queue = [] - def test_publish(self): - self.assertEqual(len(self.message_queue), 0, "message queue not empty") - payload = "abcdefghij" - self.client.publish("inTopic",payload) - - i=30 - while len(self.message_queue) == 0 and i > 0: - self.client.loop() - time.sleep(0.5) - i -= 1 + @classmethod + def setUpClass(self): + self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self) + self.client.connect(settings.server_ip) + self.client.on_message = on_message + self.client.subscribe("outTopic", 0) - self.assertTrue(i>0, "message receive timed-out") - self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") - msg = self.message_queue.pop(0) - self.assertEqual(msg.mid,0,"message id not 0") - self.assertEqual(msg.topic,"outTopic","message topic incorrect") - self.assertEqual(msg.payload,payload) - self.assertEqual(msg.qos,0,"message qos not 0") - self.assertEqual(msg.retain,False,"message retain flag incorrect") - + @classmethod + def tearDownClass(self): + self.client.disconnect() + def test_connect(self): + i = 30 + while len(self.message_queue) == 0 and i > 0: + self.client.loop() + time.sleep(0.5) + i -= 1 + self.assertTrue(i > 0, "message receive timed-out") + self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") + msg = self.message_queue.pop(0) + self.assertEqual(msg.mid, 0, "message id not 0") + self.assertEqual(msg.topic, "outTopic", "message topic incorrect") + self.assertEqual(msg.payload, "hello world") + self.assertEqual(msg.qos, 0, "message qos not 0") + self.assertEqual(msg.retain, False, "message retain flag incorrect") + def test_publish(self): + self.assertEqual(len(self.message_queue), 0, "message queue not empty") + payload = "abcdefghij" + self.client.publish("inTopic", payload) + + i = 30 + while len(self.message_queue) == 0 and i > 0: + self.client.loop() + time.sleep(0.5) + i -= 1 + + self.assertTrue(i > 0, "message receive timed-out") + self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") + msg = self.message_queue.pop(0) + self.assertEqual(msg.mid, 0, "message id not 0") + self.assertEqual(msg.topic, "outTopic", "message topic incorrect") + self.assertEqual(msg.payload, payload) + self.assertEqual(msg.qos, 0, "message qos not 0") + self.assertEqual(msg.retain, False, "message retain flag incorrect") diff --git a/tests/testsuite.py b/tests/testsuite.py index 0a8e70d..788fc5d 100644 --- a/tests/testsuite.py +++ b/tests/testsuite.py @@ -10,170 +10,172 @@ import re from testcases import settings + class Workspace(object): - - def __init__(self): - self.root_dir = os.getcwd() - self.build_dir = os.path.join(self.root_dir,"tmpbin"); - self.log_dir = os.path.join(self.root_dir,"logs"); - self.tests_dir = os.path.join(self.root_dir,"testcases"); - self.examples_dir = os.path.join(self.root_dir,"../PubSubClient/examples") - self.examples = [] - self.tests = [] - if not os.path.isdir("../PubSubClient"): - raise Exception("Cannot find PubSubClient library") - try: - import ino - except: - raise Exception("ino tool not installed") - def init(self): - if os.path.isdir(self.build_dir): - shutil.rmtree(self.build_dir) - os.mkdir(self.build_dir) - if os.path.isdir(self.log_dir): - shutil.rmtree(self.log_dir) - os.mkdir(self.log_dir) - - os.chdir(self.build_dir) - call(["ino","init"]) - - shutil.copytree("../../PubSubClient","lib/PubSubClient") - - filenames = [] - for root, dirs, files in os.walk(self.examples_dir): - filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")] - filenames.sort() - for e in filenames: - self.examples.append(Sketch(self,e)) - - filenames = [] - for root, dirs, files in os.walk(self.tests_dir): - filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")] - filenames.sort() - for e in filenames: - self.tests.append(Sketch(self,e)) - - def clean(self): - shutil.rmtree(self.build_dir) - + def __init__(self): + self.root_dir = os.getcwd() + self.build_dir = os.path.join(self.root_dir, "tmpbin") + self.log_dir = os.path.join(self.root_dir, "logs") + self.tests_dir = os.path.join(self.root_dir, "testcases") + self.examples_dir = os.path.join(self.root_dir, "../PubSubClient/examples") + self.examples = [] + self.tests = [] + if not os.path.isdir("../PubSubClient"): + raise Exception("Cannot find PubSubClient library") + try: + return __import__('ino') + except ImportError: + raise Exception("ino tool not installed") + + def init(self): + if os.path.isdir(self.build_dir): + shutil.rmtree(self.build_dir) + os.mkdir(self.build_dir) + if os.path.isdir(self.log_dir): + shutil.rmtree(self.log_dir) + os.mkdir(self.log_dir) + + os.chdir(self.build_dir) + call(["ino", "init"]) + + shutil.copytree("../../PubSubClient", "lib/PubSubClient") + + filenames = [] + for root, dirs, files in os.walk(self.examples_dir): + filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")] + filenames.sort() + for e in filenames: + self.examples.append(Sketch(self, e)) + + filenames = [] + for root, dirs, files in os.walk(self.tests_dir): + filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")] + filenames.sort() + for e in filenames: + self.tests.append(Sketch(self, e)) + + def clean(self): + shutil.rmtree(self.build_dir) + + class Sketch(object): - def __init__(self,wksp,fn): - self.w = wksp - self.filename = fn - self.basename = os.path.basename(self.filename) - self.build_log = os.path.join(self.w.log_dir,"%s.log"%(os.path.basename(self.filename),)) - self.build_err_log = os.path.join(self.w.log_dir,"%s.err.log"%(os.path.basename(self.filename),)) - self.build_upload_log = os.path.join(self.w.log_dir,"%s.upload.log"%(os.path.basename(self.filename),)) + def __init__(self, wksp, fn): + self.w = wksp + self.filename = fn + self.basename = os.path.basename(self.filename) + self.build_log = os.path.join(self.w.log_dir, "%s.log" % (os.path.basename(self.filename),)) + self.build_err_log = os.path.join(self.w.log_dir, "%s.err.log" % (os.path.basename(self.filename),)) + self.build_upload_log = os.path.join(self.w.log_dir, "%s.upload.log" % (os.path.basename(self.filename),)) - def build(self): - sys.stdout.write(" Build: ") - sys.stdout.flush() - - # Copy sketch over, replacing IP addresses as necessary - fin = open(self.filename,"r") - lines = fin.readlines() - fin.close() - fout = open(os.path.join(self.w.build_dir,"src","sketch.ino"),"w") - for l in lines: - if re.match(r"^byte server\[\] = {",l): - fout.write("byte server[] = { %s };\n"%(settings.server_ip.replace(".",", "),)) - elif re.match(r"^byte ip\[\] = {",l): - fout.write("byte ip[] = { %s };\n"%(settings.arduino_ip.replace(".",", "),)) - else: - fout.write(l) - fout.flush() - fout.close() - - # Run build - fout = open(self.build_log, "w") - ferr = open(self.build_err_log, "w") - rc = call(["ino","build"],stdout=fout,stderr=ferr) - fout.close() - ferr.close() - if rc == 0: - sys.stdout.write("pass") - sys.stdout.write("\n") - return True - else: - sys.stdout.write("fail") - sys.stdout.write("\n") - with open(self.build_err_log) as f: - for line in f: - print " ",line, - return False - - def upload(self): - sys.stdout.write(" Upload: ") - sys.stdout.flush() - fout = open(self.build_upload_log, "w") - rc = call(["ino","upload"],stdout=fout,stderr=fout) - fout.close() - if rc == 0: - sys.stdout.write("pass") - sys.stdout.write("\n") - return True - else: - sys.stdout.write("fail") - sys.stdout.write("\n") - with open(self.build_upload_log) as f: - for line in f: - print " ",line, - return False + def build(self): + sys.stdout.write(" Build: ") + sys.stdout.flush() + # Copy sketch over, replacing IP addresses as necessary + fin = open(self.filename, "r") + lines = fin.readlines() + fin.close() + fout = open(os.path.join(self.w.build_dir, "src", "sketch.ino"), "w") + for l in lines: + if re.match(r"^byte server\[\] = {", l): + fout.write("byte server[] = { %s };\n" % (settings.server_ip.replace(".", ", "),)) + elif re.match(r"^byte ip\[\] = {", l): + fout.write("byte ip[] = { %s };\n" % (settings.arduino_ip.replace(".", ", "),)) + else: + fout.write(l) + fout.flush() + fout.close() + + # Run build + fout = open(self.build_log, "w") + ferr = open(self.build_err_log, "w") + rc = call(["ino", "build"], stdout=fout, stderr=ferr) + fout.close() + ferr.close() + if rc == 0: + sys.stdout.write("pass") + sys.stdout.write("\n") + return True + else: + sys.stdout.write("fail") + sys.stdout.write("\n") + with open(self.build_err_log) as f: + for line in f: + print(" " + line) + return False + + def upload(self): + sys.stdout.write(" Upload: ") + sys.stdout.flush() + fout = open(self.build_upload_log, "w") + rc = call(["ino", "upload"], stdout=fout, stderr=fout) + fout.close() + if rc == 0: + sys.stdout.write("pass") + sys.stdout.write("\n") + return True + else: + sys.stdout.write("fail") + sys.stdout.write("\n") + with open(self.build_upload_log) as f: + for line in f: + print(" " + line) + return False + + def test(self): + # import the matching test case, if it exists + try: + basename = os.path.basename(self.filename)[:-4] + i = importlib.import_module("testcases." + basename) + except: + sys.stdout.write(" Test: no tests found") + sys.stdout.write("\n") + return + c = getattr(i, basename) + + testmethods = [m for m in dir(c) if m.startswith("test_")] + testmethods.sort() + tests = [] + for m in testmethods: + tests.append(c(m)) + + result = unittest.TestResult() + c.setUpClass() + if self.upload(): + sys.stdout.write(" Test: ") + sys.stdout.flush() + for t in tests: + t.run(result) + print(str(result.testsRun - len(result.failures) - len(result.errors)) + "/" + str(result.testsRun)) + if not result.wasSuccessful(): + if len(result.failures) > 0: + for f in result.failures: + print("-- " + str(f[0])) + print(f[1]) + if len(result.errors) > 0: + print(" Errors:") + for f in result.errors: + print("-- " + str(f[0])) + print(f[1]) + c.tearDownClass() - def test(self): - # import the matching test case, if it exists - try: - basename = os.path.basename(self.filename)[:-4] - i = importlib.import_module("testcases."+basename) - except: - sys.stdout.write(" Test: no tests found") - sys.stdout.write("\n") - return - c = getattr(i,basename) - - testmethods = [m for m in dir(c) if m.startswith("test_")] - testmethods.sort() - tests = [] - for m in testmethods: - tests.append(c(m)) - - result = unittest.TestResult() - c.setUpClass() - if self.upload(): - sys.stdout.write(" Test: ") - sys.stdout.flush() - for t in tests: - t.run(result) - print "%d/%d"%(result.testsRun-len(result.failures)-len(result.errors),result.testsRun) - if not result.wasSuccessful(): - if len(result.failures) > 0: - for f in result.failures: - print "-- %s"%(str(f[0]),) - print f[1] - if len(result.errors) > 0: - print " Errors:" - for f in result.errors: - print "-- %s"%(str(f[0]),) - print f[1] - c.tearDownClass() if __name__ == '__main__': - run_tests = True + run_tests = True - w = Workspace() - w.init() - - for e in w.examples: - print "--------------------------------------" - print "[%s]"%(e.basename,) - if e.build() and run_tests: - e.test() - for e in w.tests: - print "--------------------------------------" - print "[%s]"%(e.basename,) - if e.build() and run_tests: - e.test() - - w.clean() + w = Workspace() + w.init() + + for e in w.examples: + print("--------------------------------------") + print("[" + e.basename + "]") + if e.build() and run_tests: + e.test() + for e in w.tests: + print("--------------------------------------") + print("[" + e.basename + "]") + if e.build() and run_tests: + e.test() + + w.clean()