Hackster is hosting Hackster Holidays, Ep. 6: Livestream & Giveaway Drawing. Watch previous episodes or stream live on Monday!Stream Hackster Holidays, Ep. 6 on Monday!
Massimiliano
Published © MIT

Remote Gesture controller with MQTT

Wouldn’t be nice to control a robotic arm on the other side of the world and make it follow your gestures in the most natural manner?

BeginnerWork in progress1 hour527

Things used in this project

Hardware components

The Tactigon Skin (T-SKIN)
The Tactigon Skin (T-SKIN)
×1

Software apps and online services

MQTT
MQTT

Story

Read more

Code

Application_Gesture_to_MQTT.py

Python
import multiprocessing
import logging
import time
import paho.mqtt.client as mqtt

import sys
from os import path

# resource directory - need to be referenced to exe path when used with exe bundle
if getattr(sys, "frozen", False):
    CLIENT_PY_PATH = sys._MEIPASS
else:
    CLIENT_PY_PATH = path.join(path.dirname(__file__), "../../")
    sys.path.insert(0, path.join(CLIENT_PY_PATH, "utilities"))
    sys.path.insert(0, path.join(CLIENT_PY_PATH, "hal"))
    sys.path.insert(0, path.join(CLIENT_PY_PATH, "middleware"))

from Config_Manager import Config_Manager
from Tactigon_BLE import Tactigon_BLE
from Tactigon_Gesture import Tactigon_Gesture


class Application_MQTT(multiprocessing.Process):
    def __init__(self, gesture_r_pipe=False, gesture_l_pipe=False):

        super(Application_MQTT, self).__init__(
            target=self.loop_iterator, args=(gesture_r_pipe, gesture_l_pipe)
        )

    def loop_iterator(self, gesture_r_pipe, gesture_l_pipe):

        self.gesture_r_pipe = gesture_r_pipe
        self.gesture_l_pipe = gesture_l_pipe
        self.app_config = Config_Manager.from_file(
            "apps/application_gesture_to_mqtt.json"
        )

        self.client = mqtt.Client()
        self.client.connect(
            self.app_config.get("MQTT_URL"), self.app_config.get("MQTT_PORT"), 60
        )
        self.client.loop_start()

        print("Application process started, type any key to close the application")

        while True:
            self.loop()

    def loop(self):
        """Gesture recognition loop routine"""

        if (self.gesture_r_pipe != False) and (self.gesture_r_pipe.poll()):
            r_gesture, _ = self.gesture_r_pipe.recv()
            print("Publishing RIGHT topic")
            self.client.publish(self.app_config.get("MQTT_RIGHT_TOPIC"), r_gesture)

        if (self.gesture_l_pipe != False) and (self.gesture_l_pipe.poll()):
            l_gesture, _ = self.gesture_l_pipe.recv()
            print("Publishing LEFT topic")
            self.client.publish(self.app_config.get("MQTT_LEFT_TOPIC"), l_gesture)


def run(debug=False):

    if debug:
        # logging
        multiprocessing.log_to_stderr()
        logger = multiprocessing.get_logger()
        logger.setLevel(logging.DEBUG)

    # import json config
    hal_config = Config_Manager.from_file("hal.json")

    R_add = hal_config.get("BLE_RIGHT_ADDRESS")
    L_add = hal_config.get("BLE_LEFT_ADDRESS")
    R_en = hal_config.get("BLE_RIGHT_ENABLE")
    L_en = hal_config.get("BLE_LEFT_ENABLE")
    num_sample = hal_config.get("NUM_SAMPLE")

    # data pipe variables
    if R_en == "True":
        rx_sensor_r_pipe, tx_sensor_r_pipe = multiprocessing.Pipe(duplex=False)
        rx_gesture_r_pipe, tx_gesture_r_pipe = multiprocessing.Pipe(duplex=False)
    else:
        rx_sensor_r_pipe = False
        rx_gesture_r_pipe = False

    if L_en == "True":
        rx_sensor_l_pipe, tx_sensor_l_pipe = multiprocessing.Pipe(duplex=False)
        rx_gesture_l_pipe, tx_gesture_l_pipe = multiprocessing.Pipe(duplex=False)
    else:
        rx_sensor_l_pipe = False
        rx_gesture_l_pipe = False

    # create  processes
    if R_en == "True":
        pro_in_r = Tactigon_BLE("RIGHT", R_add, tx_sensor_r_pipe, debug=debug)
        pro_g_r = Tactigon_Gesture(
            "RIGHT",
            num_sample,
            rx_sensor_r_pipe,
            tx_gesture_r_pipe,
            confidence_th=5,
            debug=debug,
        )

    if L_en == "True":
        pro_in_l = Tactigon_BLE("LEFT", L_add, tx_sensor_l_pipe, debug=debug)
        pro_g_l = Tactigon_Gesture(
            "LEFT",
            num_sample,
            rx_sensor_l_pipe,
            tx_gesture_l_pipe,
            confidence_th=5,
            debug=debug,
        )

    pro_app = Application_MQTT(rx_gesture_r_pipe, rx_gesture_l_pipe)

    pro_app.start()

    if R_en == "True":
        pro_g_r.start()
        pro_in_r.start()

    if L_en == "True":
        pro_g_l.start()
        pro_in_l.start()

    input()

    if R_en == "True":
        pro_in_r.terminate()
        pro_g_r.terminate()

    if L_en == "True":
        pro_in_l.terminate()
        pro_g_l.terminate()

    pro_app.terminate()


# Start point of the application
if __name__ == "__main__":

    run(False)

Credits

Massimiliano

Massimiliano

29 projects • 115 followers
I'm an engineer and I like robotics, embedded systems and linux. I spent a lot of time in automation and firmware development.

Comments