Peer Connections
As of Aug 10, 2025
Peer Connections
The client and worker state machine and transitions are defined
in {client/worker}/src/connection/rtc.rs. Other shared code is defined
in thava-types/src/rtc.rs.
The best place to see how peer connections
are established in a localized region is in
crates/client/src/connection/rtc.rs:
#[tokio::test]
async fn test_negotiation() -> Result<()> {}
#[tokio::test]
async fn test_streaming() -> Result<()> {}These tests import the PolitePeerConnection from our worker
crate and test both the negotiation process as well as maximum
sending speeds.
The entrypoint for either client or worker peer connections is
{Impol/Pol}itePeerConnection::negotiate, which does the following:
- Creates an
RTCPeerConnectionobject (from thewebrtc-rscrate). - Creates a data channel, sets up handlers for handling WebRTC state changes, local ICE candidate events, and the data channel open event.
- Sets up a
Pingheartbeat with the distributor (for the worker). - Adds the connection state to an
AllConnectionsobject, which is like a lock-less distributedHashMapto keep track of all peer connections a worker or client has (in any state). - Begins the negotiation process (
external_senderandexternal_receiversend and receive messages from the distributor (or any signaling server)). - As messages pop up internally (e.g. locally-generated ICE candidates)
or externally (e.g. messages from the opposing peer), internal/external
transitions occur, modifying the state stored in the
AllConnectionsobject. - Once the data channel is opened, the worker hands off all
incoming messages to the
handle_fn. Given that every message is handled serially, we pass a mutable reference to theObjectCachealong with the message bytes (may have to adapt to theAllConnectionsmessage passing structure when reordering is added).
Handling PyTorch Function Requests
When remote_call is called from a Python client, it is first
intercepted at thava-types/src/rtorch.rs in the RemoteCall
struct __call__ method. Here, for calls with Future return types,
remote IDs are generated and immediately returned. The PyMessage
is then passed into the actual remote_call implementation
(defined in client/src/pycaravan/init.rs), which sends a request
to the peer.
thava-types/src/biject/ contains payload.rs, which contains the full Rust
worker code that handles every remote call payload. Here is how a standard call
may be run:
-
The client encounters a PyTorch function call resembling:
train.py bar = foo(*args, **kwargs)where
foomay be defined as:torch.py def foo(other: Tensor, device: DeviceLikeType | None = None) -> Tensor: ... -
foohas already been bijected whencaravanwas imported and/or built. Therefore,foois our function. We must inspect the arguments and their types to determine whether a remote call should be made. Here are the conditions we must check (included inget_parameterized_callinclient/src/rtorch/rtorch/biject.py).- If our arguments contain at least one
RObject(object representing a remote value) and the parameters include adevice="cpu", route to remote. - If our arguments do not contain
RObjects but the parameters include adevice="cpu", route to local. - If our arguments contain at least one
RObjectand with adeviceparameter intended for a remote GPU, route to remote. - If our arguments do not contain
RObjects but with adeviceparameter intended for a remote GPU, route to remote. - If our arguments contain at least one
RObjectwithout adeviceparameter, route to remote. - If our arguments contain neither
RObjectsnor adeviceparameter, route to local.
Additionally, we choose to make a remote call a
FutureorBlockercall based on whether the return annotation outputs atorchobject or not (e.g.Tensor,Parameter, etc.).To facilitate these checks, we use a type checking system.
jediis a static analysis library used in thejedi-language-server(that I use for autocomplete innvim). For eachtorchfunction, we usejedito find the function signature. A few notes:-
Tried using the standard
inspectlibrary which cannot find types fortorchstub files generated for C++libtorchsource code. -
For internal
torchfunctions that are not typed or for any edge cases thatjedidoes not accurately find, we manually provide annotations inclient/src/rtorch/rtorch/known.py.
See
get_jedi_signaturesinbiject.pyfor details on howjediinfers function signatures. Thesejedisignatures are then parsed into standardinspectlibrary signatures, which are then type-checked against the given function parameters to find the existence of thedeviceparameter (functions can be overloaded in Python if they have different signatures, and type stubs come from the C++ implementation, so functions liketorch.Tensor.tohave three implementations that we must check against).Similarly, we parse the return annotations (with some tooling to unwrap generic alias types) to determine
Futurestatus.For detecting
RObjects inside arguments, we recurse through tuple/list items, dict values, and__dict__values.Once these checks have occurred, we can create a
RemoteCallPayloadand pass it as an argument toremote_call(the Rust function). - If our arguments contain at least one
-
The payload is intercepted to create
RIds if there are one or moreFuturereturn types. The remaining payload is serialized (withpickle, which can be insecure to unknown arguments) and sent to the worker as a remote call. -
The Python process described above happens almost in reverse in
handle_bytesinpayload.rs. AllRObjectarguments are converted back to their truetorchobject representations by retrieving from theObjectCache. The function is run with these arguments. If the resulting object is atorchobject, we save it in the cache and return anRObject(ifBlockerreturn, otherwise we return an emptyVec). -
The resulting bytes are relayed to the client, who will deserialize them as needed and parse them back into the appropriate object or exception. Program execution continues as expected.