Coverage for modbus_connect/core.py: 74%

65 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2023-01-12 07:46 +0000

1from typing import List 

2from dataclasses import dataclass, field 

3 

4import logging 

5 

6import pymodbus 

7import pymodbus.register_read_message 

8import pymodbus.client.tcp 

9import pymodbus.payload 

10import pymodbus.constants 

11 

12import modbus_connect.utils as utils 

13import modbus_connect.processors as processors 

14 

15 

16logger = logging.getLogger() 

17 

18 

19class ModbusGateway: 

20 def __init__( 

21 self, 

22 host: str, 

23 port: int, 

24 slave: int = 1, 

25 timeout: int = 3, 

26 tags_list: List[utils.ModbusRegister] = [], 

27 batch_size: int = 60, 

28 byteorder: pymodbus.constants.Endian = pymodbus.constants.Endian.Big, 

29 wordorder: pymodbus.constants.Endian = pymodbus.constants.Endian.Little, 

30 ): 

31 self.host = host 

32 self.port = port 

33 self.slave = slave 

34 self.timeout = timeout 

35 self.batch_size = batch_size 

36 self.tags_list = tags_list 

37 self.byte_order = byteorder 

38 self.word_order = wordorder 

39 

40 # Create modbus server connections using pymodbus library, using the host and port parameters, using the ModbusTcpClient class¡ 

41 self.client = pymodbus.client.tcp.ModbusTcpClient( 

42 self.host, self.port, timeout=self.timeout 

43 ) 

44 self.tags_requests: utils.ModbusRegistersBatched = [] 

45 

46 if self.tags_list != []: 

47 self.configure_tags() 

48 

49 def __del__(self): 

50 self.client.close() 

51 

52 def connect(self): 

53 self.client.connect() 

54 

55 def set_tags_list(self, tags_list: List[utils.ModbusRegister]): 

56 self.tags_list = tags_list 

57 self.tags_requests: utils.ModbusRegistersBatched = [] 

58 self.configure_tags() 

59 

60 def configure_tags(self): 

61 # For each tags_request, sort the list of dictionaries by address and the make batches of consecutive addresses with a maximum size of self.batch_size 

62 

63 self.tags_requests = utils.make_batch_consecutive_bank_and_size( 

64 self.tags_list, self.batch_size 

65 ) 

66 

67 # Get variables values from modbus server 

68 # It gets the values in batches of 60 variables, using the address from the tags dictionary, and returns a dictionary with tha varibles names as keys and the server values 

69 def read_tags(self) -> utils.ModbusResults: 

70 # List to store the values of the results 

71 values: utils.ModbusResults = utils.ModbusResults([]) 

72 

73 # For each batch of tags, read the values from the modbus server and process them 

74 for batch in self.tags_requests: 

75 # Check if the batch is not empty 

76 if len(batch) == 0: 

77 continue 

78 

79 batch_results = None 

80 

81 # Check the memory bank of the batch and process the values accordingly 

82 # Holding registers 

83 if batch[0].memorybank == utils.MemoryBanks.HOLDING_REGISTERS: 

84 try: 

85 batch_results = self.client.read_holding_registers( 

86 batch[0].address, 

87 count=utils.get_batch_memory_length(batch), 

88 slave=self.slave, 

89 ) 

90 if batch_results.isError(): 

91 raise Exception("Modbus error: " + str(batch_results)) 

92 

93 except Exception as e: 

94 raise Exception( 

95 "Error reading from modbus server from address " 

96 + str(batch[0].address) 

97 + " to address " 

98 + str(batch[-1].address) 

99 ) 

100 elif batch[0].memorybank == utils.MemoryBanks.INPUT_REGISTERS: 

101 # TODO: Implement input registers read 

102 raise NotImplementedError("Input registers not implemented") 

103 

104 elif batch[0].memorybank == utils.MemoryBanks.DISCRETE_INPUTS: 

105 # TODO: Implement discrete inputs read 

106 raise NotImplementedError("Discrete inputs not implemented") 

107 

108 elif batch[0].memorybank == utils.MemoryBanks.COILS: 

109 # TODO: Implement coils read 

110 raise NotImplementedError("Coils not implemented") 

111 else: 

112 raise Exception("Non supported memory bank") 

113 

114 # Process the values 

115 if batch_results is not None: 

116 values += processors.process_batch( 

117 batch, batch_results, self.byte_order, self.word_order 

118 ) 

119 

120 return values 

121 

122 def tags_ready(self) -> bool: 

123 return len(self.tags) > 0 

124 

125 

126if __name__ == "__main__": 

127 # Create a ModbusGateway object, using the host and port parameters, and the tags_list parameter, which is a list of dictionaries with the variable names and memory addresses 

128 gateway = ModbusGateway( 

129 host="docencia.i4techlab.upc.edu", 

130 port=20000, 

131 tags_list=[ 

132 { 

133 "name": "var1", 

134 "address": 0, 

135 "memory_bank": utils.MemoryBanks.HOLDING_REGISTERS, 

136 "datatype": "float32", 

137 }, 

138 { 

139 "name": "var2", 

140 "address": 1, 

141 "memory_bank": utils.MemoryBanks.HOLDING_REGISTERS, 

142 "datatype": "float32", 

143 }, 

144 ], 

145 ) 

146 

147 # Read the values from the modbus server 

148 values = gateway.read_tags() 

149 print(values) 

150 logger.info(values)