Caravan

Distributor

As of Aug 10, 2025

The distributor is a gRPC server responsible for:

  1. handling worker admins
  2. facilitating the creation and modification of worker groups and worker machines
  3. queueing worker machines as available/unavailable based on their GPU utilization (wip)
  4. querying worker machines according to a client's request for GPUs (wip)
  5. acting as a signaling server for peer connection negotiation
    1. perform inter-instance communication with Pub/Sub between clients and worker machines
  6. hot-swapping GPUs during program execution (fut.)
    1. reconstructing program state using database-like recovery
    2. establishing peer connections on the fly for new queries or fallback
  7. handling pricing (fut.)

Build and start the distributor with (you may need gcloud permissions):

sh
# from thava/
cd crates/distributor
cargo run

# or, again from thava/
cargo run --manifest-path crates/distributor/Cargo.toml

Architecture

dir
distributor/
    src/
        interceptor/
        controller/
        service/
        domain/
        dao/
        Cargo.toml

Interceptor/Controller

gRPC request bodies are defined in proto/. Internally, the tonic crate uses the prost crate to parse the protobuf files and generates code for interfacing with these files from the distributor. To generate the code, run:

sh
# from thava/
cargo build

Note that thava/build.rs specifies where the generated code for the protobuf files should reside.

gRPC requests to the distributor are first automatically deserialized into generated Rust structs, intercepted for authentication in interceptor/, then will reach the corresponding controllers. Here, the relevant services are created for handling the request. The return type will similarly be serialized automatically and sent to the caller.

The controllers are added (unfortunately confusingly!) as asynchronous "services" in distributor/src/main.rs.

Service/Domain

Services perform series of actions based in domain logic. The line between service and domain logic is currently a bit blurry to avoid premature abstraction, but ideally business logic will eventually be abstracted as needed. In particular, one potential principle that can be used to differentiate the two can be that data is generically typed when entering a service and is typically parsed (and validated) before leaving the service to domain logic.

Core services to be noted:

  1. service/client_service.rs
client_service.rs
/// Sets up a relayer (i.e. WebRTC signaling server) to communicate
/// back and forth between the worker through other distributor instances 
/// over Pub/Sub and back to the client through the client's controller.
pub async fn start_relayer(
    &self, 
    controller_sender: Sender<PeerRtcMessage>,
) -> Result<Sender<PeerRtcMessage>> {}

/// Verifies if a client is allowed to access a worker group.
pub async fn verify_client(
    &self,
    email: String,
    sharing_key: String,
    group: String,
) -> Result<()> {}

/// Queues a client and returns a client id and list of worker machine IDs and
/// the corresponding GPU offsets.
pub async fn queue_client(&self, email: String) -> Result<(i64, Vec<[i64; 2]>)> {}
  1. service/worker_admin_service.rs, service/worker_group_service.rs, service/worker_machine_service.rs

Note most of the services here: they are either CRUD or the corresponding functions for worker-side WebRTC relaying. Please let me know if you have questions here.

Core domain logic to be noted:

  1. domain/datastore.rs: Initializes a connection to Firestore and defines transaction capabilities.
  2. domain/rendezvous.rs: The rendezvous task is used to pair a client to worker machine(s) (currently one machine) when a client attempts to queue, creating a Connection object with all information needed to start the negotiation process. Similarly, when a worker machine runs caravan start, it makes itself available here. The rendezvous task may be slightly outdated from previous iterations of WebRTC negotiation and can be removed in the future.

DAO (Data Access Object)

DAOs are proxies between datastore objects and domain-level objects. Relevant functions here are used to interface with Firestore such that the database can be swapped out in the future if needed.

For example:

worker_machine_dao.rs
pub async fn create_worker_machine(&self, transaction: &mut Transaction<'_>) -> Result<()> {
    let datastore = Datastore::new()?;
    datastore
        .db
        .fluent()
        .update()
        .in_col("worker_machine")
        .document_id(self.id.to_string())
        .object(self)
        .add_to_transaction(&mut transaction.transaction)?;
    Ok(())
}