Coverage for modbus_connect/core.py: 74%
65 statements
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
1from typing import List
2from dataclasses import dataclass, field
4import logging
6import pymodbus
7import pymodbus.register_read_message
8import pymodbus.client.tcp
9import pymodbus.payload
10import pymodbus.constants
12import modbus_connect.utils as utils
13import modbus_connect.processors as processors
16logger = logging.getLogger()
19class ModbusGateway:
20 def __init__(
21 self,
22 host: str,
23 port: int,
24 slave: int = 1,
25 timeout: int = 3,
26 tags_list: List[utils.ModbusRegister] = [],
27 batch_size: int = 60,
28 byteorder: pymodbus.constants.Endian = pymodbus.constants.Endian.Big,
29 wordorder: pymodbus.constants.Endian = pymodbus.constants.Endian.Little,
30 ):
31 self.host = host
32 self.port = port
33 self.slave = slave
34 self.timeout = timeout
35 self.batch_size = batch_size
36 self.tags_list = tags_list
37 self.byte_order = byteorder
38 self.word_order = wordorder
40 # Create modbus server connections using pymodbus library, using the host and port parameters, using the ModbusTcpClient class¡
41 self.client = pymodbus.client.tcp.ModbusTcpClient(
42 self.host, self.port, timeout=self.timeout
43 )
44 self.tags_requests: utils.ModbusRegistersBatched = []
46 if self.tags_list != []:
47 self.configure_tags()
49 def __del__(self):
50 self.client.close()
52 def connect(self):
53 self.client.connect()
55 def set_tags_list(self, tags_list: List[utils.ModbusRegister]):
56 self.tags_list = tags_list
57 self.tags_requests: utils.ModbusRegistersBatched = []
58 self.configure_tags()
60 def configure_tags(self):
61 # For each tags_request, sort the list of dictionaries by address and the make batches of consecutive addresses with a maximum size of self.batch_size
63 self.tags_requests = utils.make_batch_consecutive_bank_and_size(
64 self.tags_list, self.batch_size
65 )
67 # Get variables values from modbus server
68 # It gets the values in batches of 60 variables, using the address from the tags dictionary, and returns a dictionary with tha varibles names as keys and the server values
69 def read_tags(self) -> utils.ModbusResults:
70 # List to store the values of the results
71 values: utils.ModbusResults = utils.ModbusResults([])
73 # For each batch of tags, read the values from the modbus server and process them
74 for batch in self.tags_requests:
75 # Check if the batch is not empty
76 if len(batch) == 0:
77 continue
79 batch_results = None
81 # Check the memory bank of the batch and process the values accordingly
82 # Holding registers
83 if batch[0].memorybank == utils.MemoryBanks.HOLDING_REGISTERS:
84 try:
85 batch_results = self.client.read_holding_registers(
86 batch[0].address,
87 count=utils.get_batch_memory_length(batch),
88 slave=self.slave,
89 )
90 if batch_results.isError():
91 raise Exception("Modbus error: " + str(batch_results))
93 except Exception as e:
94 raise Exception(
95 "Error reading from modbus server from address "
96 + str(batch[0].address)
97 + " to address "
98 + str(batch[-1].address)
99 )
100 elif batch[0].memorybank == utils.MemoryBanks.INPUT_REGISTERS:
101 # TODO: Implement input registers read
102 raise NotImplementedError("Input registers not implemented")
104 elif batch[0].memorybank == utils.MemoryBanks.DISCRETE_INPUTS:
105 # TODO: Implement discrete inputs read
106 raise NotImplementedError("Discrete inputs not implemented")
108 elif batch[0].memorybank == utils.MemoryBanks.COILS:
109 # TODO: Implement coils read
110 raise NotImplementedError("Coils not implemented")
111 else:
112 raise Exception("Non supported memory bank")
114 # Process the values
115 if batch_results is not None:
116 values += processors.process_batch(
117 batch, batch_results, self.byte_order, self.word_order
118 )
120 return values
122 def tags_ready(self) -> bool:
123 return len(self.tags) > 0
126if __name__ == "__main__":
127 # Create a ModbusGateway object, using the host and port parameters, and the tags_list parameter, which is a list of dictionaries with the variable names and memory addresses
128 gateway = ModbusGateway(
129 host="docencia.i4techlab.upc.edu",
130 port=20000,
131 tags_list=[
132 {
133 "name": "var1",
134 "address": 0,
135 "memory_bank": utils.MemoryBanks.HOLDING_REGISTERS,
136 "datatype": "float32",
137 },
138 {
139 "name": "var2",
140 "address": 1,
141 "memory_bank": utils.MemoryBanks.HOLDING_REGISTERS,
142 "datatype": "float32",
143 },
144 ],
145 )
147 # Read the values from the modbus server
148 values = gateway.read_tags()
149 print(values)
150 logger.info(values)