Franklin
Created December 31, 2019 © Apache-2.0

The Child Dance Party

Join The Child as he has a dance party. His favorite dance is the Hokey Pokey.

IntermediateShowcase (no instructions)4 hours7
The Child Dance Party

Things used in this project

Story

Read more

Schematics

None

no schematics per se. Just an EV3, plus two medium motors and two large motors.

Code

Skill Code

Python
This is the code you use for the Alexa Skill
# The Child Dance Party
# Copyright Franklin Lobb 2019

import logging.handlers
import requests
import uuid

from ask_sdk_core.skill_builder import SkillBuilder
from ask_sdk_core.utils import is_request_type, is_intent_name, get_slot_value, get_slot
from ask_sdk_core.handler_input import HandlerInput
from ask_sdk_core.serialize import DefaultSerializer

from ask_sdk_model import IntentRequest
from ask_sdk_model.ui import PlayBehavior
from ask_sdk_model.slot import Slot

from ask_sdk_model.interfaces.custom_interface_controller import (
    StartEventHandlerDirective, EventFilter, Expiration, FilterMatchAction,
    StopEventHandlerDirective,
    SendDirectiveDirective,
    Header,
    Endpoint,
    EventsReceivedRequest,
    ExpiredRequest
)

logger = logging.getLogger()
logger.setLevel(logging.INFO)
serializer = DefaultSerializer()
skill_builder = SkillBuilder()

# The namespace of the custom directive to be sent by this skill
NAMESPACE = "Custom.Mindstorms.Gadget"

# The name of the custom directive to be sent this skill
NAME_CONTROL = "control"

# The audio tag to include background music
BG_MUSIC = '<audio src="soundbank://soundlibrary/ui/gameshow/amzn_ui_sfx_gameshow_waiting_loop_30s_01"></audio>'

@skill_builder.request_handler(can_handle_func=is_request_type("LaunchRequest"))
def launch_request_handler(handler_input: HandlerInput):
    logger.info("== Launch Intent ==")

    response_builder = handler_input.response_builder

    system = handler_input.request_envelope.context.system
    api_access_token = system.api_access_token
    api_endpoint = system.api_endpoint

    # Get connected gadget endpoint ID.
    endpoints = get_connected_endpoints(api_endpoint, api_access_token)
    logger.debug("Checking endpoint..")
    if not endpoints:
        logger.debug("No connected gadget endpoints available.")
        return (response_builder
                .speak("I couldn't find an EV3 Brick connected to this Echo device. Please check to make sure your EV3 Brick is connected, and try again.")
                .set_should_end_session(True)
                .response)

    endpoint_id = endpoints[0].get("endpointId", "")

    # Store endpoint ID for using it to send custom directives later.
    logger.debug("Received endpoints. Storing Endpoint Id: %s", endpoint_id)
    session_attr = handler_input.attributes_manager.session_attributes
    session_attr["endpoint_id"] = endpoint_id

    # Set skill duration to 5 minutes (ten 30-seconds interval)
    session_attr["duration"] = 10

    # Set the token to track the event handler
    token = handler_input.request_envelope.request.request_id
    session_attr["token"] = token

    speak_output = "Welcome, let\'s get this dance party started!"

    return (response_builder
            .speak(speak_output + BG_MUSIC)
            .add_directive(build_start_event_handler_directive(token, 60000, NAMESPACE, NAME_CONTROL, 'SEND', {}))
            .response)

def get_hokey_pokey(appendage):
    appendage_info = ''
    if appendage != None:
        appendage_info = ('<break time="1s"/> put your {} in, <break time="1s"/> put your {} out, <break time="1s"/> put your {} in and shake it all about.'
                ).format(appendage, appendage, appendage)
    return (
        appendage_info + ' do the hokey pokey and turn yourself around. that\'s what it\'s all about! <break time="3s"/>'
        + ' what should we do next?')

def get_id_from_slot(slot: Slot):
    """ returns the slot's id if there was a match, else the spoken value
    """
    if slot is None:

        return ""
    try:
        # grab the first id from the first match (assumes only static entities)
        return slot.resolutions.resolutions_per_authority[0].values[0].value.id
    except Exception as e:
        logger.error('Failed to get id: '+ str(e))
        # if it doesn't work, then there is no id available, so just return the slot's value
        try:
            return slot.value
        except:
            # if for some reason the slot is present, but there is no value
            return ""

