Autonomous Objects

Date 2023
Team Gavin Ruedisueli, Binita Gupta, Andrew Witt
Collaborators Laboratory for Design Technologies, Harvard GSD
Platform iRobot
Languages Ros2, Python, C#
Materials 3d printed hardware, foam upper
, , , , , , ,

In the future, the home will become more dynamic. Smart furniture will respond to your needs and reconfigure your space for different use cases. Conducted under the supervision of Andrew Witt / the Harvard Graduate School of Design’s Laboratory for Design Technologies and in collaboration with Binita Gupta, this project explored the concept of autonomous furniture and reconfigurable spaces.

The design process started with simple prototypes and culminated in a choreographed performance in which four objects pirouetted around each other in ways suggestive of ballet.

The platform for this design was the iRobot Create, which out-of-the-box is capable of communicating via Ros2. We used a central computer to choreograph the simultaneous sending of commands to each robot, keeping their motions in-time. We developed a C# Grasshopper plugin for Rhino that allows a user to draw curves in the viewport, then turn these curves into a custom, self-defined, intermediate language that allowed for plain-text writing of instructions that could by parsed by a Python script and sent to the robots via Ros2.

I also presented the project to students in the Master of Design Engineering program during their fall 2023 studio, and gave an introductory Ros2 tutorial and demo. From this information, one group of students created robot to aid the blind in navigating architectural spaces.

Here is an example script, which contains the simple series of instructions written by the Grasshopper plugin and copied into this script. These commands are then interpreted by our Python scripts and sent via Ros2 to the robots:

'''
This is a sample script for a robot scene
'''

#!/usr/bin/env python3

from .sync_drive import SequenceStep
from .sync_drive import MessageDriveStraight
from .sync_drive import MessageDriveArc
from .sync_drive import MessageRotate
from .sync_drive import runscene

#moves is a list of SequenceSteps. 
#replace this with your programs
#each SequenceStep constructor takes four arguments, one per robot
#if a robot is not moving during a specific step, set it to "None"


moves = [
    SequenceStep(
        None,
        MessageDriveStraight(-0.212823082351256, 0.1),
        None,
        MessageDriveStraight(-0.216952092162623, 0.1)),
    SequenceStep(
        MessageDriveStraight(0.349108058221493, 0.1),
        None,
        MessageDriveStraight(0.348723182425761, 0.1),
        None),
    SequenceStep(
        MessageDriveStraight(-0.349108058221493, 0.1),
        None,
        MessageDriveStraight(-0.348723182425761, 0.1),
        None),
    SequenceStep(
        None,
        MessageDriveStraight(0.212823082351256, 0.1),
        None,
        MessageDriveStraight(0.216952092162623, 0.1)),
]

def main(args = None):
    #setting the booleans to true allows the script to override whatever speed settings you are copying into the program above
    #if for some reason you want to explicitly control speeds, set these to False.
    runscene(moves, True, True)

    

if __name__ == '__main__':
    main()

Which was then sent to “runscene” here:

'''
This is a sample script for a robot scene
'''

#!/usr/bin/env python3

from .sync_drive import SequenceStep
from .sync_drive import MessageDriveStraight
from .sync_drive import MessageDriveArc
from .sync_drive import MessageRotate
from .sync_drive import runscene

#moves is a list of SequenceSteps. 
#replace this with your programs
#each SequenceStep constructor takes four arguments, one per robot
#if a robot is not moving during a specific step, set it to "None"

moves = [
    SequenceStep(
        None,
        MessageDriveStraight(-0.212823082351256, 0.1),
        None,
        MessageDriveStraight(-0.216952092162623, 0.1)),
    SequenceStep(
        MessageDriveStraight(0.349108058221493, 0.1),
        None,
        MessageDriveStraight(0.348723182425761, 0.1),
        None),
    SequenceStep(
        MessageDriveStraight(-0.349108058221493, 0.1),
        None,
        MessageDriveStraight(-0.348723182425761, 0.1),
        None),
    SequenceStep(
        None,
        MessageDriveStraight(0.212823082351256, 0.1),
        None,
        MessageDriveStraight(0.216952092162623, 0.1)),
]

def main(args = None):
    #setting the booleans to true allows the script to override whatever speed settings you are copying into the program above
    #if for some reason you want to explicitly control speeds, set these to False.
    runscene(moves, True, True)

    

if __name__ == '__main__':
    main()

which was then sent to "runscene" here:

'''
This script allows control of multiple robots at once, executing actions in sequence
'''

#!/usr/bin/env python3

