Made Python files compliant with PEP8, except for E501

This commit is contained in:
Paulo Martinez 2017-04-12 09:39:36 +02:00
parent 4c8ce14dad
commit 10925659ef
3 changed files with 233 additions and 240 deletions

View File

@ -1,43 +1,39 @@
import unittest import unittest
import settings import settings
import time import time
import mosquitto import mosquitto
import serial
def on_message(mosq, obj, msg): def on_message(mosq, obj, msg):
obj.message_queue.append(msg) obj.message_queue.append(msg)
class mqtt_basic(unittest.TestCase): class mqtt_basic(unittest.TestCase):
message_queue = [] message_queue = []
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self) self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self)
self.client.connect(settings.server_ip) self.client.connect(settings.server_ip)
self.client.on_message = on_message self.client.on_message = on_message
self.client.subscribe("outTopic",0) self.client.subscribe("outTopic", 0)
@classmethod @classmethod
def tearDownClass(self): def tearDownClass(self):
self.client.disconnect() self.client.disconnect()
def test_one(self): def test_one(self):
i=30 i = 30
while len(self.message_queue) == 0 and i > 0: while len(self.message_queue) == 0 and i > 0:
self.client.loop() self.client.loop()
time.sleep(0.5) time.sleep(0.5)
i -= 1 i -= 1
self.assertTrue(i>0, "message receive timed-out") self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue[0] msg = self.message_queue[0]
self.assertEqual(msg.mid,0,"message id not 0") self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect") self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload,"hello world") self.assertEqual(msg.payload, "hello world")
self.assertEqual(msg.qos,0,"message qos not 0") self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect") self.assertEqual(msg.retain, False, "message retain flag incorrect")

View File

@ -1,64 +1,59 @@
import unittest import unittest
import settings import settings
import time import time
import mosquitto import mosquitto
import serial
def on_message(mosq, obj, msg): def on_message(mosq, obj, msg):
obj.message_queue.append(msg) obj.message_queue.append(msg)
class mqtt_publish_in_callback(unittest.TestCase): class mqtt_publish_in_callback(unittest.TestCase):
message_queue = [] message_queue = []
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self) self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True, obj=self)
self.client.connect(settings.server_ip) self.client.connect(settings.server_ip)
self.client.on_message = on_message self.client.on_message = on_message
self.client.subscribe("outTopic",0) self.client.subscribe("outTopic", 0)
@classmethod @classmethod
def tearDownClass(self): def tearDownClass(self):
self.client.disconnect() self.client.disconnect()
def test_connect(self): def test_connect(self):
i=30 i = 30
while len(self.message_queue) == 0 and i > 0: while len(self.message_queue) == 0 and i > 0:
self.client.loop() self.client.loop()
time.sleep(0.5) time.sleep(0.5)
i -= 1 i -= 1
self.assertTrue(i>0, "message receive timed-out") self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0) msg = self.message_queue.pop(0)
self.assertEqual(msg.mid,0,"message id not 0") self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect") self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload,"hello world") self.assertEqual(msg.payload, "hello world")
self.assertEqual(msg.qos,0,"message qos not 0") self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect") self.assertEqual(msg.retain, False, "message retain flag incorrect")
def test_publish(self): def test_publish(self):
self.assertEqual(len(self.message_queue), 0, "message queue not empty") self.assertEqual(len(self.message_queue), 0, "message queue not empty")
payload = "abcdefghij" payload = "abcdefghij"
self.client.publish("inTopic",payload) self.client.publish("inTopic", payload)
i=30 i = 30
while len(self.message_queue) == 0 and i > 0: while len(self.message_queue) == 0 and i > 0:
self.client.loop() self.client.loop()
time.sleep(0.5) time.sleep(0.5)
i -= 1 i -= 1
self.assertTrue(i>0, "message receive timed-out") self.assertTrue(i > 0, "message receive timed-out")
self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received")
msg = self.message_queue.pop(0) msg = self.message_queue.pop(0)
self.assertEqual(msg.mid,0,"message id not 0") self.assertEqual(msg.mid, 0, "message id not 0")
self.assertEqual(msg.topic,"outTopic","message topic incorrect") self.assertEqual(msg.topic, "outTopic", "message topic incorrect")
self.assertEqual(msg.payload,payload) self.assertEqual(msg.payload, payload)
self.assertEqual(msg.qos,0,"message qos not 0") self.assertEqual(msg.qos, 0, "message qos not 0")
self.assertEqual(msg.retain,False,"message retain flag incorrect") self.assertEqual(msg.retain, False, "message retain flag incorrect")