@skill_builder.request_handler(can_handle_func=is_intent_name("DanceIntent"))
def dance_intent_handler(handler_input: HandlerInput):
    # Construct and send a custom directive to the connected gadget with
    # data from the MoveIntent request.
    logger.info("DanceIntent received.")

    appendage = get_slot(handler_input, "appendage" )
    appendage_id = get_id_from_slot(appendage)
    appendage_name = appendage.value

    # Get data from session attribute
    session_attr = handler_input.attributes_manager.session_attributes

    endpoint_id = session_attr.get("endpoint_id", [])

    # Construct the directive with the payload containing the move parameters
    payload = {
        "type": "dance",
        "appendage": appendage_id
    }
    directive = build_send_directive(NAMESPACE, NAME_CONTROL, endpoint_id, payload)

    speak_output = "Whoa! Stopping here!" if (appendage_id == "stop") else (
        get_hokey_pokey(appendage_name)
    )

    return (handler_input.response_builder 
            .speak(speak_output)
            .add_directive(directive)
            .set_should_end_session(False)
            .response)

def has_valid_token(handler_input):
    if not is_request_type('CustomInterfaceController.EventsReceived')(handler_input):
        return False

    session_attr = handler_input.attributes_manager.session_attributes
    request = handler_input.request_envelope.request

    # Validate event token
    if session_attr.get("token", None) != request.token:
        logger.info("Event token doesn't match. Ignoring this event")
        return False

    return True

def has_valid_endpoint(handler_input):
    if not is_request_type('CustomInterfaceController.EventsReceived')(handler_input):
        return False

    session_attr = handler_input.attributes_manager.session_attributes
    request = handler_input.request_envelope.request
    custom_event = request.events[0]

    # Validate endpoint
    request_endpoint = custom_event.endpoint.endpoint_id
    if request_endpoint != session_attr.get("endpoint_id", None):
        logger.info("Event endpoint id doesn't match. Ignoring this event")
        return False

    return True

@skill_builder.request_handler(can_handle_func=lambda handler_input:
    is_request_type("CustomInterfaceController.EventsReceived") and
    has_valid_token(handler_input) and 
    has_valid_endpoint(handler_input)
)
def events_received_request_handler(handler_input: HandlerInput):
    logger.info("== Received Custom Event ==")

    custom_event = handler_input.request_envelope.request.events[0]
    payload = custom_event.payload
    name = custom_event.header.name

    speak_output = ""
    if name == 'Speech':
        speak_output = payload.get("speechOut", "")
    else:
        speak_output = "Event not recognized. Awaiting new command."

    return (handler_input.response_builder
        .speak(speak_output + BG_MUSIC, "REPLACE_ALL")
        .response)

@skill_builder.request_handler(can_handle_func=is_request_type("CustomInterfaceController.Expired"))
def custom_interface_expiration_handler(handler_input):
    logger.info("== Custom Event Expiration Input ==")

    session_attr = handler_input.attributes_manager.session_attributes
    
    # Set the token to track the event handler
    token = handler_input.request_envelope.request.request_id
    session_attr["token"] = token

    duration = session_attr.get("duration", 0)
    if duration > 0:
        session_attr["duration"] = duration - 1 
        # extends skill session
        timeout = 60000
        directive = build_start_event_handler_directive(token, timeout, NAMESPACE, NAME_CONTROL, 'SEND', {})
        return (handler_input.response_builder
            .add_directive(directive)
            .response
        )
    else:
        # End skill session
        return (handler_input.response_builder
            .speak("Skill duration expired. Goodbye.")
            .set_should_end_session(True)
            .response)

@skill_builder.request_handler(can_handle_func=lambda handler_input:
                               is_intent_name("AMAZON.CancelIntent")(handler_input) or
                               is_intent_name("AMAZON.StopIntent")(handler_input))
