Welcome to Hackster!
Hackster is a community dedicated to learning hardware, from beginner to pro. Join us, it's free!
Yeshvanth Muniraj
Published

Interfacing Red Pitaya with Python

Interfacing Red Pitaya with Python

IntermediateProtip3 hours1,649
Interfacing Red Pitaya with Python

Things used in this project

Hardware components

STEMLab 125-10
Red Pitaya STEMLab 125-10
×1
SMA to BNC Converter
×1
BNC Oscilloscope x1/x10 Probes (Pair)
Digilent BNC Oscilloscope x1/x10 Probes (Pair)
×1
Rotary potentiometer (generic)
Rotary potentiometer (generic)
×1
IC 555 Pulse Generator Circuit
×1

Story

Read more

Code

toggle_led

Python
Toggle LED
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

led1 = 0
led2 = 1

print ("Toggling LED["+str(led1)+"] and LED["+str(led2)+"]")

period = 1 # seconds

while 1:
    time.sleep(period/2.0)
    rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(1))
    rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(0))
    time.sleep(period/2.0)
    rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(0))
    rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(1))

clear_led

Python
Clear LED
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

led1 = 0
led2 = 1

print ("Clearing LED["+str(led1)+"] and LED["+str(led2)+"]")

rp_s.tx_txt('DIG:PIN LED' + str(led1) + ',' + str(0))
rp_s.tx_txt('DIG:PIN LED' + str(led2) + ',' + str(0))

analog_input

Python
Analog Input
import sys
import time
import redpitaya_scpi as scpi

rp_s = scpi.scpi(sys.argv[1])

period = 1 # seconds

while 1:
    rp_s.tx_txt('ANALOG:PIN? AIN' + str(3))
    value = float(rp_s.rx_txt())
    print ("Measured voltage on AI["+str(3)+"] = "+str(value)+"V")
    time.sleep(period/2.0)

redpitaya_scpi

Python
Python Module for RP SCPI
"""SCPI access to Red Pitaya."""

import socket

__author__ = "Luka Golinar, Iztok Jeras"
__copyright__ = "Copyright 2015, Red Pitaya"

class scpi (object):
    """SCPI class used to access Red Pitaya over an IP network."""
    delimiter = '\r\n'

    def __init__(self, host, timeout=None, port=5000):
        """Initialize object and open IP connection.
        Host IP should be a string in parentheses, like '192.168.1.100'.
        """
        self.host    = host
        self.port    = port
        self.timeout = timeout

        try:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            if timeout is not None:
                self._socket.settimeout(timeout)

            self._socket.connect((host, port))

        except socket.error as e:
            print('SCPI >> connect({:s}:{:d}) failed: {:s}'.format(host, port, e))

    def __del__(self):
        if self._socket is not None:
            self._socket.close()
        self._socket = None

    def close(self):
        """Close IP connection."""
        self.__del__()

    def rx_txt(self, chunksize = 4096):
        """Receive text string and return it after removing the delimiter."""
        msg = ''
        while 1:
            chunk = self._socket.recv(chunksize + len(self.delimiter)).decode('utf-8') # Receive chunk size of 2^n preferably
            msg += chunk
            if (len(chunk) and chunk[-2:] == self.delimiter):
                break
        return msg[:-2]

    def rx_arb(self):
        numOfBytes = 0
        """ Recieve binary data from scpi server"""
        str=''
        while (len(str) != 1):
            str = (self._socket.recv(1))
        if not (str == '#'):
            return False
        str=''
        while (len(str) != 1):
            str = (self._socket.recv(1))
        numOfNumBytes = int(str)
        if not (numOfNumBytes > 0):
            return False
        str=''
        while (len(str) != numOfNumBytes):
            str += (self._socket.recv(1))
        numOfBytes = int(str)
        str=''
        while (len(str) != numOfBytes):
            str += (self._socket.recv(1))
        return str

    def tx_txt(self, msg):
        """Send text string ending and append delimiter."""
        return self._socket.send((msg + self.delimiter).encode('utf-8'))

    def txrx_txt(self, msg):
        """Send/receive text string."""
        self.tx_txt(msg)
        return self.rx_txt()

# IEEE Mandated Commands

    def cls(self):
        """Clear Status Command"""
        return self.tx_txt('*CLS')

    def ese(self, value: int):
        """Standard Event Status Enable Command"""
        return self.tx_txt('*ESE {}'.format(value))

    def ese_q(self):
        """Standard Event Status Enable Query"""
        return self.txrx_txt('*ESE?')

    def esr_q(self):
        """Standard Event Status Register Query"""
        return self.txrx_txt('*ESR?')

    def idn_q(self):
        """Identification Query"""
        return self.txrx_txt('*IDN?')

    def opc(self):
        """Operation Complete Command"""
        return self.tx_txt('*OPC')

    def opc_q(self):
        """Operation Complete Query"""
        return self.txrx_txt('*OPC?')

    def rst(self):
        """Reset Command"""
        return self.tx_txt('*RST')

    def sre(self):
        """Service Request Enable Command"""
        return self.tx_txt('*SRE')

    def sre_q(self):
        """Service Request Enable Query"""
        return self.txrx_txt('*SRE?')

    def stb_q(self):
        """Read Status Byte Query"""
        return self.txrx_txt('*STB?')

# :SYSTem

    def err_c(self):
        """Error count."""
        return rp.txrx_txt('SYST:ERR:COUN?')

    def err_c(self):
        """Error next."""
        return rp.txrx_txt('SYST:ERR:NEXT?')

Credits

Yeshvanth Muniraj
21 projects • 38 followers
Hands-on experience in Embedded Systems and IoT. Good knowledge of FPGAs and Microcontrollers.
Contact

Comments

Please log in or sign up to comment.