Yeshvanth Muniraj
Published

Interfacing Red Pitaya with Python

Interfacing Red Pitaya with Python

IntermediateProtip3 hours1,479
Interfacing Red Pitaya with Python

Things used in this project

Hardware components

STEMLab 125-10
Red Pitaya STEMLab 125-10
×1
SMA to BNC Converter
×1
BNC Oscilloscope x1/x10 Probes (Pair)
Digilent BNC Oscilloscope x1/x10 Probes (Pair)
×1
Rotary potentiometer (generic)
Rotary potentiometer (generic)
×1
IC 555 Pulse Generator Circuit
×1

Story

Read more

Code

toggle_led

Python
Toggle LED
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

led1 = 0
led2 = 1

print ("Toggling LED["+str(led1)+"] and LED["+str(led2)+"]")

period = 1 # seconds

while 1:
    time.sleep(period/2.0)
    rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(1))
    rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(0))
    time.sleep(period/2.0)
    rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(0))
    rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(1))

clear_led

Python
Clear LED
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

led1 = 0
led2 = 1

print ("Clearing LED["+str(led1)+"] and LED["+str(led2)+"]")

rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(0))
rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(0))

analog_input

Python
Analog Input
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

period = 1 # seconds

while 1:
    rp_s.tx_txt('ANALOG:PIN? AIN' + str(3))
    value = float(rp_s.rx_txt())
    print ("Measured voltage on AI["+str(3)+"] = "+str(value)+"V")
    time.sleep(period/2.0)

redpitaya_scpi

Python
Python Module for RP SCPI
"""SCPI access to Red Pitaya."""

import socket

__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"

class scpi (object):
    """SCPI class used to access Red Pitaya over an IP network."""
    delimiter = '\r\n'

    def __init__(self, host, timeout=None, port=5000):
        """Initialize object and open IP connection.
        Host IP should be a string in parentheses, like '192.168.1.100'.
        """
        self.host    = host
        self.port    = port
        self.timeout = timeout

        try:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            if timeout is not None:
                self._socket.settimeout(timeout)

            self._socket.connect((host, port))

        except socket.error as e:
            print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))

    def __del__(self):
        if self._socket is not None:
            self._socket.close()
        self._socket = None

    def close(self):
        """Close IP connection."""
        self.__del__()

    def rx_txt(self, chunksize = 4096):
        """Receive text string and return it after removing the delimiter."""
        msg = ''
        while 1:
            chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
            msg += chunk
            if (len(chunk) and chunk[-2:] == self.delimiter):
                break
        return msg[:-2]

    def rx_arb(self):
        numOfBytes = 0
        """ Recieve binary data from scpi server"""
        str=''
        while (len(str) != 1):
            str = (self._socket.recv(1))
        if not (str == '#'):
            return False
        str=''
        while (len(str) != 1):
            str = (self._socket.recv(1))
        numOfNumBytes = int(str)
        if not (numOfNumBytes > 0):
            return False
        str=''
        while (len(str) != numOfNumBytes):
            str += (self._socket.recv(1))
        numOfBytes = int(str)
        str=''
        while (len(str) != numOfBytes):
            str += (self._socket.recv(1))
        return str

    def tx_txt(self, msg):
        """Send text string ending and append delimiter."""
        return self._socket.send((msg + self.delimiter).encode('utf-8'))

    def txrx_txt(self, msg):
        """Send/receive text string."""
        self.tx_txt(msg)
        return self.rx_txt()

# IEEE Mandated Commands

    def cls(self):
        """Clear Status Command"""
        return self.tx_txt('*CLS')

    def ese(self, value: int):
        """Standard Event Status Enable Command"""
        return self.tx_txt('*ESE {}'.format(value))

    def ese_q(self):
        """Standard Event Status Enable Query"""
        return self.txrx_txt('*ESE?')

    def esr_q(self):
        """Standard Event Status Register Query"""
        return self.txrx_txt('*ESR?')

    def idn_q(self):
        """Identification Query"""
        return self.txrx_txt('*IDN?')

    def opc(self):
        """Operation Complete Command"""
        return self.tx_txt('*OPC')

    def opc_q(self):
        """Operation Complete Query"""
        return self.txrx_txt('*OPC?')

    def rst(self):
        """Reset Command"""
        return self.tx_txt('*RST')

    def sre(self):
        """Service Request Enable Command"""
        return self.tx_txt('*SRE')

    def sre_q(self):
        """Service Request Enable Query"""
        return self.txrx_txt('*SRE?')

    def stb_q(self):
        """Read Status Byte Query"""
        return self.txrx_txt('*STB?')

# :SYSTem

    def err_c(self):
        """Error count."""
        return rp.txrx_txt('SYST:ERR:COUN?')

    def err_c(self):
        """Error next."""
        return rp.txrx_txt('SYST:ERR:NEXT?')

Credits

Yeshvanth Muniraj

Yeshvanth Muniraj

19 projects • 32 followers
Hands-on experience in Embedded Systems and IoT. Good knowledge of FPGAs and Microcontrollers.

Comments