def stop_and_cancel_intent_handler(handler_input):
    logger.info("Received a Stop or a Cancel Intent..")
    session_attr = handler_input.attributes_manager.session_attributes
    response_builder = handler_input.response_builder

    # When the user stops the skill, stop the EventHandler
    if 'token' in session_attr.keys():
        logger.debug("Active session detected, sending stop EventHandlerDirective.")
        directive = build_stop_event_handler_directive(session_attr["token"])
        response_builder.add_directive(directive)

    return (response_builder
            .speak("Goodbye!")
            .set_should_end_session(True)
            .response)

@skill_builder.request_handler(can_handle_func=is_request_type("SessionEndedRequest"))
def session_ended_request_handler(handler_input):
    logger.info("Session ended with reason: " +
                handler_input.request_envelope.request.reason.to_str())
    return handler_input.response_builder.response

@skill_builder.exception_handler(can_handle_func=lambda i, e: True)
def error_handler(handler_input, exception):
    logger.info("==Error==")
    logger.error(exception, exc_info=True)
    return (handler_input.response_builder
            .speak("I'm sorry, something went wrong!").response)

@skill_builder.global_request_interceptor()
def log_request(handler_input):
    # Log the request for debugging purposes.
    logger.info("==Request==\r" +
                str(serializer.serialize(handler_input.request_envelope)))

@skill_builder.global_response_interceptor()
def log_response(handler_input, response):
    # Log the response for debugging purposes.
    logger.info("==Response==\r" + str(serializer.serialize(response)))
    logger.info("==Session Attributes==\r" +
                str(serializer.serialize(handler_input.attributes_manager.session_attributes)))

def get_connected_endpoints(api_endpoint, api_access_token):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer {}'.format(api_access_token)
    }

    api_url = api_endpoint + "/v1/endpoints"
    endpoints_response = requests.get(api_url, headers=headers)

    if endpoints_response.status_code == requests.codes.get("ok", ""):
        return endpoints_response.json()["endpoints"]

def build_send_directive(namespace, name, endpoint_id, payload):
    return SendDirectiveDirective(
        header=Header(
            name=name,
            namespace=namespace
        ),
        endpoint=Endpoint(
            endpoint_id=endpoint_id
        ),
        payload=payload
    )

def build_start_event_handler_directive(token, duration_ms, namespace,
                                        name, filter_match_action, expiration_payload):
    return StartEventHandlerDirective(
        token=token,
        # event_filter=EventFilter(
        #     filter_expression={
        #         'and': [
        #             {'==': [{'var': 'header.namespace'}, namespace]},
        #             {'==': [{'var': 'header.name'}, name]}
        #         ]
        #     },
        #     filter_match_action=filter_match_action
        # ),
        expiration=Expiration(
            duration_in_milliseconds=duration_ms,
            expiration_payload=expiration_payload))

def build_stop_event_handler_directive(token):
    return StopEventHandlerDirective(token=token)

lambda_handler = skill_builder.lambda_handler()

EV3 Code

Python
This is the code you run on the EV3 device.
# Copyright 2019 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
# 
# You may not use this file except in compliance with the terms and conditions 
# set forth in the accompanying LICENSE.TXT file.
#
# THESE MATERIALS ARE PROVIDED ON AN "AS IS" BASIS. AMAZON SPECIFICALLY DISCLAIMS, WITH 
# RESPECT TO THESE MATERIALS, ALL WARRANTIES, EXPRESS, IMPLIED, OR STATUTORY, INCLUDING 
# THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.

import time
import logging
import json
import random
import threading
from enum import Enum

from agt import AlexaGadget

from ev3dev2.led import Leds
from ev3dev2.sound import Sound
from ev3dev2.motor import OUTPUT_A, OUTPUT_B, OUTPUT_C, OUTPUT_D, MoveTank, MotorSet, SpeedPercent, MediumMotor, LargeMotor
from ev3dev2.sensor.lego import InfraredSensor

# Set the logging level to INFO to see messages from AlexaGadget
logging.basicConfig(level=logging.INFO)

class EventName(Enum):
    """
    The list of custom event name sent from this gadget
    """
    SPEECH = "Speech"


