Coverage for modbus_connect/processors.py: 84%
32 statements
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
1from typing import List
3import pymodbus.payload
4import pymodbus.register_read_message
5import pymodbus.bit_read_message
6import pymodbus.constants
8import modbus_connect.utils as utils
9from modbus_connect.utils import MemoryBanks, DataTypes
12def process_batch(
13 batch: utils.ModbusRegisters,
14 batch_results: pymodbus.register_read_message.ReadHoldingRegistersResponse
15 or pymodbus.bit_read_message.ReadCoilsResponse,
16 byte_order: pymodbus.constants.Endian,
17 word_order: pymodbus.constants.Endian,
18) -> utils.ModbusResults:
19 # Initialize the list of results
20 values: utils.ModbusResults = []
22 # Check the memory bank of the first register in the batch to determine the processing function between word like or bit like, for each register in the batch, process the register and append the result to the list of results
23 if batch[0].memorybank == utils.MemoryBanks.HOLDING_REGISTERS:
24 for tag in batch:
25 if (
26 tag.memorybank == utils.MemoryBanks.HOLDING_REGISTERS
27 or tag.memorybank == utils.MemoryBanks.INPUT_REGISTERS
28 ):
29 values.append(
30 utils.ModbusResult(
31 tag,
32 process_wordlike_register(
33 tag,
34 batch_results.registers[
35 tag.address
36 - batch[0].address : (
37 tag.address - batch[0].address
38 )
39 + tag.length
40 ],
41 byte_order,
42 word_order,
43 ),
44 )
45 )
46 elif (
47 batch[0].memorybank == utils.MemoryBanks.COILS
48 or batch[0].memorybank == utils.MemoryBanks.DISCRETE_INPUTS
49 ):
50 for register in batch:
51 if (
52 register.memorybank == utils.MemoryBanks.COILS
53 or register.memorybank == utils.MemoryBanks.DISCRETE_INPUTS
54 ):
55 values.append(
56 process_bitlike_register(
57 register,
58 batch_results.getBit(
59 register.address - batch[0].address
60 ),
61 )
62 )
64 return values
67def process_wordlike_register(
68 tag: utils.ModbusRegister,
69 registers_list: list[int],
70 byte_order: pymodbus.constants.Endian,
71 word_order: pymodbus.constants.Endian,
72) -> any:
73 if tag.datatype == DataTypes.INT16:
74 return registers_list[0]
76 elif tag.datatype == DataTypes.INT32:
77 return pymodbus.payload.BinaryPayloadDecoder.fromRegisters(
78 registers_list,
79 byte_order,
80 wordorder=word_order,
81 ).decode_32bit_int()
83 elif tag.datatype == DataTypes.FLOAT32:
84 return pymodbus.payload.BinaryPayloadDecoder.fromRegisters(
85 registers_list,
86 byte_order,
87 wordorder=word_order,
88 ).decode_32bit_float()
90 else:
91 raise ValueError("Invalid datatype: " + tag.datatype)
94def process_bitlike_register(
95 tag: utils.ModbusRegister,
96 register: any,
97):
98 value = None
100 if tag.datatype == DataTypes.BOOL:
101 value = utils.ModbusResult(tag, bool(register))
103 else:
104 raise ValueError("Invalid datatype: " + tag["datatype"])
106 return value