Peer Connections
As of Aug 10, 2025
Peer Connections
The client and worker state machine and transitions are defined
in {client/worker}/src/connection/rtc.rs
. Other shared code is defined
in thava-types/src/rtc.rs
.
The best place to see how peer connections
are established in a localized region is in
crates/client/src/connection/rtc.rs
:
#[tokio::test]
async fn test_negotiation() -> Result<()> {}
#[tokio::test]
async fn test_streaming() -> Result<()> {}
These tests import the PolitePeerConnection
from our worker
crate and test both the negotiation process as well as maximum
sending speeds.
The entrypoint for either client or worker peer connections is
{Impol/Pol}itePeerConnection::negotiate
, which does the following:
- Creates an
RTCPeerConnection
object (from thewebrtc-rs
crate). - Creates a data channel, sets up handlers for handling WebRTC state changes, local ICE candidate events, and the data channel open event.
- Sets up a
Ping
heartbeat with the distributor (for the worker). - Adds the connection state to an
AllConnections
object, which is like a lock-less distributedHashMap
to keep track of all peer connections a worker or client has (in any state). - Begins the negotiation process (
external_sender
andexternal_receiver
send and receive messages from the distributor (or any signaling server)). - As messages pop up internally (e.g. locally-generated ICE candidates)
or externally (e.g. messages from the opposing peer), internal/external
transitions occur, modifying the state stored in the
AllConnections
object. - Once the data channel is opened, the worker hands off all
incoming messages to the
handle_fn
. Given that every message is handled serially, we pass a mutable reference to theObjectCache
along with the message bytes (may have to adapt to theAllConnections
message passing structure when reordering is added).
Handling PyTorch Function Requests
When remote_call
is called from a Python client, it is first
intercepted at thava-types/src/rtorch.rs
in the RemoteCall
struct __call__
method. Here, for calls with Future
return types,
remote IDs are generated and immediately returned. The PyMessage
is then passed into the actual remote_call
implementation
(defined in client/src/pycaravan/init.rs
), which sends a request
to the peer.
thava-types/src/biject/
contains payload.rs
, which contains the full Rust
worker code that handles every remote call payload. Here is how a standard call
may be run:
-
The client encounters a PyTorch function call resembling:
train.py bar = foo(*args, **kwargs)
where
foo
may be defined as:torch.py def foo(other: Tensor, device: DeviceLikeType | None = None) -> Tensor: ...
-
foo
has already been bijected whencaravan
was imported and/or built. Therefore,foo
is our function. We must inspect the arguments and their types to determine whether a remote call should be made. Here are the conditions we must check (included inget_parameterized_call
inclient/src/rtorch/rtorch/biject.py
).- If our arguments contain at least one
RObject
(object representing a remote value) and the parameters include adevice="cpu"
, route to remote. - If our arguments do not contain
RObject
s but the parameters include adevice="cpu"
, route to local. - If our arguments contain at least one
RObject
and with adevice
parameter intended for a remote GPU, route to remote. - If our arguments do not contain
RObject
s but with adevice
parameter intended for a remote GPU, route to remote. - If our arguments contain at least one
RObject
without adevice
parameter, route to remote. - If our arguments contain neither
RObjects
nor adevice
parameter, route to local.
Additionally, we choose to make a remote call a
Future
orBlocker
call based on whether the return annotation outputs atorch
object or not (e.g.Tensor
,Parameter
, etc.).To facilitate these checks, we use a type checking system.
jedi
is a static analysis library used in thejedi-language-server
(that I use for autocomplete innvim
). For eachtorch
function, we usejedi
to find the function signature. A few notes:-
Tried using the standard
inspect
library which cannot find types fortorch
stub files generated for C++libtorch
source code. -
For internal
torch
functions that are not typed or for any edge cases thatjedi
does not accurately find, we manually provide annotations inclient/src/rtorch/rtorch/known.py
.
See
get_jedi_signatures
inbiject.py
for details on howjedi
infers function signatures. Thesejedi
signatures are then parsed into standardinspect
library signatures, which are then type-checked against the given function parameters to find the existence of thedevice
parameter (functions can be overloaded in Python if they have different signatures, and type stubs come from the C++ implementation, so functions liketorch.Tensor.to
have three implementations that we must check against).Similarly, we parse the return annotations (with some tooling to unwrap generic alias types) to determine
Future
status.For detecting
RObject
s inside arguments, we recurse through tuple/list items, dict values, and__dict__
values.Once these checks have occurred, we can create a
RemoteCallPayload
and pass it as an argument toremote_call
(the Rust function). - If our arguments contain at least one
-
The payload is intercepted to create
RId
s if there are one or moreFuture
return types. The remaining payload is serialized (withpickle
, which can be insecure to unknown arguments) and sent to the worker as a remote call. -
The Python process described above happens almost in reverse in
handle_bytes
inpayload.rs
. AllRObject
arguments are converted back to their truetorch
object representations by retrieving from theObjectCache
. The function is run with these arguments. If the resulting object is atorch
object, we save it in the cache and return anRObject
(ifBlocker
return, otherwise we return an emptyVec
). -
The resulting bytes are relayed to the client, who will deserialize them as needed and parse them back into the appropriate object or exception. Program execution continues as expected.