import rclpy
from threading import Thread
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.executors import MultiThreadedExecutor
from irobot_create_msgs.action import DriveDistance
from irobot_create_msgs.action import RotateAngle
from irobot_create_msgs.action import DriveArc
from irobot_create_msgs.action import AudioNoteSequence
from irobot_create_msgs.msg import AudioNoteVector
from irobot_create_msgs.msg import AudioNote
from builtin_interfaces.msg import Duration

from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import ParameterType
from rcl_interfaces.msg import Parameter

robotA = 'Maeve'
robotB = 'Dolores'
robotC = 'Teddy'
robotD = 'Bernard'
STANDARD_DRIVE_SPEED = 0.075
STANDARD_ROTATE_SPEED = 0.2

class RobotMessage:
    pass

class MessageDriveStraight(RobotMessage):
    def __init__(self, distanceMeters:float, maxSpeedMetersPerSecond:float):
        self._distanceMeters = distanceMeters
        self._maxSpeedMetersPerSecond = maxSpeedMetersPerSecond

class MessageRotate(RobotMessage):
    def __init__(self, angleRadians:float, maxRotationSpeedRadiansPerSecond:float):
        self._angleRadians = angleRadians
        self._maxRotationSpeedRadiansPerSecond = maxRotationSpeedRadiansPerSecond

class MessageDriveArc(RobotMessage):
    def __init__(self, arcAngleRadians:float, arcRadiusMeters:float, maxSpeedMetersPerSecond:float, isForwards:bool):
        self._arcAngleRadians = arcAngleRadians
        self._arcRadiusMeters = arcRadiusMeters
        self._maxSpeedMetersPerSecond = maxSpeedMetersPerSecond
        self._isForwards = isForwards

class SequenceStep:
    def __init__(self, msgA:RobotMessage, msgB:RobotMessage, msgC:RobotMessage, msgD:RobotMessage):
        self._msgA = msgA
        self._msgB = msgB
        self._msgC = msgC
        self._msgD = msgD

class RobotActionClient(Node):
    def __init__(self, robotNamespace:str):
        super().__init__(robotNamespace + '_robot_action_client')
        self._robot = robotNamespace
        self._driveDistanceClient = ActionClient(self, DriveDistance, robotNamespace + '/drive_distance')
        self._rotateAngleClient = ActionClient(self, RotateAngle, robotNamespace + '/rotate_angle')
        self._driveArcClient = ActionClient(self, DriveArc, robotNamespace + '/drive_arc')
        self._sendSongClient = ActionClient(self, AudioNoteSequence, robotNamespace + '/audio_note_sequence')
        self._isReady = True
        self._client = self.create_client(SetParameters, robotNamespace + '/motion_control/set_parameters')
        print (self._robot + ': initialized')

    def sendMsg(self, robotMsg:RobotMessage):
        if isinstance(robotMsg, MessageDriveStraight):
            self.sendDrive(robotMsg)
        elif isinstance(robotMsg, MessageRotate):
            self.sendRotate(robotMsg)
        elif isinstance(robotMsg, MessageDriveArc):
            self.sendArc(robotMsg)

    def sendSong(self, frequencyHz, durationNanoSec):
        print (self._robot + ": sending song")
        self._isReady = False
        goalMsg = AudioNoteSequence.Goal()
        vector = AudioNoteVector()
        note = AudioNote()
        note.frequency = frequencyHz
        duration = Duration()
        duration.sec = 0
        duration.nanosec = durationNanoSec
        note.max_runtime = duration
        vector.notes = [note]
        goalMsg.note_sequence = vector
        print (self._robot + ': waiting for audio not sequence server to become available...')
        self._sendSongClient.wait_for_server()
        self._send_goal_future = self._sendSongClient.send_goal_async(goalMsg)
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def sendDrive(self, driveStraightMessage:MessageDriveStraight):
        print (self._robot + ": sending drive info")
        self._isReady = False
        goalMsg = DriveDistance.Goal()
        goalMsg.distance = driveStraightMessage._distanceMeters
        goalMsg._max_translation_speed = driveStraightMessage._maxSpeedMetersPerSecond
        print (self._robot + ': waiting for drive distance server to become available...')
        self._driveDistanceClient.wait_for_server()
        self._send_goal_future = self._driveDistanceClient.send_goal_async(goalMsg)
        self._send_goal_future.add_done_callback(self.goal_response_callback)
        
    def sendRotate(self, rotateMessage:MessageRotate):
        print (self._robot + ": sending rotation info")
        self._isReady = False
        goalMsg = RotateAngle.Goal()
        goalMsg.max_rotation_speed = rotateMessage._maxRotationSpeedRadiansPerSecond
        goalMsg.angle = rotateMessage._angleRadians
        print (self._robot + ': waiting for rotate angle server to become available...')
        self._rotateAngleClient.wait_for_server()
        self._send_goal_future = self._rotateAngleClient.send_goal_async(goalMsg)
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def sendArc(self, arcMessage:MessageDriveArc):
        print (self._robot + ": sending arc info")
        self._isReady = False
        goalMsg = DriveArc.Goal()
        goalMsg.angle = arcMessage._arcAngleRadians
        goalMsg.radius = arcMessage._arcRadiusMeters
        goalMsg.max_translation_speed = arcMessage._maxSpeedMetersPerSecond
        goalMsg.translate_direction = 1 if arcMessage._isForwards else -1
        print (self._robot + ': waiting for drive arc server to become available...')
        self._driveArcClient.wait_for_server()
        self._send_goal_future = self._driveArcClient.send_goal_async(goalMsg)
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        print(self._robot + ': checking if goal was accepted or rejected...')
        goalHandle = future.result()
        if not goalHandle.accepted:
            self.get_logger().info('goal rejected')
            return
        self.get_logger().info(self._robot + ': goal accepted')
        self._get_result_future = goalHandle.get_result_async() #log the result
        self._get_result_future.add_done_callback(self.get_result_callback) #new callback for when we have result logged

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info(self._robot + ': Result: {0}'.format(result))
        self._isReady = True

    def set_params(self, mode:str):
        request = SetParameters.Request()
        param = Parameter()
        param.name = "safety_override"
        param.value.type = ParameterType.PARAMETER_STRING
        param.value.string_value = mode
        request.parameters.append(param)

        print('waiting for service to override safeties')

        self._client.wait_for_service()
        print('request sent')
        self._future = self._client.call_async(request)

    def shutdown(self):
        self.set_params('none')
        print('reset safety overrides')

