This section demonstrates Device Connect’s device-to-device discovery. There are two ways to walk through this setup. Optionally, you can connect an external device.
For a conceptual implementation, follow the steps using two terminal windows on your machine with the virtual environment set up.
If you have access to an external device, you can use it with this setup as well. You’ll need:
You’ll need two terminal windows open at the same time: one to keep the simulated robot running, and one to invoke it from the agent side. Both terminals need the virtual environment activated.
| Machine | Terminal | Purpose |
|---|---|---|
| Host or Target | 1 | Simulated robot - keep running throughout |
| Host | 2 | Agent tool invocations |
Make sure you are in the repository directory and that your virtual environment is activated:
cd ~/strands-device-connect/robots
source .venv/bin/activate
In terminal 1, run the following command to create and start the simulated SO-100 robot arm:
python <<'PY'
import logging
logging.basicConfig(level=logging.INFO)
from strands_robots import Robot
r = Robot('so100', peer_id='so100-abc123')
r.run()
PY
Two things happen when this script runs.
Robot('so100') calls the factory function, which checks for USB servo hardware and, finding none, creates a MuJoCo Simulation instance. On the first run it downloads the SO-100 MJCF physics model from Hugging Face — this can take up to 20 minutes depending on your connection. Subsequent runs use the local cache and start immediately.
r.run() calls init_device_connect_sync(), which does the following:
SimulationDeviceDriver — a Device Connect DeviceDriver adapter that wraps the simulation and maps its methods to structured RPCs.DeviceRuntime with the Zenoh D2D backend. No broker or environment variables are needed; devices discover each other on the LAN via Zenoh multicast scouting.device-connect.default.<PEER_ID>.cmd.execute, getStatus, getFeatures, step, reset, and stop.stateUpdate and observationUpdate events to any listener on the mesh.
Call flow inside r.run(): device registration and event publishing over Zenoh
The output is similar to:
device_connect_sdk.device.so100-abc123 - INFO - Using ZENOH messaging backend
device_connect_sdk.device.so100-abc123 - INFO - Connected to ZENOH broker: []
device_connect_sdk.device.so100-abc123 - INFO - Driver connected: strands_sim
device_connect_sdk.device.so100-abc123 - INFO - Subscribed to commands on device-connect.default.so100-abc123.cmd
🤖 so100-abc123 is online. Ctrl+C to stop.
Leave this process running. The simulated robot is only discoverable as long as this process is alive.
Leave terminal 1 running with the robot process. Open a new terminal window, navigate to the repository, and activate the virtual environment:
cd ~/strands-device-connect/robots
source .venv/bin/activate
Run all remaining commands in this section from this second terminal.
The robot_mesh tool wraps the same discovery and invocation primitives as a Strands agent tool. You can call it directly from a Python script or attach it to an LLM agent; the API is identical either way.
Start by confirming which robots are currently visible on the mesh:
python <<'PY'
from strands_robots.tools.robot_mesh import robot_mesh
print(robot_mesh(action='peers'))
PY
When this runs, robot_mesh calls _ensure_connected(), which sets MESSAGING_BACKEND=zenoh (if the environment variable is not already set) and opens the agent-side Zenoh connection via device_connect_agent_tools. It then calls conn.list_devices(), which queries the Zenoh network for all registered Device Connect devices. Each device registers its device_id, device_type, availability from its DeviceStatus, and the list of RPC functions it exposes. The tool formats this into the human-readable summary below.
The output is similar to:
Discovered 1 device(s):
[robot] so100-abc123 - idle
Functions: execute, getFeatures, getStatus, reset, step, stop
The peer ID (for example so100-abc123) is assigned at startup and changes each run. Note the actual ID shown in your terminal - you’ll need it in the next step.
Send a task to one of the discovered robots. Replace so100-abc123 with the peer ID shown in your peers output:
python <<'PY'
from strands_robots.tools.robot_mesh import robot_mesh
print(robot_mesh(
action='tell',
target='so100-abc123',
instruction='pick up the cube',
policy_provider='mock',
))
PY
Under the hood, robot_mesh calls conn.invoke(target, "execute", params), which serializes the arguments and routes them over Zenoh to the device’s command topic: device-connect.default.<PEER_ID>.cmd. The SimulationDeviceDriver.execute() RPC handler on the robot side receives the call, resolves the robot name inside the simulation world, and calls sim.start_policy(instruction=..., policy_provider='mock', ...). With the mock policy provider, the handler returns immediately with a success result without running real motion — the call is a connectivity and RPC round-trip test. The stateUpdate events you’ll see in terminal 1 are published by the separate 10Hz background loop that was started by r.run().
You will see the following output:
-> so100-abc123: pick up the cube
{"status": "success", "content": [...]}
The robot also logs event updates as it processes the task. If you switch back to terminal 1, it logs the execution of the task, similar to:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*** EVENT so100-abc123::stateUpdate [111aaabb]
payload: sim_time=4.34, step_count=2070, running_policies={'so100': {'steps': 814, 'instruction': 'pick up the cube'}}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The emergency stop broadcasts a halt command to every device on the mesh simultaneously. This is useful when you want to stop all robots without knowing their individual peer IDs:
python <<'PY'
from strands_robots.tools.robot_mesh import robot_mesh
print(robot_mesh(action='emergency_stop'))
PY
This doesn’t send a single broadcast message. Instead, robot_mesh first calls conn.list_devices() to enumerate every device currently on the mesh, then calls conn.invoke(device_id, "stop", timeout=3.0) on each one in sequence. On the device side, SimulationDeviceDriver.stop() sets policy_running = False for every robot in the simulation world and returns immediately. Failures per device are swallowed so that a single unresponsive device doesn’t block the rest from stopping.
The output is similar to:
E-STOP: 1/1 devices stopped
The device-connect-agent-tools package gives you direct programmatic access to the mesh, without involving an LLM. This is useful for testing, scripting, or validating the stack before wiring it up to an agent. Open terminal 2, activate the virtual environment, then run:
python <<'PY'
from device_connect_agent_tools import connect, discover_devices, invoke_device
connect()
devices = discover_devices(device_type='')
print(f'Found {len(devices)} robot(s):')
for d in devices:
print(f' {d["device_id"]}')
if devices:
result = invoke_device(
devices[0]['device_id'],
'execute',
{'instruction': 'pick up the cube', 'policy_provider': 'mock'},
)
print(f'Execute result: {result}')
status = invoke_device(devices[0]['device_id'], 'getStatus')
print(f'Status: {status}')
PY
When an agent calls execute(instruction="pick up the cup", policy_provider="groot"), Device Connect handles the RPC delivery, and the policy handles the actual arm movement.
discover_devices(device_type='') returns all devices on the mesh regardless of type. If you pass device_type='strands_robot' you can filter to only Robot() instances. invoke_device sends an RPC to the named device; here policy_provider='mock' tells the robot to accept the task without executing real motion, which is appropriate for this connectivity test.
The output is similar to:
Found 1 robot(s):
so100-abc123 - idle
Execute result: {'success': True, 'result': {'status': 'success', 'content': [...]}}
Status: {'success': True, 'result': {...}} # full sim state dict
In this section you’ve:
device-connect-agent-tools to discover the robot and invoke an RPC call against it.robot_mesh Strands tool to list peers, send an instruction, and trigger an emergency stop.This showcases the ease of setting up a mesh on a local network. In the next section, you can extend the configuration to a Docker-based approach, opening up a new category of possibilities with agent integration.