Hackster is hosting Hackster Holidays, Ep. 7: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Friday!Stream Hackster Holidays, Ep. 7 on Friday!
Dhimiter D
Published © MIT

Kria260 mavlink autopilot with AI object tracking

Build a teleoperated robot with AI using Kria KR260 and QGroundControl station

AdvancedWork in progressOver 5 days105
Kria260 mavlink autopilot with AI object tracking

Things used in this project

Story

Read more

Code

main.py

Python
import threading
import queue
import serial
import time
import serial
from autopilot import KriaAP
from DifferentialController import TwoWheelSteeringThrottle

sysid = 1
compid = 1
chan = 1
udp_ip = '0.0.0.0'  # Kria IP
udp_port = 24550    # desired port

    
def main():
    sysid = 1
    compid = 1
    chan = 1
    sys_stat_interval = 1.0  # Set the interval for sending attitude messages        
    
    Kriarover = KriaAP(sysid, compid, chan, udp_ip, udp_port)
    Kriarover.send_debug_message_to_ground_control(time_boot_ms=123456789, ind=1, value=42)
   
    steering_throttle_controller = TwoWheelSteeringThrottle(steering_zero=10)
    
    try:
        while True:
            Kriarover.send_heartbeat()
            Kriarover.receive_manual_control()
            while not Kriarover.data_queue.empty():
                mvl_joy = Kriarover.data_queue.get()
                throttle = mvl_joy['z']
                steering = mvl_joy['r']
                print("z =", throttle, "r =", steering)
                
                # Run the steering and throttle controller
                left_pwm, right_pwm = steering_throttle_controller.run(throttle, steering)
                 
                left_direction = 1 if left_pwm >= 0 else -1
                right_direction = 1 if right_pwm >= 0 else -1

                left_motor_pwm = int(abs(left_pwm))
                right_motor_pwm = int(abs(right_pwm))

                # Print the resulting throttle values for left and right wheels
                print(f"Left Throttle: {left_pwm}, Right Throttle: {right_pwm}, left_direction : {left_direction}, right_direction: {right_direction}")
        

    
    except KeyboardInterrupt:
        # Handle Ctrl+C to gracefully exit the loop
        pass

if __name__ == "__main__":
    main()

DifferentialController.py

Python
import math
import time
from typing import Tuple

def is_number_type(value):
    return isinstance(value, int)

def clamp(value, min_val, max_val):
    return max(min(value, max_val), min_val)

class TwoWheelSteeringThrottle:
    def __init__(self, steering_zero: int = 10) -> None:
        if not is_number_type(steering_zero):
            raise ValueError("steering_zero must be an integer")
        if steering_zero > 1000 or steering_zero < 0:
            raise ValueError(f"steering_zero {steering_zero}, but must be between 0 and 1000.")
        self.steering_zero = steering_zero

    def convert_to_motor_range(self, value: int) -> int:
        # Map the input value to the range 0-255
        return int((value + 1000) * 255 / 2000)
        
    def differential_steering(self, steering: int, throttle: int, steering_zero: int = 10) -> Tuple[int, int]:
        if not is_number_type(throttle):
            print("throttle must be an integer")
            return 0, 0
        if throttle > 1000 or throttle < -1000:
            print(f"throttle = {throttle}, but must be between 1000(right) and -1000(left)")
        throttle = clamp(throttle, -1000, 1000)

        if not is_number_type(steering):
            print("steering must be an integer")
            return 0, 0
        if steering > 1000 or steering < -1000:
            print(f"steering = {steering}, but must be between 1000(right) and -1000(left)")
        steering = clamp(steering, -1000, 1000)

        # Convert steering and throttle to the range -1 to 1
        steering /= 1000
        throttle /= 1000

        # Calculate left and right throttle values
        left_throttle = throttle + steering
        right_throttle = throttle - steering

        #print(f"left_throttle: {left_throttle}, right_throttle: {right_throttle} ")
                
        # Scale the values to the range 0-255
        left_pwm = int((left_throttle) * 127.5)
        right_pwm = int((right_throttle) * 127.5)
        
        # left_direction = 1 if left_pwm >= 0 else -1
        # right_direction = 1 if right_pwm >= 0 else -1
        
        return left_pwm, right_pwm 
    
    def run(self, throttle, steering):
        return self.differential_steering(throttle, steering, self.steering_zero)

