In the future, the home will become more dynamic. Smart furniture will respond to your needs and reconfigure your space for different use cases. Conducted under the supervision of Andrew Witt / the Harvard Graduate School of Design’s Laboratory for Design Technologies and in collaboration with Binita Gupta, this project explored the concept of autonomous furniture and reconfigurable spaces.
The design process started with simple prototypes and culminated in a choreographed performance in which four objects pirouetted around each other in ways suggestive of ballet.
The platform for this design was the iRobot Create, which out-of-the-box is capable of communicating via Ros2. We used a central computer to choreograph the simultaneous sending of commands to each robot, keeping their motions in-time. We developed a C# Grasshopper plugin for Rhino that allows a user to draw curves in the viewport, then turn these curves into a custom, self-defined, intermediate language that allowed for plain-text writing of instructions that could by parsed by a Python script and sent to the robots via Ros2.
I also presented the project to students in the Master of Design Engineering program during their fall 2023 studio, and gave an introductory Ros2 tutorial and demo. From this information, one group of students created robot to aid the blind in navigating architectural spaces.
Here is an example script, which contains the simple series of instructions written by the Grasshopper plugin and copied into this script. These commands are then interpreted by our Python scripts and sent via Ros2 to the robots:
'''
This is a sample script for a robot scene
'''
#!/usr/bin/env python3
from .sync_drive import SequenceStep
from .sync_drive import MessageDriveStraight
from .sync_drive import MessageDriveArc
from .sync_drive import MessageRotate
from .sync_drive import runscene
#moves is a list of SequenceSteps.
#replace this with your programs
#each SequenceStep constructor takes four arguments, one per robot
#if a robot is not moving during a specific step, set it to "None"
moves = [
SequenceStep(
None,
MessageDriveStraight(-0.212823082351256, 0.1),
None,
MessageDriveStraight(-0.216952092162623, 0.1)),
SequenceStep(
MessageDriveStraight(0.349108058221493, 0.1),
None,
MessageDriveStraight(0.348723182425761, 0.1),
None),
SequenceStep(
MessageDriveStraight(-0.349108058221493, 0.1),
None,
MessageDriveStraight(-0.348723182425761, 0.1),
None),
SequenceStep(
None,
MessageDriveStraight(0.212823082351256, 0.1),
None,
MessageDriveStraight(0.216952092162623, 0.1)),
]
def main(args = None):
#setting the booleans to true allows the script to override whatever speed settings you are copying into the program above
#if for some reason you want to explicitly control speeds, set these to False.
runscene(moves, True, True)
if __name__ == '__main__':
main()
Which was then sent to “runscene” here:
'''
This is a sample script for a robot scene
'''
#!/usr/bin/env python3
from .sync_drive import SequenceStep
from .sync_drive import MessageDriveStraight
from .sync_drive import MessageDriveArc
from .sync_drive import MessageRotate
from .sync_drive import runscene
#moves is a list of SequenceSteps.
#replace this with your programs
#each SequenceStep constructor takes four arguments, one per robot
#if a robot is not moving during a specific step, set it to "None"
moves = [
SequenceStep(
None,
MessageDriveStraight(-0.212823082351256, 0.1),
None,
MessageDriveStraight(-0.216952092162623, 0.1)),
SequenceStep(
MessageDriveStraight(0.349108058221493, 0.1),
None,
MessageDriveStraight(0.348723182425761, 0.1),
None),
SequenceStep(
MessageDriveStraight(-0.349108058221493, 0.1),
None,
MessageDriveStraight(-0.348723182425761, 0.1),
None),
SequenceStep(
None,
MessageDriveStraight(0.212823082351256, 0.1),
None,
MessageDriveStraight(0.216952092162623, 0.1)),
]
def main(args = None):
#setting the booleans to true allows the script to override whatever speed settings you are copying into the program above
#if for some reason you want to explicitly control speeds, set these to False.
runscene(moves, True, True)
if __name__ == '__main__':
main()
which was then sent to "runscene" here:
'''
This script allows control of multiple robots at once, executing actions in sequence
'''
#!/usr/bin/env python3
import rclpy
from threading import Thread
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.executors import MultiThreadedExecutor
from irobot_create_msgs.action import DriveDistance
from irobot_create_msgs.action import RotateAngle
from irobot_create_msgs.action import DriveArc
from irobot_create_msgs.action import AudioNoteSequence
from irobot_create_msgs.msg import AudioNoteVector
from irobot_create_msgs.msg import AudioNote
from builtin_interfaces.msg import Duration
from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import ParameterType
from rcl_interfaces.msg import Parameter
robotA = 'Maeve'
robotB = 'Dolores'
robotC = 'Teddy'
robotD = 'Bernard'
STANDARD_DRIVE_SPEED = 0.075
STANDARD_ROTATE_SPEED = 0.2
class RobotMessage:
pass
class MessageDriveStraight(RobotMessage):
def __init__(self, distanceMeters:float, maxSpeedMetersPerSecond:float):
self._distanceMeters = distanceMeters
self._maxSpeedMetersPerSecond = maxSpeedMetersPerSecond
class MessageRotate(RobotMessage):
def __init__(self, angleRadians:float, maxRotationSpeedRadiansPerSecond:float):
self._angleRadians = angleRadians
self._maxRotationSpeedRadiansPerSecond = maxRotationSpeedRadiansPerSecond
class MessageDriveArc(RobotMessage):
def __init__(self, arcAngleRadians:float, arcRadiusMeters:float, maxSpeedMetersPerSecond:float, isForwards:bool):
self._arcAngleRadians = arcAngleRadians
self._arcRadiusMeters = arcRadiusMeters
self._maxSpeedMetersPerSecond = maxSpeedMetersPerSecond
self._isForwards = isForwards
class SequenceStep:
def __init__(self, msgA:RobotMessage, msgB:RobotMessage, msgC:RobotMessage, msgD:RobotMessage):
self._msgA = msgA
self._msgB = msgB
self._msgC = msgC
self._msgD = msgD
class RobotActionClient(Node):
def __init__(self, robotNamespace:str):
super().__init__(robotNamespace + '_robot_action_client')
self._robot = robotNamespace
self._driveDistanceClient = ActionClient(self, DriveDistance, robotNamespace + '/drive_distance')
self._rotateAngleClient = ActionClient(self, RotateAngle, robotNamespace + '/rotate_angle')
self._driveArcClient = ActionClient(self, DriveArc, robotNamespace + '/drive_arc')
self._sendSongClient = ActionClient(self, AudioNoteSequence, robotNamespace + '/audio_note_sequence')
self._isReady = True
self._client = self.create_client(SetParameters, robotNamespace + '/motion_control/set_parameters')
print (self._robot + ': initialized')
def sendMsg(self, robotMsg:RobotMessage):
if isinstance(robotMsg, MessageDriveStraight):
self.sendDrive(robotMsg)
elif isinstance(robotMsg, MessageRotate):
self.sendRotate(robotMsg)
elif isinstance(robotMsg, MessageDriveArc):
self.sendArc(robotMsg)
def sendSong(self, frequencyHz, durationNanoSec):
print (self._robot + ": sending song")
self._isReady = False
goalMsg = AudioNoteSequence.Goal()
vector = AudioNoteVector()
note = AudioNote()
note.frequency = frequencyHz
duration = Duration()
duration.sec = 0
duration.nanosec = durationNanoSec
note.max_runtime = duration
vector.notes = [note]
goalMsg.note_sequence = vector
print (self._robot + ': waiting for audio not sequence server to become available...')
self._sendSongClient.wait_for_server()
self._send_goal_future = self._sendSongClient.send_goal_async(goalMsg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def sendDrive(self, driveStraightMessage:MessageDriveStraight):
print (self._robot + ": sending drive info")
self._isReady = False
goalMsg = DriveDistance.Goal()
goalMsg.distance = driveStraightMessage._distanceMeters
goalMsg._max_translation_speed = driveStraightMessage._maxSpeedMetersPerSecond
print (self._robot + ': waiting for drive distance server to become available...')
self._driveDistanceClient.wait_for_server()
self._send_goal_future = self._driveDistanceClient.send_goal_async(goalMsg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def sendRotate(self, rotateMessage:MessageRotate):
print (self._robot + ": sending rotation info")
self._isReady = False
goalMsg = RotateAngle.Goal()
goalMsg.max_rotation_speed = rotateMessage._maxRotationSpeedRadiansPerSecond
goalMsg.angle = rotateMessage._angleRadians
print (self._robot + ': waiting for rotate angle server to become available...')
self._rotateAngleClient.wait_for_server()
self._send_goal_future = self._rotateAngleClient.send_goal_async(goalMsg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def sendArc(self, arcMessage:MessageDriveArc):
print (self._robot + ": sending arc info")
self._isReady = False
goalMsg = DriveArc.Goal()
goalMsg.angle = arcMessage._arcAngleRadians
goalMsg.radius = arcMessage._arcRadiusMeters
goalMsg.max_translation_speed = arcMessage._maxSpeedMetersPerSecond
goalMsg.translate_direction = 1 if arcMessage._isForwards else -1
print (self._robot + ': waiting for drive arc server to become available...')
self._driveArcClient.wait_for_server()
self._send_goal_future = self._driveArcClient.send_goal_async(goalMsg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
print(self._robot + ': checking if goal was accepted or rejected...')
goalHandle = future.result()
if not goalHandle.accepted:
self.get_logger().info('goal rejected')
return
self.get_logger().info(self._robot + ': goal accepted')
self._get_result_future = goalHandle.get_result_async() #log the result
self._get_result_future.add_done_callback(self.get_result_callback) #new callback for when we have result logged
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(self._robot + ': Result: {0}'.format(result))
self._isReady = True
def set_params(self, mode:str):
request = SetParameters.Request()
param = Parameter()
param.name = "safety_override"
param.value.type = ParameterType.PARAMETER_STRING
param.value.string_value = mode
request.parameters.append(param)
print('waiting for service to override safeties')
self._client.wait_for_service()
print('request sent')
self._future = self._client.call_async(request)
def shutdown(self):
self.set_params('none')
print('reset safety overrides')
def runscene(moves, useStandardDriveSpeed:bool, useStandardRotateSpeed: bool):
if useStandardDriveSpeed or useStandardRotateSpeed:
for step in moves:
msgA = step._msgA
msgB = step._msgB
msgC = step._msgC
msgD = step._msgD
allMsgs = [msgA, msgB, msgC, msgD]
for msg in allMsgs:
if isinstance(msg, MessageDriveStraight) and useStandardDriveSpeed:
msg._maxSpeedMetersPerSecond = STANDARD_DRIVE_SPEED
elif isinstance(msg, MessageRotate) and useStandardRotateSpeed:
msg._maxRotationSpeedRadiansPerSecond = STANDARD_ROTATE_SPEED
elif isinstance(msg, MessageDriveArc) and useStandardDriveSpeed:
msg._maxSpeedMetersPerSecond = STANDARD_DRIVE_SPEED
rclpy.init()
a = RobotActionClient(robotA)
b = RobotActionClient(robotB)
c = RobotActionClient(robotC)
d = RobotActionClient(robotD)
executor = MultiThreadedExecutor()
executor.add_node(a)
executor.add_node(b)
executor.add_node(c)
executor.add_node(d)
executorThread = Thread(target=executor.spin, daemon=True)
executorThread.start()
step = 0
maxStep = len(moves)
print("MOVES",moves)
print("MAX STEP:",maxStep)
#safety overrides so we can drive backwards without them stopping
a.set_params('full')
b.set_params('full')
c.set_params('full')
d.set_params('full')
#sort of "wake up" the robots. we observed that the first command can response can be delayed causing synchronization issues
a.sendSong(50, 1000000000)
b.sendSong(50, 1000000000)
c.sendSong(50, 1000000000)
d.sendSong(50, 1000000000)
startStep = True
try:
while (rclpy.ok):
#send song at start of each step.
#bugs associated with sending multiple rotates in a row.
#this hack gets around this
if(a._isReady and b._isReady and c._isReady and d._isReady and startStep):
a.sendSong(50, 100000000)
b.sendSong(50, 100000000)
c.sendSong(50, 100000000)
d.sendSong(50, 100000000)
startStep = False
elif(a._isReady and b._isReady and c._isReady and d._isReady and not startStep):
a.sendMsg(moves[step]._msgA)
b.sendMsg(moves[step]._msgB)
c.sendMsg(moves[step]._msgC)
d.sendMsg(moves[step]._msgD)
step = step + 1
startStep = True
if step >= maxStep:
break
except KeyboardInterrupt:
print('Caught keyboard interrupt')
pass
print('Done')
a.shutdown
b.shutdown
c.shutdown
d.shutdown
rclpy.shutdown()
print('rclpy shutdown')
executorThread.join()
print('thread joined')
For more information see our Repos: