Adam Taylor
Published © GPL3+

AMD MicroBlaze™ V Processor Health and Usage Monitoring

A small FPGA combined with the AMD MicroBlaze™ V processor can be used to create HUMS to monitor the environmental effects on the systems.

IntermediateFull instructions provided3 hours2,582
AMD MicroBlaze™ V Processor Health and Usage Monitoring

Things used in this project

Story

Read more

Code

Python - Logging one channel

Python
import serial
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Function to initialize the serial connection
def initialize_serial(serial_port, baud_rate):
    try:
        ser = serial.Serial(serial_port, baud_rate, timeout=1)
        time.sleep(2)  # Wait for the serial connection to initialize
        return ser
    except serial.SerialException as e:
        print(f"Serial error: {e}")
        return None

# Function to send a command and receive the response
def send_command(ser, command):
    try:
        ser.write(command.encode('utf-8'))
        time.sleep(0.5)
        response = ser.readline().decode('utf-8').strip()
        response = response.replace(' ', '')  # Remove any spaces
        #print(f"Received: {response}")  # Print the received value for debugging
        if response.isdigit():
            response = int(response)
        elif response.replace('.', '', 1).isdigit():
            response = float(response)
        return float(response)
    except Exception as e:
        print(f"Error during send/receive: {e}")
        return None

# Function to update the plot
def update_plot(frame, ser, command, x_data, y_data, line, start_time):
    response = send_command(ser, command)
    #print("sending")
    if response is not None:
        #print(f"Received: {response}")  # Print the received value for debugging
        x_data.append(time.time() - start_time)
        y_data.append(response)
        line.set_data(x_data, y_data)
        ax.relim()
        ax.autoscale_view()
    return line,

# Main function
def main(serial_port, baud_rate, command):
    ser = initialize_serial(serial_port, baud_rate)
    if ser is None:
        return

    # Initialize plot data
    x_data, y_data = [], []
    start_time = time.time()

    # Create plot
    global fig, ax  # Ensure these remain in scope
    fig, ax = plt.subplots()
    line, = ax.plot([], [], 'r-')
    ax.set_title('Real-Time Serial Data')
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Received Value')

    # Set up plot to call update_plot function periodically
    global ani  # Ensure the animation object remains in scope
    ani = FuncAnimation(fig, update_plot, fargs=(ser, command, x_data, y_data, line, start_time), interval=1000, blit=False, cache_frame_data=False)

    # Display the plot
    plt.show()

    # Close the serial connection when the plot is closed
    ser.close()

if __name__ == "__main__":
    serial_port = 'COM15'  # Replace with your serial port
    baud_rate = 9600       # Replace with your baud rate
    command = 'read_accel_z\r'  # Replace with the command you want to send

    try:
        main(serial_port, baud_rate, command)
    except KeyboardInterrupt:
        print("Interrupted!")

Python - Three channel logginc

Python
import serial
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Function to initialize the serial connection
def initialize_serial(serial_port, baud_rate):
    try:
        ser = serial.Serial(serial_port, baud_rate, timeout=1)
        time.sleep(2)  # Wait for the serial connection to initialize
        return ser
    except serial.SerialException as e:
        print(f"Serial error: {e}")
        return None

# Function to send a command and receive the response
def send_command(ser, command):
    try:
        ser.write(command.encode('utf-8'))
        time.sleep(0.5)
        response = ser.readline().decode('utf-8').strip()
        response = response.replace(' ', '')  # Remove any spaces
        try:
            # Convert response to a float if possible
            return float(response)
        except ValueError:
            print(f"Non-numeric response received: {response}")
            return None
    except Exception as e:
        print(f"Error during send/receive: {e}")
        return None

# Function to update the plot
def update_plot(frame, ser, commands, x_data, y_data, lines, start_time):
    for i, command in enumerate(commands):
        response = send_command(ser, command)
        if response is not None:
            print(f"Received for {command}: {response}")  # Print the received value for debugging
            x_data[i].append(time.time() - start_time)
            y_data[i].append(response)
            lines[i].set_data(x_data[i], y_data[i])
    ax.relim()
    ax.autoscale_view()
    return lines

# Main function
def main(serial_port, baud_rate, commands):
    ser = initialize_serial(serial_port, baud_rate)
    if ser is None:
        return

    # Initialize plot data for multiple series
    x_data = [[] for _ in commands]
    y_data = [[] for _ in commands]
    start_time = time.time()

    # Create plot
    global fig, ax  # Ensure these remain in scope
    fig, ax = plt.subplots()
    lines = [ax.plot([], [], label=labels[i])[0] for i in range(len(commands))]
    ax.set_title('Real-Time Serial Data')
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Received Value')
    ax.legend()

    # Set up plot to call update_plot function periodically
    global ani  # Ensure the animation object remains in scope
    ani = FuncAnimation(fig, update_plot, fargs=(ser, commands, x_data, y_data, lines, start_time), interval=1000, blit=False, cache_frame_data=False)

    # Display the plot
    plt.show()

    # Close the serial connection when the plot is closed
    ser.close()

if __name__ == "__main__":
    serial_port = 'COM15'  # Replace with your serial port
    baud_rate = 9600       # Replace with your baud rate
    commands = ['read_accel_x\r', 'read_accel_y\r', 'read_accel_z\r']  # Replace with the commands you want to send
    labels = ['Accel X', 'Accel Y', 'Accel Z']  # Labels for each data series
    try:
        main(serial_port, baud_rate, commands)
    except KeyboardInterrupt:
        print("Interrupted!")

CLI.c

C/C++
#include "../master_include.h"

