Adam Taylor
Published © GPL3+

AMD MicroBlaze™ V Processor Health and Usage Monitoring

A small FPGA combined with the AMD MicroBlaze™ V processor can be used to create HUMS to monitor the environmental effects on the systems.

IntermediateFull instructions provided3 hours1,700
AMD MicroBlaze™ V Processor Health and Usage Monitoring

Things used in this project

Story

Read more

Code

Python - Logging one channel

Python
import serial
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Function to initialize the serial connection
def initialize_serial(serial_port, baud_rate):
    try:
        ser = serial.Serial(serial_port, baud_rate, timeout=1)
        time.sleep(2)  # Wait for the serial connection to initialize
        return ser
    except serial.SerialException as e:
        print(f"Serial error: {e}")
        return None

# Function to send a command and receive the response
def send_command(ser, command):
    try:
        ser.write(command.encode('utf-8'))
        time.sleep(0.5)
        response = ser.readline().decode('utf-8').strip()
        response = response.replace(' ', '')  # Remove any spaces
        #print(f"Received: {response}")  # Print the received value for debugging
        if response.isdigit():
            response = int(response)
        elif response.replace('.', '', 1).isdigit():
            response = float(response)
        return float(response)
    except Exception as e:
        print(f"Error during send/receive: {e}")
        return None

# Function to update the plot
def update_plot(frame, ser, command, x_data, y_data, line, start_time):
    response = send_command(ser, command)
    #print("sending")
    if response is not None:
        #print(f"Received: {response}")  # Print the received value for debugging
        x_data.append(time.time() - start_time)
        y_data.append(response)
        line.set_data(x_data, y_data)
        ax.relim()
        ax.autoscale_view()
    return line,

# Main function
def main(serial_port, baud_rate, command):
    ser = initialize_serial(serial_port, baud_rate)
    if ser is None:
        return

    # Initialize plot data
    x_data, y_data = [], []
    start_time = time.time()

    # Create plot
    global fig, ax  # Ensure these remain in scope
    fig, ax = plt.subplots()
    line, = ax.plot([], [], 'r-')
    ax.set_title('Real-Time Serial Data')
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Received Value')

    # Set up plot to call update_plot function periodically
    global ani  # Ensure the animation object remains in scope
    ani = FuncAnimation(fig, update_plot, fargs=(ser, command, x_data, y_data, line, start_time), interval=1000, blit=False, cache_frame_data=False)

    # Display the plot
    plt.show()

    # Close the serial connection when the plot is closed
    ser.close()

if __name__ == "__main__":
    serial_port = 'COM15'  # Replace with your serial port
    baud_rate = 9600       # Replace with your baud rate
    command = 'read_accel_z\r'  # Replace with the command you want to send

    try:
        main(serial_port, baud_rate, command)
    except KeyboardInterrupt:
        print("Interrupted!")

Python - Three channel logginc

Python
import serial
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Function to initialize the serial connection
def initialize_serial(serial_port, baud_rate):
    try:
        ser = serial.Serial(serial_port, baud_rate, timeout=1)
        time.sleep(2)  # Wait for the serial connection to initialize
        return ser
    except serial.SerialException as e:
        print(f"Serial error: {e}")
        return None

# Function to send a command and receive the response
def send_command(ser, command):
    try:
        ser.write(command.encode('utf-8'))
        time.sleep(0.5)
        response = ser.readline().decode('utf-8').strip()
        response = response.replace(' ', '')  # Remove any spaces
        try:
            # Convert response to a float if possible
            return float(response)
        except ValueError:
            print(f"Non-numeric response received: {response}")
            return None
    except Exception as e:
        print(f"Error during send/receive: {e}")
        return None

# Function to update the plot
def update_plot(frame, ser, commands, x_data, y_data, lines, start_time):
    for i, command in enumerate(commands):
        response = send_command(ser, command)
        if response is not None:
            print(f"Received for {command}: {response}")  # Print the received value for debugging
            x_data[i].append(time.time() - start_time)
            y_data[i].append(response)
            lines[i].set_data(x_data[i], y_data[i])
    ax.relim()
    ax.autoscale_view()
    return lines

# Main function
def main(serial_port, baud_rate, commands):
    ser = initialize_serial(serial_port, baud_rate)
    if ser is None:
        return

    # Initialize plot data for multiple series
    x_data = [[] for _ in commands]
    y_data = [[] for _ in commands]
    start_time = time.time()

    # Create plot
    global fig, ax  # Ensure these remain in scope
    fig, ax = plt.subplots()
    lines = [ax.plot([], [], label=labels[i])[0] for i in range(len(commands))]
    ax.set_title('Real-Time Serial Data')
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Received Value')
    ax.legend()

    # Set up plot to call update_plot function periodically
    global ani  # Ensure the animation object remains in scope
    ani = FuncAnimation(fig, update_plot, fargs=(ser, commands, x_data, y_data, lines, start_time), interval=1000, blit=False, cache_frame_data=False)

    # Display the plot
    plt.show()

    # Close the serial connection when the plot is closed
    ser.close()

if __name__ == "__main__":
    serial_port = 'COM15'  # Replace with your serial port
    baud_rate = 9600       # Replace with your baud rate
    commands = ['read_accel_x\r', 'read_accel_y\r', 'read_accel_z\r']  # Replace with the commands you want to send
    labels = ['Accel X', 'Accel Y', 'Accel Z']  # Labels for each data series
    try:
        main(serial_port, baud_rate, commands)
    except KeyboardInterrupt:
        print("Interrupted!")

CLI.c

C/C++
#include "../master_include.h"

