Source code for pyipcmini.usb_ion_pump

##########################################################################################
#
# Class to communicate with an Agilent IPCMini ion pump controller via RS232 serial.
#
# Tested on standard USB port of raspberry pi 4 with FTDI USB/serial converter.
#
##########################################################################################
"""Contain the class for Agilent IPCMini controller."""

from __future__ import annotations

import dataclasses
import logging
import threading
import time
from datetime import datetime as dt
from queue import Queue

import serial
from dateutil import tz

from .functions import IonPump, IonPumpDics


[docs]@dataclasses.dataclass class MeasureTimes: """Times used to timestamp measurements.""" time: float time_stamp: dt old_time: float
[docs]class CustomError(Exception): """Custom error class."""
########################################################################################## # Class for addressing the Agilent IPCMini controller. ##########################################################################################
[docs]class UsbPumpHandler: """Class for addressing the Agilent IPCMini controller.""" TIME_ZONE = tz.gettz("Europe / Paris") ########################################################################################## # Init / close functions ########################################################################################## def __init__(self, serial_port: str = "/dev/ttyUSB0", baudrate: int = 9600, init_param: dict | None = None, label: str = "Label") -> None: """Initialise the UsbPumpHandler. Parameters ---------- serial_port : str On which COM port your pump controller is connected to. baudrate : int Value of the baudrate for COM port. By default 9600 on IPCMini. init_param : dict All parameters of the pump controller to be initialized. label : str Pump driver label, to check whether this is the correct pump. Raises ------ CustomError Wrong pump name. """ self.logger = logging.getLogger("USB PUMP serial - " + serial_port) self.logger.debug("__init__") self.thread_killed = False # Set to true to kill a running thread for this object self.timings = MeasureTimes(time=time.time(), time_stamp=dt.now(tz=UsbPumpHandler.TIME_ZONE), old_time=time.time()) self.serial_connection = serial.Serial(port=serial_port, baudrate=baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) self.serial_connection.timeout = 1 self.serial_connection.flushInput() self.ion_pump = IonPump(self.serial_connection) if init_param is not None: if self.ion_pump.read_functions.read_label() != label: msg = "Wrong pump (inverted USB ports?)" raise CustomError(msg) self.initialize_parameters(init_param)
[docs] def initialize_parameters(self, init_param: dict) -> None: """Initialize standard parameters. Parameters ---------- init_param : dict All parameters of the pump controller to be initialized. """ self.logger.debug("initialize_parameters") for param in init_param: answer = self.ion_pump.read_functions.dic_read_functions[param]() if param in IonPumpDics.dics: already_set = answer == IonPumpDics.dics[param][init_param[param]] if init_param[param] in IonPumpDics.dics[param] else False else: already_set = answer == init_param[param] if not already_set: self.logger.debug("INITIALIZING: %s", param) acknowledgement = self.ion_pump.set_functions.dic_set_functions[param](init_param[param]) msg = "INITIALIZING: " + param + " gives " + acknowledgement self.logger.debug(msg) else: self.logger.debug("INITIALIZING: %s already set", param)
[docs] def close(self) -> None: """Close the serial connection, free up the COM port.""" self.logger.debug("close") self.serial_connection.close()
########################################################################################## # Loop function to be called by a thread ##########################################################################################
[docs] def thread_loop(self, values_queue: Queue, commands_queue: Queue, update_values_time: float = 30.0, update_commands_time: float = 2.0) -> None: """Loop thread for pressure and temperature. Loops reading the pressure and current every update_time, then sends them via the thread's value Queue, along with a time stamp. Parameters ---------- values_queue : Queue To pass data collected within a thread to the app that started the thread. commands_queue : Queue To send commands while thread is running. update_values_time : float Time interval between measurements => in principle, should be longer than update_commands_time. update_commands_time : float Time interval between checks for commands. """ self.logger.debug("thread_loop") self.timings.old_time = time.time() while True: while commands_queue.qsize() > 0: command_type, command_value = commands_queue.get() self.ion_pump.set_functions.dic_set_functions[command_type](command_value) time.sleep(update_commands_time) self.timings.time = time.time() if (self.timings.time - self.timings.old_time) > update_values_time: self.timings.old_time = self.timings.time self.timings.time_stamp = dt.now(tz=UsbPumpHandler.TIME_ZONE) measured_pressure = self.ion_pump.read_functions.read_pressure() measured_current = self.ion_pump.read_functions.read_current_measured() values_queue.put([measured_pressure, measured_current, self.timings.time_stamp]) if self.thread_killed: self.thread_killed = False break
[docs] def kill_thread(self) -> None: """Kill the thread.""" self.logger.debug("kill_thread") self.thread_killed = True
########################################################################################## ########################################################################################## ########################################################################################## # Direct case ########################################################################################## if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG, format="%(name)s: %(message)s") logger_main = logging.getLogger("__main__") dic_init_param = { "Protect": True, "Fixed/step": False, "Autostart": True, "Max power": 40, "I protect": 5000, "Set point": 1e-08, "Label": "Sr Oven", "Unit pressure": 1, "Device number": 5, "V target": 5000, "HV ON/OFF": True, } PORT = "/dev/ttyUSB0" BAUD = 9600 serial_conn = UsbPumpHandler(serial_port=PORT, baudrate=BAUD, init_param=dic_init_param, label="Sr Oven") UPDATE_TIME = 15.0 # in seconds pump_readings_values_queue = Queue() pump_commands_values_queue = Queue() thread = threading.Thread(target=serial_conn.thread_loop, args=(pump_readings_values_queue, pump_commands_values_queue, UPDATE_TIME), daemon=True) thread.start() try: LOOP_BOOL = True while LOOP_BOOL: if pump_readings_values_queue.qsize() > 0: pump_measured_pressure, pump_measured_current, pump_measurement_time = pump_readings_values_queue.get() logger_main.debug("Measurement at time:") logger_main.debug("%f", pump_measurement_time) logger_main.debug("Pressure is %.2E mBar and current %.2E A.", pump_measured_pressure, pump_measured_current) time.sleep(0.01) except KeyboardInterrupt: pass finally: logger_main.debug(" ") logger_main.debug("Program interrupted by user (ctrl+C)") serial_conn.kill_thread() thread.join() if not thread.is_alive(): logger_main.debug("Thread killed") serial_conn.close()