Distributor
As of Aug 10, 2025
The distributor is a gRPC server responsible for:
- handling worker admins
- facilitating the creation and modification of worker groups and worker machines
- queueing worker machines as available/unavailable based on their GPU utilization (wip)
- querying worker machines according to a client's request for GPUs (wip)
- acting as a signaling server for peer connection negotiation
- perform inter-instance communication with Pub/Sub between clients and worker machines
- hot-swapping GPUs during program execution (fut.)
- reconstructing program state using database-like recovery
- establishing peer connections on the fly for new queries or fallback
- handling pricing (fut.)
Build and start the distributor with (you may need gcloud
permissions):
# from thava/
cd crates/distributor
cargo run
# or, again from thava/
cargo run --manifest-path crates/distributor/Cargo.toml
Architecture
distributor/
src/
interceptor/
controller/
service/
domain/
dao/
Cargo.toml
Interceptor/Controller
gRPC
request bodies are defined in proto/
. Internally, the tonic
crate uses the
prost
crate to parse the protobuf
files and generates code for interfacing with these
files from the distributor. To generate the code, run:
# from thava/
cargo build
Note that thava/build.rs
specifies where the generated code for the protobuf
files
should reside.
gRPC
requests to the distributor are first automatically deserialized into generated
Rust struct
s, intercepted for authentication
in interceptor/
, then will reach the corresponding controller
s. Here, the relevant
service
s are created for handling the request. The return type will similarly be
serialized automatically and sent to the caller.
The controllers are added (unfortunately confusingly!) as asynchronous "services" in
distributor/src/main.rs
.
Service/Domain
Services perform series of actions based in domain logic. The line between service and domain logic is currently a bit blurry to avoid premature abstraction, but ideally business logic will eventually be abstracted as needed. In particular, one potential principle that can be used to differentiate the two can be that data is generically typed when entering a service and is typically parsed (and validated) before leaving the service to domain logic.
Core services to be noted:
service/client_service.rs
/// Sets up a relayer (i.e. WebRTC signaling server) to communicate
/// back and forth between the worker through other distributor instances
/// over Pub/Sub and back to the client through the client's controller.
pub async fn start_relayer(
&self,
controller_sender: Sender<PeerRtcMessage>,
) -> Result<Sender<PeerRtcMessage>> {}
/// Verifies if a client is allowed to access a worker group.
pub async fn verify_client(
&self,
email: String,
sharing_key: String,
group: String,
) -> Result<()> {}
/// Queues a client and returns a client id and list of worker machine IDs and
/// the corresponding GPU offsets.
pub async fn queue_client(&self, email: String) -> Result<(i64, Vec<[i64; 2]>)> {}
service/worker_admin_service.rs
,service/worker_group_service.rs
,service/worker_machine_service.rs
Note most of the services here: they are either CRUD or the corresponding functions for worker-side WebRTC relaying. Please let me know if you have questions here.
Core domain logic to be noted:
domain/datastore.rs
: Initializes a connection to Firestore and defines transaction capabilities.domain/rendezvous.rs
: The rendezvous task is used to pair a client to worker machine(s) (currently one machine) when a client attempts to queue, creating aConnection
object with all information needed to start the negotiation process. Similarly, when a worker machine runscaravan start
, it makes itself available here. The rendezvous task may be slightly outdated from previous iterations of WebRTC negotiation and can be removed in the future.
DAO (Data Access Object)
DAOs are proxies between datastore objects and domain-level objects. Relevant functions here are used to interface with Firestore such that the database can be swapped out in the future if needed.
For example:
pub async fn create_worker_machine(&self, transaction: &mut Transaction<'_>) -> Result<()> {
let datastore = Datastore::new()?;
datastore
.db
.fluent()
.update()
.in_col("worker_machine")
.document_id(self.id.to_string())
.object(self)
.add_to_transaction(&mut transaction.transaction)?;
Ok(())
}