View File

@ -10,21 +10,22 @@ import re
from testcases import settings from testcases import settings
class Workspace(object): class Workspace(object):
def __init__(self): def __init__(self):
self.root_dir = os.getcwd() self.root_dir = os.getcwd()
self.build_dir = os.path.join(self.root_dir,"tmpbin"); self.build_dir = os.path.join(self.root_dir, "tmpbin")
self.log_dir = os.path.join(self.root_dir,"logs"); self.log_dir = os.path.join(self.root_dir, "logs")
self.tests_dir = os.path.join(self.root_dir,"testcases"); self.tests_dir = os.path.join(self.root_dir, "testcases")
self.examples_dir = os.path.join(self.root_dir,"../PubSubClient/examples") self.examples_dir = os.path.join(self.root_dir, "../PubSubClient/examples")
self.examples = [] self.examples = []
self.tests = [] self.tests = []
if not os.path.isdir("../PubSubClient"): if not os.path.isdir("../PubSubClient"):
raise Exception("Cannot find PubSubClient library") raise Exception("Cannot find PubSubClient library")
try: try:
import ino return __import__('ino')
except: except ImportError:
raise Exception("ino tool not installed") raise Exception("ino tool not installed")
def init(self): def init(self):
@ -36,50 +37,51 @@ class Workspace(object):
os.mkdir(self.log_dir) os.mkdir(self.log_dir)
os.chdir(self.build_dir) os.chdir(self.build_dir)
call(["ino","init"]) call(["ino", "init"])
shutil.copytree("../../PubSubClient","lib/PubSubClient") shutil.copytree("../../PubSubClient", "lib/PubSubClient")
filenames = [] filenames = []
for root, dirs, files in os.walk(self.examples_dir): for root, dirs, files in os.walk(self.examples_dir):
filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")] filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")]
filenames.sort() filenames.sort()
for e in filenames: for e in filenames:
self.examples.append(Sketch(self,e)) self.examples.append(Sketch(self, e))
filenames = [] filenames = []
for root, dirs, files in os.walk(self.tests_dir): for root, dirs, files in os.walk(self.tests_dir):
filenames += [os.path.join(root,f) for f in files if f.endswith(".ino")] filenames += [os.path.join(root, f) for f in files if f.endswith(".ino")]
filenames.sort() filenames.sort()
for e in filenames: for e in filenames:
self.tests.append(Sketch(self,e)) self.tests.append(Sketch(self, e))
def clean(self): def clean(self):
shutil.rmtree(self.build_dir) shutil.rmtree(self.build_dir)
class Sketch(object): class Sketch(object):
def __init__(self,wksp,fn): def __init__(self, wksp, fn):
self.w = wksp self.w = wksp
self.filename = fn self.filename = fn
self.basename = os.path.basename(self.filename) self.basename = os.path.basename(self.filename)
self.build_log = os.path.join(self.w.log_dir,"%s.log"%(os.path.basename(self.filename),)) self.build_log = os.path.join(self.w.log_dir, "%s.log" % (os.path.basename(self.filename),))
self.build_err_log = os.path.join(self.w.log_dir,"%s.err.log"%(os.path.basename(self.filename),)) self.build_err_log = os.path.join(self.w.log_dir, "%s.err.log" % (os.path.basename(self.filename),))
self.build_upload_log = os.path.join(self.w.log_dir,"%s.upload.log"%(os.path.basename(self.filename),)) self.build_upload_log = os.path.join(self.w.log_dir, "%s.upload.log" % (os.path.basename(self.filename),))
def build(self): def build(self):
sys.stdout.write(" Build: ") sys.stdout.write(" Build: ")
sys.stdout.flush() sys.stdout.flush()
# Copy sketch over, replacing IP addresses as necessary # Copy sketch over, replacing IP addresses as necessary
fin = open(self.filename,"r") fin = open(self.filename, "r")
lines = fin.readlines() lines = fin.readlines()
fin.close() fin.close()
fout = open(os.path.join(self.w.build_dir,"src","sketch.ino"),"w") fout = open(os.path.join(self.w.build_dir, "src", "sketch.ino"), "w")
for l in lines: for l in lines:
if re.match(r"^byte server\[\] = {",l): if re.match(r"^byte server\[\] = {", l):
fout.write("byte server[] = { %s };\n"%(settings.server_ip.replace(".",", "),)) fout.write("byte server[] = { %s };\n" % (settings.server_ip.replace(".", ", "),))
elif re.match(r"^byte ip\[\] = {",l): elif re.match(r"^byte ip\[\] = {", l):
fout.write("byte ip[] = { %s };\n"%(settings.arduino_ip.replace(".",", "),)) fout.write("byte ip[] = { %s };\n" % (settings.arduino_ip.replace(".", ", "),))
else: else:
fout.write(l) fout.write(l)
fout.flush() fout.flush()
@ -88,7 +90,7 @@ class Sketch(object):
# Run build # Run build
fout = open(self.build_log, "w") fout = open(self.build_log, "w")
ferr = open(self.build_err_log, "w") ferr = open(self.build_err_log, "w")
rc = call(["ino","build"],stdout=fout,stderr=ferr) rc = call(["ino", "build"], stdout=fout, stderr=ferr)
fout.close() fout.close()
ferr.close() ferr.close()
if rc == 0: if rc == 0:
@ -100,14 +102,14 @@ class Sketch(object):
sys.stdout.write("\n") sys.stdout.write("\n")
with open(self.build_err_log) as f: with open(self.build_err_log) as f:
for line in f: for line in f:
print " ",line, print(" " + line)
return False return False
def upload(self): def upload(self):
sys.stdout.write(" Upload: ") sys.stdout.write(" Upload: ")
sys.stdout.flush() sys.stdout.flush()
fout = open(self.build_upload_log, "w") fout = open(self.build_upload_log, "w")
rc = call(["ino","upload"],stdout=fout,stderr=fout) rc = call(["ino", "upload"], stdout=fout, stderr=fout)
fout.close() fout.close()
if rc == 0: if rc == 0:
sys.stdout.write("pass") sys.stdout.write("pass")
@ -118,20 +120,19 @@ class Sketch(object):
sys.stdout.write("\n") sys.stdout.write("\n")
with open(self.build_upload_log) as f: with open(self.build_upload_log) as f:
for line in f: for line in f:
print " ",line, print(" " + line)
return False return False
def test(self): def test(self):
# import the matching test case, if it exists # import the matching test case, if it exists
try: try:
basename = os.path.basename(self.filename)[:-4] basename = os.path.basename(self.filename)[:-4]
i = importlib.import_module("testcases."+basename) i = importlib.import_module("testcases." + basename)
except: except:
sys.stdout.write(" Test: no tests found") sys.stdout.write(" Test: no tests found")
sys.stdout.write("\n") sys.stdout.write("\n")
return return
c = getattr(i,basename) c = getattr(i, basename)
testmethods = [m for m in dir(c) if m.startswith("test_")] testmethods = [m for m in dir(c) if m.startswith("test_")]
testmethods.sort() testmethods.sort()
@ -146,19 +147,20 @@ class Sketch(object):
sys.stdout.flush() sys.stdout.flush()
for t in tests: for t in tests:
t.run(result) t.run(result)
print "%d/%d"%(result.testsRun-len(result.failures)-len(result.errors),result.testsRun) print(str(result.testsRun - len(result.failures) - len(result.errors)) + "/" + str(result.testsRun))
if not result.wasSuccessful(): if not result.wasSuccessful():
if len(result.failures) > 0: if len(result.failures) > 0:
for f in result.failures: for f in result.failures:
print "-- %s"%(str(f[0]),) print("-- " + str(f[0]))
print f[1] print(f[1])
if len(result.errors) > 0: if len(result.errors) > 0:
print " Errors:" print(" Errors:")
for f in result.errors: for f in result.errors:
print "-- %s"%(str(f[0]),) print("-- " + str(f[0]))
print f[1] print(f[1])
c.tearDownClass() c.tearDownClass()
if __name__ == '__main__': if __name__ == '__main__':
run_tests = True run_tests = True
@ -166,13 +168,13 @@ if __name__ == '__main__':
w.init() w.init()
for e in w.examples: for e in w.examples:
print "--------------------------------------" print("--------------------------------------")
print "[%s]"%(e.basename,) print("[" + e.basename + "]")
if e.build() and run_tests: if e.build() and run_tests:
e.test() e.test()
for e in w.tests: for e in w.tests:
print "--------------------------------------" print("--------------------------------------")
print "[%s]"%(e.basename,) print("[" + e.basename + "]")
if e.build() and run_tests: if e.build() and run_tests:
e.test() e.test()