Examples#
For a simple, single use connection#
[ ]:
import serial
from pyipcmini.functions import IonPump
# Initialize the serial connection.
PORT = "/dev/ttyUSB0" # COM1 in windows
BAUD = 9600
serial_conn = serial.Serial(port=PORT, baudrate=BAUD, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS)
serial_conn.timeout = 1
serial_conn.flushInput()
# Initialize the pump object.
pump = IonPump(serial_connection=serial_conn)
# Measure pressure and current.
measured_pressure = pump.read_functions.read_pressure()
measured_current = pump.read_functions.read_current_measured()
print(f"Pressure is {measured_pressure:.2E} mBar and current {measured_current:.2E} A.")
# Close serial connection.
serial_conn.close()
Same, using labels#
Instead of using the functions directly, you can also use the function labels as provided by the output of the functions IonPump.list_read_function_labels and IonPump.list_set_function_labels.
The list of available read functions keys:
[ ]:
[
"Mode",
"HV ON/OFF",
"Baud rate",
"Status",
"Error code",
"Model",
"Serial number",
"RS485 address",
"Serial type",
"Unit pressure",
"Autostart",
"Protect",
"Fixed/step",
"Device number",
"Max power",
"V target",
"I protect",
"Set point",
"Temperature power section",
"Temperature internal controller",
"Status set point",
"V measured",
"I measured",
"Pressure",
"Label",
]
The list of available set functions keys:
[ ]:
[
"Mode",
"HV ON/OFF",
"Baud rate",
"RS485 address",
"Serial type",
"Unit pressure",
"Autostart",
"Protect",
"Fixed/step",
"Device number",
"Max power",
"V target",
"I protect",
"Set point",
"Label",
]
Example for using those lists:
[ ]:
import serial
from pyipcmini.functions import IonPump
# Initialize the serial connection.
PORT = "/dev/ttyUSB0" # COM1 in windows
BAUD = 9600
serial_conn = serial.Serial(port=PORT, baudrate=BAUD, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS)
serial_conn.timeout = 1
serial_conn.flushInput()
# Initialize the pump object.
pump = IonPump(serial_connection=serial_conn)
# Read functions.
read_label_list = ["Pressure", "Label"]
for read_lbl in read_label_list:
answer = pump.read_functions.dic_read_functions[read_lbl]()
print("Pump answer: " + str(answer))
# Set functions.
set_lbl_dic = {"V target": 3000, "Max power": 10}
for set_lbl in set_lbl_dic:
acknowledgement = pump.set_functions.dic_set_functions[set_lbl](set_lbl_dic[set_lbl])
print("Pump answer: " + str(acknowledgement))
# Close serial connection.
serial_conn.close()
For a repeated interrogation of the pump using the handler and threading#
[ ]:
import threading
import time
from queue import Queue
from pyipcmini import UsbPumpHandler
# Dictionary used to initialize some parameters of the pump.
# Optional and dangerous if you don't know how the pump works!!!
dic_init_param = {
"Protect": True,
"Fixed/step": False,
"Autostart": False,
"Max power": 10,
"I protect": 1000,
"Set point": 1e-08,
"Label": "My pump",
"Unit pressure": 1,
"Device number": 5,
"V target": 3000,
}
# Initialize the pump connection handler.
PORT = "/dev/ttyUSB0"
BAUD = 9600
serial_conn = UsbPumpHandler(
serial_port=PORT,
baudrate=BAUD,
# init_param = dic_init_param, # Uncomment this line if you want to apply initializations parameters to the pupm.
label="My pump",
)
# Initialize the thread.
UPDATE_TIME = 15.0 # in seconds.
pump_readings_values_queue = Queue()
pump_commands_values_queue = Queue()
thread = threading.Thread(target=serial_conn.thread_loop, args=(pump_readings_values_queue, pump_commands_values_queue, UPDATE_TIME), daemon=True)
thread.start()
# Measuring loop.
try:
LOOP_BOOL = True
while LOOP_BOOL:
if pump_readings_values_queue.qsize() > 0:
pump_measured_pressure, pump_measured_current, pump_measurement_time = pump_readings_values_queue.get()
print(f"Measurement at time: {pump_measurement_time:f}")
print(f"Pressure is {measured_pressure:.2E} mBar and current {measured_current:.2E} A.")
time.sleep(0.01)
except KeyboardInterrupt:
pass
finally:
print("Program interrupted by user (ctrl+C)")
# Close thread.
serial_conn.kill_thread()
thread.join()
if not thread.is_alive():
print("Thread killed")
# Close serial connection.
serial_conn.close()