Coverage for modbus_connect/utils.py: 86%
81 statements
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
« prev ^ index » next coverage.py v7.0.0, created at 2023-01-12 07:46 +0000
1import typing
2from typing import List, NewType
4from dataclasses import dataclass, field
5from typing import List, TypedDict
6from enum import Enum
8# --------------------- TYPES ---------------------
11class MemoryBanks(Enum):
12 HOLDING_REGISTERS = "holding_registers"
13 INPUT_REGISTERS = "input_registers"
14 DISCRETE_INPUTS = "discrete_inputs"
15 COILS = "coils"
17 __lt__ = lambda self, other: self.value < other.value
18 __eq__ = lambda self, other: self.value == other.value
19 __gt__ = lambda self, other: self.value > other.value
22class DataTypes(Enum):
23 BOOL = "bool"
24 INT16 = "int16"
25 UINT16 = "uint16"
26 INT32 = "int32"
27 UINT32 = "uint32"
28 FLOAT32 = "float32"
29 INT64 = "int64"
30 UINT64 = "uint64"
31 FLOAT64 = "float64"
34def datatype_lenght(datatype: DataTypes) -> int:
35 if datatype == DataTypes.BOOL:
36 return 1
37 elif datatype == DataTypes.INT16:
38 return 1
39 elif datatype == DataTypes.UINT16:
40 return 1
41 elif datatype == DataTypes.INT32:
42 return 2
43 elif datatype == DataTypes.UINT32:
44 return 2
45 elif datatype == DataTypes.FLOAT32:
46 return 2
47 elif datatype == DataTypes.INT64:
48 return 4
49 elif datatype == DataTypes.UINT64:
50 return 4
51 elif datatype == DataTypes.FLOAT64:
52 return 4
53 else:
54 raise ValueError("Datatype not supported")
57@dataclass
58class ModbusRegister:
59 name: str
60 address: int
61 memorybank: MemoryBanks
62 datatype: DataTypes
64 # Create a lenght property to be able to know the lenght of the register by its datatype
65 @property
66 def length(self) -> int:
67 return datatype_lenght(self.datatype)
70ModbusRegisters = NewType("ModbusRegisters", List[ModbusRegister])
72ModbusRegistersBatched = NewType(
73 "ModbusRegistersBatched", List[List[ModbusRegister]]
74)
77@dataclass
78class ModbusResult:
79 tag: ModbusRegister
80 value: any = None
83ModbusResults = NewType("ModbusResults", List[ModbusResult])
86# --------------------- FUNCTIONS ---------------------
88# Interface to tag classes
91@dataclass
92class ObjectWithAddressAndBank:
93 address: int
94 memorybank: str
95 length: int
98# From a list of ObjectWithAddressAndBank classes, return the total lenght in memory addresses cosidering the length of each object
100# TODO: Allow the use calculate batches which are not consecutive
103def get_batch_memory_length(list: List[ObjectWithAddressAndBank]) -> int:
104 return sum([tag.length for tag in list])
107# From a list of {'name': 'variable_name', 'address': number} classes, return a list which contains lists of dictionaries which contain the dictionaries
108# from the original list which have consecutive address numbers, with a maximum size of size.
110# TODO: Allow the creation of batches which are not consecutive by an allowed non taged memory space
113def make_batch_consecutive_bank_and_size(
114 list: List[ObjectWithAddressAndBank],
115 size: int,
116) -> List[List]:
117 # Check if size is greater than 2 to be able to make batches
118 if size < 2:
119 raise ValueError(
120 "Is not possible to make batches with a size less than 2"
121 )
123 # Sort the list by address and memorybank
124 list.sort(key=lambda x: x.address)
125 list.sort(key=lambda x: x.memorybank)
127 parts: List[List] = []
129 if len(list) == 1:
130 parts.append([list[0]])
131 return parts
133 # For loop that will iterate over list, creating batches of consecutive address and same memorybank value
135 parts.append([list[0]])
136 for i in range(1, len(list)):
137 # Check that the address is not consecutive or the memorybank is not the same or if the batch is full, if so, create a new batch
138 if (
139 list[i].address != list[i - 1].address + list[i - 1].length
140 or list[i].memorybank != list[i - 1].memorybank
141 or len(parts[-1]) >= size
142 ):
143 parts.append([list[i]])
145 # Otherwise, add the element to the last batch
146 else:
147 parts[-1].append(list[i])
149 return parts