#! usr/bin/env python print "\n*** " + __file__ + " ***\n" import sys sys.path.append('./test') sys.path.append('../test') import startup #sys.argv += ['-r', '/dev/ttyS0'] from fcall import * from struct import pack, unpack # LIB PROTO TEST def my_cb (msg): print "=>",my_cb.func_name if msg.is_param("result_2"): result2 = unpack('q',msg.bind_param("result_2")) print "result 2 =",result2[0] receive_fc1_rsp = True stationA = "A" stationB = "B" stationC = "C" fc1 = create_fcall("function_1") param1 = pack('iiB',1,2,True) fc1.add_param("param_1", param1) param2 = pack('iiiiiiiiii',0,1,2,3,4,5,6,7,8,9) fc1.add_param("param_2", param2) param3 = "hello" fc1.add_param("param_3", param3) param4 = pack('q',123) fc1.add_param("param_4", param4) fc1.set_cb(my_cb) fc1.set_sta(stationA) receive_fc1_rsp = False fc1.send_async() fc2 = create_fcall("function_2") fc2.add_param_bool("param_5", True) fc2.send(stationB) result1 = fc2.bind_param_string("result_1") print "result1 =",result1 probe1 = create_probe() param6 = 789 probe1.add_param_ulong("param_6", param6) probe1.send(stationB) probe2 = create_probe().add_param("param_7") probe2.send(stationB) param7 = None #probe2.bind_param_ulong("param_7") print "param 7 =",param7 probe2.send(stationC) if 456 == param7: fc3 = create_fcall("function_3") fc3.send_async(stationC) fc4 = create_fcall("function_4") fc4.send_async(stationC) fc5 = create_fcall("function_5") fc5.send_async(stationC) probe3 = create_probe() probe3.send(stationC) if probe3.is_param("param_8"): probe4 = create_probe() probe4.add_param("param_8") probe4.add_param_bool("param_9", False) probe4.send(stationC) param8 = probe4.bind_param_bool("param_8") print "param 8 =",param8 # DOC TEST import doctest, fcall doctest.testmod(fcall) # UNIT TEST import unittest def cb1(msg): print "=>",cb1.func_name if not msg.is_param("result_2"): raise Exception(cb1.func_name) result2 = unpack('q',msg.bind_param("result_2")) if result2[0] != 0x0000007B: raise Exception(cb1.func_name) def cb2(msg): print "=>",cb2.func_name if not msg.is_param("result_2"): raise Exception(cb2.func_name) result2 = unpack('q',msg.bind_param("result_2")) if result2[0] != 0x0000007B: raise Exception(cb2.func_name) class TestFcallFunctions(unittest.TestCase): def setUp(self): # Create a station self.station = 0 def tearDown(self): # Remove station pass def test_create_fcall(self): # Create a fcall message fcall1 = create_fcall("function_1") # Copy the message fcall2 = fcall1 def test_create_probe(self): # Create a probe message probe = create_probe() def test_add_param(self): # Add an empty parameter to the probe message, # and retrieve the added parameter # probe = create_probe() probe.add_param("empty_param") self.assert_(probe.is_param("empty_param")) # Add a string parameter to the fcall message, # and retrieve the added parameter # fcall = create_fcall("fcall").add_param("string_param", 'ABCD') self.assertEqual(fcall.bind_param_string("string_param"),'ABCD') def test_add_param_bool(self): # Add a boolean parameter to the probe message, # and retrieve the added parameter # probe = create_probe() probe.add_param_bool("boolean_param", True) self.assertEqual(probe.bind_param_bool("boolean_param"),True) def test_add_param_ushort(self): # Add an unsigned short parameter to the fcall message, # and retrieve the added parameter # fcall = create_fcall("fcall") fcall.add_param_ushort("ushort_param", 0xFEDC) self.assertEqual(fcall.bind_param_ushort("ushort_param"),0xFEDC) def test_add_param_ulong(self): # Add an unsigned long parameter to the fcall message, # and retrieve the added parameter # fcall = create_fcall("fcall") fcall.add_param_ulong("ulong_param", 0xFEDCBA98) self.assertEqual(fcall.bind_param_ulong("ulong_param"),0xFEDCBA98) def test_add_param_n_u8(self): # Add a Python tuple parameter to the fcall message, # and retrieve the added parameter # tuple_u8 = (0x12, 0x34, 0x56) fcall = create_fcall("test_u8").add_param_n_u8("tuple_u8", tuple_u8) param = fcall.bind_param("tuple_u8") param = unpack('!BBB', param) self.assertEqual(param, tuple_u8) def test_add_param_n_u16(self): # Add a Python tuple parameter to the fcall message, # and retrieve the added parameter # tuple_u16 = (0x12AB, 0x34AB, 0x56AB, 0x78AB) fcall = create_fcall("test_u16").add_param_n_u16("tuple_u16", tuple_u16) param = fcall.bind_param("tuple_u16") param = unpack('!HHHH', param) self.assertEqual(param, tuple_u16) def test_add_param_n_u32(self): # Add a Python tuple parameter to the fcall message, # and retrieve the added parameter # tuple_u32 = (0x1234ABCD, 0x3456ABCD, 0x5678ABCD, 0x9ABCABCD, 0xEF01ABCD) fcall = create_fcall("test_u32").add_param_n_u32("tuple_u32", tuple_u32) param = fcall.bind_param("tuple_u32") param = unpack('!IIIII', param) self.assertEqual(param, tuple_u32) def test_remove_param(self): # Add a parameter to the probe message, # remove the added parameter, # and try to retrieve the removed parameter # probe = create_probe() probe.add_param("param") probe.remove_param("param") self.failIf(probe.is_param("param")) def test_set_cb(self): # Set a callback to the fcall message, # and set another callback # fcall = create_fcall("fcall") fcall.set_cb(cb1) fcall.set_cb(cb2) def test_remove_cb(self): # Set a callback to the fcall message, # remove the set callback, # and set another callback # fcall = create_fcall("fcall") fcall.set_cb(cb1) fcall.remove_cb() fcall.set_cb(cb2) def test_set_sta(self): # Set the destination station to the probe message, # create station 3, # and set the destination station 3 to the probe message # probe = create_probe() probe.set_sta(self.station) station1 = 1 probe.set_sta(station1) def test_send_async(self): # Create a fcall message fcall1 = create_fcall("function_1") # Add a first parameter to the created message param1 = pack('iiB',1,2,True) fcall1.add_param("param_1", param1) # Add a second parameter to the created message param2 = pack('iiiiiiiiii',0,1,2,3,4,5,6,7,8,9) fcall1.add_param("param_2", param2) # Add a third parameter to the created message param3 = "hello" fcall1.add_param("param_3", param3) # Add a fourth parameter to the created message param4 = pack('q',123) fcall1.add_param("param_4", param4) # Register a callback function which will be called at message response reception fcall1.set_cb(cb1) # Set destination station of message fcall1.set_sta(self.station) # Send the configured message in an asynchronous mode fcall1.send_async() # Send another message in an asynchronous mode station2 = 2 fcall2 = create_fcall("function_1") fcall2.add_param("param_1", param1) fcall2.add_param("param_2", param2) fcall2.add_param("param_3", param3) fcall2.add_param("param_4", param4) fcall2.set_cb(cb2) fcall2.set_sta(station2) fcall2.send_async() def test_send(self): # Create a fcall message, # add a boolean parameter, # send the message to station, # and get the result # fcall2 = create_fcall("function_2") fcall2.add_param_bool("param_5", True) fcall2 = fcall2.send(self.station) result1 = fcall2.bind_param_string("result_1") self.assertEqual(result1,"this is result 1") self.assertEqual(len(result1),16) # Create a probe message, # set the destination station, # add a parameter, # send the message, # and get the result # probe = create_probe().set_sta(self.station).add_param("param_6").send() self.assertEqual(probe.bind_param_ulong("param_6"),0x7B000000) def test_is_param(self): # Add a parameter to the probe message, # send the message to the destination station, # and get the result # probe = create_probe() probe.add_param("param_6").send(self.station) self.assert_(probe.is_param("param_6")) def test_bind_param_string(self): # Add string parameters to the fcall message, # and retrieve added parameters # fcall = create_fcall("fcall") fcall.add_param("string_param_1","string param 1") fcall.add_param("string_param_2","string param 2") fcall.add_param("string_param_3","string param 3") self.assertEqual(fcall.bind_param_string("string_param_1"),"string param 1") self.assertEqual(fcall.bind_param_string("string_param_2"),"string param 2") self.assertEqual(fcall.bind_param_string("string_param_3"),"string param 3") def test_bind_param_bool(self): # Add a boolean parameter to the fcall message, # and retrieve the added parameter # fcall = create_fcall("fcall") fcall.add_param_bool("boolean_param", False) self.assertEqual(fcall.bind_param_bool("boolean_param"),False) def test_bind_param_ushort(self): # Add a parameter to the probe message, # send the message to the destination station, # and get the result # probe = create_probe() probe = probe.add_param("param_10").send(self.station) self.assertEqual(probe.bind_param_ushort("param_10"),0xEEFF) def test_bind_param_ulong(self): # Add a parameter to the probe message, # send the message to the destination station, # and get the result # probe = create_probe() probe = probe.add_param("param_11").send(self.station) self.assertEqual(probe.bind_param_ulong("param_11"),0xCCDDEEFF) suite = unittest.TestLoader().loadTestsFromTestCase(TestFcallFunctions) try: suite.addTest(doctest.DocTestSuite(fcall)) except ValueError: print "has no tests" if __name__ == '__main__': testResult = unittest.TextTestRunner(verbosity=2).run(suite)