XUartLite Uart;

char receive_buffer[CLI_BUFFER_SIZE];    /* Buffer for Receiving Data */
char receive_buffer_lst[CLI_BUFFER_SIZE];    /* Buffer for Receiving Data */
int mesg_ready;

int test_id = 0;
int test_stop = 0;
int test_oneshot = 0;
char command_delim[] = " ";

extern PmodNAV nav;

int init_uart0()
{
    int Status;

    Status = XUartLite_Initialize(&Uart, XPAR_XUARTLITE_0_BASEADDR);
    if (Status != XST_SUCCESS) {
        return XST_FAILURE;
    }

    return XST_SUCCESS;

}

int xil_printf_float(float x){
    int integer, fraction, abs_frac;
    integer = x;
    fraction = (x - integer) * 100;
    abs_frac = abs(fraction);
    xil_printf("%d.%3d\n\r", integer, abs_frac);

}

int read_serial(char *buffer)
{
	static u8 read_val[3] = {0,0,0};
    unsigned int received_count = 0;
    static int fisrt = 1;
    static int index = 0;
    static int index_lst = 0;



    received_count =    XUartLite_Recv(&Uart, &read_val[received_count], 1);
    if (received_count > 0)
    {

        if (read_val[0] == '\r'){
            buffer[index] = '\0';

            // Save the entered command for when we use Arrow up
            index_lst = index;
            strcpy(receive_buffer_lst,receive_buffer);

            index = 0;
            fisrt = 0;
            //xil_printf("\n\r");

            return 1;
        }

        // Handles "arrow up" key press
        if (read_val[2] == 27 && read_val[1] == 91 && read_val[0] == 65 && fisrt == 0){
            // restore last entered command
        	strcpy(receive_buffer,receive_buffer_lst);
            index = index_lst;

            xil_printf("\b%s",receive_buffer);

            return 0;
        }

        // Handles backspace
        if (read_val[0] == 8)
        {
        	xil_printf("\b \b");
        	if (index > 0)
        	{
        		index--;
        	}

        }

        // Prevents buffer overflow
        if (index >= CLI_BUFFER_SIZE-1)
        {
            buffer[index] = '\0';
            index = 0;
            xil_printf("\n\r");
            return 1;
        }

        // Prevents not printable characters from being added to the buffer
        if (read_val[0] >= 32 && read_val[0] <= 126)
        {
            buffer[index++] = read_val[0];
            //xil_printf("%c", read_val[0]);
        }

        read_val[2] = read_val[1];
        read_val[1] = read_val[0];
    }
    return 0;
}

void cli_parse_command()
{
    // /static u32 block = 0;
    static u32 val = 0;

    mesg_ready = read_serial(receive_buffer);
    if (mesg_ready == 1)
    {
        //xil_printf("cmd> %s\n\r", receive_buffer);

        if (strcmp(receive_buffer, "") == 0)
		{
            val = 0; // NOP
		}
        else if (strcmp(receive_buffer, "read_accel_x") == 0)
        {
            //xil_printf("read_accel_x\n\r");
            NAV_GetData(&nav);
            xil_printf_float(nav.acclData.X);
      
      
        }
        else if (strcmp(receive_buffer, "read_accel_y") == 0)
        {
                //xil_printf("read_accel_y\n\r");
                NAV_GetData(&nav);
                xil_printf_float(nav.acclData.Y);

        }
        else if (strcmp(receive_buffer, "read_accel_z") == 0)
        {
            //xil_printf("read_accel_z\n\r");
            NAV_GetData(&nav);
            xil_printf_float(nav.acclData.Z);
        }
        else
        {
            char *ptr = strtok(receive_buffer, command_delim);
            if (strcmp(ptr, "r") == 0)
            {
                ptr = strtok(NULL, command_delim);
                u32 addr = char_to_int(strlen(ptr), ptr);
                val = Xil_In32( XPAR_XGPIO_0_BASEADDR + addr);
                xil_printf("Addr: 0x%x (%d)  Val: 0x%x (%d)\r\n",addr,addr, val, val);

            }
            else if (strcmp(ptr, "w") == 0)
            {
                ptr = strtok(NULL, command_delim);
                u32 addr = char_to_int(strlen(ptr), ptr);

                ptr = strtok(NULL, command_delim);
                val = char_to_int(strlen(ptr), ptr);

                Xil_Out32(XPAR_XGPIO_0_BASEADDR + addr,val);

                val = Xil_In32( XPAR_XGPIO_0_BASEADDR + addr);
                xil_printf("Addr: 0x%x (%d)  Val: 0x%x (%d)\r\n",addr,addr, val, val);

            }
            else
            {
                xil_printf("Command not found.\n\r");
            }

        }
    }

}

int char_to_int(int len, char* buffer)
{

    int value = 0, char_val, scalar = 1;

    for (int i = len - 1; i >= 0; i--) {
        char_val = (int)(buffer[i] - '0');
        value = value + (char_val * scalar);
        scalar = scalar * 10;
    }

    return value;
}

Cli.h

C/C++
#ifndef _CLI_H_
#define _CLI_H_


#define CLI_BUFFER_SIZE 32
#define NUM_OF_BYTE 1

extern int test_id;
extern int test_stop;
extern int test_oneshot;

int read_serial(char* buffer);
int init_uart0();
void cli_parse_command();
int char_to_int(int len, char* buffer);
#endif

Credits

Adam Taylor

Adam Taylor

129 projects • 2214 followers
Adam Taylor is an expert in design and development of embedded systems and FPGA’s for several end applications (Space, Defense, Automotive)

Comments