In this section you’ll build two device runtimes on the same mesh, with the Raspberry Pi 5 as the primary edge device:

  • a sensor that runs on the Raspberry Pi 5 and publishes temperature and humidity readings on a schedule
  • a threshold monitor that runs on your development machine, subscribes to those readings, and raises an alert when the temperature crosses a configurable threshold

This mirrors a real edge scenario, a room supervisor watching environmental sensors for out-of-bounds conditions, and exercises every Device Connect primitive across two cooperating devices.

Note

This Learning Path uses a Raspberry Pi 5 as the example primary device. You can use any device that can run the Device Connect Python packages, and you can also use your development machine as a simulated device by running the sensor and monitor in separate terminals.

Install uv

This walkthrough uses uv to manage the project and its Python dependencies. Install uv on both machines:

  • the Raspberry Pi 5 or other device that runs the sensor runtime
  • your development machine, which runs the monitor and the agent-tools client

uv will resolve a compatible Python interpreter, create a virtual environment, and install packages for you, so no manual venv or pip steps are needed.

    

        
        

curl -LsSf https://astral.sh/uv/install.sh | sh
    

    
    

        
        

powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
    

    

Alternative install methods (Homebrew, pipx, and others) are listed in the uv installation docs . Verify the install with:

    

        
        
uv --version

    

Create the project

Create the same project on the sensor device and your development machine, and add the Device Connect packages:

    

        
        
mkdir ~/device-connect-d2d
cd ~/device-connect-d2d
uv init --python 3.11
uv add device-connect-edge device-connect-agent-tools

    

The device-connect-edge package is the device runtime SDK. It is what turns a Python class into a live peer on the messaging mesh. The device-connect-agent-tools package is the client side: it lets an agent or script discover devices and invoke their RPCs. In production you might consume devices from a different client, but for this walkthrough it is the fastest way to confirm that discovery and RPC are working.

Keep both runtimes on the same local network. D2D discovery uses local network discovery, so VPNs, guest Wi-Fi isolation, or firewall rules that block local traffic can prevent devices from seeing each other. If you run everything on your development machine, open separate terminals for the sensor, monitor, and agent-tools client.

Write the sensor device

On the Raspberry Pi 5, another device, or your development machine, create a file called sensor.py:

    

        
        
import argparse
import asyncio
import random

from device_connect_edge import DeviceRuntime
from device_connect_edge.drivers import DeviceDriver, emit, periodic, rpc
from device_connect_edge.types import DeviceIdentity, DeviceStatus


class SimulatedSensor(DeviceDriver):
    device_type = "simulated_sensor"

    def __init__(self):
        super().__init__()
        self._last = {"temperature": 24.0, "humidity": 45.0}

    @property
    def identity(self) -> DeviceIdentity:
        return DeviceIdentity(
            device_type="simulated_sensor",
            manufacturer="Device Connect",
            model="SIM-TH-100",
            description="Raspberry Pi 5 temperature and humidity sensor example",
        )

    @property
    def status(self) -> DeviceStatus:
        return DeviceStatus(availability="available", location="raspberry-pi-5")

    @rpc()
    async def get_reading(self) -> dict:
        return self._last

    @emit()
    async def reading_ready(self, temperature: float, humidity: float):
        return None

    @periodic(interval=5.0)
    async def publish_reading(self):
        self._last = {
            "temperature": round(random.uniform(22.0, 30.0), 1),
            "humidity": round(random.uniform(35.0, 55.0), 1),
        }
        print(f"publishing {self._last}")
        await self.reading_ready(**self._last)


async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--device-id", required=True)
    args = parser.parse_args()

    runtime = DeviceRuntime(
        driver=SimulatedSensor(),
        device_id=args.device_id,
        allow_insecure=True,
    )
    await runtime.run()


if __name__ == "__main__":
    asyncio.run(main())

    

This driver uses three of the decorators introduced in the overview:

  • @rpc exposes get_reading as a function that other peers or agents can call
  • @emit declares reading_ready as an event the device publishes to the mesh
  • @periodic runs publish_reading every five seconds so the sensor produces fresh data on its own

The identity and status properties are what other peers see during discovery. They are how the sensor runtime advertises itself as a simulated_sensor with a known manufacturer, model, and availability. The readings are generated so you can focus on Device Connect behavior first; you can replace publish_reading() with a real sensor read later without changing the runtime structure.

If you are not using a Raspberry Pi 5, update the description and location values to match your device, for example location="development-machine" when you run the sensor locally.

Write a threshold monitor device

Now create a second device that consumes the sensor’s data. On your development machine, create a file called monitor.py:

    

        
        
import argparse
import asyncio
from collections import deque

from device_connect_edge import DeviceRuntime
from device_connect_edge.drivers import DeviceDriver, emit, on, rpc
from device_connect_edge.types import DeviceIdentity, DeviceStatus


class ThresholdMonitor(DeviceDriver):
    device_type = "threshold_monitor"

    def __init__(self, threshold: float):
        super().__init__()
        self._threshold = threshold
        self._recent_alerts: deque = deque(maxlen=10)

    @property
    def identity(self) -> DeviceIdentity:
        return DeviceIdentity(
            device_type="threshold_monitor",
            manufacturer="Device Connect",
            model="MON-T-100",
            description="Temperature threshold monitor",
        )

    @property
    def status(self) -> DeviceStatus:
        return DeviceStatus(availability="available", location="development-machine")

    @on(event_name="reading_ready")
    async def on_reading(self, device_id: str, event_name: str, payload: dict):
        temperature = payload["temperature"]
        print(f"received {temperature} C from {device_id}")
        if temperature > self._threshold:
            alert = {"device_id": device_id, "temperature": temperature}
            self._recent_alerts.append(alert)
            print(f"ALERT: {device_id} at {temperature} C (threshold {self._threshold})")
            await self.alert_raised(**alert)

    @emit()
    async def alert_raised(self, device_id: str, temperature: float):
        return None

    @rpc()
    async def get_recent_alerts(self) -> list:
        return list(self._recent_alerts)


async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--device-id", required=True)
    parser.add_argument("--threshold", type=float, default=27.0)
    args = parser.parse_args()

    runtime = DeviceRuntime(
        driver=ThresholdMonitor(threshold=args.threshold),
        device_id=args.device_id,
        allow_insecure=True,
    )
    await runtime.run()


if __name__ == "__main__":
    asyncio.run(main())

    

The monitor adds one primitive you haven’t seen yet:

  • @on(event_name="reading_ready") subscribes to reading_ready events from any device on the mesh. Whenever the sensor emits, the runtime delivers the event to on_reading as a method call with three arguments: the source device id, the event name, and a payload dict carrying the emitted fields. This is device-to-device communication without either side knowing the other’s address. You can also narrow the subscription by device_id= or device_type=; the device_type filter matches when the source device id starts with {device_type}-.
  • @emit alert_raised lets the monitor publish its own events when a threshold is crossed, so another peer or agent could subscribe to alerts in turn.
  • @rpc get_recent_alerts exposes the monitor’s recent history so an external caller can query what it has seen.

Run the sensor and the monitor

Open a terminal on the Raspberry Pi 5, another device, or your development machine in the project directory (~/device-connect-d2d). Start the sensor:

    

        
        
uv run python sensor.py --device-id sensor-001

    

On your development machine, open a terminal in the project directory (~/device-connect-d2d). Start the monitor with a threshold below the sensor’s typical temperature range so you see alerts quickly:

    

        
        
uv run python monitor.py --device-id monitor-001 --threshold 27.0

    

uv run executes the command inside the project’s managed environment, so you do not need to activate a virtual environment manually.

Within a few seconds, the monitor terminal should start printing received ... from sensor-001 lines, and an ALERT: line each time the temperature rises above 27.0 degrees C. This is the sensor invoking the monitor across the mesh through its emitted event, and you did not configure any address or pairing between them.

Query the monitor from agent tools

Open a second terminal on your development machine in the project directory and run:

    

        
        
uv run python - <<'PY'
from device_connect_agent_tools import connect, discover_devices, invoke_device

connect()

devices = discover_devices()
print(f"Found {len(devices)} device(s)")
for device in devices:
    print(f"  {device['device_id']} ({device['device_type']})")

sensor_id = next(d["device_id"] for d in devices if d["device_type"] == "simulated_sensor")
monitor_id = next(d["device_id"] for d in devices if d["device_type"] == "threshold_monitor")

print("latest reading:", invoke_device(sensor_id, "get_reading"))
print("recent alerts:", invoke_device(monitor_id, "get_recent_alerts"))
PY

    

The script discovers both devices, invokes get_reading on the sensor, and invokes get_recent_alerts on the monitor. The alert list should contain every breach the monitor has observed since it started:

    

        
        
Found 2 device(s)
  monitor-001 (threshold_monitor)
  sensor-001 (simulated_sensor)
latest reading: {'success': True, 'result': {'temperature': 27.5, 'humidity': 53.5}}
recent alerts: {'success': True, 'result': [{'device_id': 'sensor-001', 'temperature': 27.9}, {'device_id': 'sensor-001', 'temperature': 28.7}, {'device_id': 'sensor-001', 'temperature': 28.2}, {'device_id': 'sensor-001', 'temperature': 28.1}, {'device_id': 'sensor-001', 'temperature': 29.2}, {'device_id': 'sensor-001', 'temperature': 28.6}, {'device_id': 'sensor-001', 'temperature': 27.7}, {'device_id': 'sensor-001', 'temperature': 28.9}, {'device_id': 'sensor-001', 'temperature': 29.0}, {'device_id': 'sensor-001', 'temperature': 27.5}]}

    

What happened

You have exercised every Device Connect primitive across two cooperating devices:

  • Discovery: the monitor found the sensor by device_type automatically, and discover_devices from agent tools found both peers
  • RPC: agent tools called get_reading on the sensor and get_recent_alerts on the monitor
  • Events: the sensor emitted reading_ready; the monitor reacted through @on and emitted its own alert_raised
  • Status: both runtimes advertised themselves as available through the status property

The sensor and monitor did not know about each other before they started. They found each other on the local network and communicated purely through typed events and RPCs, with no broker, no registry, and no cloud service.

Outcome

You now have a working D2D deployment where an example primary device cooperates with a monitor on the same mesh: the sensor publishes data, and the monitor reacts to it and exposes its own state. This same driver pattern (a class, a handful of decorators, and a runtime) is how you would describe a real sensor, actuator, or monitor on the network.

Back
Next