emcor-python-package/ 0000775 0005333 0001756 00000000000 12664113030 013703 5 ustar luchini cd emcor-python-package/EMCOR-Controller/ 0000775 0005333 0001756 00000000000 12664112635 016703 5 ustar luchini cd emcor-python-package/EMCOR-Controller/.project 0000664 0005333 0001756 00000000562 12511465231 020347 0 ustar luchini cd
EMCOR-Controller
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/EMCOR-Controller/.pydevproject 0000664 0005333 0001756 00000000647 12511465232 021424 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/EMCOR-Controller/README 0000664 0005333 0001756 00000000202 12515410301 017537 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
- Python-RemotePCI
emcor-python-package/EMCOR-Controller/RELEASE_NOTES 0000664 0005333 0001756 00000000547 12540076744 020667 0 ustar luchini cd #==============================================================
#
# Name: RELEASE_NOTES
#
# Rem: This files describes release notes for
# the EMCOR-Controller Python Scripts
#
# Subsystem: Magnet Conrols
#
#==============================================================
#
EMCOR-Controller-R1-0-0 Tomo Cesnik
21-Apr-2015 Initial Version
emcor-python-package/EMCOR-Controller/emcor_controller.py 0000664 0005333 0001756 00000043711 12511465232 022626 0 ustar luchini cd class ECDataControl(object):
"""Base class for all the utility classes.
Just enforces that they accept remote PCI client in the constructor.
"""
DATA_LEN = 32
TIME_CONDITION_S = 0.00000255
def __init__(self, rpci):
self._rpci = rpci
class SystemInfo(ECDataControl):
"""System information register area, containing basic information about the board."""
BASE_ADDRESS = 0x05C0
OFF_FIRMWARE_VER = 0x00
OFF_SYS_ID = 0x08
OFF_SUB_TYPE = 0x0C
OFF_FIRMWARE_DATE = 0x16
LEN_FIRMWARE_VER = 8
LEN_SYS_ID = 4
LEN_SUB_TYPE = 10
LEN_FIRMWARE_DATE = 10
def read_firmware_version(self):
address = SystemInfo.BASE_ADDRESS + SystemInfo.OFF_FIRMWARE_VER
return self._rpci.read_text(address, SystemInfo.LEN_FIRMWARE_VER).strip()
def read_system_id(self):
address = SystemInfo.BASE_ADDRESS + SystemInfo.OFF_SYS_ID
return self._rpci.read_text(address, SystemInfo.LEN_SYS_ID).strip()
def read_sub_type(self):
address = SystemInfo.BASE_ADDRESS + SystemInfo.OFF_SUB_TYPE
return self._rpci.read_text(address, SystemInfo.LEN_SUB_TYPE).strip()
def read_firmware_date(self):
address = SystemInfo.BASE_ADDRESS + SystemInfo.OFF_FIRMWARE_DATE
return self._rpci.read_text(address, SystemInfo.LEN_FIRMWARE_DATE).strip()
class XilinxSystemMonitor(ECDataControl):
"""Xilinx system monitor register area, containing voltages and temperatures.
Temperatures and voltages are properly converted before returned to the user.
"""
BASE_ADDRESS = 0x0580
OFF_TEMP_CUR = 0x00
OFF_VINT_CUR = 0x04
OFF_VAUX_CUR = 0x08
OFF_TEMP_MAX = 0x0C
OFF_VINT_MAX = 0x10
OFF_VAUX_MAX = 0x14
OFF_TEMP_MIN = 0x18
OFF_VINT_MIN = 0x1C
OFF_VAUX_MIN = 0x20
_C_TEMP_MULT = float(1) / 64 * 503.975 / 1024
_C_TEMP_ADD = -273.15
_C_VOLT_MULT = float(1) / 64 / 1024 * 3
def _read_temp(self, offset):
"""Read temperature in celsius from the offset. Counts are always 4 bytes integer."""
address = XilinxSystemMonitor.BASE_ADDRESS + offset
num = self._rpci.read(address, XilinxSystemMonitor.DATA_LEN)
return (num * XilinxSystemMonitor._C_TEMP_MULT) + XilinxSystemMonitor._C_TEMP_ADD
def _read_volts(self, offset):
"""Read voltage from the offset. Counts are always 4 bytes integer."""
address = XilinxSystemMonitor.BASE_ADDRESS + offset
num = self._rpci.read(address, XilinxSystemMonitor.DATA_LEN)
return num * XilinxSystemMonitor._C_VOLT_MULT
def read_temp_cur(self):
return self._read_temp(XilinxSystemMonitor.OFF_TEMP_CUR)
def read_temp_max(self):
return self._read_temp(XilinxSystemMonitor.OFF_TEMP_MAX)
def read_temp_min(self):
return self._read_temp(XilinxSystemMonitor.OFF_TEMP_MIN)
def read_vint_cur(self):
return self._read_volts(XilinxSystemMonitor.OFF_VINT_CUR)
def read_vint_max(self):
return self._read_volts(XilinxSystemMonitor.OFF_VINT_MAX)
def read_vint_min(self):
return self._read_volts(XilinxSystemMonitor.OFF_VINT_MIN)
def read_vaux_cur(self):
return self._read_volts(XilinxSystemMonitor.OFF_VAUX_CUR)
def read_vaux_max(self):
return self._read_volts(XilinxSystemMonitor.OFF_VAUX_MAX)
def read_vaux_min(self):
return self._read_volts(XilinxSystemMonitor.OFF_VAUX_MIN)
class VoltageMonitor(ECDataControl):
"""Voltage monitor register area, containing board voltages and temperature.
Temperatures and voltages are properly converted before returned to the user.
"""
BASE_ADDRESS = 0x0540
OFF_V15 = 0x00
OFF_V12 = 0x04
OFF_V5 = 0x08
OFF_V33 = 0x0C
OFF_V33IO = 0x10
OFF_VM15 = 0x14
OFF_C15 = 0x18
OFF_C12 = 0x1C
OFF_C5 = 0x20
OFF_C33 = 0x24
OFF_C33IO = 0x28
OFF_C25 = 0x2C
OFF_C1 = 0x30
OFF_CM15 = 0x34
OFF_TEMP_BOARD = 0x38
_C305 = 305.18 * 1e-6
_C19 = 19.0735 * 1e-6
_C19_150 = _C19 / (150 * 1e-3)
_C19_60 = _C19 / (60 * 1e-3)
_C19_30 = _C19 / (30 * 1e-3)
def read_voltage_15(self):
"""Reads +15.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_V15
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * 6.0 * VoltageMonitor._C305
def read_voltage_12(self):
"""Reads +12.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_V12
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * 4.83 * VoltageMonitor._C305
def read_voltage_5(self):
"""Reads +5.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_V5
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * 2.0 * VoltageMonitor._C305
def read_voltage_33(self):
"""Reads +3.3V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_V33
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C305
def read_voltage_33io(self):
"""Reads +3.3V IO."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_V33IO
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C305
def read_voltage_m15(self):
"""Reads -15.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_VM15
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * -16.0 * VoltageMonitor._C305
def read_current_15(self):
"""Reads current at +15.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C15
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_60
def read_current_12(self):
"""Reads current at +12.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C12
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_150
def read_current_5(self):
"""Reads current at +5.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C5
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_30
def read_current_33(self):
"""Reads current at +3.3V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C33
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_150
def read_current_33io(self):
"""Reads current at +3.3V IO."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C33IO
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_150
def read_current_25(self):
"""Reads current at +2.5V. See Xilinx AUX voltage."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C25
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_150
def read_current_1(self):
"""Reads current at +1.0V. See Xilinx INT voltage."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_C1
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_150
def read_current_m15(self):
"""Reads current at +15.0V."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_CM15
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * VoltageMonitor._C19_60
def read_temp(self):
"""Reads board temperature."""
address = VoltageMonitor.BASE_ADDRESS + VoltageMonitor.OFF_TEMP_BOARD
num = self._rpci.read(address, VoltageMonitor.DATA_LEN)
return num * 0.0625
class ADCControl(ECDataControl):
"""ADC control register area, enabling reading of the status."""
BASE_ADDRESS = 0x0440
OFF_STATUS = 0x00
OFF_SET = 0x04
OFF_RESET = 0x08
MASK_BULK_ADC_TIMEOUT = 1 << 3
MASK_ADC_TIMEOUT = 1 << 2
def read_status(self):
address = ADCControl.BASE_ADDRESS + ADCControl.OFF_STATUS
return self._rpci.read(address, ADCControl.DATA_LEN)
def read_bulk_adc_timeout(self):
return ((self.read_status() & ADCControl.MASK_BULK_ADC_TIMEOUT) > 0)
def read_adc_timeout(self):
return ((self.read_status() & ADCControl.MASK_ADC_TIMEOUT) > 0)
class FaultControl(ECDataControl):
"""Faults register area, enabling reading of board faults.
Also provides functions for writing external fault bits.
"""
BASE_ADDRESS = 0x0480
OFF_FAULTS_STATUS = 0x00
OFF_FAULTS_LATCH_STATUS = 0x04
OFF_RESET_LATCH = 0x08
OFF_CTRL_STATUS = 0x0C
OFF_CTRL_SET = 0x10
OFF_CTRL_RESET = 0x14
MASK_INHIBIT_BIT = 1 << 1
MASK_RESET_BIT = 1 << 0
def _get_offset(self, value):
offset = FaultControl.OFF_CTRL_RESET
if value == True:
offset = FaultControl.OFF_CTRL_SET
return offset
def read_status(self):
"""Returns faults status as a 16 bit integer, where each bit is one fault."""
address = FaultControl.BASE_ADDRESS + FaultControl.OFF_FAULTS_STATUS
return self._rpci.read(address, FaultControl.DATA_LEN) & 0xFFFF
def reset_faults_all(self):
"""Resets faults latch status."""
address = FaultControl.BASE_ADDRESS + FaultControl.OFF_RESET_LATCH
self._rpci.write(address, FaultControl.DATA_LEN, 0xFFFF)
def write_inhibit_bit(self, value):
address = FaultControl.BASE_ADDRESS + self._get_offset(value)
self._rpci.write(address, FaultControl.DATA_LEN, FaultControl.MASK_INHIBIT_BIT)
def write_reset_bit(self, value):
address = FaultControl.BASE_ADDRESS + self._get_offset(value)
self._rpci.write(address, FaultControl.DATA_LEN, FaultControl.MASK_RESET_BIT)
class MagnetFaultControl(ECDataControl):
"""Magnet faults register area."""
BASE_ADDRESS = 0x0500
OFF_MAG_FAULTS_STATUS = 0x0C
OFF_MAG_FAULTS_LATCH_STATUS = 0x10
OFF_RESET_LATCH = 0x14
def read_status(self):
"""Returns magnet faults status as a 8 bit integer, where each bit is one fault."""
address = MagnetFaultControl.BASE_ADDRESS + MagnetFaultControl.OFF_MAG_FAULTS_STATUS
return self._rpci.read(address, MagnetFaultControl.DATA_LEN) & 0xFF
def reset_mag_faults_all(self):
"""Resets magnet faults latch status."""
address = MagnetFaultControl.BASE_ADDRESS + MagnetFaultControl.OFF_RESET_LATCH
self._rpci.write(address, FaultControl.DATA_LEN, 0xFF)
class InterlockControl(ECDataControl):
"""Interlocks register area, enabling setting of external interlocks."""
BASE_ADDRESS = 0x0500
OFF_INT_STATUS = 0x00
OFF_INT_SET = 0x04
OFF_INT_RESET = 0x08
NUM_INTERLOCKS = 4
def _get_offset(self, value):
offset = InterlockControl.OFF_INT_RESET
if value == True:
offset = InterlockControl.OFF_INT_SET
return offset
def write_interlock(self, value, bit):
if bit < 0:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be greater or equal to 0!")
if bit >= InterlockControl.NUM_INTERLOCKS:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be less than " + str(InterlockControl.NUM_INTERLOCKS) + "!")
address = InterlockControl.BASE_ADDRESS + self._get_offset(value)
self._rpci.write(address, InterlockControl.DATA_LEN, (1 << bit) & 0x0F)
def write_all_interlocks(self, value):
address = InterlockControl.BASE_ADDRESS + self._get_offset(value)
self._rpci.write(address, InterlockControl.DATA_LEN, 0x0F)
class ChannelsControl(ECDataControl):
"""AIO channels register area. Enables reading of monitors and feedbacks.
Also provides functions for writing the setpoints and ramping settings.
"""
BASE_ADDRESS = 0x0000
CHANNEL_OFFSET = 0x40
NUM_CHANNELS = 16
OFF_SP_REQ = 0x00
OFF_CUR_SP = 0x04
OFF_MON_READ = 0x08
OFF_MON_AVG_READ = 0x0C
OFF_MON_RIPPLE = 0x10
OFF_FDBCK_READ = 0x14
OFF_FDBCK_AVG_READ = 0x18
OFF_FDBCK_RIPPLE = 0x1C
OFF_FULLSCALE_DAC = 0x20
OFF_FULLSCALE_ADC_MON = 0x24
OFF_RAMP_RATE = 0x28
OFF_SAMPLES_AVG = 0x2C
OFF_CONF_STATUS = 0x30
OFF_CONF_SET = 0x34
OFF_CONF_RESET = 0x38
OFF_FULLSCALE_ADC_FDBCK = 0x3C
MASK_RAMPING_STATUS = 1 << 5
MASK_RAMP_MODE = 1 << 3
MASK_CONF = 1 << 0
ADC_V_MIN = -10.0
ADC_V_MAX = +10.0
DAC_V_MIN = -10.0
DAC_V_MAX = +10.0
def _get_channel_address(self, channel):
if channel < 0:
raise ValueError("Invalid channel '" + str(channel) + "' specified! Value should be greater or equal to 0!")
if channel >= ChannelsControl.NUM_CHANNELS:
raise ValueError("Invalid channel '" + str(channel) + "' specified! Value should be less than " + str(ChannelsControl.NUM_CHANNELS) + "!")
return ChannelsControl.BASE_ADDRESS + channel * ChannelsControl.CHANNEL_OFFSET
def _get_conf_offset(self, value):
offset = ChannelsControl.OFF_CONF_RESET
if value == True:
offset = ChannelsControl.OFF_CONF_SET
return offset
def read_monitor(self, channel):
"""Reads monitor value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_MON_READ
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_monitor_avg(self, channel):
"""Reads monitor averaged value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_MON_AVG_READ
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_monitor_ripple(self, channel):
"""Reads monitor ripple value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_MON_RIPPLE
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_feedback(self, channel):
"""Reads feedback value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_FDBCK_READ
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_feedback_avg(self, channel):
"""Reads feedback averaged value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_FDBCK_AVG_READ
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_feedback_ripple(self, channel):
"""Reads feedback ripple value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_FDBCK_RIPPLE
return self._rpci.read(address, ChannelsControl.DATA_LEN)
def write_set_point(self, channel, value):
"""writes set point value in uA."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_SP_REQ
self._rpci.write(address, ChannelsControl.DATA_LEN, value)
def write_ramp_rate(self, channel, ramp_rate):
"""Writes ramp rate in uA/sec."""
address = self._get_channel_address(channel) + ChannelsControl.OFF_RAMP_RATE
self._rpci.write(address, ChannelsControl.DATA_LEN, ramp_rate)
def write_ramp_mode(self, channel, value):
"""Value:
True -> Immediate, no ramping
False -> Ramping
"""
address = self._get_channel_address(channel) + self._get_conf_offset(value)
self._rpci.write(address, ChannelsControl.DATA_LEN, ChannelsControl.MASK_RAMP_MODE)
def write_configure_bit(self, channel, value):
"""Value:
True -> The channel will respond to setpoint commands.
False -> The channel will not respond to setpoint commands.
"""
address = self._get_channel_address(channel) + self._get_conf_offset(value)
self._rpci.write(address, ChannelsControl.DATA_LEN, ChannelsControl.MASK_CONF)
def read_configure_bit(self, channel):
"""Returns:
True -> The channel is configured.
False -> The channel is not configured.
"""
address = self._get_channel_address(channel) + ChannelsControl.OFF_CONF_STATUS
return (self._rpci.read(address, ChannelsControl.DATA_LEN) & ChannelsControl.MASK_CONF) != 0
def write_fullscale_dac(self, channel, value):
address = self._get_channel_address(channel) + ChannelsControl.OFF_FULLSCALE_DAC
self._rpci.write(address, ChannelsControl.DATA_LEN, value)
def write_fullscale_adc_monitor(self, channel, value):
address = self._get_channel_address(channel) + ChannelsControl.OFF_FULLSCALE_ADC_MON
self._rpci.write(address, ChannelsControl.DATA_LEN, value)
def write_fullscale_adc_feedback(self, channel, value):
address = self._get_channel_address(channel) + ChannelsControl.OFF_FULLSCALE_ADC_FDBCK
self._rpci.write(address, ChannelsControl.DATA_LEN, value)
def read_samples_avg(self, channel):
"""Returns samples per average for monitor and feedback channels."""
# 0x5 -> 32
# 0x4 -> 16
# 0x3 -> 8
# 0x2 -> 4
# 0x1 -> 2
# 0x0 -> 1
address = self._get_channel_address(channel) + ChannelsControl.OFF_SAMPLES_AVG
return 1 << self._rpci.read(address, ChannelsControl.DATA_LEN)
def read_ramping_status(self, channel):
"""Returns:
True -> Ramping in progress
False -> Ramping done
"""
address = self._get_channel_address(channel) + ChannelsControl.OFF_CONF_STATUS
return (self._rpci.read(address, ChannelsControl.DATA_LEN) & ChannelsControl.MASK_RAMPING_STATUS) != 0
def remap_value(a, min_a, max_a, min_b, max_b):
"""Maps value a defined on interval min_a..max_a to
value b, defined on interval min_b..max_b and returns it.
"""
fact = (float(a) - min_a) / (max_a - min_a)
b = fact * (max_b - min_b) + min_b
return b
emcor-python-package/EMCOR-Controller/setup.py 0000664 0005333 0001756 00000000607 12515410301 020402 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'EMCOR-Controller',
'version': '2.0.0',
'description': 'EMCOR Controller API',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': ['emcor_controller'],
'scripts': []
}
setup(**config)
emcor-python-package/EMCOR-Controller/.cram/ 0000775 0005333 0001756 00000000000 12664112106 017674 5 ustar luchini cd emcor-python-package/EMCOR-Controller/.cram/CVS/ 0000775 0005333 0001756 00000000000 12664112107 020330 5 ustar luchini cd emcor-python-package/EMCOR-Controller/.cram/CVS/Root 0000664 0005333 0001756 00000000025 12664112105 021171 0 ustar luchini cd /afs/slac/g/lcls/cvs
emcor-python-package/EMCOR-Controller/.cram/CVS/Repository 0000664 0005333 0001756 00000000051 12664112105 022424 0 ustar luchini cd tools/emcorTester/EMCOR-Controller/.cram
emcor-python-package/EMCOR-Controller/.cram/CVS/Entries 0000664 0005333 0001756 00000000056 12664112106 021664 0 ustar luchini cd /packageinfo/1.1/Tue Jun 16 19:47:42 2015//
D
emcor-python-package/EMCOR-Controller/.cram/packageinfo 0000664 0005333 0001756 00000000055 12540076536 022077 0 ustar luchini cd {"type": "Tools", "name": "EMCOR-Controller"} emcor-python-package/EMCOR-Tester/ 0000775 0005333 0001756 00000000000 12664112655 016030 5 ustar luchini cd emcor-python-package/EMCOR-Tester/.project 0000664 0005333 0001756 00000000556 12464137637 017512 0 ustar luchini cd
EMCOR-Tester
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/EMCOR-Tester/.pydevproject 0000664 0005333 0001756 00000000647 12464137637 020563 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/EMCOR-Tester/README 0000664 0005333 0001756 00000000157 12515410301 016673 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
emcor-python-package/EMCOR-Tester/RELEASE_NOTES 0000664 0005333 0001756 00000000547 12540077531 020005 0 ustar luchini cd #==============================================================
#
# Name: RELEASE_NOTES
#
# Rem: This files describes release notes for
# the EMCOR-Tester Python Scripts
#
# Subsystem: Magnet Controls
#
#==============================================================
#
EMCOR-Tester-R1-0-0 Tomo Cesnik
21-Apr-2015 Initial Version
emcor-python-package/EMCOR-Tester/emcor_tester.py 0000664 0005333 0001756 00000017771 12453067610 021106 0 ustar luchini cd import socket
import struct
import threading
class EmcorTester(object):
"""Core class for communicating with the EMCOR tester board.
Abstracts socket usage away from the user.
"""
_DEFAULT_PORT = 56789
_RECV_BUFFER_SIZE = 128
_PROTOCOL_VERSION = 0x01
_PROTOCOL_VERSION_MASK = 0x3F
_ENDIAN_BIG = 0
_ENDIAN_LITTLE = 1
_METHOD_READ = 0
_METHOD_WRITE = 1
_HEADER_SIZE = 12
def __init__(self, host, port=_DEFAULT_PORT, timeout=2):
self._message_id = 0
self._host = host
self._port = port
self._socklock = threading.Lock()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.settimeout(timeout)
def __del__(self):
self._socket.close()
def read(self, address, count):
"""Returns data if it was read successfully, else throws an exception."""
return self._process(address, count, None)
def write(self, address, data):
"""Writes data or throws an exception."""
self._process(address, len(data), data)
def _process(self, address, count, data):
"""Encodes protocol message and sends it to the server.
Waits for the response, decodes and returns it.
"""
with self._socklock:
msg_send = self._encode_msg(address, count, data)
msg_recv = self._communicate(msg_send)
return self._decode_msg(msg_recv, address)
def _encode_msg(self, address, count, data):
"""Checks input parameters and throws exception if they are invalid.
Encodes and returns protocol message.
"""
if address < 0:
raise ValueError("Invalid address '" + str(address) + "' specified! Value should be greater or equal to 0!")
if count < 0:
raise ValueError("Invalid count '" + str(count) + "' specified! Value should be greater or equal to 0!")
endian = EmcorTester._ENDIAN_BIG
method = EmcorTester._METHOD_READ
if data:
method = EmcorTester._METHOD_WRITE
self._message_id = self._message_id + 1
# message_id can only be 1 byte
if self._message_id > 0xFF:
self._message_id = 0
msg = bytearray()
msg.append((method << 7) | (endian << 6) | (EmcorTester._PROTOCOL_VERSION & EmcorTester._PROTOCOL_VERSION_MASK))
msg.append(self._message_id & 0xFF)
msg.extend(struct.pack("!H", 0)) # 2 bytes padding
msg.extend(struct.pack("!I", address))
msg.extend(struct.pack("!H", count))
msg.extend(struct.pack("!H", 0)) # 2 bytes padding
if method == EmcorTester._METHOD_WRITE:
for d in data:
msg.extend(struct.pack("!H", d))
return msg
def _decode_msg(self, message, address):
"""Decodes message and returns the data as an array of words.
Throws ProtocolException in case that decoding fails.
"""
if len(message) < EmcorTester._HEADER_SIZE:
raise ETProtocolException("header length", len(message), EmcorTester._HEADER_SIZE)
msg_protocol = struct.unpack("!B", message[0])[0]
msg_id = struct.unpack("!B", message[1])[0]
# 2 bytes padding
msg_address = struct.unpack("!I", message[4:8])[0]
msg_count = struct.unpack("!H", message[8:10])[0]
# 2 bytes padding
# no checks for other bits, because they are always the same in the examples
protocol_version = msg_protocol & EmcorTester._PROTOCOL_VERSION_MASK
if protocol_version != EmcorTester._PROTOCOL_VERSION:
raise ETProtocolException("protocol version", protocol_version, EmcorTester._PROTOCOL_VERSION)
if msg_id != self._message_id:
raise ETProtocolException("message id", msg_id, self._message_id)
if msg_address != address:
raise ETProtocolException("status", msg_address, address)
msg_words = []
if len(message) > EmcorTester._HEADER_SIZE:
msg_vals = message[EmcorTester._HEADER_SIZE:]
if len(msg_vals) != msg_count * 2:
raise ETProtocolException("message values length", len(msg_vals), msg_count * 2)
# decoding by words - 2 bytes
for i in range(msg_count):
byte_id = i * 2
msg_words.extend(struct.unpack("!H", msg_vals[byte_id:byte_id + 2]))
return msg_words
def _communicate(self, data):
"""Sends data over the socket and receives and returns response."""
self._socket.sendto(data, (self._host, self._port))
recv_data, _ = self._socket.recvfrom(EmcorTester._RECV_BUFFER_SIZE)
return recv_data
class ETProtocolException(Exception):
"""Exception that is thrown when there is a problem with the communication protocol."""
def __init__(self, var_name, var_value, var_expected):
self.var_name = var_name
self.var_value = var_value
self.var_expected = var_expected
message = "Value for '" + var_name + "' is '" + str(var_value) + "', but should be '" + str(var_expected) + "'!"
super(ETProtocolException, self).__init__(message)
class ETDataControl(object):
"""Base class for all the utility classes.
Just enforces that they accept tester in the constructor.
"""
def __init__(self, tester):
self._tester = tester
class FaultControl(ETDataControl):
"""Faults control register area, enables triggering the faults."""
ADDR_STATUS = 0x0000
ADDR_SET = 0x0001
ADDR_RESET = 0x0002
NUM_FAULTS = 16
def _get_address(self, value):
address = FaultControl.ADDR_RESET
if value == True:
address = FaultControl.ADDR_SET
return address
def write_fault(self, value, bit):
if bit < 0:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be greater or equal to 0!")
if bit >= FaultControl.NUM_FAULTS:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be less than " + str(FaultControl.NUM_FAULTS) + "!")
address = self._get_address(value)
self._tester.write(address, [(1 << bit) & 0xFFFF])
def write_all_faults(self, value):
address = self._get_address(value)
self._tester.write(address, [0xFFFF])
class MagnetFaultControl(ETDataControl):
"""Magnet faults control register area, enables triggering the faults."""
ADDR_STATUS = 0x0003
ADDR_SET = 0x0004
ADDR_RESET = 0x0005
NUM_MAG_FAULTS = 8
def _get_address(self, value):
address = MagnetFaultControl.ADDR_RESET
if value == True:
address = MagnetFaultControl.ADDR_SET
return address
def write_mag_fault(self, value, bit):
if bit < 0:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be greater or equal to 0!")
if bit >= MagnetFaultControl.NUM_MAG_FAULTS:
raise ValueError("Invalid bit '" + str(bit) + "' specified! Value should be less than " + str(MagnetFaultControl.NUM_MAG_FAULTS) + "!")
address = self._get_address(value)
self._tester.write(address, [(1 << bit) & 0xFFFF])
def write_all_mag_faults(self, value):
address = self._get_address(value)
self._tester.write(address, [0x00FF])
class StatusRegControl(ETDataControl):
"""Status register area, enables reading the status of external interlocks."""
ADDRESS = 0x0006
MASK_INHIBIT_BIT = 1 << 4
MASK_RESET_BIT = 1 << 5
def read_interlocks(self):
"""Returns interlocks status as a 4 bit integer, where each bit is one interlock."""
return self._tester.read(StatusRegControl.ADDRESS, 1)[0] & 0x0F
def read_inhibit_bit(self):
return self._tester.read(StatusRegControl.ADDRESS, 1)[0] & StatusRegControl.MASK_INHIBIT_BIT
def read_reset_bit(self):
return self._tester.read(StatusRegControl.ADDRESS, 1)[0] & StatusRegControl.MASK_RESET_BIT
emcor-python-package/EMCOR-Tester/et_example.py 0000664 0005333 0001756 00000001424 12440516676 020531 0 ustar luchini cd #!/usr/bin/env python
from emcor_tester import EmcorTester
IP = "127.0.0.1"
#IP = "134.79.218.58"
def print_bytes(words):
for w in words:
byte = (w >> 8) & 0xFF
print " " + hex(byte) + " - " + chr(byte)
byte = w & 0xFF
print " " + hex(byte) + " - " + chr(byte)
et = EmcorTester(IP)
words = et.read(0x0010, 1)
print "Firmware version:"
print_bytes(words)
words = et.read(0x0011, 4)
print "System ID:"
print_bytes(words)
words = et.read(0x0015, 4)
print "Sub Type:"
print_bytes(words)
words = et.read(0x0019, 4)
print "Firmware date:"
print_bytes(words)
et.write(0x0001, [0xFFFF])
words = et.read(0x0000, 1)
print "Faults set:"
print_bytes(words)
et.write(0x0002, [0xFFFF])
words = et.read(0x0000, 1)
print "Faults cleared:"
print_bytes(words)
emcor-python-package/EMCOR-Tester/et_simulator.py 0000664 0005333 0001756 00000004070 12446334506 021111 0 ustar luchini cd #!/usr/bin/env python
import socket
import struct
UDP_IP = "127.0.0.1"
UDP_PORT = 56789
BUFFER_SIZE = 128
METHOD_READ = 0
METHOD_WRITE = 1
PROTOCOL_VERSION_MASK = 0x3F
HEADER_SIZE = 12
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(BUFFER_SIZE)
print "Connection address:", addr
if not data:
break
print "Received data:", ' '.join(x.encode('hex') for x in data)
if len(data) < HEADER_SIZE:
print "MALFORMED DATA"
break
protocol = int(data[0].encode('hex'), 16)
protocol_version = protocol & PROTOCOL_VERSION_MASK
method = (protocol >> 7) & 0x01
endian = (protocol >> 6) & 0x01
message_id = int(data[1].encode('hex'), 16)
address = struct.unpack("!I", data[4:8])[0]
count = struct.unpack("!H", data[8:10])[0]
print "Protocol version:", str(protocol_version)
print "Method:", str(method)
print "Endian:", str(endian)
print "Message ID:", str(message_id)
print "Address:", str(address)
print "Count:", str(count)
rem_data = []
if len(data) > HEADER_SIZE:
rem_data = data[HEADER_SIZE:]
if len(rem_data) != count * 2:
print "rem_data != count"
break
# decoding by words - 2 bytes
words = []
for i in range(count):
byte_id = i * 2
words.extend(struct.unpack("!H", rem_data[byte_id:byte_id + 2]))
print "Values data:", ' '.join(hex(x) for x in words)
if method == METHOD_WRITE:
count = 0
msg = bytearray()
msg.append(protocol)
msg.append(message_id)
msg.extend(struct.pack("!H", 0)) # 2 bytes padding
msg.extend(struct.pack("!I", address))
msg.extend(struct.pack("!H", count))
msg.extend(struct.pack("!H", 0)) # 2 bytes padding
if method == METHOD_READ:
for i in range(count):
msg.extend(struct.pack("!H", i))
print "msg:", ' '.join(hex(x) for x in msg)
sock.sendto(msg, addr)
print "--------------------------------------------------"
emcor-python-package/EMCOR-Tester/setup.py 0000664 0005333 0001756 00000000614 12515410301 017523 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'EMCOR-Tester',
'version': '1.2.0',
'description': 'EMCOR Tester API',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': ['emcor_tester'],
'scripts': ['et_simulator.py']
}
setup(**config)
emcor-python-package/EMCOR-Tester/.cram/ 0000775 0005333 0001756 00000000000 12664112116 017020 5 ustar luchini cd emcor-python-package/EMCOR-Tester/.cram/CVS/ 0000775 0005333 0001756 00000000000 12664112117 017454 5 ustar luchini cd emcor-python-package/EMCOR-Tester/.cram/CVS/Root 0000664 0005333 0001756 00000000025 12664112115 020315 0 ustar luchini cd /afs/slac/g/lcls/cvs
emcor-python-package/EMCOR-Tester/.cram/CVS/Repository 0000664 0005333 0001756 00000000045 12664112116 021554 0 ustar luchini cd tools/emcorTester/EMCOR-Tester/.cram
emcor-python-package/EMCOR-Tester/.cram/CVS/Entries 0000664 0005333 0001756 00000000056 12664112117 021011 0 ustar luchini cd /packageinfo/1.1/Tue Jun 16 19:56:29 2015//
D
emcor-python-package/EMCOR-Tester/.cram/packageinfo 0000664 0005333 0001756 00000000051 12540077555 021220 0 ustar luchini cd {"type": "Tools", "name": "EMCOR-Tester"} emcor-python-package/EMCOR-Tests/ 0000775 0005333 0001756 00000000000 12664112667 015667 5 ustar luchini cd emcor-python-package/EMCOR-Tests/.project 0000664 0005333 0001756 00000000735 12464137637 017345 0 ustar luchini cd
EMCOR-Tests
EMCOR-Controller
EMCOR-Tester
Python-UnitTestGUI
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/EMCOR-Tests/.pydevproject 0000664 0005333 0001756 00000000647 12464137637 020417 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/EMCOR-Tests/README 0000664 0005333 0001756 00000000254 12515410301 016525 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
- PyQt4
- Python-RemotePCI
- EMCOR-Controller
- EMCOR-Tester
emcor-python-package/EMCOR-Tests/RELEASE_NOTES 0000664 0005333 0001756 00000000544 12540101746 017632 0 ustar luchini cd #==============================================================
#
# Name: RELEASE_NOTES
#
# Rem: This files describes release notes for
# the EMCOR-Tests Python Scripts
#
# Subsystem: Magnet Controls
#
#==============================================================
#
EMCOR-Tests-R1-0-0 Tomo Cesnik
21-Apr-2015 Initial Version
emcor-python-package/EMCOR-Tests/emcor_channels_gui.py 0000664 0005333 0001756 00000065121 12511465330 022057 0 ustar luchini cd #!/usr/bin/env python
import sys
import time
from collections import OrderedDict
from PyQt4 import QtGui, QtCore
import emcor_controller as ec
from pyrpci import RemotePCI
MULT_MICRO = 1000000.0
F_VOLTAGE = "{0:.3f}"
F_CURRENT = "{0:.3f}"
F_TEMP = "{0:.2f}"
F_CHANNEL = "{0:.6f}"
class MCORType(object):
def __init__(self, fs_mon, fs_fdbck, fs_dac):
"""Input values should be in A, which are here transformed to uA."""
self.fullscale_mon = fs_mon * MULT_MICRO
self.fullscale_fdbck = fs_fdbck * MULT_MICRO
self.fullscale_dac = fs_dac * MULT_MICRO
MCOR_TYPES = OrderedDict()
MCOR_TYPES["1"] = MCORType(2, 1, 1.02564)
MCOR_TYPES["2"] = MCORType(2, 2, 2.05128)
MCOR_TYPES["6"] = MCORType(12, 6, 6.15384)
MCOR_TYPES["12"] = MCORType(12, 12, 12.30768)
MCOR_TYPES["30"] = MCORType(30, 30, 30.7692)
class VoltageSpinBox(QtGui.QDoubleSpinBox):
def __init__(self):
super(VoltageSpinBox, self).__init__()
self.setDecimals(6)
self.setSingleStep(0.1)
self.setMinimum(ec.ChannelsControl.DAC_V_MIN)
self.setMaximum(ec.ChannelsControl.DAC_V_MAX)
class StyledLabel(QtGui.QLabel):
def __init__(self):
super(StyledLabel, self).__init__()
self.setFrameShape(QtGui.QFrame.StyledPanel)
class ChannelsControlWidget(QtGui.QGroupBox):
"""Widget for showing channel readbacks and setting the values on all the channels."""
def __init__(self):
super(ChannelsControlWidget, self).__init__()
self.setTitle("Channels control")
self.setMinimumWidth(700)
self._ch_sp_widgets = []
self._ch_mon_widgets = []
self._ch_fdbck_widgets = []
self._ch_mon_r_widgets = []
self._ch_fdbck_r_widgets = []
lbl_all_ch = QtGui.QLabel("All channels (V):")
lbl_all_ch_policy = lbl_all_ch.sizePolicy()
lbl_all_ch_policy.setHorizontalPolicy(QtGui.QSizePolicy.Fixed)
lbl_all_ch.setSizePolicy(lbl_all_ch_policy)
self._spin_box = VoltageSpinBox()
self._btn_set_all = QtGui.QPushButton("Set All")
self._btn_set_all.setEnabled(False)
self._btn_set_all.clicked.connect(self._change_sp_all)
self._btn_set_min = QtGui.QPushButton("Set All to Min")
self._btn_set_min.setEnabled(False)
self._btn_set_min.clicked.connect(self._change_sp_min)
self._btn_set_zero = QtGui.QPushButton("Set All to Zero")
self._btn_set_zero.setEnabled(False)
self._btn_set_zero.clicked.connect(self._change_sp_zero)
self._btn_set_max = QtGui.QPushButton("Set All to Max")
self._btn_set_max.setEnabled(False)
self._btn_set_max.clicked.connect(self._change_sp_max)
w_all_channels = QtGui.QWidget()
layout_all = QtGui.QHBoxLayout()
layout_all.addWidget(lbl_all_ch)
layout_all.addWidget(self._spin_box)
layout_all.addWidget(self._btn_set_all)
layout_all.addWidget(self._btn_set_min)
layout_all.addWidget(self._btn_set_zero)
layout_all.addWidget(self._btn_set_max)
w_all_channels.setLayout(layout_all)
w_channels = QtGui.QWidget()
layout_ch = QtGui.QGridLayout()
layout_ch.setColumnStretch(1, 1)
layout_ch.setColumnStretch(2, 1)
layout_ch.setColumnStretch(3, 1)
layout_ch.setColumnStretch(4, 1)
layout_ch.setColumnStretch(5, 1)
layout_ch.addWidget(QtGui.QLabel("Instantaneous (V)"), 0, 2, 1, 2, QtCore.Qt.AlignCenter)
layout_ch.addWidget(QtGui.QLabel("Ripple (V)"), 0, 4, 1, 2, QtCore.Qt.AlignCenter)
layout_ch.addWidget(QtGui.QLabel("Monitor"), 1, 2, QtCore.Qt.AlignCenter)
layout_ch.addWidget(QtGui.QLabel("Feedback"), 1, 3, QtCore.Qt.AlignCenter)
layout_ch.addWidget(QtGui.QLabel("Monitor"), 1, 4, QtCore.Qt.AlignCenter)
layout_ch.addWidget(QtGui.QLabel("Feedback"), 1, 5, QtCore.Qt.AlignCenter)
head = 2
for i in range(ec.ChannelsControl.NUM_CHANNELS):
lbl_ch = QtGui.QLabel("Channel " + str(i) + " (V):")
layout_ch.addWidget(lbl_ch, i + head, 0, QtCore.Qt.AlignRight)
w_ch = ChannelSPWidget(i)
self._ch_sp_widgets.append(w_ch)
layout_ch.addWidget(w_ch, i + head, 1)
lbl_mon = StyledLabel()
lbl_mon.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignCenter)
self._ch_mon_widgets.append(lbl_mon)
layout_ch.addWidget(lbl_mon, i + head, 2)
lbl_fdbck = StyledLabel()
lbl_fdbck.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignCenter)
self._ch_fdbck_widgets.append(lbl_fdbck)
layout_ch.addWidget(lbl_fdbck, i + head, 3)
lbl_mon_ripple = StyledLabel()
lbl_mon_ripple.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignCenter)
self._ch_mon_r_widgets.append(lbl_mon_ripple)
layout_ch.addWidget(lbl_mon_ripple, i + head, 4)
lbl_fdbck_ripple = StyledLabel()
lbl_fdbck_ripple.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignCenter)
self._ch_fdbck_r_widgets.append(lbl_fdbck_ripple)
layout_ch.addWidget(lbl_fdbck_ripple, i + head, 5)
w_channels.setLayout(layout_ch)
size_policy = w_channels.sizePolicy()
size_policy.setVerticalPolicy(QtGui.QSizePolicy.MinimumExpanding)
w_channels.setSizePolicy(size_policy)
layout = QtGui.QVBoxLayout()
layout.addWidget(w_all_channels)
layout.addWidget(w_channels)
self.setLayout(layout)
def _change_sp_all(self):
for w_ch in self._ch_sp_widgets:
w_ch.change_set_point(self._spin_box.value())
def _change_sp_min(self):
self._spin_box.setValue(self._spin_box.minimum())
self._change_sp_all()
def _change_sp_zero(self):
self._spin_box.setValue(0)
self._change_sp_all()
def _change_sp_max(self):
self._spin_box.setValue(self._spin_box.maximum())
self._change_sp_all()
def update(self, ch_control, mcor_type):
for w_ch in self._ch_sp_widgets:
w_ch.update(ch_control, mcor_type)
if ch_control is not None:
self._btn_set_all.setEnabled(True)
self._btn_set_min.setEnabled(True)
self._btn_set_zero.setEnabled(True)
self._btn_set_max.setEnabled(True)
for i in range(ec.ChannelsControl.NUM_CHANNELS):
ua = ch_control.read_monitor(i)
v = ec.remap_value(ua, -mcor_type.fullscale_mon, mcor_type.fullscale_mon, ec.ChannelsControl.ADC_V_MIN, ec.ChannelsControl.ADC_V_MAX)
self._ch_mon_widgets[i].setText(F_CHANNEL.format(v))
ua = ch_control.read_feedback(i)
v = ec.remap_value(ua, -mcor_type.fullscale_fdbck, mcor_type.fullscale_fdbck, ec.ChannelsControl.ADC_V_MIN, ec.ChannelsControl.ADC_V_MAX)
self._ch_fdbck_widgets[i].setText(F_CHANNEL.format(v))
ua = ch_control.read_monitor_ripple(i)
v = ec.remap_value(ua, -mcor_type.fullscale_mon, mcor_type.fullscale_mon, ec.ChannelsControl.ADC_V_MIN, ec.ChannelsControl.ADC_V_MAX)
self._ch_mon_r_widgets[i].setText(F_CHANNEL.format(v))
ua = ch_control.read_feedback_ripple(i)
v = ec.remap_value(ua, -mcor_type.fullscale_fdbck, mcor_type.fullscale_fdbck, ec.ChannelsControl.ADC_V_MIN, ec.ChannelsControl.ADC_V_MAX)
self._ch_fdbck_r_widgets[i].setText(F_CHANNEL.format(v))
else:
self._btn_set_all.setEnabled(False)
self._btn_set_min.setEnabled(False)
self._btn_set_zero.setEnabled(False)
self._btn_set_max.setEnabled(False)
for i in range(ec.ChannelsControl.NUM_CHANNELS):
self._ch_mon_widgets[i].setText("")
self._ch_fdbck_widgets[i].setText("")
self._ch_mon_r_widgets[i].setText("")
self._ch_fdbck_r_widgets[i].setText("")
class ChannelSPWidget(QtGui.QWidget):
"""Widget for writing setpoints to just one channel."""
def __init__(self, num):
super(ChannelSPWidget, self).__init__()
self._channel_num = num
self._ch_control = None
self._fs_dac = 0
self._spin_box = VoltageSpinBox()
self._btn_set = QtGui.QPushButton("Set")
self._btn_set.setEnabled(False)
self._btn_set.clicked.connect(self._change_set_point)
layout = QtGui.QHBoxLayout()
layout.addWidget(self._spin_box)
layout.addWidget(self._btn_set)
layout.setMargin(0)
self.setLayout(layout)
def change_set_point(self, value):
self._spin_box.setValue(value)
self._change_set_point()
def _change_set_point(self):
"""Can only write the setpoint if the widget was updated before."""
if self._ch_control is not None:
v = self._spin_box.value()
ua = ec.remap_value(v, ec.ChannelsControl.DAC_V_MIN, ec.ChannelsControl.DAC_V_MAX, -self._fs_dac, self._fs_dac)
self._ch_control.write_set_point(self._channel_num, ua)
def update(self, ch_control, mcor_type):
"""Saves the ch_control for later usage when setting the setpoints."""
if ch_control:
if self._ch_control is None:
self._fs_dac = int(mcor_type.fullscale_dac / MULT_MICRO) * MULT_MICRO
# XXX: configure bit currently bugged, just always configure all
#if not ch_control.read_configure_bit(self._channel_num):
ch_control.write_fullscale_adc_monitor(self._channel_num, mcor_type.fullscale_mon)
ch_control.write_fullscale_adc_feedback(self._channel_num, mcor_type.fullscale_fdbck)
ch_control.write_fullscale_dac(self._channel_num, mcor_type.fullscale_dac)
ch_control.write_ramp_mode(self._channel_num, True)
ch_control.write_configure_bit(self._channel_num, True)
self._btn_set.setEnabled(True)
else:
self._btn_set.setEnabled(False)
self._ch_control = ch_control
class SysInfoWidget(QtGui.QGroupBox):
def __init__(self):
super(SysInfoWidget, self).__init__()
self.setTitle("System Info")
self._lbl_sysid = StyledLabel()
self._lbl_subtype = StyledLabel()
self._lbl_frmver = StyledLabel()
self._lbl_frmdate = StyledLabel()
layout = QtGui.QFormLayout()
layout.setLabelAlignment(QtCore.Qt.AlignRight)
layout.addRow("System ID:", self._lbl_sysid)
layout.addRow("Sub Type:", self._lbl_subtype)
layout.addRow("Firmware version:", self._lbl_frmver)
layout.addRow("Firmware date:", self._lbl_frmdate)
self.setLayout(layout)
def update(self, sys_info):
if sys_info is not None:
self._lbl_sysid.setText(sys_info.read_system_id())
self._lbl_subtype.setText(sys_info.read_sub_type())
self._lbl_frmver.setText(sys_info.read_firmware_version())
self._lbl_frmdate.setText(sys_info.read_firmware_date())
else:
self._lbl_sysid.setText("")
self._lbl_subtype.setText("")
self._lbl_frmver.setText("")
self._lbl_frmdate.setText("")
class XilinxSysMonitorWidget(QtGui.QGroupBox):
def __init__(self):
super(XilinxSysMonitorWidget, self).__init__()
self.setTitle("Xilinx System Monitor")
self._lbl_temp_cur = StyledLabel()
self._lbl_temp_min = StyledLabel()
self._lbl_temp_max = StyledLabel()
self._lbl_vccint_cur = StyledLabel()
self._lbl_vccint_min = StyledLabel()
self._lbl_vccint_max = StyledLabel()
self._lbl_vccaux_cur = StyledLabel()
self._lbl_vccaux_min = StyledLabel()
self._lbl_vccaux_max = StyledLabel()
layout = QtGui.QGridLayout()
layout.setColumnStretch(1, 1)
layout.setColumnStretch(2, 1)
layout.setColumnStretch(3, 1)
layout.addWidget(QtGui.QLabel("Current"), 0, 1, QtCore.Qt.AlignCenter)
layout.addWidget(QtGui.QLabel("Min"), 0, 2, QtCore.Qt.AlignCenter)
layout.addWidget(QtGui.QLabel("Max"), 0, 3, QtCore.Qt.AlignCenter)
layout.addWidget(QtGui.QLabel("Xilinx Temp (C):"), 1, 0, QtCore.Qt.AlignRight)
layout.addWidget(self._lbl_temp_cur, 1, 1)
layout.addWidget(self._lbl_temp_min, 1, 2)
layout.addWidget(self._lbl_temp_max, 1, 3)
layout.addWidget(QtGui.QLabel("Vcc Int (V):"), 2, 0, QtCore.Qt.AlignRight)
layout.addWidget(self._lbl_vccint_cur, 2, 1)
layout.addWidget(self._lbl_vccint_min, 2, 2)
layout.addWidget(self._lbl_vccint_max, 2, 3)
layout.addWidget(QtGui.QLabel("Vcc Aux (V):"), 3, 0, QtCore.Qt.AlignRight)
layout.addWidget(self._lbl_vccaux_cur, 3, 1)
layout.addWidget(self._lbl_vccaux_min, 3, 2)
layout.addWidget(self._lbl_vccaux_max, 3, 3)
self.setLayout(layout)
def update(self, sys_mon):
if sys_mon is not None:
self._lbl_temp_cur.setText(F_TEMP.format(sys_mon.read_temp_cur()))
self._lbl_temp_min.setText(F_TEMP.format(sys_mon.read_temp_min()))
self._lbl_temp_max.setText(F_TEMP.format(sys_mon.read_temp_max()))
self._lbl_vccint_cur.setText(F_VOLTAGE.format(sys_mon.read_vint_cur()))
self._lbl_vccint_min.setText(F_VOLTAGE.format(sys_mon.read_vint_min()))
self._lbl_vccint_max.setText(F_VOLTAGE.format(sys_mon.read_vint_max()))
self._lbl_vccaux_cur.setText(F_VOLTAGE.format(sys_mon.read_vaux_cur()))
self._lbl_vccaux_min.setText(F_VOLTAGE.format(sys_mon.read_vaux_min()))
self._lbl_vccaux_max.setText(F_VOLTAGE.format(sys_mon.read_vaux_max()))
else:
self._lbl_temp_cur.setText("")
self._lbl_temp_min.setText("")
self._lbl_temp_max.setText("")
self._lbl_vccint_cur.setText("")
self._lbl_vccint_min.setText("")
self._lbl_vccint_max.setText("")
self._lbl_vccaux_cur.setText("")
self._lbl_vccaux_min.setText("")
self._lbl_vccaux_max.setText("")
class VoltageMonitorWidget(QtGui.QGroupBox):
def __init__(self):
super(VoltageMonitorWidget, self).__init__()
self.setTitle("Voltage Monitor")
self._lbl_v15 = StyledLabel()
self._lbl_v12 = StyledLabel()
self._lbl_v5 = StyledLabel()
self._lbl_v33 = StyledLabel()
self._lbl_v33io = StyledLabel()
self._lbl_v25 = StyledLabel()
self._lbl_v1 = StyledLabel()
self._lbl_vm15 = StyledLabel()
self._lbl_c15 = StyledLabel()
self._lbl_c12 = StyledLabel()
self._lbl_c5 = StyledLabel()
self._lbl_c33 = StyledLabel()
self._lbl_c33io = StyledLabel()
self._lbl_c25 = StyledLabel()
self._lbl_c1 = StyledLabel()
self._lbl_cm15 = StyledLabel()
volts_widget = QtGui.QWidget()
volts_layout = QtGui.QGridLayout()
volts_layout.setColumnStretch(1, 1)
volts_layout.setColumnStretch(2, 1)
volts_layout.addWidget(QtGui.QLabel("Voltage (V)"), 0, 1, QtCore.Qt.AlignCenter)
volts_layout.addWidget(QtGui.QLabel("Current (A)"), 0, 2, QtCore.Qt.AlignCenter)
volts_layout.addWidget(QtGui.QLabel("+15.0V:"), 1, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v15, 1, 1)
volts_layout.addWidget(self._lbl_c15, 1, 2)
volts_layout.addWidget(QtGui.QLabel("+12.0V:"), 2, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v12, 2, 1)
volts_layout.addWidget(self._lbl_c12, 2, 2)
volts_layout.addWidget(QtGui.QLabel("+5.0V:"), 3, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v5, 3, 1)
volts_layout.addWidget(self._lbl_c5, 3, 2)
volts_layout.addWidget(QtGui.QLabel("+3.3V:"), 4, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v33, 4, 1)
volts_layout.addWidget(self._lbl_c33, 4, 2)
volts_layout.addWidget(QtGui.QLabel("+3.3V IO:"), 5, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v33io, 5, 1)
volts_layout.addWidget(self._lbl_c33io, 5, 2)
volts_layout.addWidget(QtGui.QLabel("+2.5V:"), 6, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v25, 6, 1)
volts_layout.addWidget(self._lbl_c25, 6, 2)
volts_layout.addWidget(QtGui.QLabel("+1.0V:"), 7, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_v1, 7, 1)
volts_layout.addWidget(self._lbl_c1, 7, 2)
volts_layout.addWidget(QtGui.QLabel("-15.0V:"), 8, 0, QtCore.Qt.AlignRight)
volts_layout.addWidget(self._lbl_vm15, 8, 1)
volts_layout.addWidget(self._lbl_cm15, 8, 2)
volts_widget.setLayout(volts_layout)
size_policy = volts_widget.sizePolicy()
size_policy.setVerticalPolicy(QtGui.QSizePolicy.MinimumExpanding)
volts_widget.setSizePolicy(size_policy)
self._lbl_temp = StyledLabel()
temp_widget = QtGui.QWidget()
temp_layout = QtGui.QFormLayout()
temp_layout.setLabelAlignment(QtCore.Qt.AlignRight)
temp_layout.addRow("Temperature (C):", self._lbl_temp)
temp_widget.setLayout(temp_layout)
layout = QtGui.QVBoxLayout()
layout.addWidget(volts_widget)
layout.addWidget(temp_widget)
layout.setMargin(0)
self.setLayout(layout)
def update(self, v_mon, sys_mon):
if v_mon is not None and sys_mon is not None:
self._lbl_v15.setText(F_VOLTAGE.format(v_mon.read_voltage_15()))
self._lbl_v12.setText(F_VOLTAGE.format(v_mon.read_voltage_12()))
self._lbl_v5.setText(F_VOLTAGE.format(v_mon.read_voltage_5()))
self._lbl_v33.setText(F_VOLTAGE.format(v_mon.read_voltage_33()))
self._lbl_v33io.setText(F_VOLTAGE.format(v_mon.read_voltage_33io()))
self._lbl_v25.setText(F_VOLTAGE.format(sys_mon.read_vaux_cur()))
self._lbl_v1.setText(F_VOLTAGE.format(sys_mon.read_vint_cur()))
self._lbl_vm15.setText(F_VOLTAGE.format(v_mon.read_voltage_m15()))
self._lbl_c15.setText(F_CURRENT.format(v_mon.read_current_15()))
self._lbl_c12.setText(F_CURRENT.format(v_mon.read_current_12()))
self._lbl_c5.setText(F_CURRENT.format(v_mon.read_current_5()))
self._lbl_c33.setText(F_CURRENT.format(v_mon.read_current_33()))
self._lbl_c33io.setText(F_CURRENT.format(v_mon.read_current_33io()))
self._lbl_c25.setText(F_CURRENT.format(v_mon.read_current_25()))
self._lbl_c1.setText(F_CURRENT.format(v_mon.read_current_1()))
self._lbl_cm15.setText(F_CURRENT.format(v_mon.read_current_m15()))
self._lbl_temp.setText(F_TEMP.format(v_mon.read_temp()))
else:
self._lbl_v15.setText("")
self._lbl_v12.setText("")
self._lbl_v5.setText("")
self._lbl_v33.setText("")
self._lbl_v33io.setText("")
self._lbl_v25.setText("")
self._lbl_v1.setText("")
self._lbl_vm15.setText("")
self._lbl_c15.setText("")
self._lbl_c12.setText("")
self._lbl_c5.setText("")
self._lbl_c33.setText("")
self._lbl_c33io.setText("")
self._lbl_c25.setText("")
self._lbl_c1.setText("")
self._lbl_cm15.setText("")
self._lbl_temp.setText("")
class UpdateThreadObj(QtCore.QObject):
"""Background thread for updating Xilinx system monitor,
voltage monitor and channels display. It runs continuously
when the connection to the controller is established.
"""
finished = QtCore.pyqtSignal()
SLEEP_SEC = 0.05
MIN_UPDATE_TIME_SEC = 1
def __init__(self, rpci, mcor_type, w_sysinfo, w_sysmon, w_vmon, w_channels):
QtCore.QObject.__init__(self)
self._stop = False
self._sysinfo = ec.SystemInfo(rpci)
self._sysmon = ec.XilinxSystemMonitor(rpci)
self._vmon = ec.VoltageMonitor(rpci)
self._cc = ec.ChannelsControl(rpci)
self._mcor_type = mcor_type
self._w_sys_info = w_sysinfo
self._w_xilinx_sys_monitor = w_sysmon
self._w_v_monitor = w_vmon
self._w_channels = w_channels
self._exception = None
self.finished.connect(self._on_finish)
def do_run(self):
try:
self._w_sys_info.update(self._sysinfo)
while not self._stop:
start_time = time.time()
self._w_xilinx_sys_monitor.update(self._sysmon)
if self._stop:
break
self._w_v_monitor.update(self._vmon, self._sysmon)
if self._stop:
break
self._w_channels.update(self._cc, self._mcor_type)
if self._stop:
break
duration_time = time.time() - start_time
while duration_time <= UpdateThreadObj.MIN_UPDATE_TIME_SEC:
time.sleep(UpdateThreadObj.SLEEP_SEC)
if self._stop:
break
duration_time = time.time() - start_time
except Exception as e:
self._exception = e
finally:
self.finished.emit()
def do_stop(self):
self._stop = True
def _on_finish(self):
if self._exception:
msg_box = QtGui.QMessageBox()
msg_box.setText("Communication error!")
msg_box.setInformativeText(str(self._exception))
msg_box.setIcon(QtGui.QMessageBox.Critical)
msg_box.exec_()
class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self._rpci = None
self._exec_thread = None
self._exec_obj = None
self._exec_mutex = QtCore.QMutex()
main_widget = QtGui.QWidget()
self._combo_type = QtGui.QComboBox()
for t in MCOR_TYPES.keys():
self._combo_type.addItem(t)
self._txt_host = QtGui.QLineEdit()
self._btn_connect = QtGui.QPushButton("Connect")
self._btn_connect.clicked.connect(self.connect)
w_connect = QtGui.QWidget()
wc_layout = QtGui.QFormLayout()
wc_layout.setLabelAlignment(QtCore.Qt.AlignRight)
wc_layout.addRow("MCOR Type:", self._combo_type)
wc_layout.addRow("Hostname/IP:", self._txt_host)
wc_layout.addRow(self._btn_connect)
w_connect.setLayout(wc_layout)
size_policy = w_connect.sizePolicy()
size_policy.setVerticalPolicy(QtGui.QSizePolicy.Fixed)
w_connect.setSizePolicy(size_policy)
self._w_sys_info = SysInfoWidget()
self._w_xilinx_sys_monitor = XilinxSysMonitorWidget()
self._w_v_monitor = VoltageMonitorWidget()
w_basic = QtGui.QWidget()
layout_basic = QtGui.QVBoxLayout()
layout_basic.addWidget(w_connect)
layout_basic.addWidget(self._w_sys_info)
layout_basic.addWidget(self._w_xilinx_sys_monitor, stretch=2)
layout_basic.addWidget(self._w_v_monitor, stretch=5)
layout_basic.setMargin(0)
w_basic.setLayout(layout_basic)
self._w_channels = ChannelsControlWidget()
layout = QtGui.QHBoxLayout()
layout.addWidget(w_basic)
layout.addWidget(self._w_channels)
main_widget.setLayout(layout)
self.setCentralWidget(main_widget)
self.setFixedSize(1000, 650)
self.setWindowTitle("EMCOR Controller")
self.center()
self.show()
def center(self):
qr = self.frameGeometry()
cp = QtGui.QDesktopWidget().availableGeometry().center()
qr.moveCenter(cp)
self.move(qr.topLeft())
def connect(self):
"""Try to update just the system info to test the connection."""
with QtCore.QMutexLocker(self._exec_mutex):
if not self._rpci:
try:
self._btn_connect.setEnabled(False)
self._rpci = RemotePCI(self._txt_host.text())
# just simple tests to check if connection works
self._rpci.read_text(0, 1)
self._txt_host.setEnabled(False)
self._combo_type.setEnabled(False)
self._btn_connect.setText("Disconnect")
except Exception as e:
self._rpci = None
msg_box = QtGui.QMessageBox()
msg_box.setText("Cannot connect! Check if the controller is accessible on the network and if the PCI register server is running.")
msg_box.setInformativeText(str(e))
msg_box.setIcon(QtGui.QMessageBox.Warning)
msg_box.exec_()
self._txt_host.setFocus()
#raise e, None, sys.exc_info()[2]
return
finally:
self._btn_connect.setEnabled(True)
mcor_type = MCOR_TYPES[str(self._combo_type.currentText())]
self._exec_thread = QtCore.QThread()
self._exec_obj = UpdateThreadObj(self._rpci, mcor_type, self._w_sys_info, self._w_xilinx_sys_monitor, self._w_v_monitor, self._w_channels)
self._exec_obj.moveToThread(self._exec_thread)
self._exec_obj.finished.connect(self._exec_thread.quit)
self._exec_thread.started.connect(self._exec_obj.do_run)
self._exec_thread.finished.connect(self._stop_thread_callback)
self._exec_thread.start()
else:
self._exec_obj.do_stop()
def _stop_thread_callback(self):
with QtCore.QMutexLocker(self._exec_mutex):
self._w_sys_info.update(None)
self._w_xilinx_sys_monitor.update(None)
self._w_v_monitor.update(None, None)
self._w_channels.update(None, None)
self._txt_host.setEnabled(True)
self._combo_type.setEnabled(True)
self._btn_connect.setText("Connect")
self._rpci = None
self._exec_thread = None
self._exec_obj = None
def main():
app = QtGui.QApplication(sys.argv)
_ = MainWindow()
return app.exec_()
if __name__ == '__main__':
sys.exit(main())
emcor-python-package/EMCOR-Tests/emcor_tests_gui.py 0000664 0005333 0001756 00000000360 12464137637 021435 0 ustar luchini cd #!/usr/bin/env python
import sys
from PyQt4 import QtGui
import pyut_gui as pyut
def main():
app = QtGui.QApplication(sys.argv)
_ = pyut.PyUTMainWindow()
return app.exec_()
if __name__ == '__main__':
sys.exit(main())
emcor-python-package/EMCOR-Tests/setup.py 0000664 0005333 0001756 00000000614 12522064660 017372 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'EMCOR-Tests',
'version': '1.4.0',
'description': 'Collection of EMCOR tests',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': [],
'scripts': ['emcor_channels_gui.py']
}
setup(**config)
emcor-python-package/EMCOR-Tests/test_emcor_aio.py 0000664 0005333 0001756 00000015242 12511465330 021226 0 ustar luchini cd #!/usr/bin/env python
import unittest
import time
import emcor_controller as ec
import emcor_tester as et
from pyrpci import RemotePCI
IP_CONTROLLER = "134.79.218.113"
IP_TESTER = "134.79.218.58"
FULLSCALE_ADC_MON = 12
FULLSCALE_ADC_FDBCK = 12
FULLSCALE_DAC = 12.30768
MAX_DIFF_V = 0.1
SETTLE_TIME = 0.01
SLEEP_TIME_MULT = 10000
MULT_MICRO = 1000000
def configure_channel(ch_control, chnum, ramp_rate):
"""Sets fullscale values as well as ramp mode/rate and configure bit.
If the ramp rate is 0V, immediate mode will be set.
"""
ch_control.write_fullscale_adc_monitor(chnum, FULLSCALE_ADC_MON * MULT_MICRO)
ch_control.write_fullscale_adc_feedback(chnum, FULLSCALE_ADC_FDBCK * MULT_MICRO)
ch_control.write_fullscale_dac(chnum, FULLSCALE_DAC * MULT_MICRO)
if ramp_rate == 0:
ch_control.write_ramp_mode(chnum, True)
else:
ch_control.write_ramp_rate(chnum, convertVtoUA(ramp_rate))
ch_control.write_ramp_mode(chnum, False)
ch_control.write_configure_bit(chnum, True)
def convertVtoUA(v):
fs_dac = int(FULLSCALE_DAC) * MULT_MICRO
ua = ec.remap_value(v, ec.ChannelsControl.DAC_V_MIN, ec.ChannelsControl.DAC_V_MAX, -fs_dac, fs_dac)
return ua
def convertFeedbackUAtoV(ua):
fs_adc = FULLSCALE_ADC_FDBCK * MULT_MICRO
v = ec.remap_value(ua, -fs_adc, fs_adc, ec.ChannelsControl.ADC_V_MIN, ec.ChannelsControl.ADC_V_MAX)
return v
def range_float(val_min, val_max, val_step, val_fact):
for v in range(int(val_min * val_fact), int(val_max * val_fact) + 1, int(val_step * val_fact)):
yield float(v) / val_fact
class AIOTests(unittest.TestCase):
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.cc = ec.ChannelsControl(rpci)
tester = et.EmcorTester(IP_TESTER)
fault_control = et.FaultControl(tester)
fault_control.write_all_faults(False)
time.sleep(self.cc.TIME_CONDITION_S * SLEEP_TIME_MULT)
def test_channels(self):
"""Tests that the same value can be written and read on the channels."""
# configure channels first
for i in range(self.cc.NUM_CHANNELS):
configure_channel(self.cc, i, 0)
for i in range(self.cc.NUM_CHANNELS):
print "Channel: " + str(i)
for j in range(self.cc.NUM_CHANNELS):
self.cc.write_set_point(j, convertVtoUA(0))
self.cc.write_set_point(i, convertVtoUA(ec.ChannelsControl.DAC_V_MAX))
time.sleep(SETTLE_TIME)
for j in range(self.cc.NUM_CHANNELS):
test_val = 0
if j == i:
test_val = ec.ChannelsControl.ADC_V_MAX
rdbck_val = convertFeedbackUAtoV(self.cc.read_feedback(j))
print "Channel " + str(j) + " has voltage " + str(rdbck_val)
self.assertAlmostEqual(rdbck_val, test_val, delta=MAX_DIFF_V)
def test_linearity(self):
"""Tests that generated values correspond to read values linearly."""
val_step = 0.5
val_fact = 10
for i in range(self.cc.NUM_CHANNELS):
# configure channel first
configure_channel(self.cc, i, 0)
print "Channel: " + str(i)
for val in range_float(ec.ChannelsControl.DAC_V_MIN, ec.ChannelsControl.DAC_V_MAX, val_step, val_fact):
self.cc.write_set_point(i, convertVtoUA(val))
time.sleep(SETTLE_TIME)
rdbck_val = convertFeedbackUAtoV(self.cc.read_feedback(i))
print "Set V: " + str(val) + "; Read V: " + str(rdbck_val)
# this is valid only if DAC and ADC min and max values are the same
self.assertAlmostEqual(rdbck_val, val, delta=MAX_DIFF_V)
class AIOTimingTests(unittest.TestCase):
TIME_SEC = 5
MAX_DIFF_V = 0.1
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.cc = ec.ChannelsControl(rpci)
tester = et.EmcorTester(IP_TESTER)
fault_control = et.FaultControl(tester)
fault_control.write_all_faults(False)
time.sleep(self.cc.TIME_CONDITION_S * SLEEP_TIME_MULT)
def test_linear_ramping(self):
"""Tests that linear ramping functionality works."""
change_per_sec = float(ec.ChannelsControl.DAC_V_MAX) / self.TIME_SEC
time_step = float(self.TIME_SEC) / 10
time_fact = 100
for i in range(self.cc.NUM_CHANNELS):
# configure channel first for immediate mode to reset the value
configure_channel(self.cc, i, 0)
self.cc.write_set_point(i, convertVtoUA(0))
time.sleep(SETTLE_TIME)
configure_channel(self.cc, i, change_per_sec)
print "Channel: " + str(i)
t0 = time.clock()
self.cc.write_set_point(i, convertVtoUA(ec.ChannelsControl.DAC_V_MAX))
for t in range_float(time_step, self.TIME_SEC, time_step, time_fact):
t1 = time.clock()
t_delay = t1 - t0
time.sleep(time_step - t_delay)
rdbck_val = convertFeedbackUAtoV(self.cc.read_feedback(i))
t0 = time.clock()
print "Time: " + str(t) + "; Voltage: " + str(rdbck_val)
val = (t / self.TIME_SEC) * ec.ChannelsControl.ADC_V_MAX
self.assertAlmostEqual(rdbck_val, val, delta=self.MAX_DIFF_V)
class FunctionalTests(unittest.TestCase):
def setUp(self):
self.rpci = RemotePCI(IP_CONTROLLER)
self.cc = ec.ChannelsControl(self.rpci)
tester = et.EmcorTester(IP_TESTER)
self.fault_control = et.FaultControl(tester)
def test_fault_output(self):
"""Tests if channel output is set to 0 when the fault is detected on the channel."""
for i in range(self.cc.NUM_CHANNELS):
self.fault_control.write_all_faults(False)
time.sleep(self.cc.TIME_CONDITION_S * SLEEP_TIME_MULT)
# configure channel first
configure_channel(self.cc, i, 0)
print "Channel: " + str(i)
self.cc.write_set_point(i, convertVtoUA(ec.ChannelsControl.DAC_V_MAX))
self.fault_control.write_fault(True, i)
time.sleep(self.cc.TIME_CONDITION_S * SLEEP_TIME_MULT)
rdbck_val = convertFeedbackUAtoV(self.cc.read_feedback(i))
self.assertAlmostEqual(rdbck_val, 0, delta=MAX_DIFF_V)
def tearDown(self):
self.fault_control.write_all_faults(False)
time.sleep(self.cc.TIME_CONDITION_S * SLEEP_TIME_MULT)
fc = ec.FaultControl(self.rpci)
fc.reset_faults_all()
if __name__=='__main__':
unittest.main(verbosity=2)
emcor-python-package/EMCOR-Tests/test_emcor_basic.py 0000664 0005333 0001756 00000012474 12511465330 021543 0 ustar luchini cd #!/usr/bin/env python
import unittest
import emcor_controller as ec
from pyrpci import RemotePCI
IP_CONTROLLER = "134.79.218.113"
def temp_F_to_C(temp_F):
return (temp_F - 32) * float(5) / 9
def calc_val_diff(v_in, v_req):
v_req = float(v_req)
return abs((v_req - v_in) / v_req)
class SystemInfoTests(unittest.TestCase):
SYSTEM_ID = "MCOR"
SUB_TYPE = "REV02"
FIRMWARE_VERSION = "8.51"
FIRMWARE_DATE = "01/15/2015"
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.sys_info = ec.SystemInfo(rpci)
def test_system_id(self):
"""Tests if device has correct system id."""
text = self.sys_info.read_system_id()
self.assertEqual(text, self.SYSTEM_ID)
def test_sub_type(self):
"""Tests if device has correct sub type."""
text = self.sys_info.read_sub_type()
self.assertEqual(text, self.SUB_TYPE)
def test_firmware_version(self):
"""Tests if device has correct firmware version."""
text = self.sys_info.read_firmware_version()
self.assertEqual(text, self.FIRMWARE_VERSION)
def test_firmware_date(self):
"""Tests if device has correct firmware date."""
text = self.sys_info.read_firmware_date()
self.assertEqual(text, self.FIRMWARE_DATE)
class XilinxSystemMonitorTests(unittest.TestCase):
TEMP_MIN_C = temp_F_to_C(80)
TEMP_MAX_C = temp_F_to_C(150)
VOLT_TOLERANCE = 0.05
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.sys_mon = ec.XilinxSystemMonitor(rpci)
def test_temp_cur(self):
"""Tests current temperature on Xilinx controller."""
temp = self.sys_mon.read_temp_cur()
print "Temperature: " + str(temp)
self.assertGreater(temp, self.TEMP_MIN_C)
self.assertLess(temp, self.TEMP_MAX_C)
def test_temp_max(self):
"""Tests maximum temperature on Xilinx controller."""
temp = self.sys_mon.read_temp_max()
print "Temperature: " + str(temp)
self.assertGreater(temp, self.TEMP_MIN_C)
self.assertLess(temp, self.TEMP_MAX_C)
def test_temp_min(self):
"""Tests minimum temperature on Xilinx controller."""
temp = self.sys_mon.read_temp_min()
print "Temperature: " + str(temp)
self.assertGreater(temp, self.TEMP_MIN_C)
self.assertLess(temp, self.TEMP_MAX_C)
def test_vint_cur(self):
"""Tests INT voltage on Xilinx controller."""
volts = self.sys_mon.read_vint_cur()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 1.0), self.VOLT_TOLERANCE)
def test_vaux_cur(self):
"""Tests AUX voltage on Xilinx controller."""
volts = self.sys_mon.read_vaux_cur()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 2.5), self.VOLT_TOLERANCE)
class VoltageMonitorTests(unittest.TestCase):
TEMP_MIN_C = temp_F_to_C(60)
TEMP_MAX_C = temp_F_to_C(100)
VOLT_TOLERANCE = 0.05
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.v_mon = ec.VoltageMonitor(rpci)
def test_voltage_15(self):
"""Tests +15.0V voltage on the board."""
volts = self.v_mon.read_voltage_15()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 15.0), self.VOLT_TOLERANCE)
def test_voltage_12(self):
"""Tests +12.0V voltage on the board."""
volts = self.v_mon.read_voltage_12()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 12.0), self.VOLT_TOLERANCE)
def test_voltage_5(self):
"""Tests +5.0V voltage on the board."""
volts = self.v_mon.read_voltage_5()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 5.0), self.VOLT_TOLERANCE)
def test_voltage_33(self):
"""Tests +3.3V voltage on the board."""
volts = self.v_mon.read_voltage_33()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 3.3), self.VOLT_TOLERANCE)
def test_voltage_33io(self):
"""Tests +3.3V IO voltage on the board."""
volts = self.v_mon.read_voltage_33io()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, 3.3), self.VOLT_TOLERANCE)
def test_voltage_m15(self):
"""Tests -15.0V voltage on the board."""
volts = self.v_mon.read_voltage_m15()
print "Voltage: " + str(volts)
self.assertLess(calc_val_diff(volts, -15.0), self.VOLT_TOLERANCE)
def test_temperature(self):
"""Tests board temperature."""
temp = self.v_mon.read_temp()
print "Temperature: " + str(temp)
self.assertGreater(temp, self.TEMP_MIN_C)
self.assertLess(temp, self.TEMP_MAX_C)
class ADCTimeoutTests(unittest.TestCase):
def setUp(self):
rpci = RemotePCI(IP_CONTROLLER)
self.adc_control = ec.ADCControl(rpci)
def test_adc_timeout(self):
"""Tests that ADCs are not in timeout state."""
self.assertFalse(self.adc_control.read_adc_timeout())
def test_bulk_adc_timeout(self):
"""Tests that bulk ADCs are not in timeout state."""
self.assertFalse(self.adc_control.read_bulk_adc_timeout())
if __name__=='__main__':
unittest.main(verbosity=2)
emcor-python-package/EMCOR-Tests/test_emcor_dio.py 0000664 0005333 0001756 00000011607 12522064660 021235 0 ustar luchini cd #!/usr/bin/env python
import unittest
import time
import emcor_controller as ec
import emcor_tester as et
from pyrpci import RemotePCI
IP_CONTROLLER = "134.79.218.113"
IP_TESTER = "134.79.218.58"
SLEEP_TIME_MULT = 10000
def bits_test(func_write_one, func_write_all, func_read_reg, num_bits, sleep_time):
stuck_lo = 0
stuck_hi = 0xFFFF
bad_bits = 0
for bit in range(num_bits):
func_write_all(False)
func_write_one(True, bit)
time.sleep(sleep_time)
read_bits = func_read_reg()
stuck_lo |= read_bits
stuck_hi &= read_bits
bit_mask = 1 << bit
if bit_mask != read_bits:
bad_bits |= bit_mask
return stuck_lo, stuck_hi, bad_bits
class DITests(unittest.TestCase):
def setUp(self):
self.rpci = RemotePCI(IP_CONTROLLER)
self.tester = et.EmcorTester(IP_TESTER)
def check_bits(self, num_bits, stuck_lo, stuck_hi, bad_bits):
for bit in range(num_bits):
bit_mask = 1 << bit
self.assertEqual(bit_mask & bad_bits, 0, "Bit " + str(bit) + " is bad")
self.assertEqual(bit_mask & stuck_hi, 0, "Bit " + str(bit) + " is stuck high")
self.assertGreater(bit_mask & stuck_lo, 0, "Bit " + str(bit) + " is stuck low")
def test_emcor_faults(self):
"""Tests that faults are correctly detected."""
cr = ec.FaultControl(self.rpci)
cw = et.FaultControl(self.tester)
stuck_lo, stuck_hi, bad_bits = bits_test(cw.write_fault, cw.write_all_faults, cr.read_status, cw.NUM_FAULTS, cr.TIME_CONDITION_S * SLEEP_TIME_MULT)
self.check_bits(cw.NUM_FAULTS, stuck_lo, stuck_hi, bad_bits)
def test_magnet_faults(self):
"""Tests that magnet faults are correctly detected."""
cr = ec.MagnetFaultControl(self.rpci)
cw = et.MagnetFaultControl(self.tester)
stuck_lo, stuck_hi, bad_bits = bits_test(cw.write_mag_fault, cw.write_all_mag_faults, cr.read_status, cw.NUM_MAG_FAULTS, cr.TIME_CONDITION_S * SLEEP_TIME_MULT)
self.check_bits(cw.NUM_MAG_FAULTS, stuck_lo, stuck_hi, bad_bits)
def tearDown(self):
fc = et.FaultControl(self.tester)
fc.write_all_faults(False)
mfc = et.MagnetFaultControl(self.tester)
mfc.write_all_mag_faults(False)
time.sleep(ec.FaultControl.TIME_CONDITION_S * SLEEP_TIME_MULT)
fc = ec.FaultControl(self.rpci)
fc.reset_faults_all()
mfc = ec.MagnetFaultControl(self.rpci)
mfc.reset_mag_faults_all()
class DOTests(unittest.TestCase):
def setUp(self):
self.rpci = RemotePCI(IP_CONTROLLER)
self.tester = et.EmcorTester(IP_TESTER)
def bit_test(self, func_write, func_read, sleep_time, invert=False):
func_write(False)
func_write(True)
time.sleep(sleep_time)
read_bit = func_read()
if invert:
self.assertEqual(read_bit, 0, "Bit should be cleared")
else:
self.assertGreater(read_bit, 0, "Bit should be set")
func_write(False)
time.sleep(sleep_time)
read_bit = func_read()
if invert:
self.assertGreater(read_bit, 0, "Bit should be set")
else:
self.assertEqual(read_bit, 0, "Bit should be cleared")
def test_interlocks(self):
"""Tests that interlocks are correctly generated."""
cr = et.StatusRegControl(self.tester)
cw = ec.InterlockControl(self.rpci)
stuck_lo, stuck_hi, bad_bits = bits_test(cw.write_interlock, cw.write_all_interlocks, cr.read_interlocks, cw.NUM_INTERLOCKS, cw.TIME_CONDITION_S * SLEEP_TIME_MULT)
for bit in range(cw.NUM_INTERLOCKS):
bit_mask = 1 << bit
self.assertEqual(bit_mask & bad_bits, 0, "Bit " + str(bit) + " is bad")
self.assertEqual(bit_mask & stuck_hi, 0, "Bit " + str(bit) + " is stuck high")
self.assertGreater(bit_mask & stuck_lo, 0, "Bit " + str(bit) + " is stuck low")
cw.write_all_interlocks(False)
def test_reset_bit(self):
"""Tests that reset signal is correctly generated."""
cr = et.StatusRegControl(self.tester)
cw = ec.FaultControl(self.rpci)
self.bit_test(cw.write_reset_bit, cr.read_reset_bit, cw.TIME_CONDITION_S * SLEEP_TIME_MULT)
def test_inhibit_bit(self):
"""Tests that inhibit signal is correctly generated."""
cr = et.StatusRegControl(self.tester)
cw = ec.FaultControl(self.rpci)
# set water fault to 1 (disable)
bit = 10
mfc = et.MagnetFaultControl(self.tester)
address = mfc._get_address(True)
self.tester.write(address, [(1 << bit) & 0xFFFF])
self.bit_test(cw.write_inhibit_bit, cr.read_inhibit_bit, cw.TIME_CONDITION_S * SLEEP_TIME_MULT, invert=True)
if __name__=='__main__':
unittest.main(verbosity=2)
emcor-python-package/EMCOR-Tests/testconf.ini 0000664 0005333 0001756 00000000724 12475134265 020216 0 ustar luchini cd [*]
IP_CONTROLLER: eioc-b34-mg08
IP_TESTER: 134.79.218.58
[test_emcor_basic.SystemInfoTests]
SYSTEM_ID: MCOR
SUB_TYPE: REV00
FIRMWARE_VERSION: 8.71
FIRMWARE_DATE: 02/18/2015
[test_emcor_basic.XilinxSystemMonitorTests]
VOLT_TOLERANCE: 0.05
[test_emcor_basic.VoltageMonitorTests]
VOLT_TOLERANCE: 0.05
[test_emcor_aio]
FULLSCALE_ADC_MON: 12
FULLSCALE_ADC_FDBCK: 12
FULLSCALE_DAC: 12.30768
MAX_DIFF_V: 0.05
[test_emcor_aio.AIOTimingTests]
TIME_SEC: 5
MAX_DIFF_V: 0.1
emcor-python-package/EMCOR-Tests/.cram/ 0000775 0005333 0001756 00000000000 12664112126 016655 5 ustar luchini cd emcor-python-package/EMCOR-Tests/.cram/CVS/ 0000775 0005333 0001756 00000000000 12664112127 017311 5 ustar luchini cd emcor-python-package/EMCOR-Tests/.cram/CVS/Root 0000664 0005333 0001756 00000000025 12664112126 020153 0 ustar luchini cd /afs/slac/g/lcls/cvs
emcor-python-package/EMCOR-Tests/.cram/CVS/Repository 0000664 0005333 0001756 00000000044 12664112126 021410 0 ustar luchini cd tools/emcorTester/EMCOR-Tests/.cram
emcor-python-package/EMCOR-Tests/.cram/CVS/Entries 0000664 0005333 0001756 00000000056 12664112127 020646 0 ustar luchini cd /packageinfo/1.1/Tue Jun 16 20:14:04 2015//
D
emcor-python-package/EMCOR-Tests/.cram/packageinfo 0000664 0005333 0001756 00000000050 12540101614 021033 0 ustar luchini cd {"type": "Tools", "name": "EMCOR-Tests"} emcor-python-package/Python-RemotePCI/ 0000775 0005333 0001756 00000000000 12664112561 016761 5 ustar luchini cd emcor-python-package/Python-RemotePCI/.project 0000664 0005333 0001756 00000000562 12511467131 020430 0 ustar luchini cd
Python-RemotePCI
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/Python-RemotePCI/.pydevproject 0000664 0005333 0001756 00000000647 12511467131 021504 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/Python-RemotePCI/README 0000664 0005333 0001756 00000000157 12515410334 017636 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
emcor-python-package/Python-RemotePCI/pyrpci.py 0000664 0005333 0001756 00000012431 12511467131 020637 0 ustar luchini cd import socket
import threading
import ctypes
class RemotePCI(object):
"""Core class for communicating with the remote-pci-register-access server.
Abstracts socket usage and message parsing away from the user.
"""
_DEFAULT_PORT = 4444
_RECV_BUFFER_SIZE = 128
_PROTOCOL_VERSION = 1
_STATUS_OK = 0
_METHOD_READ = 3
_METHOD_WRITE = 4
_METHOD_MODIFY = 5
def __init__(self, host, port=_DEFAULT_PORT, timeout=2, tcp=False):
self._message_id = 0
self._host = host
self._port = port
self.tcp = tcp
self._socklock = threading.Lock()
socket_type = socket.SOCK_DGRAM
if tcp:
socket_type = socket.SOCK_STREAM
self._socket = socket.socket(socket.AF_INET, socket_type)
self._socket.settimeout(timeout)
if tcp:
self._socket.connect((host, port))
def __del__(self):
self._socket.close()
def read(self, offset, width):
"""Returns data if it was read successfully, else throws an exception."""
return self._process(offset, width, None, None)
def write(self, offset, width, value):
"""Writes data or throws an exception."""
self._process(offset, width, value, None)
def modify(self, offset, width, mask_or, mask_and):
"""Modifies the data with OR and AND mask. Returns the change or throws an exception."""
return self._process(offset, width, mask_or, mask_and)
def _process(self, offset, width, value, value1):
"""Encodes protocol message and sends it to the server.
Waits for the response, decodes and returns it.
"""
with self._socklock:
msg_send = self._encode_msg(offset, width, value, value1)
msg_recv = self._communicate(msg_send)
return self._decode_msg(msg_recv)
def _encode_msg(self, offset, width, value, value1):
"""Checks input parameters and throws exception if they are invalid.
Encodes and returns protocol message.
"""
if offset < 0:
raise ValueError("Invalid offset '" + str(offset) + "' specified! Value should be greater or equal to 0!")
if width != 8 and width != 16 and width != 32:
raise ValueError("Invalid width '" + str(width) + "' specified! Allowed values: 8/16/32.")
method = RemotePCI._METHOD_READ
if value is not None:
method = RemotePCI._METHOD_WRITE
if value1 is not None:
method = RemotePCI._METHOD_MODIFY
self._message_id = self._message_id + 1
msg = str(RemotePCI._PROTOCOL_VERSION) + ","
msg += str(self._message_id) + ","
msg += str(offset) + ","
msg += str(method) + ","
msg += str(width)
if value is not None:
msg += "," + str(value)
if value1 is not None:
msg += "," + str(value1)
msg += ";"
return msg
def _decode_msg(self, message):
"""Decodes message and returns read data as a signed 32-bit integer.
If no data was read (in case of write method), returns None.
Throws ProtocolException in case that decoding fails.
"""
message = message.split(';')[0]
data_parts = [s.strip() for s in message.split(',')]
if len(data_parts) < 3:
raise RPCIProtocolException("length", len(data_parts), 3)
protocol_version = int(data_parts[0])
message_id = int(data_parts[1])
status = int(data_parts[2])
if protocol_version != RemotePCI._PROTOCOL_VERSION:
raise RPCIProtocolException("protocol version", protocol_version, RemotePCI._PROTOCOL_VERSION)
if message_id != self._message_id:
raise RPCIProtocolException("message id", message_id, self._message_id)
if status != RemotePCI._STATUS_OK:
raise RPCIProtocolException("status", status, RemotePCI._STATUS_OK)
if len(data_parts) > 3:
val = int(data_parts[3])
return ctypes.c_long(val).value
return None
def _communicate(self, data):
"""Sends data over the socket and receives and returns response."""
if self.tcp:
self._socket.sendall(data)
return self._socket.recv(RemotePCI._RECV_BUFFER_SIZE)
else:
self._socket.sendto(data, (self._host, self._port))
recv_data, _ = self._socket.recvfrom(RemotePCI._RECV_BUFFER_SIZE)
return recv_data
def read_text(self, offset, length):
"""Reads from defined offset specified number of characters/bytes.
Returns the characters as string.
"""
text = ""
for i in range(length):
b = self.read(offset + i, 8)
text += chr(b)
return text
class RPCIProtocolException(Exception):
"""Exception that is thrown when there is a problem with the communication protocol."""
def __init__(self, var_name, var_value, var_expected):
self.var_name = var_name
self.var_value = var_value
self.var_expected = var_expected
message = "Value for '" + var_name + "' is '" + str(var_value) + "', but should be '" + str(var_expected) + "'!"
super(RPCIProtocolException, self).__init__(message)
emcor-python-package/Python-RemotePCI/pyrpci_example.py 0000664 0005333 0001756 00000000315 12511467131 022350 0 ustar luchini cd #!/usr/bin/env python
from pyrpci import RemotePCI
IP = "127.0.0.1"
pci = RemotePCI(IP)
r = pci.read(0, 16)
print r
pci.write(0, 16, r + 1)
print pci.read(0, 16)
print pci.modify(0, 16, 0x0003, 0x0007)
emcor-python-package/Python-RemotePCI/pyrpci_simulator.py 0000664 0005333 0001756 00000002446 12511467131 022743 0 ustar luchini cd #!/usr/bin/env python
import socket
UDP_IP = "127.0.0.1"
UDP_PORT = 4444
BUFFER_SIZE = 128
STATUS_OK = 0
METHOD_READ = 3
METHOD_WRITE = 4
METHOD_MODIFY = 5
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(BUFFER_SIZE)
print "Connection address:", addr
if not data:
break
print "Received data:", data
data = data.split(';')[0]
data_parts = [s.strip() for s in data.split(',')]
if len(data_parts) < 5:
print "MALFORMED DATA"
break
print "Protocol version:", data_parts[0]
print "Message ID:", data_parts[1]
print "Offset:", data_parts[2]
print "Method:", data_parts[3]
print "Width:", data_parts[4]
if len(data_parts) > 5:
print "Value:", data_parts[5]
if len(data_parts) > 6:
print "Value1:", data_parts[6]
method = int(data_parts[3])
msg = data_parts[0] + "," + data_parts[1] + "," + str(STATUS_OK)
if method == METHOD_READ:
msg += "," + str(METHOD_READ)
elif method == METHOD_WRITE:
pass
elif method == METHOD_MODIFY:
msg += "," + str(METHOD_MODIFY)
msg += ";"
print "msg:", msg
sock.sendto(msg, addr)
print "--------------------------------------------------"
emcor-python-package/Python-RemotePCI/setup.py 0000664 0005333 0001756 00000000705 12515410334 020467 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'Python-RemotePCI',
'version': '1.1.0',
'description': 'Python library for communicating with remote-pci-register-access server',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': ['pyrpci'],
'scripts': ['pyrpci_simulator.py']
}
setup(**config)
emcor-python-package/Python-RemotePCI-GUI/ 0000775 0005333 0001756 00000000000 12664112402 017375 5 ustar luchini cd emcor-python-package/Python-RemotePCI-GUI/CVS/ 0000775 0005333 0001756 00000000000 12664112402 020030 5 ustar luchini cd emcor-python-package/Python-RemotePCI-GUI/CVS/Root 0000664 0005333 0001756 00000000025 12664112402 020673 0 ustar luchini cd /afs/slac/g/lcls/cvs
emcor-python-package/Python-RemotePCI-GUI/CVS/Repository 0000664 0005333 0001756 00000000047 12664112402 022133 0 ustar luchini cd tools/python/slac/Python-RemotePCI-GUI
emcor-python-package/Python-RemotePCI-GUI/CVS/Entries 0000664 0005333 0001756 00000000327 12664112402 021366 0 ustar luchini cd /.project/1.1/Thu Apr 9 12:20:12 2015//
/.pydevproject/1.1/Thu Apr 9 12:20:12 2015//
/README/1.1/Tue Apr 21 08:57:29 2015//
/pyrpci_gui.py/1.1/Thu Apr 9 12:20:12 2015//
/setup.py/1.1/Tue Apr 21 08:57:29 2015//
D
emcor-python-package/Python-RemotePCI-GUI/.project 0000664 0005333 0001756 00000000566 12511467174 021065 0 ustar luchini cd
Python-RemotePCI-GUI
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/Python-RemotePCI-GUI/.pydevproject 0000664 0005333 0001756 00000000647 12511467174 022135 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/Python-RemotePCI-GUI/README 0000664 0005333 0001756 00000000212 12515410371 020251 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
- PyQt4
- Python-RemotePCI
emcor-python-package/Python-RemotePCI-GUI/pyrpci_gui.py 0000664 0005333 0001756 00000052776 12511467174 022154 0 ustar luchini cd #!/usr/bin/env python
import sys
import csv
from PyQt4 import QtGui, QtCore
from pyrpci import RemotePCI
VALUE_WIDTH = 90
COL_ADDRESS = 0
COL_NAME = 1
COL_WIDTH = 2
COL_VALUE = 3
COL_REMOVE = 4
FIELD_ADDRESS = "address"
FIELD_NAME = "name"
FIELD_WIDTH = "width"
PERSIST_FIELDS = [FIELD_ADDRESS, FIELD_NAME, FIELD_WIDTH]
DATA_WIDTHS = [8, 16, 32]
FONT_TYPEWRITER = QtGui.QFont("RPCI-GUI-FONT-TYPEWRITER")
FONT_TYPEWRITER.setStyleHint(QtGui.QFont.TypeWriter)
class StyledLabel(QtGui.QLabel):
def __init__(self):
super(StyledLabel, self).__init__()
self.setFrameShape(QtGui.QFrame.StyledPanel)
class AddressWidget(QtGui.QLineEdit):
"""Widget for modifying the address.
Supports 8-characters long HEX numbers.
"""
def __init__(self, address):
super(AddressWidget, self).__init__()
self.setFont(FONT_TYPEWRITER)
self.setMaxLength(8)
self.setInputMask("\\0\\xHHHHHHHH")
address_val = "0x00000000"
if address is not None:
address_val = address
self.setText(address_val)
self.setMaximumWidth(VALUE_WIDTH)
self.setValidator(HexValidator())
self.setAlignment(QtCore.Qt.AlignRight)
class TableSortItem(QtGui.QTableWidgetItem):
"""Used for storing the name of the address and sorting the table by address."""
def __init__(self, name, w_address, cbox_width):
super(TableSortItem, self).__init__(name, type=QtGui.QTableWidgetItem.UserType)
self.w_address = w_address
self.cbox_width = cbox_width
def __lt__(self, other):
"""Sorts by address, width and name in this order."""
if not isinstance(other, TableSortItem):
return super(TableSortItem, self).__lt__(other)
snum = int(str(self.w_address.text()), 0)
onum = int(str(other.w_address.text()), 0)
if snum != onum:
return snum < onum
if self.cbox_width.currentIndex() != other.cbox_width.currentIndex():
return self.cbox_width.currentIndex() < other.cbox_width.currentIndex()
return self.text() < other.text()
class ValWidget(QtGui.QWidget):
"""Widget for reading/writing and modifying the value of the register."""
def __init__(self):
super(ValWidget, self).__init__()
self._text_write = QtGui.QLineEdit()
self._text_write.setFont(FONT_TYPEWRITER)
self._text_write.setValidator(HexValidator())
text_write_sp = self._text_write.sizePolicy()
text_write_sp.setVerticalPolicy(QtGui.QSizePolicy.Ignored)
self._text_write.setSizePolicy(text_write_sp)
self._text_write.setMaximumWidth(VALUE_WIDTH)
self._text_write.setAlignment(QtCore.Qt.AlignRight)
self.btn_write = QtGui.QPushButton("Write")
btn_write_sp = self.btn_write.sizePolicy()
btn_write_sp.setVerticalPolicy(QtGui.QSizePolicy.Ignored)
self.btn_write.setSizePolicy(btn_write_sp)
self._text_modify = QtGui.QLineEdit()
self._text_modify.setFont(FONT_TYPEWRITER)
self._text_modify.setValidator(HexValidator())
text_modify_sp = self._text_modify.sizePolicy()
text_modify_sp.setVerticalPolicy(QtGui.QSizePolicy.Ignored)
self._text_modify.setSizePolicy(text_modify_sp)
self._text_modify.setMaximumWidth(VALUE_WIDTH)
self._text_modify.setAlignment(QtCore.Qt.AlignRight)
self.btn_modify = QtGui.QPushButton("Modify")
btn_modify_sp = self.btn_modify.sizePolicy()
btn_modify_sp.setVerticalPolicy(QtGui.QSizePolicy.Ignored)
self.btn_modify.setSizePolicy(btn_modify_sp)
self._lbl_read = StyledLabel()
self._lbl_read.setFont(FONT_TYPEWRITER)
self._lbl_read.setMinimumWidth(VALUE_WIDTH)
self._lbl_read.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
self.btn_read = QtGui.QPushButton("Read")
btn_read_sp = self.btn_read.sizePolicy()
btn_read_sp.setVerticalPolicy(QtGui.QSizePolicy.Ignored)
self.btn_read.setSizePolicy(btn_read_sp)
layout = QtGui.QHBoxLayout()
layout.addWidget(self._text_write)
layout.addWidget(self.btn_write)
layout.addWidget(self._text_modify)
layout.addWidget(self.btn_modify)
layout.addWidget(self._lbl_read)
layout.addWidget(self.btn_read)
layout.setMargin(0)
self.setLayout(layout)
self.set_data_width(8)
def set_read_data(self, data):
data_str = ""
if data != None:
data_str = "0x" + format(data, "0" + str(self._data_width) + "X")
self._lbl_read.setText(data_str)
def get_write_data(self):
"""Returns data as a number."""
return int(str(self._text_write.text()), 0)
def get_modify_data(self):
"""Returns data as a number."""
return int(str(self._text_modify.text()), 0)
def set_data_width(self, width):
"""Data width is specified in bytes."""
self._data_width = width
input_mask = "\\0\\x" + width * "H"
self._text_write.setInputMask(input_mask)
self._text_write.setMaxLength(width)
self._text_write.setText("0x" + width * "0")
self._text_modify.setInputMask(input_mask)
self._text_modify.setMaxLength(width)
self._text_modify.setText("0x" + width * "F")
class PCIDataTableWidget(QtGui.QTableWidget):
"""Table that contains the data for all the registers.
The user can edit it in-place. It also contains the buttons for read/write/modify/remove.
"""
def __init__(self):
super(PCIDataTableWidget, self).__init__(0, 5)
self.setSelectionMode(QtGui.QAbstractItemView.NoSelection)
self.verticalHeader().hide()
self.horizontalHeader().setResizeMode(QtGui.QHeaderView.Fixed)
self.setHorizontalHeaderItem(0, QtGui.QTableWidgetItem("Address"))
self.setHorizontalHeaderItem(1, QtGui.QTableWidgetItem("Name"))
self.setHorizontalHeaderItem(2, QtGui.QTableWidgetItem("Width [bits]"))
self.setHorizontalHeaderItem(3, QtGui.QTableWidgetItem("Value"))
self.setHorizontalHeaderItem(4, QtGui.QTableWidgetItem(""))
size_name = QtCore.QSize()
size_name.setWidth(150)
self.horizontalHeaderItem(1).setSizeHint(size_name)
self.itemChanged.connect(self.sortCustom)
def sortCustom(self):
"""Always sorts by name column, since it contains the item that specifies sorting."""
self.sortItems(COL_NAME, QtCore.Qt.AscendingOrder)
class HexValidator(QtGui.QValidator):
"""Validator for text fields that makes sure that there are
no empty spaces and all hexadecimal letters are capitalized.
"""
def __init__(self):
super(HexValidator, self).__init__()
def validate(self, s, pos):
text = str(s)
if text.count(" ") > 0:
return QtGui.QValidator.Intermediate, pos
for c in text:
if c != 'x' and c.islower():
return QtGui.QValidator.Intermediate, pos
return QtGui.QValidator.Acceptable, pos
def fixup(self, s):
s.replace(0, s.size(), s.toUpper())
s.replace("X", "x")
s.replace(" ", "0")
class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self._rpci = None
self._exec_thread = None
self._exec_obj = None
self._exec_mutex = QtCore.QMutex()
self._init_menus()
self._txt_host = QtGui.QLineEdit()
self._btn_connect = QtGui.QPushButton("Connect")
self._btn_connect.clicked.connect(self._connect)
w_connect = QtGui.QWidget()
wc_layout = QtGui.QFormLayout()
wc_layout.setLabelAlignment(QtCore.Qt.AlignRight)
wc_layout.addRow("Hostname/IP:", self._txt_host)
wc_layout.addRow(self._btn_connect)
w_connect.setLayout(wc_layout)
size_policy = w_connect.sizePolicy()
size_policy.setVerticalPolicy(QtGui.QSizePolicy.Fixed)
w_connect.setSizePolicy(size_policy)
self._table_view = PCIDataTableWidget()
self._add_address()
self._table_view.resizeColumnsToContents()
self._btn_add_address = QtGui.QPushButton("Add Address")
self._btn_add_address.clicked.connect(lambda: self._add_address())
self._btn_read_all = QtGui.QPushButton("Read All")
self._btn_read_all.clicked.connect(self._read_all)
self._btn_read_all.setEnabled(False)
main_widget = QtGui.QWidget()
layout = QtGui.QVBoxLayout()
layout.addWidget(w_connect)
layout.addWidget(self._table_view)
layout.addWidget(self._btn_add_address)
layout.addWidget(self._btn_read_all)
main_widget.setLayout(layout)
self.setCentralWidget(main_widget)
self.setFixedSize(1000, 600)
self.setWindowTitle("Remote PCI register access GUI")
self.center()
self.show()
def center(self):
qr = self.frameGeometry()
cp = QtGui.QDesktopWidget().availableGeometry().center()
qr.moveCenter(cp)
self.move(qr.topLeft())
def _init_menus(self):
self._act_new = QtGui.QAction("New", self)
self._act_new.setShortcut('Ctrl+N')
self._act_new.triggered.connect(self._remove_all)
self._act_open = QtGui.QAction("Open", self)
self._act_open.setShortcut('Ctrl+O')
self._act_open.triggered.connect(self._load_registers)
self._act_save = QtGui.QAction("Save", self)
self._act_save.setShortcut('Ctrl+S')
self._act_save.triggered.connect(self._save_registers)
menubar = self.menuBar()
reg_menu = menubar.addMenu("Register Map")
reg_menu.addAction(self._act_new)
reg_menu.addAction(self._act_open)
reg_menu.addAction(self._act_save)
def _enable_menus(self, enable):
self._act_new.setEnabled(enable)
self._act_open.setEnabled(enable)
self._act_save.setEnabled(enable)
def _save_registers(self):
"""Shows dialog and saves the data from the table to the CSV file."""
file_name = QtGui.QFileDialog.getSaveFileName(self, "Save register map", ".")
if not file_name:
return
with open(file_name, 'wb') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=PERSIST_FIELDS)
for row in range(self._table_view.rowCount()):
text_addr = self._table_view.cellWidget(row, COL_ADDRESS)
address = text_addr.text()
cbox_width = self._table_view.cellWidget(row, COL_WIDTH)
width = pow(2, cbox_width.currentIndex() + 3)
item = self._table_view.item(row, COL_NAME)
name = ""
if item:
name = item.text()
writer.writerow({FIELD_ADDRESS: address, FIELD_NAME: name, FIELD_WIDTH: width})
def _load_registers(self):
"""Shows dialog and loads the data from the CSV file to the table.
Table is populated only if the file contains valid/parsable data.
In the opposite case, error dialog with the details is shown.
"""
file_name = QtGui.QFileDialog.getOpenFileName(self, "Open register map", ".")
if not file_name:
return
load_success = True
items = []
with open(file_name, 'rb') as csv_file:
reader = csv.DictReader(csv_file, fieldnames=PERSIST_FIELDS)
for i, row in enumerate(reader):
try:
int(row[FIELD_ADDRESS], 0)
address = row[FIELD_ADDRESS]
name = row[FIELD_NAME]
if name is None:
raise Exception("Missing name attribute!")
width = int(row[FIELD_WIDTH])
if width not in DATA_WIDTHS:
raise Exception("Not supported width " + str(width))
items.append((address, name, width))
except Exception as e:
msg_box = QtGui.QMessageBox()
msg_box.setText("Cannot parse selected file!")
msg_box.setInformativeText("Error in line " + str(i) + ": " + str(e))
msg_box.setIcon(QtGui.QMessageBox.Warning)
msg_box.exec_()
load_success = False
break
if load_success:
self._table_view.setRowCount(0)
for address, name, width in items:
self._add_address(address, name, width)
def _on_address_edited(self, w_val):
w_val.set_read_data(None)
self._table_view.sortCustom()
def _on_width_changed(self, w_val, index):
w_val.set_read_data(None)
width = pow(2, index + 1)
w_val.set_data_width(width)
self._table_view.sortCustom()
def _add_address(self, address=None, name=None, width=None):
"""Adds new row to the table with appropriate widgets initialized."""
row_num = self._table_view.rowCount()
self._table_view.insertRow(row_num)
w_val = ValWidget()
if not self._rpci:
w_val.setEnabled(False)
self._table_view.setCellWidget(row_num, COL_VALUE, w_val)
w_address = AddressWidget(address)
w_address.editingFinished.connect(lambda: self._on_address_edited(w_val))
self._table_view.setCellWidget(row_num, COL_ADDRESS, w_address)
cbox_width = QtGui.QComboBox()
for i, dw in enumerate(DATA_WIDTHS):
cbox_width.addItem(str(dw))
if width is not None:
if dw == width:
cbox_width.setCurrentIndex(i)
calc_width = pow(2, i + 1)
w_val.set_data_width(calc_width)
else:
if i == len(DATA_WIDTHS) - 1:
cbox_width.setCurrentIndex(i)
cbox_width.currentIndexChanged.connect(lambda: self._on_width_changed(w_val, cbox_width.currentIndex()))
self._table_view.setCellWidget(row_num, COL_WIDTH, cbox_width)
btn_remove = QtGui.QPushButton("Remove")
btn_remove.clicked.connect(self._remove)
self._table_view.setCellWidget(row_num, COL_REMOVE, btn_remove)
# connect value buttons
w_val.btn_write.clicked.connect(lambda: self._write(w_val, w_address, cbox_width, btn_remove))
w_val.btn_modify.clicked.connect(lambda: self._modify(w_val, w_address, cbox_width, btn_remove))
w_val.btn_read.clicked.connect(lambda: self._read(w_val, w_address, cbox_width, btn_remove))
name_str = ""
if name is not None:
name_str = name
tw_item = TableSortItem(name_str, w_address, cbox_width)
# will automatically sort when the item is changed
self._table_view.setItem(row_num, COL_NAME, tw_item)
def _remove(self):
btn = self.sender()
index = self._table_view.indexAt(btn.pos())
if not index.isValid():
return
self._table_view.removeRow(index.row())
if self._table_view.rowCount() == 0:
self._add_address()
def _remove_all(self):
"""Enforces at least one element in the table."""
self._table_view.setRowCount(0)
self._add_address()
def _connect(self):
if not self._rpci:
try:
self._btn_connect.setEnabled(False)
self._rpci = RemotePCI(self._txt_host.text())
# just simple tests to check if the connection works
self._rpci.read_text(0, 1)
self._txt_host.setEnabled(False)
self._btn_connect.setText("Disconnect")
self._enable_val_widgets(True)
except Exception as e:
self._rpci = None
msg_box = QtGui.QMessageBox()
msg_box.setText("Cannot connect! Check if the PCI register server is running and it is accessible on the network.")
msg_box.setInformativeText(str(e))
msg_box.setIcon(QtGui.QMessageBox.Warning)
msg_box.exec_()
self._txt_host.setFocus()
#raise e, None, sys.exc_info()[2]
return
finally:
self._btn_connect.setEnabled(True)
else:
self._rpci = None
self._txt_host.setEnabled(True)
self._btn_connect.setText("Connect")
self._enable_val_widgets(False)
def _enable_table_edit(self, enable):
self._table_view.setEnabled(enable)
self._btn_add_address.setEnabled(enable)
self._btn_read_all.setEnabled(enable)
self._enable_menus(enable)
def _read_all(self):
self._enable_table_edit(False)
self._dev_communicate(self._dev_read_all, lambda: self._enable_table_edit(True))
def _get_target_register(self, w_address, cbox_width):
cid = cbox_width.currentIndex()
width = pow(2, cid + 3)
offset = int(str(w_address.text()), 0)
return (offset, width)
def _enable_row_edit(self, w_address, cbox_width, btn_remove, enable):
w_address.setEnabled(enable)
cbox_width.setEnabled(enable)
btn_remove.setEnabled(enable)
self._enable_val_widgets(enable)
self._enable_menus(enable)
def _read(self, w_val, w_address, cbox_width, btn_remove):
self._enable_row_edit(w_address, cbox_width, btn_remove, False)
offset, width = self._get_target_register(w_address, cbox_width)
self._dev_communicate(lambda: self._dev_read(w_val, offset, width), lambda: self._enable_row_edit(w_address, cbox_width, btn_remove, True))
def _write(self, w_val, w_address, cbox_width, btn_remove):
self._enable_row_edit(w_address, cbox_width, btn_remove, False)
offset, width = self._get_target_register(w_address, cbox_width)
self._dev_communicate(lambda: self._dev_write(w_val, offset, width), lambda: self._enable_row_edit(w_address, cbox_width, btn_remove, True))
def _modify(self, w_val, w_address, cbox_width, btn_remove):
self._enable_row_edit(w_address, cbox_width, btn_remove, False)
offset, width = self._get_target_register(w_address, cbox_width)
self._dev_communicate(lambda: self._dev_modify(w_val, offset, width), lambda: self._enable_row_edit(w_address, cbox_width, btn_remove, True))
def _enable_val_widgets(self, enable):
"""Enables/disables all the widgets for reading/writing the registers."""
self._btn_read_all.setEnabled(enable)
for row in range(self._table_view.rowCount()):
w_val = self._table_view.cellWidget(row, COL_VALUE)
w_val.setEnabled(enable)
def _dev_read_all(self):
for row in range(self._table_view.rowCount()):
w_address = self._table_view.cellWidget(row, COL_ADDRESS)
cbox_width = self._table_view.cellWidget(row, COL_WIDTH)
w_val = self._table_view.cellWidget(row, COL_VALUE)
offset, width = self._get_target_register(w_address, cbox_width)
self._dev_read(w_val, offset, width)
def _dev_read(self, widget_val, offset, width):
data = self._rpci.read(offset, width)
widget_val.set_read_data(data)
def _dev_write(self, widget_val, offset, width):
val_w = widget_val.get_write_data()
self._rpci.write(offset, width, val_w)
widget_val.set_read_data(None)
def _dev_modify(self, widget_val, offset, width):
val_w = widget_val.get_write_data()
val_m = widget_val.get_modify_data()
data = self._rpci.modify(offset, width, val_w, val_m)
widget_val.set_read_data(data)
def _dev_communicate(self, func_run, func_cleanup):
"""Executes passed functions in the background thread."""
with QtCore.QMutexLocker(self._exec_mutex):
if self._exec_obj:
return
self._exec_thread = QtCore.QThread()
self._exec_obj = ExecThreadObj(func_run)
self._exec_obj.moveToThread(self._exec_thread)
self._exec_obj.finished.connect(self._exec_thread.quit)
self._exec_thread.started.connect(self._exec_obj.do_run)
self._exec_thread.finished.connect(lambda: self._stop_thread_callback(func_cleanup, self._exec_obj.exc))
self._exec_thread.start()
def _stop_thread_callback(self, func_cleanup, exc):
if exc:
msg_box = QtGui.QMessageBox()
msg_box.setText("Device operation failed!")
msg_box.setInformativeText(str(exc))
msg_box.setIcon(QtGui.QMessageBox.Warning)
msg_box.exec_()
with QtCore.QMutexLocker(self._exec_mutex):
self._exec_thread = None
self._exec_obj = None
func_cleanup()
class ExecThreadObj(QtCore.QObject):
"""Object that gets executed on the background thread."""
finished = QtCore.pyqtSignal()
def __init__(self, func_run):
QtCore.QObject.__init__(self)
self._func_run = func_run
self.exc = None
def do_run(self):
try:
self._func_run()
except Exception as e:
self.exc = e
finally:
self.finished.emit()
def main():
app = QtGui.QApplication(sys.argv)
_ = MainWindow()
return app.exec_()
if __name__ == '__main__':
sys.exit(main())
emcor-python-package/Python-RemotePCI-GUI/setup.py 0000664 0005333 0001756 00000000615 12515410371 021112 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'Python-RemotePCI-GUI',
'version': '1.1.0',
'description': 'Python GUI pyrpci library',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': [],
'scripts': ['pyrpci_gui.py']
}
setup(**config)
emcor-python-package/Python-UnitTestGUI/ 0000775 0005333 0001756 00000000000 12664112621 017313 5 ustar luchini cd emcor-python-package/Python-UnitTestGUI/.project 0000664 0005333 0001756 00000000564 12511467241 020771 0 ustar luchini cd
Python-UnitTestGUI
org.python.pydev.PyDevBuilder
org.python.pydev.pythonNature
emcor-python-package/Python-UnitTestGUI/.pydevproject 0000664 0005333 0001756 00000000647 12511467241 022043 0 ustar luchini cd
/${PROJECT_DIR_NAME}
python 2.7
Default
emcor-python-package/Python-UnitTestGUI/README 0000664 0005333 0001756 00000000167 12515410441 020173 0 ustar luchini cd Please see setup.py file for module details.
INSTALLATION:
python setup.py install
PYTHON: 2.7
DEPENDENCIES:
- PyQt4
emcor-python-package/Python-UnitTestGUI/pyut_framework.py 0000664 0005333 0001756 00000032627 12511467241 022757 0 ustar luchini cd import sys
import unittest
import time
import traceback
import ConfigParser
CONF_SECTION_ALL = "*"
def parse_test_case_id(test_case):
"""Constructs TestCaseId object from the passed TestCase id."""
test_id = test_case.id().split(".")
return TestCaseId(test_id[0], test_id[1], test_id[2])
def get_test_item_id(test_item):
"""Constructs TestCaseId from the TestItem.
It can create only partial objects if the test_item was not created from test function.
"""
test_case_id = TestCaseId()
if test_item.type == TestItemType.FUNCTION:
test_case_id.function_name = test_item.name
test_item = test_item.get_parent()
if test_item.type == TestItemType.CLASS:
test_case_id.class_name = test_item.name
test_item = test_item.get_parent()
if test_item.type == TestItemType.MODULE:
test_case_id.module_name = test_item.name
return test_case_id
def load_tests_configuration(path, ini_file="testconf.ini"):
config_parser = ConfigParser.RawConfigParser()
# must be case sensitive
config_parser.optionxform = str
filename = path + "/" + ini_file
config_parser.read(filename)
return config_parser
def _set_obj_attr(obj, k, v):
"""Dynamically casts the value of the attribute to the existing type.
Applies it only if the cast was successful.
Returns true if the attribute was successfully injected.
"""
if hasattr(obj, k):
val = getattr(obj, k)
try:
v = type(val)(v)
except (TypeError, ValueError):
return False
setattr(obj, k, v)
return True
return False
def _apply_configuration(test_case, config):
"""Applies configuration to module and class objects of the test case.
The configuration is changed only if it already exists in the object.
Returns the set of configuration keys from the omni-section that were not applied.
"""
not_applied = set()
section = CONF_SECTION_ALL
if config.has_section(section):
test_module = sys.modules[test_case.__module__]
for k, v in config.items(section):
if not _set_obj_attr(test_module, k, v):
not_applied.add(k)
section = test_case.__module__
if config.has_section(section):
test_module = sys.modules[section]
for k, v in config.items(section):
if not _set_obj_attr(test_module, k, v):
config.remove_option(section, k)
section = test_case.__module__ + "." + test_case.__class__.__name__
if config.has_section(section):
for k, v in config.items(section):
if not _set_obj_attr(test_case, k, v):
config.remove_option(section, k)
return not_applied
def build_tests_tree(path, config):
"""Uses unittest.TestLoader to discover all the python files in the
current directory that start with 'test' prefix.
Creates a tree of TestItem objects and returns its root.
Also configures all test cases to have longMessage enabled by default.
Additionally applies configuration to the test cases.
"""
not_applied_keys = set(config.options(CONF_SECTION_ALL))
test_case_id = TestCaseId()
all_tests = unittest.TestLoader().discover(path)
root_ti = TestItem("All tests", all_tests, TestItemType.ALL)
for module_tests in all_tests:
tests_class = []
for class_tests in module_tests:
tests_case = []
for case_test in class_tests:
test_case_id = parse_test_case_id(case_test)
ti = TestItem(test_case_id.function_name, case_test, TestItemType.FUNCTION)
tests_case.append(ti)
ti.description = case_test.shortDescription()
if ti.description == None:
ti.description = ""
# always set longMessage to True to get more output with asserts
setattr(case_test, 'longMessage', True)
na_keys = _apply_configuration(case_test, config)
not_applied_keys &= na_keys
if tests_case:
ti = TestItem(test_case_id.class_name, class_tests, TestItemType.CLASS)
ti.add_children(tests_case)
tests_class.append(ti)
if tests_class:
ti = TestItem(test_case_id.module_name, module_tests, TestItemType.MODULE)
ti.add_children(tests_class)
root_ti.add_child(ti)
# remove not applied keys from omni-section
for na_key in not_applied_keys:
config.remove_option(CONF_SECTION_ALL, na_key)
return root_ti
def aggregate_results(test_item):
"""Recursively aggregates results of the passed TestItem."""
num_tests, num_passed, num_skipped, num_failed, num_errors = 0, 0, 0, 0, 0
if test_item.type == TestItemType.FUNCTION:
num_tests += 1
if test_item.status:
if test_item.status.status == TestStatus.PASS:
num_passed += 1
if test_item.status.status == TestStatus.SKIP:
num_skipped += 1
if test_item.status.status == TestStatus.FAIL:
num_failed += 1
if test_item.status.status == TestStatus.ERROR:
num_errors += 1
else:
for ti in test_item.get_children():
nt, np, ns, nf, ne = aggregate_results(ti)
num_tests += nt
num_passed += np
num_skipped += ns
num_failed += nf
num_errors += ne
return num_tests, num_passed, num_skipped, num_failed, num_errors
class TestExecutor(unittest.TestResult):
"""Works as aggregator for unittest results, but works on TestItem objects.
Takes care of timing the tests and propagating results.
"""
def __init__(self, root_test_item, callback=None):
unittest.TestResult.__init__(self)
self._root_test_item = root_test_item
self._callback = callback
self._start_time = 0
self.successes = []
self.buffer = True
def startTestRun(self):
super(TestExecutor, self).startTestRun()
if self._callback:
self._callback.on_start_all()
def stopTestRun(self):
super(TestExecutor, self).stopTestRun()
if self._callback:
self._callback.on_stop_all(self.shouldStop, self.testsRun, len(self.successes), len(self.skipped), len(self.failures), len(self.errors), len(self.unexpectedSuccesses), len(self.expectedFailures))
def startTest(self, test):
super(TestExecutor, self).startTest(test)
if self._callback:
self._callback.on_start(test)
self._start_time = time.time()
def stopTest(self, test):
super(TestExecutor, self).stopTest(test)
duration_time = time.time() - self._start_time
tc_id = parse_test_case_id(test)
test_item = self._find_test_item(self._root_test_item, tc_id)
test_item.time = duration_time
self._propagate_result_up(test_item)
if self._callback:
self._callback.on_stop(test)
def addError(self, test, err):
super(TestExecutor, self).addError(test, err)
self._mirrorOutput = False
self._update_test_case_status(test, self._parse_test_err(err, True), TestStatus(TestStatus.ERROR))
def addFailure(self, test, err):
super(TestExecutor, self).addFailure(test, err)
self._mirrorOutput = False
self._update_test_case_status(test, self._parse_test_err(err, False), TestStatus(TestStatus.FAIL))
def addUnexpectedSuccess(self, test):
super(TestExecutor, self).addUnexpectedSuccess(test)
self._update_test_case_status(test, "", TestStatus(TestStatus.FAIL, "Unexpected Success"))
def addSuccess(self, test):
super(TestExecutor, self).addSuccess(test)
self.successes.append(test)
self._update_test_case_status(test, "", TestStatus(TestStatus.PASS))
def addExpectedFailure(self, test, err):
super(TestExecutor, self).addExpectedFailure(test, err)
self._update_test_case_status(test, self._parse_test_err(err, False), TestStatus(TestStatus.PASS, "Expected Failure"))
def addSkip(self, test, reason):
super(TestExecutor, self).addSkip(test, reason)
self._update_test_case_status(test, self.skipped[-1][1], TestStatus(TestStatus.SKIP))
def run(self):
test = self._root_test_item.executable
self.startTestRun()
try:
test(self)
finally:
self.stopTestRun()
def _find_test_item(self, test_item, tc_id):
"""Recursive function that does breadth first search for the TestItem
with specified id that resides in the tree that is defined by passed test item.
"""
if (test_item.type == TestItemType.FUNCTION) and (test_item.name == tc_id.function_name):
return test_item
if ((test_item.type == TestItemType.ALL) or
((test_item.type == TestItemType.MODULE) and (test_item.name == tc_id.module_name)) or
((test_item.type == TestItemType.CLASS) and (test_item.name == tc_id.class_name))):
for ti in test_item.get_children():
ret_ti = self._find_test_item(ti, tc_id)
if ret_ti:
return ret_ti
return None
def _update_test_case_status(self, test_case, test_outcome, status):
"""Finds test item from the test_case id and updates its status and output values."""
tc_id = parse_test_case_id(test_case)
test_item = self._find_test_item(self._root_test_item, tc_id)
test_item.status = status
test_item.outcome = test_outcome
test_item.output = sys.stdout.getvalue()
def _propagate_result_up(self, test_item):
"""Propagates result of passed TestItem to its parents and updates them accordingly."""
parent = test_item.get_parent()
propagate_status = True
while parent:
parent.time = 0
statuses = []
for ti in parent.get_children():
parent.time += ti.time
statuses.append(ti.status)
if None in statuses:
propagate_status = False
if propagate_status:
act_statuses = [s.status for s in statuses]
parent.status = TestStatus(TestStatus.PASS)
if TestStatus.FAIL in act_statuses:
parent.status = TestStatus(TestStatus.FAIL)
if TestStatus.ERROR in act_statuses:
parent.status = TestStatus(TestStatus.ERROR)
parent = parent.get_parent()
def _parse_test_err(self, err, stack_trace):
"""Creates more readable exception output by omitting stack trace if specified."""
exc_text = []
if stack_trace:
exctype, value, tb = err
exc_text = traceback.format_exception(exctype, value, tb)
else:
exctype, value, _ = err
exc_text = traceback.format_exception_only(exctype, value)
return ''.join(exc_text)
class TestCaseId(object):
def __init__(self, module_name="", class_name="", function_name=""):
self.module_name = module_name
self.class_name = class_name
self.function_name = function_name
def __eq__(self, other):
if isinstance(other, self.__class__):
if (self.module_name == other.module_name) and (self.class_name == other.class_name) and (self.function_name == other.function_name):
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.module_name + "." + self.class_name + "." + self.function_name
def is_superset_of(self, other):
if isinstance(other, self.__class__):
if not self.module_name:
return True
if not self.class_name:
if self.module_name == other.module_name:
return True
if not self.function_name:
if self.module_name == other.module_name and self.class_name == other.class_name:
return True
return self.__eq__(other)
return False
class TestStatus(object):
PASS = "Pass"
SKIP = "Skip"
FAIL = "Fail"
ERROR = "Error"
def __init__(self, status, details=None):
self.status = status
self.details = details
def __str__(self):
out_str = self.status
if self.details:
out_str += " (" + self.details + ")"
return out_str
class TestItemType(object):
ALL = 0
MODULE = 1
CLASS = 2
FUNCTION = 3
class TestItem(object):
"""Core test item that holds all the important data for each of the tests.
It also has pointers to its children and parent.
"""
def __init__(self, name, executable, typ):
self.name = name
self.executable = executable
self.status = None
self.type = typ
self.outcome = ""
self.output = ""
self.description = ""
self.time = 0
self._children = []
self._parent = None
def add_child(self, child):
self._children.append(child)
child._parent = self
def add_children(self, children):
for child in children:
self.add_child(child)
def get_children(self):
return self._children
def get_parent(self):
return self._parent
emcor-python-package/Python-UnitTestGUI/pyut_gui.py 0000664 0005333 0001756 00000037560 12511467241 021547 0 ustar luchini cd #!/usr/bin/env python
import sys
from datetime import datetime
from PyQt4 import QtGui, QtCore
import pyut_framework as pyutf
import pyut_xml as pyutx
class StyledLabel(QtGui.QLabel):
def __init__(self):
super(StyledLabel, self).__init__()
self.setFrameShape(QtGui.QFrame.StyledPanel)
class TreeNode(object):
def __init__(self, parent, row, data):
self.parent = parent
self.row = row
self.data = data
self.subnodes = [TreeNode(self, index, elem) for index, elem in enumerate(self.data.get_children())]
class TreeModel(QtCore.QAbstractItemModel):
"""Tree model for displaying TestItem objects in a tree view."""
HEADERS = ["Name", "Status", "Duration[s]"]
def __init__(self, root_nodes):
QtCore.QAbstractItemModel.__init__(self)
self._root_nodes = [TreeNode(None, index, elem) for index, elem in enumerate(root_nodes)]
def index(self, row, column, parent):
if not parent.isValid():
return self.createIndex(row, column, self._root_nodes[row])
parent_node = parent.internalPointer()
return self.createIndex(row, column, parent_node.subnodes[row])
def parent(self, index):
if not index.isValid():
return QtCore.QModelIndex()
node = index.internalPointer()
if node.parent is None:
return QtCore.QModelIndex()
else:
return self.createIndex(node.parent.row, 0, node.parent)
def rowCount(self, parent):
if not parent.isValid():
return len(self._root_nodes)
node = parent.internalPointer()
return len(node.subnodes)
def columnCount(self, parent):
return len(TreeModel.HEADERS)
def data(self, index, role):
if not index.isValid():
return None
node = index.internalPointer()
if role == QtCore.Qt.DisplayRole:
if index.column() == 0:
return node.data.name
elif index.column() == 1:
if node.data.status:
return str(node.data.status)
elif index.column() == 2:
return "{0:.3f}".format(node.data.time)
return None
def headerData(self, section, orientation, role):
if orientation == QtCore.Qt.Horizontal and role == QtCore.Qt.DisplayRole:
return TreeModel.HEADERS[section]
return None
class GuiCallback(QtCore.QObject):
"""Passed to TestExecutor for obtaining the current status of the tests
and immediately updating the GUI. GUI updates are done in the GUI thread.
"""
start = QtCore.pyqtSignal()
start_all = QtCore.pyqtSignal()
stop = QtCore.pyqtSignal()
stop_all = QtCore.pyqtSignal()
def __init__(self, tree_view, desc_widget, status_bar):
QtCore.QObject.__init__(self)
self._tree_view = tree_view
self._desc_widget = desc_widget
self._status_bar = status_bar
self.start.connect(self._update_status_bar)
self.stop.connect(self._update_data_views)
self.stop_all.connect(self._update_status_bar)
self._message = ""
self._test = None
def on_start_all(self):
self.start_all.emit()
def on_stop_all(self, should_stop, num_tests, num_success, num_skipped, num_failed, num_errors, num_unexp_success, num_exp_failures):
if should_stop:
self._message += "Execution interrupted! "
self._message = "Ran " + str(num_tests) + " test"
if num_tests != 1:
self._message += "s"
self._message += ": Passed " + str(num_success)
self._message += ", Skipped " + str(num_skipped)
self._message += ", Failures " + str(num_failed)
self._message += ", Errors " + str(num_errors)
if num_unexp_success > 0:
self._message += ", Unexpectedly passed " + str(num_unexp_success)
if num_exp_failures > 0:
self._message += ", Expected failures " + str(num_exp_failures)
self.stop_all.emit()
def on_start(self, test):
self._message = "Running " + str(test)
self.start.emit()
def on_stop(self, test):
self._test = test
self.stop.emit()
def _update_status_bar(self):
"""Executes in the GUI thread."""
self._status_bar.showMessage(self._message)
def _update_data_views(self):
"""Executes in the GUI thread."""
self._tree_view.dataChanged(QtCore.QModelIndex(), QtCore.QModelIndex())
selected_test_item = self._tree_view.currentIndex().internalPointer().data
s_tc_id = pyutf.get_test_item_id(selected_test_item)
tc_id = pyutf.parse_test_case_id(self._test)
if s_tc_id.is_superset_of(tc_id):
self._desc_widget.currentWidget().update(selected_test_item)
class TestExecThreadObj(QtCore.QObject):
"""Just a dummy object for running TestExecutor in the background thread."""
finished = QtCore.pyqtSignal()
def __init__(self, test_executor):
QtCore.QObject.__init__(self)
self._test_executor = test_executor
def do_run(self):
self._test_executor.run()
self.finished.emit()
class TestCaseDescWidget(QtGui.QWidget):
"""Widget for showing the details about the test case."""
def __init__(self):
super(TestCaseDescWidget, self).__init__()
self._lbl_name = StyledLabel()
self._lbl_name.setWordWrap(True)
self._lbl_desc = StyledLabel()
self._lbl_desc.setWordWrap(True)
self._lbl_status = StyledLabel()
self._lbl_time = StyledLabel()
self._lbl_output = QtGui.QLabel()
self._lbl_output.setWordWrap(True)
self._lbl_output.setMargin(2)
self._lbl_output.setAlignment(QtCore.Qt.AlignLeft)
scroll_area = QtGui.QScrollArea()
scroll_area.setFrameShape(QtGui.QFrame.StyledPanel)
output_policy = scroll_area.sizePolicy()
output_policy.setVerticalStretch(1)
output_policy.setHorizontalStretch(0)
scroll_area.setSizePolicy(output_policy)
scroll_area.setAlignment(QtCore.Qt.AlignLeft)
scroll_area.setWidget(self._lbl_output)
scroll_area.setWidgetResizable(True)
layout = QtGui.QFormLayout()
layout.setLabelAlignment(QtCore.Qt.AlignRight)
layout.addRow("Name:", self._lbl_name)
layout.addRow("Description:", self._lbl_desc)
layout.addRow("Status:", self._lbl_status)
layout.addRow("Duration[s]:", self._lbl_time)
layout.addRow("Output:", scroll_area)
self.setLayout(layout)
def update(self, test_item):
self._lbl_name.setText(test_item.name)
self._lbl_desc.setText(test_item.description)
if test_item.status:
self._lbl_status.setText(str(test_item.status))
else:
self._lbl_status.setText("")
self._lbl_time.setText("{0:.3f}".format(test_item.time))
output = test_item.output
if output:
if not output.endswith('\n'):
output += '\n'
output += test_item.outcome
self._lbl_output.setText(output)
class TestSuiteDescWidget(QtGui.QWidget):
"""Widget for showing the details about the test suite."""
def __init__(self):
super(TestSuiteDescWidget, self).__init__()
self._lbl_name = StyledLabel()
self._lbl_name.setWordWrap(True)
self._lbl_status = StyledLabel()
self._lbl_status.setWordWrap(True)
self._lbl_time = StyledLabel()
self._lbl_num_tests = StyledLabel()
self._lbl_num_passed = StyledLabel()
self._lbl_num_skipped = StyledLabel()
self._lbl_num_failed = StyledLabel()
self._lbl_num_errors = StyledLabel()
layout = QtGui.QFormLayout()
layout.setLabelAlignment(QtCore.Qt.AlignRight)
layout.addRow("Name:", self._lbl_name)
layout.addRow("Status:", self._lbl_status)
layout.addRow("Duration[s]:", self._lbl_time)
layout.addRow("Tests:", self._lbl_num_tests)
layout.addRow("Passed:", self._lbl_num_passed)
layout.addRow("Skipped:", self._lbl_num_skipped)
layout.addRow("Failed:", self._lbl_num_failed)
layout.addRow("Errors:", self._lbl_num_errors)
self.setLayout(layout)
def update(self, test_item):
self._lbl_name.setText(test_item.name)
if test_item.status:
self._lbl_status.setText(str(test_item.status))
else:
self._lbl_status.setText("")
self._lbl_time.setText("{0:.3f}".format(test_item.time))
num_tests, num_passed, num_skipped, num_failed, num_errors = pyutf.aggregate_results(test_item)
self._lbl_num_tests.setText(str(num_tests))
self._lbl_num_passed.setText(str(num_passed))
self._lbl_num_skipped.setText(str(num_skipped))
self._lbl_num_failed.setText(str(num_failed))
self._lbl_num_errors.setText(str(num_errors))
class PyUTMainWindow(QtGui.QMainWindow):
"""Main window that contains the whole GUI."""
def __init__(self):
super(PyUTMainWindow, self).__init__()
self._tests_configuration = None
self._tests_path = "."
self._ns_dialog = QtGui.QFileDialog(self)
self._ns_dialog.setWindowTitle("Choose tests location")
self._ns_dialog.setFileMode(QtGui.QFileDialog.Directory)
self._ns_dialog.setViewMode(QtGui.QFileDialog.List)
self._ns_dialog.setDirectory(".")
self._act_new = QtGui.QAction("New", self)
self._act_new.setShortcut('Ctrl+N')
self._act_new.triggered.connect(self.new_session)
self._act_save = QtGui.QAction("Save", self)
self._act_save.setShortcut('Ctrl+S')
self._act_save.triggered.connect(self.save_tests)
act_exit = QtGui.QAction("Exit", self)
act_exit.setShortcut('Ctrl+Q')
act_exit.triggered.connect(QtGui.qApp.quit)
menubar = self.menuBar()
test_menu = menubar.addMenu("Test Session")
test_menu.addAction(self._act_new)
test_menu.addAction(self._act_save)
test_menu.addSeparator()
test_menu.addAction(act_exit)
self.statusBar().showMessage("Ready")
main_widget = QtGui.QWidget()
self._test_executor = None
self._test_exec_thread = None
self._test_exec_obj = None
self._exec_mutex = QtCore.QMutex()
self._tree_view = QtGui.QTreeView()
self._btn_run = QtGui.QPushButton("Run")
self._btn_run.clicked.connect(self.run_tests)
left_widget = QtGui.QWidget()
wl_layout = QtGui.QVBoxLayout()
wl_layout.addWidget(self._tree_view)
wl_layout.addWidget(self._btn_run)
left_widget.setLayout(wl_layout)
tcase_widget = TestCaseDescWidget()
tsuite_widget = TestSuiteDescWidget()
self._desc_widget = QtGui.QStackedWidget()
self._desc_widget.addWidget(tcase_widget)
self._desc_widget.addWidget(tsuite_widget)
layout = QtGui.QHBoxLayout()
layout.addWidget(left_widget)
layout.addWidget(self._desc_widget)
layout.setMargin(0)
main_widget.setLayout(layout)
self._gui_callback = GuiCallback(self._tree_view, self._desc_widget, self.statusBar())
self.load_tests()
self.setCentralWidget(main_widget)
self.setFixedSize(1000, 600)
self.setWindowTitle("Tests GUI")
self.center()
self.show()
def center(self):
qr = self.frameGeometry()
cp = QtGui.QDesktopWidget().availableGeometry().center()
qr.moveCenter(cp)
self.move(qr.topLeft())
def show_desc_widget(self):
"""Selects which widget to show in the GUI based on the selected test item."""
test_item = self._tree_view.currentIndex().internalPointer().data
if test_item.type == pyutf.TestItemType.FUNCTION:
self._desc_widget.setCurrentIndex(0)
else:
self._desc_widget.setCurrentIndex(1)
self._desc_widget.currentWidget().update(test_item)
def new_session(self):
if self._ns_dialog.exec_() == QtGui.QDialog.Accepted:
self.load_tests(str(self._ns_dialog.selectedFiles()[0]))
def load_tests(self, path="."):
"""Loads the tests and fills the tree view with them."""
self._tests_path = path
self._tests_configuration = pyutf.load_tests_configuration(path)
root_test_item = pyutf.build_tests_tree(path, self._tests_configuration)
self._tree_view.setModel(TreeModel([root_test_item]))
self._tree_view.header().setStretchLastSection(False)
self._tree_view.header().setResizeMode(0, QtGui.QHeaderView.Stretch)
self._tree_view.expandAll()
self._tree_view.selectionModel().selectionChanged.connect(self.show_desc_widget)
# select the first element immediately
root_index = self._tree_view.model().index(0, 0, QtCore.QModelIndex())
self._tree_view.selectionModel().setCurrentIndex(root_index, QtGui.QItemSelectionModel.NoUpdate)
self.show_desc_widget()
def run_tests(self):
"""Creates background execution thread for running the tests or stops if it's already running."""
with QtCore.QMutexLocker(self._exec_mutex):
if not self._test_executor:
self._btn_run.setText("Stop")
self._act_new.setEnabled(False)
self._act_save.setEnabled(False)
test_item = self._tree_view.currentIndex().internalPointer().data
self._test_executor = pyutf.TestExecutor(test_item, self._gui_callback)
self._test_exec_thread = QtCore.QThread()
self._test_exec_obj = TestExecThreadObj(self._test_executor)
self._test_exec_obj.moveToThread(self._test_exec_thread)
self._test_exec_obj.finished.connect(self._test_exec_thread.quit)
self._test_exec_thread.started.connect(self._test_exec_obj.do_run)
self._test_exec_thread.finished.connect(self._stop_tests_callback)
self._test_exec_thread.start()
else:
self._test_executor.stop()
def _stop_tests_callback(self):
"""Called automatically when the background execution thread is stopped."""
with QtCore.QMutexLocker(self._exec_mutex):
self._btn_run.setText("Run")
self._act_new.setEnabled(True)
self._act_save.setEnabled(True)
self._test_executor = None
self._test_exec_thread = None
self._test_exec_obj = None
def save_tests(self):
"""Saves the tests to the XML report file."""
root_index = self._tree_view.model().index(0, 0, QtCore.QModelIndex())
root_test_item = root_index.internalPointer().data
_, np, ns, nf, ne = pyutf.aggregate_results(root_test_item)
tests_run = np + ns + nf + ne
if tests_run <= 0:
msg_box = QtGui.QMessageBox()
msg_box.setText("At least one test needs to be run to save the report.")
msg_box.setIcon(QtGui.QMessageBox.Information)
msg_box.exec_()
return
dt = datetime.now()
dt = dt.replace(microsecond=0)
default_file = "test-report-" + dt.strftime('%Y%m%dT%H%M%S') + ".xml"
default_path = self._tests_path + "/" + default_file
filename = QtGui.QFileDialog.getSaveFileName(self, "Choose report location", default_path, "XML files (*.xml)")
if filename:
xml_report = pyutx.TestXmlReport()
xml_report.save(filename, root_test_item, self._tests_configuration)
def main():
app = QtGui.QApplication(sys.argv)
_ = PyUTMainWindow()
return app.exec_()
if __name__ == '__main__':
sys.exit(main())
emcor-python-package/Python-UnitTestGUI/pyut_xml.py 0000664 0005333 0001756 00000007773 12511467241 021566 0 ustar luchini cd from xml.dom.minidom import Document
import pyut_framework as pyut
class TestXmlReport(object):
def save(self, filename, root_test_item, tests_configuration):
"""Creates XML report from the tests that are contained in the tree under passed root TestItem."""
doc = Document()
elem = self._create_test_item_elem(doc, root_test_item, tests_configuration)
if elem:
doc.appendChild(elem)
xml_data = doc.toprettyxml()
with open(filename, 'w') as report_file:
report_file.write(xml_data)
def _create_test_item_elem(self, doc, test_item, tests_configuration):
"""Recursive function that walks the test tree and creates testcase
or testsuite XML element based on the item in the test tree.
"""
elem = None
if test_item.type == pyut.TestItemType.FUNCTION:
if not test_item.status:
return None
elem = doc.createElement('testcase')
elem.setAttribute('name', test_item.name)
elem.setAttribute('time', '%.3f' % test_item.time)
elem_fail_name = None
if test_item.status.status == pyut.TestStatus.SKIP:
elem_fail_name = 'skipped'
elif test_item.status.status == pyut.TestStatus.FAIL:
elem_fail_name = 'failure'
elif test_item.status.status == pyut.TestStatus.ERROR:
elem_fail_name = 'error'
if elem_fail_name:
elem_fail = doc.createElement(elem_fail_name)
cdata = doc.createCDATASection(test_item.outcome)
elem_fail.appendChild(cdata)
elem.appendChild(elem_fail)
if test_item.output:
txt_out = doc.createTextNode(test_item.output)
elem.appendChild(txt_out)
else:
# non-optimal, could aggregate in this recursion
_, np, ns, nf, ne = pyut.aggregate_results(test_item)
na = np + ns + nf + ne
elem = doc.createElement('testsuite')
elem.setAttribute('name', test_item.name)
elem.setAttribute('time', '%.3f' % test_item.time)
elem.setAttribute('tests', str(na))
elem.setAttribute('failures', str(nf))
elem.setAttribute('errors', str(ne))
elem.setAttribute('skip', str(ns))
for ti in test_item.get_children():
child_elem = self._create_test_item_elem(doc, ti, tests_configuration)
if child_elem:
elem.appendChild(child_elem)
if not elem.hasChildNodes():
return None
if tests_configuration:
test_item_id = pyut.get_test_item_id(test_item)
section = ""
if test_item.type == pyut.TestItemType.MODULE:
section = test_item_id.module_name
elif test_item.type == pyut.TestItemType.CLASS:
section = test_item_id.module_name + "." + test_item_id.class_name
else:
section = pyut.CONF_SECTION_ALL
elem_confs = self._create_properties_elem(doc, tests_configuration, section)
if elem_confs:
elem.appendChild(elem_confs)
return elem
def _create_properties_elem(self, doc, tests_configuration, section):
"""Creates properties element if there are some properties to write and returns it.
Else returns None.
"""
if tests_configuration.has_section(section):
elem_confs = doc.createElement('properties')
for k, v in tests_configuration.items(section):
elem_conf = doc.createElement('property')
elem_conf.setAttribute('name', k)
elem_conf.setAttribute('value', v)
elem_confs.appendChild(elem_conf)
if elem_confs.hasChildNodes():
return elem_confs
return None
emcor-python-package/Python-UnitTestGUI/setup.py 0000664 0005333 0001756 00000000676 12515410441 021032 0 ustar luchini cd try:
from setuptools import setup
except ImportError:
from distutils.core import setup
config = {
'name': 'Python-UnitTestGUI',
'version': '1.2.0',
'description': 'Python GUI for execution of unit tests',
'url': 'slac.stanford.edu',
'author': 'Tomo Cesnik',
'author_email': 'tomo.cesnik@cosylab.com',
'py_modules': ['pyut_framework', 'pyut_xml', 'pyut_gui'],
'scripts': ['pyut_gui.py']
}
setup(**config)