Coverage for modbus_connect/utils.py: 86%

81 statements  

« prev     ^ index     » next       coverage.py v7.0.0, created at 2023-01-12 07:46 +0000

1import typing 

2from typing import List, NewType 

3 

4from dataclasses import dataclass, field 

5from typing import List, TypedDict 

6from enum import Enum 

7 

8# --------------------- TYPES --------------------- 

9 

10 

11class MemoryBanks(Enum): 

12 HOLDING_REGISTERS = "holding_registers" 

13 INPUT_REGISTERS = "input_registers" 

14 DISCRETE_INPUTS = "discrete_inputs" 

15 COILS = "coils" 

16 

17 __lt__ = lambda self, other: self.value < other.value 

18 __eq__ = lambda self, other: self.value == other.value 

19 __gt__ = lambda self, other: self.value > other.value 

20 

21 

22class DataTypes(Enum): 

23 BOOL = "bool" 

24 INT16 = "int16" 

25 UINT16 = "uint16" 

26 INT32 = "int32" 

27 UINT32 = "uint32" 

28 FLOAT32 = "float32" 

29 INT64 = "int64" 

30 UINT64 = "uint64" 

31 FLOAT64 = "float64" 

32 

33 

34def datatype_lenght(datatype: DataTypes) -> int: 

35 if datatype == DataTypes.BOOL: 

36 return 1 

37 elif datatype == DataTypes.INT16: 

38 return 1 

39 elif datatype == DataTypes.UINT16: 

40 return 1 

41 elif datatype == DataTypes.INT32: 

42 return 2 

43 elif datatype == DataTypes.UINT32: 

44 return 2 

45 elif datatype == DataTypes.FLOAT32: 

46 return 2 

47 elif datatype == DataTypes.INT64: 

48 return 4 

49 elif datatype == DataTypes.UINT64: 

50 return 4 

51 elif datatype == DataTypes.FLOAT64: 

52 return 4 

53 else: 

54 raise ValueError("Datatype not supported") 

55 

56 

57@dataclass 

58class ModbusRegister: 

59 name: str 

60 address: int 

61 memorybank: MemoryBanks 

62 datatype: DataTypes 

63 

64 # Create a lenght property to be able to know the lenght of the register by its datatype 

65 @property 

66 def length(self) -> int: 

67 return datatype_lenght(self.datatype) 

68 

69 

70ModbusRegisters = NewType("ModbusRegisters", List[ModbusRegister]) 

71 

72ModbusRegistersBatched = NewType( 

73 "ModbusRegistersBatched", List[List[ModbusRegister]] 

74) 

75 

76 

77@dataclass 

78class ModbusResult: 

79 tag: ModbusRegister 

80 value: any = None 

81 

82 

83ModbusResults = NewType("ModbusResults", List[ModbusResult]) 

84 

85 

86# --------------------- FUNCTIONS --------------------- 

87 

88# Interface to tag classes 

89 

90 

91@dataclass 

92class ObjectWithAddressAndBank: 

93 address: int 

94 memorybank: str 

95 length: int 

96 

97 

98# From a list of ObjectWithAddressAndBank classes, return the total lenght in memory addresses cosidering the length of each object 

99 

100# TODO: Allow the use calculate batches which are not consecutive 

101 

102 

103def get_batch_memory_length(list: List[ObjectWithAddressAndBank]) -> int: 

104 return sum([tag.length for tag in list]) 

105 

106 

107# From a list of {'name': 'variable_name', 'address': number} classes, return a list which contains lists of dictionaries which contain the dictionaries 

108# from the original list which have consecutive address numbers, with a maximum size of size. 

109 

110# TODO: Allow the creation of batches which are not consecutive by an allowed non taged memory space 

111 

112 

113def make_batch_consecutive_bank_and_size( 

114 list: List[ObjectWithAddressAndBank], 

115 size: int, 

116) -> List[List]: 

117 # Check if size is greater than 2 to be able to make batches 

118 if size < 2: 

119 raise ValueError( 

120 "Is not possible to make batches with a size less than 2" 

121 ) 

122 

123 # Sort the list by address and memorybank 

124 list.sort(key=lambda x: x.address) 

125 list.sort(key=lambda x: x.memorybank) 

126 

127 parts: List[List] = [] 

128 

129 if len(list) == 1: 

130 parts.append([list[0]]) 

131 return parts 

132 

133 # For loop that will iterate over list, creating batches of consecutive address and same memorybank value 

134 

135 parts.append([list[0]]) 

136 for i in range(1, len(list)): 

137 # Check that the address is not consecutive or the memorybank is not the same or if the batch is full, if so, create a new batch 

138 if ( 

139 list[i].address != list[i - 1].address + list[i - 1].length 

140 or list[i].memorybank != list[i - 1].memorybank 

141 or len(parts[-1]) >= size 

142 ): 

143 parts.append([list[i]]) 

144 

145 # Otherwise, add the element to the last batch 

146 else: 

147 parts[-1].append(list[i]) 

148 

149 return parts