autopilot.py

Python
import math
import time
import threading
import random
import queue
from pymavlink import mavutil
from pyquaternion import Quaternion
from pymavlink.dialects.v20 import common


class KriaAP:
    def __init__(self, sysid, compid, chan, udp_ip, udp_port):
        self.sysid = sysid
        self.compid = compid
        self.chan = chan
        self.mvl_armed = False
        self.t_last_hb = 0
        self.t_last_parameters = 0
        self.t_last_sys_stat = 0
        self.t_last_joystick = 0
        self.sys_stat_count = 0
        self.sys_stat_interval = 5
        self.sys_param_interval = 5
        self.hb_interval = 1
        self.data_queue = queue.Queue()
        udp_connection_string = f'udpin:{udp_ip}:{udp_port}'
        self.mav = mavutil.mavlink_connection(udp_connection_string, source_system=self.sysid, source_component=self.compid)
        print(f"Connected to: {udp_connection_string}")

    def log(self, message):
        print(message)

    def set_value(self, control, value):
        print(f"Setting {control} to {value}")

    def transmit_message(self, message):
        self.mav.mav.send(message)
        print(f"Transmitting message: {message}")

    def handle_manual_control(self, mvl_msg):
        self.log("handleManualControl")
        mvl_joy = mavutil.mavlink.MAVLink_manual_control_message(mvl_msg)
        
        # Values are normalized to the range [-1000, 1000] -> we divide it by 10 to convert it to %
        self.set_value("THROTTLE", 0.1 * mvl_joy.z)
        self.set_value("YAW", 0.1 * mvl_joy.r)
        self.set_value("PITCH", 0.1 * mvl_joy.x)
        self.set_value("ROLL", 0.1 * mvl_joy.y)

        # Retransmit the manual control message
        mvl_joy.pack(mavutil.mavlink.MAVLink_message(), self.sysid, self.compid, self.chan)
        self.transmit_message(mvl_joy)

    def handle_command_long(self, mvl_msg):
        self.log("handleCommandLong")
        mvl_cmd = mavutil.mavlink.MAVLink_command_long_message(mvl_msg)
        if mvl_cmd.command == mavutil.mavlink.MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES:        #and mvl_cmd.param1 == 1
            mvl_apv = mavutil.mavlink.MAVLink_autopilot_version_message(2, 1, 1, 10101, 20202, 0, 0)
            mvl_apv.capabilities |= mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_SET_ATTITUDE_TARGET
            mvl_apv.capabilities |= mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_MAVLINK2
            mvl_apv.pack(mavutil.mavlink.MAVLink_message(), self.sysid, self.compid, self.chan)
            self.transmit_message(mvl_apv)

        elif mvl_cmd.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
            self.mvl_armed = True if mvl_cmd.param1 == 1 else False
            mvl_ack = mavutil.mavlink.MAVLink_command_ack_message(mvl_cmd.command, mavutil.mavlink.MAV_RESULT_ACCEPTED)
            mvl_ack.pack(mavutil.mavlink.MAVLink_message(), self.sysid, self.compid, self.chan)
            self.transmit_message(mvl_ack)

        else:
            self.log(f"Unprocessed command: {mvl_cmd.command}")

    def handle_mission_request_list(self, mvl_msg):
        self.log("handleMissionRequestList")
        mvl_mc = mavutil.mavlink.MAVLink_mission_count_message(0, self.sysid, self.compid, self.chan)
        mvl_mc.pack(mavutil.mavlink.MAVLink_message(), self.sysid, self.compid, self.chan)
        self.transmit_message(mvl_mc)

    def process_heartbeat(self):
        self.log("processHeartbeat")
        mvl_hb = mavutil.mavlink.MAVLink_heartbeat_message(1, 1, mavutil.mavlink.MAV_TYPE_GENERIC, mavutil.mavlink.MAV_AUTOPILOT_GENERIC, 1, 1)
        mvl_hb.base_mode |= mavutil.mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED
        mvl_hb.system_status = mavutil.mavlink.MAV_STATE_ACTIVE

        if self.mvl_armed:
            mvl_hb.base_mode |= mavutil.mavlink.MAV_MODE_MANUAL_ARMED
        else:
            mvl_hb.base_mode |= mavutil.mavlink.MAV_MODE_MANUAL_DISARMED

        mvl_hb.pack(mavutil.mavlink.MAVLink_message(), self.sysid, self.compid, self.chan)
        self.transmit_message(mvl_hb)
        self.t_last_hb = time.time()

    def send_system_status(self):
        if time.time() - self.t_last_sys_stat > self.sys_stat_interval:
            angle_as_mV = self.sys_stat_count * 100 + 10000
            self.sys_stat_count += 1
            self.sys_stat_count %= 60

            mvl_sys_stat = mavutil.mavlink.MAVLink_sys_status_message(
                onboard_control_sensors_present=1,
                onboard_control_sensors_enabled=1,
                onboard_control_sensors_health=1,
                load=1,
                voltage_battery=angle_as_mV,
                current_battery=-1,
                battery_remaining=-1,
                drop_rate_comm=0,
                errors_comm=0,
                errors_count1=0,
                errors_count2=0,
                errors_count3=0,
                errors_count4=0
            )
            self.transmit_message(mvl_sys_stat)
            self.t_last_sys_stat = time.time()



    def send_heartbeat(self):
        current_time = time.time()
        if current_time - self.t_last_hb > self.hb_interval:
            mvl_hb = mavutil.mavlink.MAVLink_heartbeat_message(
                mavutil.mavlink.MAV_TYPE_GROUND_ROVER,
                mavutil.mavlink.MAV_AUTOPILOT_GENERIC,
                self.sysid,
                self.compid,
                mavutil.mavlink.MAV_MODE_MANUAL_ARMED,
                mavutil.mavlink.MAV_STATE_ACTIVE
            )
            self.transmit_message(mvl_hb)
            self.t_last_hb = current_time
      
    def send_autopilot_version_message(self):   
        mvl_apv = mavutil.mavlink.MAVLink_autopilot_version_message(flight_sw_version=2,
            middleware_sw_version=1, 
            os_sw_version=1,
            board_version=1, 
            flight_custom_version=[0, 0, 0, 0, 3, 3, 3, 3],  # 
            middleware_custom_version=[0, 0, 0, 2, 2, 0, 0, 0],  # 
            os_custom_version=[0, 0, 0, 0, 0, 0, 1, 1],   
            vendor_id=10101, 
            product_id=20202, 
            uid=0, 
            capabilities=(mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_SET_ATTITUDE_TARGET |mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_MAVLINK2) )
        self.transmit_message(mvl_apv)


    
    def receive_param_request_list(self, mvl_msg_ptr):
        msg = self.mav.recv_match(type='MAVLINK_MSG_ID_PARAM_REQUEST_LIST', blocking=False)
        if msg is not None:
            self.log("handleParamRequestList")
            mvl_param = mavutil.mavlink.MAVLink_param_value_message()
            mvl_param.param_count = 1
            mvl_param.param_index = 1
            mavutil.mavlink_msg_param_value_encode_chan(self.mvl_sysid, self.mvl_compid, self.mvl_chan, self.mvl_tx_message, mvl_param)
            self.transmit_message(mvl_param)    
      


    def receive_manual_control(self):
        try:
            msg = self.mav.recv_match(type='MANUAL_CONTROL', blocking=False)
            
            if msg:
                # Check if z and r are not zero
                if msg.z != 0 or msg.r != 0:
                    # Extract relevant data from the MANUAL_CONTROL message
                    manual_control_data = {
                        'x': msg.x,
                        'y': msg.y,
                        'z': msg.z,
                        'r': msg.r
                    }
                    
                    # Print the received data
                    print("Received MANUAL_CONTROL data:")
                    print(f"X: {manual_control_data['x']}")
                    print(f"Y: {manual_control_data['y']}")
                    print(f"Z: {manual_control_data['z']}")
                    print(f"R: {manual_control_data['r']}")
                    
                    # Optionally, you can add the data to a queue for further processing
                    # data_queue.put(manual_control_data)
        except Exception as e:
            print(f"An error occurred: {e}")


    def receive_command_long(self):
        msg = self.mav.recv_match(type='COMMAND_LONG', blocking=False)
        if msg is not None:
            print(f"Received COMMAND_LONG message: {msg}")

    def send_heartbeat(self):
        current_time = time.time()
        if current_time - self.t_last_hb > self.hb_interval:
            mvl_hb = mavutil.mavlink.MAVLink_heartbeat_message(
                mavutil.mavlink.MAV_TYPE_GROUND_ROVER,
                mavutil.mavlink.MAV_AUTOPILOT_GENERIC,
                self.sysid,
                self.compid,
                mavutil.mavlink.MAV_MODE_MANUAL_ARMED,
                mavutil.mavlink.MAV_STATE_ACTIVE
            )
            self.transmit_message(mvl_hb)
            self.t_last_hb = current_time

    def send_command_long(self):
        # Define command_long_encode message to send MAV_CMD_SET_MESSAGE_INTERVAL command
        # param1: MAVLINK_MSG_ID_BATTERY_STATUS (message to stream)
        # param2: 1000000 (Stream interval in microseconds)
        message = self.mav.mav.command_long_encode(
            self.mav.target_system,  # Target system ID
            self.mav.target_component,  # Target component ID
            mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,  # ID of command to send
            0,  # Confirmation
            mavutil.mavlink.MAVLINK_MSG_ID_BATTERY_STATUS,  # param1: Message ID to be streamed
            1000000,  # param2: Interval in microseconds
            0,  # param3 (unused)
            0,  # param4 (unused)
            0,  # param5 (unused)
            0,  # param5 (unused)
            0  # param6 (unused)
        )
        self.transmit_message(message)


    def receive_commands(self, data_queue):
        while True:
            self.receive_manual_control(data_queue)
            # self.receive_command_long()
            # self.receive_param_request_list
            time.sleep(0.1)


    def send_system_telemetry(self):
        while True:
            current_time = time.time()
            if current_time - self.t_last_sys_stat > 2:
                #self.send_autopilot_version_message()
                #self.send_gps_position()
                #self.send_attitude_message()
                self.send_system_status()
                self.t_last_sys_stat = current_time
        
    def loop_telem_hb(self):
        self.send_heartbeat()

        self.send_system_telemetry()




    def get_timestamps(self):
        # Get current time in microseconds since UNIX epoch
        time_unix_usec = int(time.time() * 1e6)

        # Get current time in milliseconds since boot time
        time_boot_ms = int((datetime.utcnow() - datetime.utcfromtimestamp(0)).total_seconds() * 1e3)

        return time_unix_usec, time_boot_ms

    def mvl_handle_manual_control(self, mvl_msg):
        mvl_joy = mavutil.mavlink.MAVLink_manual_control_message(mvl_msg)
        self.transmit_message(mvl_joy)

    def mvl_handle_param_request_list(self, mvl_msg):
        mvl_param = mavutil.mavlink.MAVLink_param_value_message(0, 0, 'a_param', 123.456, mavutil.mavlink.MAV_PARAM_TYPE_REAL32, 1, 0)
        self.transmit_message(mvl_param)

    def mvl_handle_command_long(self, mvl_msg):
        mvl_cmd = mavutil.mavlink.MAVLink_command_long_message(mvl_msg)
        if mvl_cmd.command == mavutil.mavlink.MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES and mvl_cmd.param1 == 1:
            mvl_apv = mavutil.mavlink.MAVLink_autopilot_version_message(1, 1, 2, 1, 1, 10101, 20202, 0, 0)
            mvl_apv.capabilities = mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_SET_ATTITUDE_TARGET | mavutil.mavlink.MAV_PROTOCOL_CAPABILITY_MAVLINK2
            self.transmit_message(mvl_apv)
        elif mvl_cmd.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
            mvl_ack = mavutil.mavlink.MAVLink_command_ack_message(mvl_msg)
            if mvl_cmd.param1 == 1:
                mvl_armed = True
            else:
                mvl_armed = False
            mvl_ack.command = mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM
            mvl_ack.result = mavutil.mavlink.MAV_RESULT_ACCEPTED
            self.transmit_message(mvl_ack)
            
    def handle_rc_raw(msg):
        channels = (msg.chan1_raw, msg.chan2_raw, msg.chan3_raw, msg.chan4_raw, msg.chan5_raw, msg.chan6_raw, msg.chan7_raw, msg.chan8_raw)


    def handle_hud(msg):
        hud_data = (msg.airspeed, msg.groundspeed, msg.heading, msg.throttle, msg.alt, msg.climb)
        print ("Aspd\tGspd\tHead\tThro\tAlt\tClimb")
        print ("%0.2f\t%0.2f\t%0.2f\t%0.2f\t%0.2f\t%0.2f" % hud_data)

    def handle_attitude(msg):
        attitude_data = (msg.roll, msg.pitch, msg.yaw, msg.rollspeed, msg.pitchspeed, msg.yawspeed)
        print ("Roll\tPit\tYaw\tRSpd\tPSpd\tYSpd")
        print ("%0.2f\t%0.2f\t%0.2f\t%0.2f\t%0.2f\t%0.2f\t" % attitude_data )
    


    def send_set_mode_message(target_system, base_mode, custom_mode):
        # Create a MAVLink message for the SET_MODE command
        msg = mavutil.mavlink.MAVLink_command_long_message(
            mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_LONG,
            0,  # System ID (0 for broadcast)
            0,  # Component ID (0 for broadcast)
            mavutil.mavlink.MAV_CMD_DO_SET_MODE,  # Command ID
            0,  # Confirmation
            target_system,  # Target System ID
            0,  # Target Component ID (0 for all components)
            base_mode,  # New base mode
            custom_mode,  # New custom mode
            0,  # Parameter 1 (not used for SET_MODE)
            0  # Parameter 2 (not used for SET_MODE)
            # 0,  # Parameter 3 (not used for SET_MODE)
            # 0,  # Parameter 4 (not used for SET_MODE)
            # 0,  # Parameter 5 (not used for SET_MODE)
            # 0   # Parameter 6 (not used for SET_MODE)
        )
        mvl_transmit_message(msg)
        # Send the message
        # mavutil.mavlink.MAVLink_message(mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_LONG, "COMMAND_LONG").pack(mav.mav)


    def send_global_position_int(self, latitude, longitude, altitude, target_system, target_component):
        # Create a MAVLink message for the GLOBAL_POSITION_INT message
        msg = mavutil.mavlink.MAVLink_global_position_int_message(
            time_boot_ms=0,  # Set to 0 or provide the appropriate boot time in milliseconds
            lat=int(latitude * 1e7),  # Latitude in degrees * 1e7
            lon=int(longitude * 1e7),  # Longitude in degrees * 1e7
            alt=int(altitude * 1e3),  # Altitude in meters * 1e3
            relative_alt=0,  # Relative altitude in meters (optional)
            vx=0,  # Ground speed in X (optional)
            vy=0,  # Ground speed in Y (optional)
            vz=0,  # Ground speed in Z (optional)
            hdg=0,  # Compass heading in degrees * 100 (optional)
        )

        # Send the message
        # msg.pack(mavutil.mavlink.MAVLink_message(mavutil.mavlink.MAVLINK_MSG_ID_GLOBAL_POSITION_INT, "GLOBAL_POSITION_INT"))
        self.transmit_message(msg)




    def send_attitude_message(self, time_boot_ms, roll, pitch, yaw, rollspeed, pitchspeed, yawspeed, target_system, target_component):
        # Create a MAVLink message for the ATTITUDE message
        msg = mavutil.mavlink.MAVLink_attitude_message(
            time_boot_ms=time_boot_ms,  # Set the time_boot_ms value
            roll=roll,  # Roll angle in radians
            pitch=pitch,  # Pitch angle in radians
            yaw=yaw,  # Yaw angle in radians
            rollspeed=rollspeed,  # Roll angular speed in radians/second
            pitchspeed=pitchspeed,  # Pitch angular speed in radians/second
            yawspeed=yawspeed  # Yaw angular speed in radians/second
        )

        # Send the message
        self.transmit_message(msg)




    def send_param_set_message(self,param_id, param_value, target_system, target_component):
        # Create a MAVLink message for the PARAM_SET message
        msg = mavutil.mavlink.MAVLink_param_set_message(
            target_system=target_system,  # Target System ID
            target_component=target_component,  # Target Component ID
            param_id=param_id.encode('utf-8'),  # Parameter name (UTF-8 encoded)
            param_value=param_value,  # Parameter value
            param_type=9  # Parameter type (assuming it's a real number)
        )

        # Send the message
        self.transmit_message(msg)

    def send_param_value_message(self,param_id, param_value):
        # Create a MAVLink message for the PARAM_VALUE message
        msg = mavutil.mavlink.MAVLink_param_value_message(
            param_id=param_id.encode('utf-8'),  # Parameter name (UTF-8 encoded)
            param_value=param_value,  # Parameter value
            param_type=9,  # Parameter type (assuming it's a real number)
            param_count=1,  # Number of parameters
            param_index=0  # Index of this parameter
        )
        # Send the message
        self.transmit_message(msg)

    def reply_to_preflight_storage(command, result):
        # Create a MAVLink message for COMMAND_ACK
        msg = mavutil.mavlink.MAVLink_command_ack_message(
            command=command,  # The command that is being acknowledged
            result=result  # The result of the command execution
        )
        # Send the message
        mav.mav.send(msg)

    def send_param_request_read(self,param_id, target_system, target_component):
        # Create a MAVLink message for PARAM_REQUEST_READ
        msg = mavutil.mavlink.MAVLink_param_request_read_message(
            target_system=target_system,  # Target System ID
            target_component=target_component,  # Target Component ID
            param_id=param_id.encode('utf-8'),  # Parameter name (UTF-8 encoded)
            param_index=-1  # Request parameter by name, not by index
        )

        # Send the message
        self.transmit_message(msg)

    def send_param_request_list(self, target_system, target_component):
        # Create a MAVLink message for PARAM_REQUEST_LIST
        msg = mavutil.mavlink.MAVLink_param_request_list_message(
            target_system=target_system,  # Target System ID
            target_component=target_component  # Target Component ID
        )
        # Send the message
        self.transmit_message(msg)



    def send_system_time_message(self,time_unix_usec, time_boot_ms):
        # Create a MAVLink message for SYSTEM_TIME
        msg = mavutil.mavlink.MAVLink_system_time_message(
            time_unix_usec=time_unix_usec,  # Timestamp of the master clock in microseconds since UNIX epoch
            time_boot_ms=time_boot_ms  # Timestamp of the component clock since boot time in milliseconds
        )

        # Send the message
        mav.mav.send(msg)

    def send_change_operator_control_message(target_system, control_request, version, passkey):
        # Create a MAVLink message for CHANGE_OPERATOR_CONTROL
        msg = mavutil.mavlink.MAVLink_change_operator_control_message(
            target_system=target_system,  # System ID of the target
            control_request=control_request,  # Control request type (MAV_ODID_CONTROL_REQUEST)
            version=version,  # Version number of the operator control request
            passkey=passkey.encode('utf-8')  # Passkey for the operator control request (UTF-8 encoded)
        )
        # Send the message
        self.transmit_message(msg)

    def send_local_position_ned_message(self,time_boot_ms, x, y, z, vx, vy, vz):
        # Create a MAVLink message for LOCAL_POSITION_NED
        msg = mavutil.mavlink.MAVLink_local_position_ned_message(
            time_boot_ms=time_boot_ms,  # Timestamp of the component clock since boot time in milliseconds
            x=x,  # X position in meters
            y=y,  # Y position in meters
            z=z,  # Z position in meters
            vx=vx,  # X velocity in meters per second
            vy=vy,  # Y velocity in meters per second
            vz=vz  # Z velocity in meters per second
        )
        # Send the message
        self.transmit_message(msg)


    def send_sys_autoconfig_parameter(self, target_system, target_component, sys_autoconfig):
        # Create a MAVLink message for PARAM_SET
        msg = mavutil.mavlink.MAVLink_param_set_message(
            target_system=target_system,         # Target System ID
            target_component=target_component,   # Target Component ID
            param_id=b'SYS_AUTOCONFIG',          # Parameter name (UTF-8 encoded)
            param_value=sys_autoconfig,          # Parameter value
            param_type=mavutil.mavlink.MAV_PARAM_TYPE_UINT8  # Parameter type (assuming it's an unsigned 8-bit integer)
        )
        # Send the message
        self.transmit_message(msg)

    def send_statustext_to_ground_control(self,severity, text):
        # Create a MAVLink message for STATUSTEXT
        msg = mavutil.mavlink.MAVLink_statustext_message(
            severity=severity,  # Severity of the status message
            text=text.encode('utf-8')  # Status message text (UTF-8 encoded)
        )
        # Send the message
        self.transmit_message(msg)

    def send_debug_message_to_ground_control(self, time_boot_ms, ind, value):
        # Create a MAVLink message for DEBUG
        msg = mavutil.mavlink.MAVLink_debug_message(
            time_boot_ms=time_boot_ms,  # Timestamp of the component clock since boot time in milliseconds
            ind=ind,  # Index of the debug variable
            value=value  # Debug value
        )
        # Send the message
        self.transmit_message(msg)


    def send_roll_pitch_yaw_rates_thrust_setpoint_message(self, time_boot_ms, roll_rate, pitch_rate, yaw_rate, thrust):
        # Create a MAVLink message for ROLL_PITCH_YAW_RATES_THRUST_SETPOINT
        msg = mavutil.mavlink.MAVLink_roll_pitch_yaw_rates_thrust_setpoint_message(
            time_boot_ms=time_boot_ms,  # Target System ID
            roll_rate=roll_rate,    # Roll rate (rad/s)
            pitch_rate=pitch_rate,  # Pitch rate (rad/s)
            yaw_rate=yaw_rate,      # Yaw rate (rad/s)
            thrust=thrust           # Throttle value (0 to 1)
        )
        # Send the message
        self.transmit_message(msg)


    def set_mav_parameters(self, name, value, retries=3, parm_type=None):
        '''set a parameter on a MAVLink connection'''
        got_ack = False

        if parm_type is not None and parm_type != mavutil.mavlink.MAV_PARAM_TYPE_REAL32:
            # need to encode as a float for sending
            if parm_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT8:
                vstr = struct.pack(">xxxB", int(value))
            elif parm_type == mavutil.mavlink.MAV_PARAM_TYPE_INT8:
                vstr = struct.pack(">xxxb", int(value))
            elif parm_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT16:
                vstr = struct.pack(">xxH", int(value))
            elif parm_type == mavutil.mavlink.MAV_PARAM_TYPE_INT16:
                vstr = struct.pack(">xxh", int(value))
            elif parm_type == mavutil.mavlink.MAV_PARAM_TYPE_UINT32:
                vstr = struct.pack(">I", int(value))
            elif parm_type == mavutil.mavlink.MAV_PARAM_TYPE_INT32:
                vstr = struct.pack(">i", int(value))
            else:
                print("can't send %s of type %u" % (name, parm_type))
                return False
            vfloat, = struct.unpack(">f", vstr)
        else:
            vfloat = float(value)

        while retries > 0 and not got_ack:
            retries -= 1
            mav.mav.param_set_send(name.upper(), vfloat, parm_type=parm_type)
            tstart = time.time()
            while time.time() - tstart < 1:
                ack = mav.recv_match(type='PARAM_VALUE', blocking=False)
                if ack is None:
                    time.sleep(0.1)
                    continue
                if str(name).upper() == str(ack.param_id).upper():
                    got_ack = True
                    print("Received PARAM_VALUE for {}: {}".format(ack.param_id, ack.param_value))
                    break

        if not got_ack:
            print("timeout setting %s to %f" % (name, vfloat))
            return False

        return True

Credits

Dhimiter D
1 project • 1 follower
Software developer

Comments