Struct ControllerService

Source
pub struct ControllerService {
    workers: Arc<Mutex<Vec<WorkerSender<Result<Task, Status>>>>>,
    cli_sender: Arc<Mutex<Option<Sender<Result<TaskResult, Status>>>>>,
    open_measurements: Arc<Mutex<HashMap<u32, u32>>>,
    m_id: Arc<Mutex<u32>>,
    unique_id: Arc<Mutex<u32>>,
    is_active: Arc<Mutex<bool>>,
    is_responsive: Arc<AtomicBool>,
    is_latency: Arc<AtomicBool>,
    worker_config: Option<HashMap<String, u32>>,
    worker_stacks: Arc<Mutex<HashMap<u32, VecDeque<Address>>>>,
}
Expand description

Struct for the orchestrator service

§Fields

  • ‘workers’ - a list of WorkerSender objects that are connected to this orchestrator
  • ‘cli_sender’ - the sender that connects to the CLI; used to stream TaskResults
  • ‘open_measurements’ - a list of the current open measurements, and the number of clients that are currently working on it
  • ‘m_id’ - keeps track of the last used measurement ID
  • ‘unique_id’ - keeps track of the last used worker ID and is used to assign a unique worker ID to a new connecting worker
  • ‘is_active’ - a boolean value that is set to true when there is an active measurement
  • ‘is_responsive’ - a boolean value that is set to true when the orchestrator is in responsive probing mode
  • ‘is_latency’ - a boolean value that is set to true when the orchestrator is in latency probing mode
  • ‘worker_config’ - optional mapping of hostname to worker IDs (to enforce static worker IDs)
  • ‘worker_stacks’ - a mapping of worker IDs to a stack of addresses that are used for follow-up probes (used for responsive and latency probing)

Fields§

§workers: Arc<Mutex<Vec<WorkerSender<Result<Task, Status>>>>>§cli_sender: Arc<Mutex<Option<Sender<Result<TaskResult, Status>>>>>§open_measurements: Arc<Mutex<HashMap<u32, u32>>>§m_id: Arc<Mutex<u32>>§unique_id: Arc<Mutex<u32>>§is_active: Arc<Mutex<bool>>§is_responsive: Arc<AtomicBool>§is_latency: Arc<AtomicBool>§worker_config: Option<HashMap<String, u32>>§worker_stacks: Arc<Mutex<HashMap<u32, VecDeque<Address>>>>

Implementations§

Source§

impl ControllerService

Source

fn get_unique_id(&self) -> u32

Gets a unique worker ID for a new connecting worker. Increments the unique ID counter after returning the ID (for the next worker).

Source

fn get_worker_id(&self, hostname: &str) -> Result<(u32, bool), Box<Status>>

Gets a worker ID for a connecting worker based on its hostname. If the hostname already exists, it returns the existing worker ID. If the hostname does not exist, it checks for a statically configured ID or generates a new unique ID.

§Arguments
  • ‘hostname’ - the hostname of the worker
§Returns

Returns the worker ID

§Errors

Returns an error if the hostname already exists and is used by a connected worker.

Trait Implementations§

Source§

impl Controller for ControllerService

Implementation of the Controller trait for the ControllerService Handles communication with the workers and the CLI

Source§

fn measurement_finished<'life0, 'async_trait>( &'life0 self, request: Request<Finished>, ) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Called by the worker when it has finished its current measurement.

When all connected workers have finished this measurement, it will notify the CLI that the measurement is finished.

§Arguments
  • ‘request’ - a Finished message containing the measurement ID of the measurement that has finished
§Errors

Returns an error if the measurement ID is unknown.

Source§

fn worker_connect<'life0, 'async_trait>( &'life0 self, request: Request<Worker>, ) -> Pin<Box<dyn Future<Output = Result<Response<Self::WorkerConnectStream>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Handles a worker connecting to this orchestrator formally.

Ensures the hostname is unique and returns a unique worker ID

Returns the receiver side of a stream to which the orchestrator will send tasks

§Arguments
  • ‘request’ - a Metadata message containing the hostname of the worker
Source§

fn do_measurement<'life0, 'async_trait>( &'life0 self, request: Request<ScheduleMeasurement>, ) -> Pin<Box<dyn Future<Output = Result<Response<Self::DoMeasurementStream>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Handles the do_measurement command from the CLI.

Instructs all workers to perform the measurement and returns the receiver side of a stream in which TaskResults will be streamed.

Will lock active to true, such that no other measurement can start.

Makes sure all workers are still connected, removes their senders if not.

Assigns a unique ID to the measurement.

Streams tasks to the workers, in a round-robin fashion, with 1-second delays between clients.

Furthermore, lets the workers know of the desired probing rate (defined by the CLI).

§Arguments
  • ‘request’ - a ScheduleMeasurement message containing information about the measurement that the CLI wants to perform
§Errors

Returns an error if there is already an active measurement, or if there are no connected workers to perform the measurement.

Source§

fn list_workers<'life0, 'async_trait>( &'life0 self, _request: Request<Empty>, ) -> Pin<Box<dyn Future<Output = Result<Response<ServerStatus>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Handle the list_clients command from the CLI.

Returns the connected clients.

Source§

fn send_result<'life0, 'async_trait>( &'life0 self, request: Request<TaskResult>, ) -> Pin<Box<dyn Future<Output = Result<Response<Ack>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Receive a TaskResult from the worker and put it in the stream towards the CLI

§Arguments
  • ‘request’ - a TaskResult message containing the results of a task
§Errors

Returns an error if the CLI has disconnected.

Source§

type WorkerConnectStream = WorkerReceiver<Result<Task, Status>>

Server streaming response type for the WorkerConnect method.
Source§

type DoMeasurementStream = CLIReceiver<Result<TaskResult, Status>>

Server streaming response type for the DoMeasurement method.
Source§

impl Debug for ControllerService

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> IntoRequest<T> for T

§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<L> LayerExt<L> for L

§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in [Layered].
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,