def runscene(moves, useStandardDriveSpeed:bool, useStandardRotateSpeed: bool):
    if useStandardDriveSpeed or useStandardRotateSpeed:
        for step in moves:
            msgA = step._msgA
            msgB = step._msgB
            msgC = step._msgC
            msgD = step._msgD
            allMsgs = [msgA, msgB, msgC, msgD]
            for msg in allMsgs:
                if isinstance(msg, MessageDriveStraight) and useStandardDriveSpeed:
                    msg._maxSpeedMetersPerSecond = STANDARD_DRIVE_SPEED
                elif isinstance(msg, MessageRotate) and useStandardRotateSpeed:
                    msg._maxRotationSpeedRadiansPerSecond = STANDARD_ROTATE_SPEED
                elif isinstance(msg, MessageDriveArc) and useStandardDriveSpeed:
                    msg._maxSpeedMetersPerSecond = STANDARD_DRIVE_SPEED

    rclpy.init()

    a = RobotActionClient(robotA)
    b = RobotActionClient(robotB)
    c = RobotActionClient(robotC)
    d = RobotActionClient(robotD)

    executor = MultiThreadedExecutor()
    executor.add_node(a)
    executor.add_node(b)
    executor.add_node(c)
    executor.add_node(d)

    executorThread = Thread(target=executor.spin, daemon=True)
    executorThread.start()
    step = 0
    maxStep = len(moves)
    print("MOVES",moves)
    print("MAX STEP:",maxStep)

    #safety overrides so we can drive backwards without them stopping
    a.set_params('full')
    b.set_params('full')
    c.set_params('full')
    d.set_params('full')

    #sort of "wake up" the robots. we observed that the first command can response can be delayed causing synchronization issues
    a.sendSong(50, 1000000000)
    b.sendSong(50, 1000000000)
    c.sendSong(50, 1000000000)
    d.sendSong(50, 1000000000)

    startStep = True
    try:
        while (rclpy.ok):
            #send song at start of each step.
            #bugs associated with sending multiple rotates in a row.
            #this hack gets around this
            if(a._isReady and b._isReady and c._isReady and d._isReady and startStep):
                a.sendSong(50, 100000000)
                b.sendSong(50, 100000000)
                c.sendSong(50, 100000000)
                d.sendSong(50, 100000000)
                startStep = False

            elif(a._isReady and b._isReady and c._isReady and d._isReady and not startStep):
                a.sendMsg(moves[step]._msgA)
                b.sendMsg(moves[step]._msgB)
                c.sendMsg(moves[step]._msgC)
                d.sendMsg(moves[step]._msgD)
                step = step + 1
                startStep = True
                if step >= maxStep:
                    break

    except KeyboardInterrupt:
        print('Caught keyboard interrupt')
        pass

    print('Done')
    a.shutdown
    b.shutdown
    c.shutdown
    d.shutdown

    rclpy.shutdown()
    print('rclpy shutdown')
    executorThread.join()
    print('thread joined')

For more information see our Repos: