Caravan

Peer Connections

As of Aug 10, 2025

Peer Connections

The client and worker state machine and transitions are defined in {client/worker}/src/connection/rtc.rs. Other shared code is defined in thava-types/src/rtc.rs. The best place to see how peer connections are established in a localized region is in crates/client/src/connection/rtc.rs:

rtc.rs
#[tokio::test]
async fn test_negotiation() -> Result<()> {}

#[tokio::test]
async fn test_streaming() -> Result<()> {}

These tests import the PolitePeerConnection from our worker crate and test both the negotiation process as well as maximum sending speeds.

The entrypoint for either client or worker peer connections is {Impol/Pol}itePeerConnection::negotiate, which does the following:

  1. Creates an RTCPeerConnection object (from the webrtc-rs crate).
  2. Creates a data channel, sets up handlers for handling WebRTC state changes, local ICE candidate events, and the data channel open event.
  3. Sets up a Ping heartbeat with the distributor (for the worker).
  4. Adds the connection state to an AllConnections object, which is like a lock-less distributed HashMap to keep track of all peer connections a worker or client has (in any state).
  5. Begins the negotiation process (external_sender and external_receiver send and receive messages from the distributor (or any signaling server)).
  6. As messages pop up internally (e.g. locally-generated ICE candidates) or externally (e.g. messages from the opposing peer), internal/external transitions occur, modifying the state stored in the AllConnections object.
  7. Once the data channel is opened, the worker hands off all incoming messages to the handle_fn. Given that every message is handled serially, we pass a mutable reference to the ObjectCache along with the message bytes (may have to adapt to the AllConnections message passing structure when reordering is added).

Handling PyTorch Function Requests

When remote_call is called from a Python client, it is first intercepted at thava-types/src/rtorch.rs in the RemoteCall struct __call__ method. Here, for calls with Future return types, remote IDs are generated and immediately returned. The PyMessage is then passed into the actual remote_call implementation (defined in client/src/pycaravan/init.rs), which sends a request to the peer.

thava-types/src/biject/ contains payload.rs, which contains the full Rust worker code that handles every remote call payload. Here is how a standard call may be run:

  1. The client encounters a PyTorch function call resembling:

    train.py
    bar = foo(*args, **kwargs)

    where foo may be defined as:

    torch.py
    def foo(other: Tensor, device: DeviceLikeType | None = None) -> Tensor: ...
  2. foo has already been bijected when caravan was imported and/or built. Therefore, foo is our function. We must inspect the arguments and their types to determine whether a remote call should be made. Here are the conditions we must check (included in get_parameterized_call in client/src/rtorch/rtorch/biject.py).

    1. If our arguments contain at least one RObject (object representing a remote value) and the parameters include a device="cpu", route to remote.
    2. If our arguments do not contain RObjects but the parameters include a device="cpu", route to local.
    3. If our arguments contain at least one RObject and with a device parameter intended for a remote GPU, route to remote.
    4. If our arguments do not contain RObjects but with a device parameter intended for a remote GPU, route to remote.
    5. If our arguments contain at least one RObject without a device parameter, route to remote.
    6. If our arguments contain neither RObjects nor a device parameter, route to local.

    Additionally, we choose to make a remote call a Future or Blocker call based on whether the return annotation outputs a torch object or not (e.g. Tensor, Parameter, etc.).

    To facilitate these checks, we use a type checking system. jedi is a static analysis library used in the jedi-language-server (that I use for autocomplete in nvim). For each torch function, we use jedi to find the function signature. A few notes:

    1. Tried using the standard inspect library which cannot find types for torch stub files generated for C++ libtorch source code.

    2. For internal torch functions that are not typed or for any edge cases that jedi does not accurately find, we manually provide annotations in client/src/rtorch/rtorch/known.py.

    See get_jedi_signatures in biject.py for details on how jedi infers function signatures. These jedi signatures are then parsed into standard inspect library signatures, which are then type-checked against the given function parameters to find the existence of the device parameter (functions can be overloaded in Python if they have different signatures, and type stubs come from the C++ implementation, so functions like torch.Tensor.to have three implementations that we must check against).

    Similarly, we parse the return annotations (with some tooling to unwrap generic alias types) to determine Future status.

    For detecting RObjects inside arguments, we recurse through tuple/list items, dict values, and __dict__ values.

    Once these checks have occurred, we can create a RemoteCallPayload and pass it as an argument to remote_call (the Rust function).

  3. The payload is intercepted to create RIds if there are one or more Future return types. The remaining payload is serialized (with pickle, which can be insecure to unknown arguments) and sent to the worker as a remote call.

  4. The Python process described above happens almost in reverse in handle_bytes in payload.rs. All RObject arguments are converted back to their true torch object representations by retrieving from the ObjectCache. The function is run with these arguments. If the resulting object is a torch object, we save it in the cache and return an RObject (if Blocker return, otherwise we return an empty Vec).

  5. The resulting bytes are relayed to the client, who will deserialize them as needed and parse them back into the appropriate object or exception. Program execution continues as expected.