Source code for pyipcmini.functions.ion_pump_base_functions
##########################################################################################
#
# Class for general send/receive data functions of Agilent IPCMini ion pump controller.
#
##########################################################################################
"""Class for general send/receive data functions of Agilent IPCMini controller."""
from __future__ import annotations
import logging
from time import sleep
from typing import TYPE_CHECKING
from .ion_pump_dic import IonPumpDics
if TYPE_CHECKING:
import serial
[docs]class PumpBaseFunctions:
"""Class for general send/receive data functions of Agilent IPCMini controller."""
WAIT_TIME_READ_SERIAL = 0.5 # give the device time to answer via serial (in s)
def __init__(self, serial_connection: serial.Serial) -> None:
"""Initialize PumpReadFunctions.
Parameters
----------
serial_connection : serial.Serial
The USB serial connection for the IPCMini.
"""
self.logger = logging.getLogger("PUMP base")
self.logger.debug("__init__")
self.serial_connection = serial_connection
[docs] def send_read_request(self, request: str) -> bytes:
"""Send read request from IonPumpDics.dics["Win"].
Parameters
----------
request : str
The request to send to the controller.
Returns
-------
bytes
The controller's answer.
"""
self.logger.debug("send_read_request")
stx = "02"
addr = "80" # Not handled in RS232, 0x80 as placeholder
if request in IonPumpDics.dics["Win"]:
win_formatted = IonPumpDics.dics["Win"][request]
win_formatted = win_formatted.encode("unicode-escape")
win_formatted = [hex(x).split("x")[-1] for x in win_formatted]
win = " "
for win_str in win_formatted:
win = win + win_str + " "
else:
self.logger.debug("Bad request")
return None
com = "30" # 0x30 for read, 0x31 for write
etx = "03" # end of transmission
crc = self.make_crc(base_msg=(addr, win_formatted, com, etx))
message = bytes.fromhex(stx + " " + addr + win + com + " " + etx + " " + crc[0] + " " + crc[1])
return self.send_data(message)
[docs] def send_set_request(self, request: str, data_to_set: str) -> bytes:
"""Send set request from IonPumpDics.dics["Win"].
Parameters
----------
request : str
The request to send to the controller.
data_to_set : str
The data to send to the controller for this request.
Returns
-------
bytes
The controller's answer.
"""
self.logger.debug("send_set_request")
stx = "02"
addr = "80" # Not handled in RS232, 0x80 as placeholder
if request in IonPumpDics.dics["Win"]:
win_formatted = IonPumpDics.dics["Win"][request]
win_formatted = win_formatted.encode("unicode-escape")
win_formatted = [hex(x).split("x")[-1] for x in win_formatted]
win = " "
for win_str in win_formatted:
win = win + win_str + " "
else:
self.logger.debug("Bad request")
return None
com = "31" # 0x30 for read, 0x31 for write
data_formatted = data_to_set.encode("unicode-escape")
data_formatted = [hex(x).split("x")[-1] for x in data_formatted]
data = " "
for byte_str in data_formatted:
data = data + byte_str + " "
etx = "03" # end of transmission
crc = self.make_crc(base_msg=(addr, win_formatted, com, etx), data=data_formatted)
message = bytes.fromhex(stx + " " + addr + win + com + data + etx + " " + crc[0] + " " + crc[1])
return self.send_data(message)
[docs] def make_crc(self, base_msg: tuple[str, str, str, str], *, data: str | None = None) -> list[str]:
"""Make formatted crc message.
Parameters
----------
base_msg : tuple[str, str, str, str]
Base part of the message to send to the controller.
data : str | None
Data send for a set command, None for a read command.
Returns
-------
list[str]
The controller's answer.
"""
self.logger.debug("make_crc")
(addr, win, com, etx) = base_msg
win_aux = int(win[0], 16)
for win_ele in win[1:]:
win_aux = win_aux ^ int(win_ele, 16)
if data is None:
crc = int(addr, 16) ^ win_aux ^ int(com, 16) ^ int(etx, 16)
else:
data_aux = int(data[0], 16)
if len(data) > 1:
for data_ele in data[1:]:
data_aux = data_aux ^ int(data_ele, 16)
crc = int(addr, 16) ^ win_aux ^ int(com, 16) ^ data_aux ^ int(etx, 16)
crc = hex(crc)
crc = crc[2:]
crc = crc.encode("unicode-escape")
return [hex(x).split("x")[-1] for x in crc]
[docs] def send_data(self, message: bytes) -> bytes:
"""Send data on serial port and read answer.
Parameters
----------
message : bytes
Message to send to the controller.
Returns
-------
bytes
The controller's answer.
"""
self.logger.debug("send_data")
if self.serial_connection.isOpen():
self.serial_connection.reset_output_buffer()
self.serial_connection.reset_input_buffer()
if message:
msg = "sending message: " + " ".join(hex(n) for n in message)
self.logger.debug(msg)
self.serial_connection.write(message)
self.serial_connection.flush() # it is buffering. required to get the data out *now*
return self.read_data()
return None
[docs] def read_data(self) -> bytes:
"""Read data on serial port.
Returns
-------
bytes
The controller's answer.
"""
self.logger.debug("read_data")
answer = None
sleep(self.WAIT_TIME_READ_SERIAL) # give the device time to answer
if self.serial_connection.inWaiting() > 0:
self.logger.debug("data present")
answer = self.serial_connection.read(self.serial_connection.inWaiting())
msg = "received message: " + " ".join(hex(n) for n in answer)
self.logger.debug(msg)
return answer
[docs] def get_reply_after_set_cmd(self, answer: bytes) -> str:
"""Check the status of the answer.
Parameters
----------
answer : bytes
The answer from the controller to be assessed.
Returns
-------
str
Success/fail answer.
"""
self.logger.debug("get_reply_after_set_cmd")
result_str = ""
if answer is not None:
result_int = hex(answer[2:-3][0])
result_int = int(result_int[2:])
result_str = IonPumpDics.dics["Reply"][result_int] if result_int in IonPumpDics.dics["Reply"] else "Unknown reply: " + str(result_int)
return result_str