pub struct ControllerService {
workers: Arc<Mutex<Vec<WorkerSender<Result<Task, Status>>>>>,
cli_sender: Arc<Mutex<Option<Sender<Result<TaskResult, Status>>>>>,
open_measurements: Arc<Mutex<HashMap<u32, u32>>>,
m_id: Arc<Mutex<u32>>,
unique_id: Arc<Mutex<u32>>,
is_active: Arc<Mutex<bool>>,
is_responsive: Arc<AtomicBool>,
is_latency: Arc<AtomicBool>,
worker_config: Option<HashMap<String, u32>>,
worker_stacks: Arc<Mutex<HashMap<u32, VecDeque<Address>>>>,
}
Expand description
Struct for the orchestrator service
§Fields
- ‘workers’ - a list of WorkerSender objects that are connected to this orchestrator
- ‘cli_sender’ - the sender that connects to the CLI; used to stream TaskResults
- ‘open_measurements’ - a list of the current open measurements, and the number of clients that are currently working on it
- ‘m_id’ - keeps track of the last used measurement ID
- ‘unique_id’ - keeps track of the last used worker ID and is used to assign a unique worker ID to a new connecting worker
- ‘is_active’ - a boolean value that is set to true when there is an active measurement
- ‘is_responsive’ - a boolean value that is set to true when the orchestrator is in responsive probing mode
- ‘is_latency’ - a boolean value that is set to true when the orchestrator is in latency probing mode
- ‘worker_config’ - optional mapping of hostname to worker IDs (to enforce static worker IDs)
- ‘worker_stacks’ - a mapping of worker IDs to a stack of addresses that are used for follow-up probes (used for responsive and latency probing)
Fields§
§workers: Arc<Mutex<Vec<WorkerSender<Result<Task, Status>>>>>
§cli_sender: Arc<Mutex<Option<Sender<Result<TaskResult, Status>>>>>
§open_measurements: Arc<Mutex<HashMap<u32, u32>>>
§m_id: Arc<Mutex<u32>>
§unique_id: Arc<Mutex<u32>>
§is_active: Arc<Mutex<bool>>
§is_responsive: Arc<AtomicBool>
§is_latency: Arc<AtomicBool>
§worker_config: Option<HashMap<String, u32>>
§worker_stacks: Arc<Mutex<HashMap<u32, VecDeque<Address>>>>
Implementations§
Source§impl ControllerService
impl ControllerService
Sourcefn get_unique_id(&self) -> u32
fn get_unique_id(&self) -> u32
Gets a unique worker ID for a new connecting worker. Increments the unique ID counter after returning the ID (for the next worker).
Sourcefn get_worker_id(&self, hostname: &str) -> Result<(u32, bool), Box<Status>>
fn get_worker_id(&self, hostname: &str) -> Result<(u32, bool), Box<Status>>
Gets a worker ID for a connecting worker based on its hostname. If the hostname already exists, it returns the existing worker ID. If the hostname does not exist, it checks for a statically configured ID or generates a new unique ID.
§Arguments
- ‘hostname’ - the hostname of the worker
§Returns
Returns the worker ID
§Errors
Returns an error if the hostname already exists and is used by a connected worker.
Trait Implementations§
Source§impl Controller for ControllerService
Implementation of the Controller trait for the ControllerService
Handles communication with the workers and the CLI
impl Controller for ControllerService
Implementation of the Controller trait for the ControllerService Handles communication with the workers and the CLI
Source§fn measurement_finished<'life0, 'async_trait>(
&'life0 self,
request: Request<Finished>,
) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn measurement_finished<'life0, 'async_trait>(
&'life0 self,
request: Request<Finished>,
) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Called by the worker when it has finished its current measurement.
When all connected workers have finished this measurement, it will notify the CLI that the measurement is finished.
§Arguments
- ‘request’ - a Finished message containing the measurement ID of the measurement that has finished
§Errors
Returns an error if the measurement ID is unknown.
Source§fn worker_connect<'life0, 'async_trait>(
&'life0 self,
request: Request<Worker>,
) -> Pin<Box<dyn Future<Output = Result<Response<Self::WorkerConnectStream>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn worker_connect<'life0, 'async_trait>(
&'life0 self,
request: Request<Worker>,
) -> Pin<Box<dyn Future<Output = Result<Response<Self::WorkerConnectStream>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Handles a worker connecting to this orchestrator formally.
Ensures the hostname is unique and returns a unique worker ID
Returns the receiver side of a stream to which the orchestrator will send tasks
§Arguments
- ‘request’ - a Metadata message containing the hostname of the worker
Source§fn do_measurement<'life0, 'async_trait>(
&'life0 self,
request: Request<ScheduleMeasurement>,
) -> Pin<Box<dyn Future<Output = Result<Response<Self::DoMeasurementStream>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn do_measurement<'life0, 'async_trait>(
&'life0 self,
request: Request<ScheduleMeasurement>,
) -> Pin<Box<dyn Future<Output = Result<Response<Self::DoMeasurementStream>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Handles the do_measurement command from the CLI.
Instructs all workers to perform the measurement and returns the receiver side of a stream in which TaskResults will be streamed.
Will lock active to true, such that no other measurement can start.
Makes sure all workers are still connected, removes their senders if not.
Assigns a unique ID to the measurement.
Streams tasks to the workers, in a round-robin fashion, with 1-second delays between clients.
Furthermore, lets the workers know of the desired probing rate (defined by the CLI).
§Arguments
- ‘request’ - a ScheduleMeasurement message containing information about the measurement that the CLI wants to perform
§Errors
Returns an error if there is already an active measurement, or if there are no connected workers to perform the measurement.
Source§fn list_workers<'life0, 'async_trait>(
&'life0 self,
_request: Request<Empty>,
) -> Pin<Box<dyn Future<Output = Result<Response<ServerStatus>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_workers<'life0, 'async_trait>(
&'life0 self,
_request: Request<Empty>,
) -> Pin<Box<dyn Future<Output = Result<Response<ServerStatus>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Handle the list_clients command from the CLI.
Returns the connected clients.
Source§fn send_result<'life0, 'async_trait>(
&'life0 self,
request: Request<TaskResult>,
) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send_result<'life0, 'async_trait>(
&'life0 self,
request: Request<TaskResult>,
) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§type WorkerConnectStream = WorkerReceiver<Result<Task, Status>>
type WorkerConnectStream = WorkerReceiver<Result<Task, Status>>
Source§type DoMeasurementStream = CLIReceiver<Result<TaskResult, Status>>
type DoMeasurementStream = CLIReceiver<Result<TaskResult, Status>>
Auto Trait Implementations§
impl Freeze for ControllerService
impl RefUnwindSafe for ControllerService
impl Send for ControllerService
impl Sync for ControllerService
impl Unpin for ControllerService
impl UnwindSafe for ControllerService
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<L> LayerExt<L> for L
impl<L> LayerExt<L> for L
§fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
Layered
].