class MindstormsGadget(AlexaGadget):
    """
    A Mindstorms gadget that can perform bi-directional interaction with an Alexa skill.
    """

    def __init__(self):
        """
        Performs Alexa Gadget initialization routines and ev3dev resource allocation.
        """
        super().__init__()

        # Connect two large motors on output ports B and C
        self.left_leg = LargeMotor(OUTPUT_C)
        self.right_leg = LargeMotor(OUTPUT_B)
        self.left_arm = MediumMotor(OUTPUT_D)
        self.right_arm = MediumMotor(OUTPUT_A)
        #self.arms = MotorSet(OUTPUT_A, OUTPUT_D)
        self.tank = MoveTank(OUTPUT_B, OUTPUT_C)
        self.sound = Sound()
        self.leds = Leds()

    def on_connected(self, device_addr):
        """
        Gadget connected to the paired Echo device.
        :param device_addr: the address of the device we connected to
        """
        self.leds.set_color("LEFT", "GREEN")
        self.leds.set_color("RIGHT", "GREEN")
        print("{} connected to Echo device".format(self.friendly_name))


    def on_disconnected(self, device_addr):
        """
        Gadget disconnected from the paired Echo device.
        :param device_addr: the address of the device we disconnected from
        """
        self.leds.set_color("LEFT", "BLACK")
        self.leds.set_color("RIGHT", "BLACK")
        print("{} disconnected from Echo device".format(self.friendly_name))

    def on_custom_mindstorms_gadget_control(self, directive):
        """
        Handles the Custom.Mindstorms.Gadget control directive.
        :param directive: the custom directive with the matching namespace and name
        """
        try:
            payload = json.loads(directive.payload.decode("utf-8"))
            print("Control payload: {}".format(payload))
            control_type = payload["type"]
            if control_type == "dance":
                if 'appendage' in payload:
                    self._dance(payload['appendage'])
                else:
                    self._hokey_pokey()

        except KeyError:
            print("Missing expected parameters: {}".format(directive))

    def _shake(self):
        # Perform Shuffle posture
        self.tank.on_for_seconds(SpeedPercent(80), SpeedPercent(-80), 0.2)
        time.sleep(0.3)
        self.tank.on_for_seconds(SpeedPercent(-40), SpeedPercent(40), 0.2)

    def _dance(self, appendage, is_blocking=False):
        motor = self.tank
        direction = 0
        speed = 50
        rotations = 0.4
        duration = 1

        if appendage == 'left_arm':
            motor = self.left_arm
            direction = 1
        elif appendage == 'right_arm':
            motor = self.right_arm
            direction = -1
        elif appendage == 'left_leg':
            motor = self.left_leg
            direction = -1
        elif appendage == 'right_leg':
            motor = self.right_leg
            direction = 1
        
        if appendage == 'whole_body':
            speed = 50
            # forward
            self.tank.on_for_seconds(SpeedPercent(speed), SpeedPercent(speed), duration, block=is_blocking)
            # back
            self.tank.on_for_seconds(SpeedPercent(-speed), SpeedPercent(-speed), 2 * duration, block=is_blocking)
            # forward
            self.tank.on_for_seconds(SpeedPercent(speed), SpeedPercent(speed), 2 * duration, block=is_blocking)
            # shake
            self.tank.on_for_seconds(SpeedPercent(-100), SpeedPercent(100), 0.2, block=is_blocking)
            self.tank.on_for_seconds(SpeedPercent(100), SpeedPercent(-100), 0.2, block=is_blocking)
            self.tank.on_for_seconds(SpeedPercent(-100), SpeedPercent(100), 0.2, block=is_blocking)
            self.tank.on_for_seconds(SpeedPercent(100), SpeedPercent(-100), 0.2, block=is_blocking)
            # back
            self.tank.on_for_seconds(SpeedPercent(-speed), SpeedPercent(-speed), duration, block=is_blocking)
        elif appendage != '':
            # in
            motor.on_for_rotations(SpeedPercent(speed), 1 * rotations * direction)
            # out
            motor.on_for_rotations(SpeedPercent(speed), -2 * rotations * direction)
            # in
            motor.on_for_rotations(SpeedPercent(speed), 2 * rotations * direction)
            # shake
            motor.on_for_rotations(SpeedPercent(100), -0.1 * direction)
            motor.on_for_rotations(SpeedPercent(100), 0.1 * direction)
            motor.on_for_rotations(SpeedPercent(100), -0.1 * direction)
            motor.on_for_rotations(SpeedPercent(100), 0.1 * direction)
            # out
            motor.on_for_rotations(SpeedPercent(speed), -1 * rotations * direction)
        
        self._hokey_pokey()            

    def _hokey_pokey(self):
        speed = 50
        rotations = 0.4
        mini_rotations = 0.1
        turn_around_speed = 50
        turn_around_duration = 5.25
        # arms up
        #self.arms(SpeedPercent(speed), SpeedPercent(-1 * speed), rotations)
        self.left_arm.on_for_rotations(SpeedPercent(speed), rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(-speed), rotations, block=False)
        
        # turn around
        self.tank.on_for_seconds(SpeedPercent(0), SpeedPercent(turn_around_speed), turn_around_duration)

        # boogie
        self.left_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
        self.left_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
        self.left_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
        self.left_arm.on_for_rotations(SpeedPercent(speed), mini_rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(-speed), mini_rotations, block=False)

        # arms down
        # self.arms(SpeedPercent(speed), SpeedPercent(-1 * speed), -1 * rotations)
        self.left_arm.on_for_rotations(SpeedPercent(-speed), rotations, block=False)
        self.right_arm.on_for_rotations(SpeedPercent(speed), rotations, block=False)
        
    def _send_event(self, name: EventName, payload):
        """
        Sends a custom event to trigger a sentry action.
        :param name: the name of the custom event
        :param payload: the sentry JSON payload
        """
        self.send_custom_event('Custom.Mindstorms.Gadget', name.value, payload)

