Coverage for modbus_connect/processors.py: 84%

32 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2023-01-12 07:46 +0000

1from typing import List 

2 

3import pymodbus.payload 

4import pymodbus.register_read_message 

5import pymodbus.bit_read_message 

6import pymodbus.constants 

7 

8import modbus_connect.utils as utils 

9from modbus_connect.utils import MemoryBanks, DataTypes 

10 

11 

12def process_batch( 

13 batch: utils.ModbusRegisters, 

14 batch_results: pymodbus.register_read_message.ReadHoldingRegistersResponse 

15 or pymodbus.bit_read_message.ReadCoilsResponse, 

16 byte_order: pymodbus.constants.Endian, 

17 word_order: pymodbus.constants.Endian, 

18) -> utils.ModbusResults: 

19 # Initialize the list of results 

20 values: utils.ModbusResults = [] 

21 

22 # Check the memory bank of the first register in the batch to determine the processing function between word like or bit like, for each register in the batch, process the register and append the result to the list of results 

23 if batch[0].memorybank == utils.MemoryBanks.HOLDING_REGISTERS: 

24 for tag in batch: 

25 if ( 

26 tag.memorybank == utils.MemoryBanks.HOLDING_REGISTERS 

27 or tag.memorybank == utils.MemoryBanks.INPUT_REGISTERS 

28 ): 

29 values.append( 

30 utils.ModbusResult( 

31 tag, 

32 process_wordlike_register( 

33 tag, 

34 batch_results.registers[ 

35 tag.address 

36 - batch[0].address : ( 

37 tag.address - batch[0].address 

38 ) 

39 + tag.length 

40 ], 

41 byte_order, 

42 word_order, 

43 ), 

44 ) 

45 ) 

46 elif ( 

47 batch[0].memorybank == utils.MemoryBanks.COILS 

48 or batch[0].memorybank == utils.MemoryBanks.DISCRETE_INPUTS 

49 ): 

50 for register in batch: 

51 if ( 

52 register.memorybank == utils.MemoryBanks.COILS 

53 or register.memorybank == utils.MemoryBanks.DISCRETE_INPUTS 

54 ): 

55 values.append( 

56 process_bitlike_register( 

57 register, 

58 batch_results.getBit( 

59 register.address - batch[0].address 

60 ), 

61 ) 

62 ) 

63 

64 return values 

65 

66 

67def process_wordlike_register( 

68 tag: utils.ModbusRegister, 

69 registers_list: list[int], 

70 byte_order: pymodbus.constants.Endian, 

71 word_order: pymodbus.constants.Endian, 

72) -> any: 

73 if tag.datatype == DataTypes.INT16: 

74 return registers_list[0] 

75 

76 elif tag.datatype == DataTypes.INT32: 

77 return pymodbus.payload.BinaryPayloadDecoder.fromRegisters( 

78 registers_list, 

79 byte_order, 

80 wordorder=word_order, 

81 ).decode_32bit_int() 

82 

83 elif tag.datatype == DataTypes.FLOAT32: 

84 return pymodbus.payload.BinaryPayloadDecoder.fromRegisters( 

85 registers_list, 

86 byte_order, 

87 wordorder=word_order, 

88 ).decode_32bit_float() 

89 

90 else: 

91 raise ValueError("Invalid datatype: " + tag.datatype) 

92 

93 

94def process_bitlike_register( 

95 tag: utils.ModbusRegister, 

96 register: any, 

97): 

98 value = None 

99 

100 if tag.datatype == DataTypes.BOOL: 

101 value = utils.ModbusResult(tag, bool(register)) 

102 

103 else: 

104 raise ValueError("Invalid datatype: " + tag["datatype"]) 

105 

106 return value