Hardware components | ||||||
| × | 1 | ||||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
| ||||||
|
The story here is pretty simple. Familiarize myself with what the EV3 could do, decide on a use case, then make it.
In Part 1, Familiarize myself with the EV3, I went through the Tutorials. I tried it out with different configurations, made other tweaks to see how things would work. All in all, pretty straightforward. In addition, after creating the Alexa Skills using the provided Node.js samples, I ported the samples to Python and published it on GitHub, thus allowing someone to only have to work in one programming language (Python) instead of having to work with both Node.js and Python. It works with Alexa Hosted, and it's had a few visitors, so hopefully it's helping someone.
Then came the slow part - deciding on what to make. I went through a couple of iterations, but the main focus was to find something that would work well with voice as well as within the capabilities of the EV3. I didn't want something more cumbersome to do via voice, and ultimately settled on doing the Hokey Pokey since you could choose different variations via voice. With four motors, there were four options - left and right arms, and left and right legs. Those would all be needed to do the hokey pokey itself (and turn yourself around), so it seemed it was settled. Then...a cute character was revealed which seemed like a good fit, although how to make The Child out of Lego, well that's a different challenge.
I decided to start with the Yoda Lego Set (#75255) since that was likely to be the right color, even if some of the other parts needed to be modified. Once the head and ears were complete, I considered how to make the cloak. Given all the movement required, that was tabled and just a facade of a cloak was built.
Then came the coding. The overall mechanics were straightfoward - move an arm in, out, back in, and shake it all about. Repeat that with the other appendages, and then link that up with the payloads sent by the skill, and The Mandalorian is your uncle!
There were some challenges getting the movements to seem right - from needing to reverse the speed for left vs. right (since the gears needed to turn the other direction), and needing to tune how far, or how long in order to move the arms or to go in a circle (to turn yourself around). I also found that the surface material made a difference on the experience. Since it's a little top-heavy, turning around on carpet sometimes caused the figure to fall over. Oops! I suspect a slower speed and longer turn might fix that.
Regardless, I couldn't find any suitable music, so I went with Alexa simply speaking the song. (You don't want to hear me sing - that's not a requirement, right?)
There you have it. Now it could be the model build could be improved, and I could potentially see adding another EV3 brick to expand the bits which could be put in and out, but that could easily get complicated and might not be worth it.
In the end, my son and I had fun with this (he's not officially on the team since he's still a few months from turning 13). Having the EV3 also helped him with his FLL (First Lego League activites).
Video: https://www.amazon.com/photos/shared/dPtB1WEsRxS2q2HzAG5jow.xxQTLycJkNmxHJRtqab8Hr
# The Child Dance Party
# Copyright Franklin Lobb 2019
import logging.handlers
import requests
import uuid
from ask_sdk_core.skill_builder import SkillBuilder
from ask_sdk_core.utils import is_request_type, is_intent_name, get_slot_value, get_slot
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_core.serialize import DefaultSerializer
from ask_sdk_model import IntentRequest
from ask_sdk_model.ui import PlayBehavior
from ask_sdk_model.slot import Slot
from ask_sdk_model.interfaces.custom_interface_controller import (
StartEventHandlerDirective, EventFilter, Expiration, FilterMatchAction,
StopEventHandlerDirective,
SendDirectiveDirective,
Header,
Endpoint,
EventsReceivedRequest,
ExpiredRequest
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
serializer = DefaultSerializer()
skill_builder = SkillBuilder()
# The namespace of the custom directive to be sent by this skill
NAMESPACE = "Custom.Mindstorms.Gadget"
# The name of the custom directive to be sent this skill
NAME_CONTROL = "control"
# The audio tag to include background music
BG_MUSIC = '<audio src="soundbank://soundlibrary/ui/gameshow/amzn_ui_sfx_gameshow_waiting_loop_30s_01"></audio>'
@skill_builder.request_handler(can_handle_func=is_request_type("LaunchRequest"))
def launch_request_handler(handler_input: HandlerInput):
logger.info("== Launch Intent ==")
response_builder = handler_input.response_builder
system = handler_input.request_envelope.context.system
api_access_token = system.api_access_token
api_endpoint = system.api_endpoint
# Get connected gadget endpoint ID.
endpoints = get_connected_endpoints(api_endpoint, api_access_token)
logger.debug("Checking endpoint..")
if not endpoints:
logger.debug("No connected gadget endpoints available.")
return (response_builder
.speak("I couldn't find an EV3 Brick connected to this Echo device. Please check to make sure your EV3 Brick is connected, and try again.")
.set_should_end_session(True)
.response)
endpoint_id = endpoints[0].get("endpointId", "")
# Store endpoint ID for using it to send custom directives later.
logger.debug("Received endpoints. Storing Endpoint Id: %s", endpoint_id)
session_attr = handler_input.attributes_manager.session_attributes
session_attr["endpoint_id"] = endpoint_id
# Set skill duration to 5 minutes (ten 30-seconds interval)
session_attr["duration"] = 10
# Set the token to track the event handler
token = handler_input.request_envelope.request.request_id
session_attr["token"] = token
speak_output = "Welcome, let\'s get this dance party started!"
return (response_builder
.speak(speak_output + BG_MUSIC)
.add_directive(build_start_event_handler_directive(token, 60000, NAMESPACE, NAME_CONTROL, 'SEND', {}))
.response)
def get_hokey_pokey(appendage):
appendage_info = ''
if appendage != None:
appendage_info = ('<break time="1s"/> put your {} in, <break time="1s"/> put your {} out, <break time="1s"/> put your {} in and shake it all about.'
).format(appendage, appendage, appendage)
return (
appendage_info + ' do the hokey pokey and turn yourself around. that\'s what it\'s all about! <break time="3s"/>'
+ ' what should we do next?')
def get_id_from_slot(slot: Slot):
""" returns the slot's id if there was a match, else the spoken value
"""
if slot is None:
return ""
try:
# grab the first id from the first match (assumes only static entities)
return slot.resolutions.resolutions_per_authority[0].values[0].value.id
except Exception as e:
logger.error('Failed to get id: '+ str(e))
# if it doesn't work, then there is no id available, so just return the slot's value
try:
return slot.value
except:
# if for some reason the slot is present, but there is no value
return ""
@skill_builder.request_handler(can_handle_func=is_intent_name("DanceIntent"))
def dance_intent_handler(handler_input: HandlerInput):
# Construct and send a custom directive to the connected gadget with
# data from the MoveIntent request.
logger.info("DanceIntent received.")
appendage = get_slot(handler_input, "appendage" )
appendage_id = get_id_from_slot(appendage)
appendage_name = appendage.value
# Get data from session attribute
session_attr = handler_input.attributes_manager.session_attributes
endpoint_id = session_attr.get("endpoint_id", [])
# Construct the directive with the payload containing the move parameters
payload = {
"type": "dance",
"appendage": appendage_id
}
directive = build_send_directive(NAMESPACE, NAME_CONTROL, endpoint_id, payload)
speak_output = "Whoa! Stopping here!" if (appendage_id == "stop") else (
get_hokey_pokey(appendage_name)
)
return (handler_input.response_builder
.speak(speak_output)
.add_directive(directive)
.set_should_end_session(False)
.response)
def has_valid_token(handler_input):
if not is_request_type('CustomInterfaceController.EventsReceived')(handler_input):
return False
session_attr = handler_input.attributes_manager.session_attributes
request = handler_input.request_envelope.request
# Validate event token
if session_attr.get("token", None) != request.token:
logger.info("Event token doesn't match. Ignoring this event")
return False
return True
def has_valid_endpoint(handler_input):
if not is_request_type('CustomInterfaceController.EventsReceived')(handler_input):
return False
session_attr = handler_input.attributes_manager.session_attributes
request = handler_input.request_envelope.request
custom_event = request.events[0]
# Validate endpoint
request_endpoint = custom_event.endpoint.endpoint_id
if request_endpoint != session_attr.get("endpoint_id", None):
logger.info("Event endpoint id doesn't match. Ignoring this event")
return False
return True
@skill_builder.request_handler(can_handle_func=lambda handler_input:
is_request_type("CustomInterfaceController.EventsReceived") and
has_valid_token(handler_input) and
has_valid_endpoint(handler_input)
)
def events_received_request_handler(handler_input: HandlerInput):
logger.info("== Received Custom Event ==")
custom_event = handler_input.request_envelope.request.events[0]
payload = custom_event.payload
name = custom_event.header.name
speak_output = ""
if name == 'Speech':
speak_output = payload.get("speechOut", "")
else:
speak_output = "Event not recognized. Awaiting new command."
return (handler_input.response_builder
.speak(speak_output + BG_MUSIC, "REPLACE_ALL")
.response)
@skill_builder.request_handler(can_handle_func=is_request_type("CustomInterfaceController.Expired"))
def custom_interface_expiration_handler(handler_input):
logger.info("== Custom Event Expiration Input ==")
session_attr = handler_input.attributes_manager.session_attributes
# Set the token to track the event handler
token = handler_input.request_envelope.request.request_id
session_attr["token"] = token
duration = session_attr.get("duration", 0)
if duration > 0:
session_attr["duration"] = duration - 1
# extends skill session
timeout = 60000
directive = build_start_event_handler_directive(token, timeout, NAMESPACE, NAME_CONTROL, 'SEND', {})
return (handler_input.response_builder
.add_directive(directive)
.response
)
else:
# End skill session
return (handler_input.response_builder
.speak("Skill duration expired. Goodbye.")
.set_should_end_session(True)
.response)
@skill_builder.request_handler(can_handle_func=lambda handler_input:
is_intent_name("AMAZON.CancelIntent")(handler_input) or
is_intent_name("AMAZON.StopIntent")(handler_input))
def stop_and_cancel_intent_handler(handler_input):
logger.info("Received a Stop or a Cancel Intent..")
session_attr = handler_input.attributes_manager.session_attributes
response_builder = handler_input.response_builder
# When the user stops the skill, stop the EventHandler
if 'token' in session_attr.keys():
logger.debug("Active session detected, sending stop EventHandlerDirective.")
directive = build_stop_event_handler_directive(session_attr["token"])
response_builder.add_directive(directive)
return (response_builder
.speak("Goodbye!")
.set_should_end_session(True)
.response)
@skill_builder.request_handler(can_handle_func=is_request_type("SessionEndedRequest"))
def session_ended_request_handler(handler_input):
logger.info("Session ended with reason: " +
handler_input.request_envelope.request.reason.to_str())
return handler_input.response_builder.response
@skill_builder.exception_handler(can_handle_func=lambda i, e: True)
def error_handler(handler_input, exception):
logger.info("==Error==")
logger.error(exception, exc_info=True)
return (handler_input.response_builder
.speak("I'm sorry, something went wrong!").response)
@skill_builder.global_request_interceptor()
def log_request(handler_input):
# Log the request for debugging purposes.
logger.info("==Request==\r" +
str(serializer.serialize(handler_input.request_envelope)))
@skill_builder.global_response_interceptor()
def log_response(handler_input, response):
# Log the response for debugging purposes.
logger.info("==Response==\r" + str(serializer.serialize(response)))
logger.info("==Session Attributes==\r" +
str(serializer.serialize(handler_input.attributes_manager.session_attributes)))
def get_connected_endpoints(api_endpoint, api_access_token):
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(api_access_token)
}
api_url = api_endpoint + "/v1/endpoints"
endpoints_response = requests.get(api_url, headers=headers)
if endpoints_response.status_code == requests.codes.get("ok", ""):
return endpoints_response.json()["endpoints"]
def build_send_directive(namespace, name, endpoint_id, payload):
return SendDirectiveDirective(
header=Header(
name=name,
namespace=namespace
),
endpoint=Endpoint(
endpoint_id=endpoint_id
),
payload=payload
)
def build_start_event_handler_directive(token, duration_ms, namespace,
name, filter_match_action, expiration_payload):
return StartEventHandlerDirective(
token=token,
# event_filter=EventFilter(
# filter_expression={
# 'and': [
# {'==': [{'var': 'header.namespace'}, namespace]},
# {'==': [{'var': 'header.name'}, name]}
# ]
# },
# filter_match_action=filter_match_action
# ),
expiration=Expiration(
duration_in_milliseconds=duration_ms,
expiration_payload=expiration_payload))
def build_stop_event_handler_directive(token):
return StopEventHandlerDirective(token=token)
lambda_handler = skill_builder.lambda_handler()
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# You may not use this file except in compliance with the terms and conditions
# set forth in the accompanying LICENSE.TXT file.
#
# THESE MATERIALS ARE PROVIDED ON AN "AS IS" BASIS. AMAZON SPECIFICALLY DISCLAIMS, WITH
# RESPECT TO THESE MATERIALS, ALL WARRANTIES, EXPRESS, IMPLIED, OR STATUTORY, INCLUDING
# THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.
import time
import logging
import json
import random
import threading
from enum import Enum
from agt import AlexaGadget
from ev3dev2.led import Leds
from ev3dev2.sound import Sound
from ev3dev2.motor import OUTPUT_A, OUTPUT_B, OUTPUT_C, OUTPUT_D, MoveTank, MotorSet, SpeedPercent, MediumMotor, LargeMotor
from ev3dev2.sensor.lego import InfraredSensor
# Set the logging level to INFO to see messages from AlexaGadget
logging.basicConfig(level=logging.INFO)
class EventName(Enum):
"""
The list of custom event name sent from this gadget
"""
SPEECH = "Speech"
class MindstormsGadget(AlexaGadget):
"""
A Mindstorms gadget that can perform bi-directional interaction with an Alexa skill.
"""
def __init__(self):
"""
Performs Alexa Gadget initialization routines and ev3dev resource allocation.
"""
super().__init__()
# Connect two large motors on output ports B and C
self.left_leg = LargeMotor(OUTPUT_C)
self.right_leg = LargeMotor(OUTPUT_B)
self.left_arm = MediumMotor(OUTPUT_D)
self.right_arm = MediumMotor(OUTPUT_A)
#self.arms = MotorSet(OUTPUT_A, OUTPUT_D)
self.tank = MoveTank(OUTPUT_B, OUTPUT_C)
self.sound = Sound()
self.leds = Leds()
def on_connected(self, device_addr):
"""
Gadget connected to the paired Echo device.
:param device_addr: the address of the device we connected to
"""
self.leds.set_color("LEFT", "GREEN")
self.leds.set_color("RIGHT", "GREEN")
print("{} connected to Echo device".format(self.friendly_name))
def on_disconnected(self, device_addr):
"""
Gadget disconnected from the paired Echo device.
:param device_addr: the address of the device we disconnected from
"""
self.leds.set_color("LEFT", "BLACK")
self.leds.set_color("RIGHT", "BLACK")
print("{} disconnected from Echo device".format(self.friendly_name))
def on_custom_mindstorms_gadget_control(self, directive):
"""
Handles the Custom.Mindstorms.Gadget control directive.
:param directive: the custom directive with the matching namespace and name
"""
try:
payload = json.loads(directive.payload.decode("utf-8"))
print("Control payload: {}".format(payload))
control_type = payload["type"]
if control_type == "dance":
if 'appendage' in payload:
self._dance(payload['appendage'])
else:
self._hokey_pokey()
except KeyError:
print("Missing expected parameters: {}".format(directive))
def _shake(self):
# Perform Shuffle posture
self.tank.on_for_seconds(SpeedPercent(80), SpeedPercent(-80), 0.2)
time.sleep(0.3)
self.tank.on_for_seconds(SpeedPercent(-40), SpeedPercent(40), 0.2)
def _dance(self, appendage, is_blocking=False):
motor = self.tank
direction = 0
speed = 50
rotations = 0.4
duration = 1
if appendage == 'left_arm':
motor = self.left_arm
direction = 1
elif appendage == 'right_arm':
motor = self.right_arm
direction = -1
elif appendage == 'left_leg':
motor = self.left_leg
direction = -1
elif appendage == 'right_leg':
motor = self.right_leg
direction = 1
if appendage == 'whole_body':
speed = 50
# forward
self.tank.on_for_seconds(SpeedPercent(speed), SpeedPercent(speed), duration, block=is_blocking)
# back
self.tank.on_for_seconds(SpeedPercent(-speed), SpeedPercent(-speed), 2 * duration, block=is_blocking)
# forward
self.tank.on_for_seconds(SpeedPercent(speed), SpeedPercent(speed), 2 * duration, block=is_blocking)
# shake
self.tank.on_for_seconds(SpeedPercent(-100), SpeedPercent(100), 0.2, block=is_blocking)
self.tank.on_for_seconds(SpeedPercent(100), SpeedPercent(-100), 0.2, block=is_blocking)
self.tank.on_for_seconds(SpeedPercent(-100), SpeedPercent(100), 0.2, block=is_blocking)
self.tank.on_for_seconds(SpeedPercent(100), SpeedPercent(-100), 0.2, block=is_blocking)
# back
self.tank.on_for_seconds(SpeedPercent(-speed), SpeedPercent(-speed), duration, block=is_blocking)
elif appendage != '':
# in
motor.on_for_rotations(SpeedPercent(speed), 1 * rotations * direction)
# out
motor.on_for_rotations(SpeedPercent(speed), -2 * rotations * direction)
# in
motor.on_for_rotations(SpeedPercent(speed), 2 * rotations * direction)
# shake
motor.on_for_rotations(SpeedPercent(100), -0.1 * direction)
motor.on_for_rotations(SpeedPercent(100), 0.1 * direction)
motor.on_for_rotations(SpeedPercent(100), -0.1 * direction)
motor.on_for_rotations(SpeedPercent(100), 0.1 * direction)
# out
motor.on_for_rotations(SpeedPercent(speed), -1 * rotations * direction)
self._hokey_pokey()
def _hokey_pokey(self):
speed = 50
rotations = 0.4
mini_rotations = 0.1
turn_around_speed = 50
turn_around_duration = 5.25
# arms up
#self.arms(SpeedPercent(speed), SpeedPercent(-1 * speed), rotations)
self.left_arm.on_for_rotations(SpeedPercent(speed), rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(-speed), rotations, block=False)
# turn around
self.tank.on_for_seconds(SpeedPercent(0), SpeedPercent(turn_around_speed), turn_around_duration)
# boogie
self.left_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
self.left_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
self.left_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
self.left_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
# arms down
# self.arms(SpeedPercent(speed), SpeedPercent(-1 * speed), -1 * rotations)
self.left_arm.on_for_rotations(SpeedPercent(-speed), rotations, block=False)
self.right_arm.on_for_rotations(SpeedPercent(speed), rotations, block=False)
def _send_event(self, name: EventName, payload):
"""
Sends a custom event to trigger a sentry action.
:param name: the name of the custom event
:param payload: the sentry JSON payload
"""
self.send_custom_event('Custom.Mindstorms.Gadget', name.value, payload)
if __name__ == '__main__':
# Startup sequence
gadget = MindstormsGadget()
gadget.sound.play_song((('C4', 'e'), ('D4', 'e'), ('E5', 'q')))
gadget.leds.set_color("LEFT", "GREEN")
gadget.leds.set_color("RIGHT", "GREEN")
# Gadget main entry point
gadget.main()
# Shutdown sequence
gadget.sound.play_song((('E5', 'e'), ('C4', 'e')))
gadget.leds.set_color("LEFT", "BLACK")
gadget.leds.set_color("RIGHT", "BLACK")
{
"interactionModel": {
"languageModel": {
"invocationName": "child dance party",
"intents": [
{
"name": "AMAZON.CancelIntent",
"samples": []
},
{
"name": "AMAZON.HelpIntent",
"samples": []
},
{
"name": "AMAZON.StopIntent",
"samples": []
},
{
"name": "AMAZON.NavigateHomeIntent",
"samples": []
},
{
"name": "DanceIntent",
"slots": [
{
"name": "appendage",
"type": "AppendageType"
}
],
"samples": [
"do the hokey pokey",
"put your {appendage} in",
"put your {appendage} out",
"use your {appendage}",
"{appendage}"
]
}
],
"types": [
{
"name": "AppendageType",
"values": [
{
"id": "left_arm",
"name": {
"value": "left arm",
"synonyms": [
]
}
},
{
"id": "right_arm",
"name": {
"value": "right arm",
"synonyms": [
]
}
},
{
"id": "left_leg",
"name": {
"value": "left leg",
"synonyms": [
]
}
},
{
"id": "right_leg",
"name": {
"value": "right leg",
"synonyms": [
]
}
},
{
"id": "whole_body",
"name": {
"value": "whole body",
"synonyms": [
]
}
}
]
}
]
}
}
}
Comments