if __name__ == '__main__':
    # Startup sequence
    gadget = MindstormsGadget()
    gadget.sound.play_song((('C4', 'e'), ('D4', 'e'), ('E5', 'q')))
    gadget.leds.set_color("LEFT", "GREEN")
    gadget.leds.set_color("RIGHT", "GREEN")

    # Gadget main entry point
    gadget.main()

    # Shutdown sequence
    gadget.sound.play_song((('E5', 'e'), ('C4', 'e')))
    gadget.leds.set_color("LEFT", "BLACK")
    gadget.leds.set_color("RIGHT", "BLACK")

Alexa Skill Interaction Model

JSON
This is the interaction model used in the Alexa Skill
{
  "interactionModel": {
      "languageModel": {
          "invocationName": "child dance party",
          "intents": [
              {
                  "name": "AMAZON.CancelIntent",
                  "samples": []
              },
              {
                  "name": "AMAZON.HelpIntent",
                  "samples": []
              },
              {
                  "name": "AMAZON.StopIntent",
                  "samples": []
              },
              {
                  "name": "AMAZON.NavigateHomeIntent",
                  "samples": []
              },
              {
                  "name": "DanceIntent",
                  "slots": [
                      {
                          "name": "appendage",
                          "type": "AppendageType"
                      }
                  ],
                  "samples": [
                      "do the hokey pokey",
                      "put your {appendage} in",
                      "put your {appendage} out",
                      "use your {appendage}",
                      "{appendage}"
                  ]
              }
          ],
          "types": [
              {
                  "name": "AppendageType",
                  "values": [
                      {
                          "id": "left_arm",
                          "name": {
                              "value": "left arm",
                              "synonyms": [
                              ]
                          }
                      },
                      {
                          "id": "right_arm",
                          "name": {
                              "value": "right arm",
                              "synonyms": [
                              ]
                          }
                      },
                      {
                          "id": "left_leg",
                          "name": {
                              "value": "left leg",
                              "synonyms": [
                              ]
                          }
                      },
                      {
                          "id": "right_leg",
                          "name": {
                              "value": "right leg",
                              "synonyms": [
                              ]
                          }
                      },
                      {
                          "id": "whole_body",
                          "name": {
                              "value": "whole body",
                              "synonyms": [
                              ]
                          }
                      }
                  ]
              }
          ]
      }
  }
}

Credits

Franklin

Franklin

1 project • 1 follower
Thanks to Lego.

Comments