XUartLite Uart;

char receive_buffer[CLI_BUFFER_SIZE];    /* Buffer for Receiving Data */
char receive_buffer_lst[CLI_BUFFER_SIZE];    /* Buffer for Receiving Data */
int mesg_ready;

int test_id = 0;
int test_stop = 0;
int test_oneshot = 0;
char command_delim[] = " ";

extern PmodNAV nav;

int init_uart0()
{
    int Status;

    Status = XUartLite_Initialize(&Uart, XPAR_XUARTLITE_0_BASEADDR);
    if (Status != XST_SUCCESS) {
        return XST_FAILURE;
    }

    return XST_SUCCESS;

}

int xil_printf_float(float x){
    int integer, fraction, abs_frac;
    integer = x;
    fraction = (x - integer) * 100;
    abs_frac = abs(fraction);
    xil_printf("%d.%3d\n\r", integer, abs_frac);

}

int read_serial(char *buffer)
{
	static u8 read_val[3] = {0,0,0};
    unsigned int received_count = 0;
    static int fisrt = 1;
    static int index = 0;
    static int index_lst = 0;



    received_count =    XUartLite_Recv(&Uart, &read_val[received_count], 1);
    if (received_count > 0)
    {

        if (read_val[0] == '\r'){
            buffer[index] = '\0';

            // Save the entered command for when we use Arrow up
            index_lst = index;
            strcpy(receive_buffer_lst,receive_buffer);

            index = 0;
            fisrt = 0;
            //xil_printf("\n\r");

            return 1;
        }

        // Handles "arrow up" key press
        if (read_val[2] == 27 && read_val[1] == 91 && read_val[0] == 65 && fisrt == 0){
            // restore last entered command
        	strcpy(receive_buffer,receive_buffer_lst);
            index = index_lst;

            xil_printf("\b%s",receive_buffer);

            return 0;
        }

        // Handles backspace
        if (read_val[0] == 8)
        {
        	xil_printf("\b \b");
        	if (index > 0)
        	{
        		index--;
        	}

        }

        // Prevents buffer overflow
        if (index >= CLI_BUFFER_SIZE-1)
        {
            buffer[index] = '\0';
            index = 0;
            xil_printf("\n\r");
            return 1;
        }

        // Prevents not printable characters from being added to the buffer
        if (read_val[0] >= 32 && read_val[0] <= 126)
        {
            buffer[index++] = read_val[0];
            //xil_printf("%c", read_val[0]);
        }

        read_val[2] = read_val[1];
        read_val[1] = read_val[0];
    }
    return 0;
}

void cli_parse_command()
{
    // /static u32 block = 0;
    static u32 val = 0;

    mesg_ready = read_serial(receive_buffer);
    if (mesg_ready == 1)
    {
        //xil_printf("cmd> %s\n\r", receive_buffer);

        if (strcmp(receive_buffer, "") == 0)
		{
            val = 0; // NOP
		}
        else if (strcmp(receive_buffer, "read_accel_x") == 0)
        {
            //xil_printf("read_accel_x\n\r");
            NAV_GetData(&nav);
            xil_printf_float(nav.acclData.X);
      
      
        }
        else if (strcmp(receive_buffer, "read_accel_y") == 0)
        {
                //xil_printf("read_accel_y\n\r");
                NAV_GetData(&nav);
                xil_printf_float(nav.acclData.Y);

        }
        else if (strcmp(receive_buffer, "read_accel_z") == 0)
        {
            //xil_printf("read_accel_z\n\r");
            NAV_GetData(&nav);
            xil_printf_float(nav.acclData.Z);
        }
        else
        {
            char *ptr = strtok(receive_buffer, command_delim);
            if (strcmp(ptr, "r") == 0)
            {
                ptr = strtok(NULL, command_delim);
                u32 addr = char_to_int(strlen(ptr), ptr);
                val = Xil_In32( XPAR_XGPIO_0_BASEADDR + addr);
                xil_printf("Addr: 0x%x (%d)  Val: 0x%x (%d)\r\n",addr,addr, val, val);

            }
            else if (strcmp(ptr, "w") == 0)
            {
                ptr = strtok(NULL, command_delim);
                u32 addr = char_to_int(strlen(ptr), ptr);

                ptr = strtok(NULL, command_delim);
                val = char_to_int(strlen(ptr), ptr);

                Xil_Out32(XPAR_XGPIO_0_BASEADDR + addr,val);

                val = Xil_In32( XPAR_XGPIO_0_BASEADDR + addr);
                xil_printf("Addr: 0x%x (%d)  Val: 0x%x (%d)\r\n",addr,addr, val, val);

            }
            else
            {
                xil_printf("Command not found.\n\r");
            }

        }
    }

}

int char_to_int(int len, char* buffer)
{

    int value = 0, char_val, scalar = 1;

    for (int i = len - 1; i >= 0; i--) {
        char_val = (int)(buffer[i] - '0');
        value = value + (char_val * scalar);
        scalar = scalar * 10;
    }

    return value;
}

Cli.h

C/C++
#ifndef _CLI_H_
#define _CLI_H_


#define CLI_BUFFER_SIZE 32
#define NUM_OF_BYTE 1

extern int test_id;
extern int test_stop;
extern int test_oneshot;

int read_serial(char* buffer);
int init_uart0();
void cli_parse_command();
int char_to_int(int len, char* buffer);
#endif

Credits

Adam Taylor
138 projects • 2373 followers
Adam Taylor is an expert in design and development of embedded systems and FPGA’s for several end applications (Space, Defense, Automotive)
Contact

Comments

Please log in or sign up to comment.