manycast/cli/
mod.rs

1mod writer;
2
3use std::collections::HashSet;
4use std::error::Error;
5use std::fs;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::str::FromStr;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use bimap::BiHashMap;
15use chrono::Local;
16use clap::ArgMatches;
17use flate2::read::GzDecoder;
18use indicatif::{ProgressBar, ProgressStyle};
19use prettytable::{color, format, row, Attr, Cell, Row, Table};
20use rand::seq::SliceRandom;
21use tokio::sync::mpsc::unbounded_channel;
22use tonic::codec::CompressionEncoding;
23use tonic::transport::{Certificate, Channel, ClientTlsConfig};
24use tonic::Request;
25
26use custom_module::manycastr::{
27    controller_client::ControllerClient, Address, Configuration, Empty, Origin,
28    ScheduleMeasurement, Targets, TaskResult,
29};
30use custom_module::Separated;
31
32use crate::cli::writer::write_results;
33use crate::cli::writer::write_results_parquet;
34use crate::cli::writer::{MetadataArgs, WriteConfig};
35use crate::{custom_module, ALL_ID, A_ID, CHAOS_ID, ICMP_ID, TCP_ID};
36
37/// A CLI client that creates a connection with the 'orchestrator' and sends the desired commands based on the command-line input.
38pub struct CliClient {
39    grpc_client: ControllerClient<Channel>,
40}
41
42/// Execute the command-line arguments and send the desired commands to the orchestrator.
43///
44/// # Arguments
45///
46/// * 'args' - the user-defined command-line arguments
47#[tokio::main]
48pub async fn execute(args: &ArgMatches) -> Result<(), Box<dyn Error>> {
49    let server_address = args.get_one::<String>("orchestrator").unwrap();
50    let fqdn = args.get_one::<String>("tls");
51
52    // Connect with orchestrator
53    println!("[CLI] Connecting to orchestrator - {server_address}");
54    let mut grpc_client = CliClient::connect(server_address, fqdn)
55        .await
56        .expect("Unable to connect to orchestrator")
57        .send_compressed(CompressionEncoding::Zstd);
58
59    // Obtain connected worker information
60    let response = grpc_client
61        .list_workers(Request::new(Empty::default()))
62        .await
63        .expect("Connection to orchestrator failed");
64
65    let mut cli_client = CliClient { grpc_client };
66
67    if args.subcommand_matches("worker-list").is_some() {
68        // Perform the worker-list command
69        println!("[CLI] Requesting workers list from orchestrator");
70        // Pretty print to command-line
71        let mut table = Table::new();
72        table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
73        table.add_row(Row::new(vec![
74            Cell::new("Hostname")
75                .with_style(Attr::Bold)
76                .with_style(Attr::ForegroundColor(color::GREEN)),
77            Cell::new("Worker ID")
78                .with_style(Attr::Bold)
79                .with_style(Attr::ForegroundColor(color::GREEN)),
80            Cell::new("Status")
81                .with_style(Attr::Bold)
82                .with_style(Attr::ForegroundColor(color::GREEN)),
83            Cell::new("Unicast IPv4")
84                .with_style(Attr::Bold)
85                .with_style(Attr::ForegroundColor(color::GREEN)),
86            Cell::new("Unicast IPv6")
87                .with_style(Attr::Bold)
88                .with_style(Attr::ForegroundColor(color::GREEN)),
89        ]));
90
91        let mut connected_workers = 0;
92        let mut workers = response.into_inner().workers;
93        workers.sort_by(|a, b| a.worker_id.cmp(&b.worker_id));
94
95        for worker in workers {
96            let unicast_v4 = if let Some(addr) = &worker.unicast_v4 {
97                addr.to_string()
98            } else {
99                "N/A".to_string()
100            };
101
102            let unicast_v6 = if let Some(addr) = &worker.unicast_v6 {
103                addr.to_string()
104            } else {
105                "N/A".to_string()
106            };
107
108            table.add_row(row![
109                worker.hostname,
110                worker.worker_id,
111                worker.status,
112                unicast_v4,
113                unicast_v6
114            ]);
115            if worker.status != "DISCONNECTED" {
116                connected_workers += 1;
117            }
118        }
119
120        table.printstd();
121        println!("[CLI] Total connected workers: {connected_workers}");
122
123        Ok(())
124    } else if let Some(matches) = args.subcommand_matches("start") {
125        // Start a MAnycastR measurement
126        let is_unicast = matches.get_flag("unicast");
127        let is_divide = matches.get_flag("divide");
128        let is_responsive = matches.get_flag("responsive");
129        let mut is_latency = matches.get_flag("latency");
130
131        if is_responsive && is_divide {
132            panic!("Incompatible flags: Responsive mode cannot be combined with divide-and-conquer measurements.");
133        } else if is_latency && (is_divide || is_responsive) {
134            panic!("Incompatible flags: Latency mode cannot be combined with divide-and-conquer or responsive measurements.");
135        } else if is_unicast && is_latency {
136            is_latency = false; // Unicast mode is latency by design
137        }
138
139        // Map to convert hostnames to worker IDs and vice versa
140        let worker_map: BiHashMap<u32, String> = response
141            .into_inner()
142            .workers
143            .into_iter()
144            .map(|worker| (worker.worker_id, worker.hostname)) // .clone() is no longer needed on hostname
145            .collect();
146
147        // Get optional opt-out URL
148        let url = matches.get_one::<String>("url").unwrap().clone();
149
150        // Source IP for the measurement
151        let src = matches.get_one::<String>("address").map(Address::from);
152
153        // Get the measurement type
154        let m_type = match matches
155            .get_one::<String>("type")
156            .unwrap()
157            .to_lowercase()
158            .as_str()
159        {
160            "icmp" => ICMP_ID,
161            "dns" => A_ID,
162            "tcp" => TCP_ID,
163            "chaos" => CHAOS_ID,
164            "all" => ALL_ID,
165            _ => panic!("Invalid measurement type! (can be either ICMP, DNS, TCP, all, or CHAOS)"),
166        };
167
168        // TODO TCP and --latency are currently broken
169        if is_latency && m_type == TCP_ID {
170            panic!("TCP measurements are not supported in latency mode!");
171        }
172
173        // Temporarily broken
174        if is_responsive && is_unicast && m_type == TCP_ID {
175            panic!("Responsive mode not supported for unicast TCP measurements");
176        }
177
178        let is_config = matches.contains_id("configuration");
179
180        // Get the workers that have to send out probes
181        let sender_ids: Vec<u32> = matches.get_one::<String>("selective").map_or_else(
182            || {
183                println!(
184                    "[CLI] Probes will be sent out from all ({}) workers",
185                    worker_map.len()
186                );
187                Vec::new()
188            },
189            |worker_entries_str| {
190                println!("[CLI] Selective probing using specified workers...");
191                worker_entries_str
192                    .trim_matches(|c| c == '[' || c == ']')
193                    .split(',')
194                    .filter_map(|entry_str_untrimmed| {
195                        let entry_str = entry_str_untrimmed.trim();
196                        if entry_str.is_empty() {
197                            return None; // Skip trailing commas
198                        }
199                        // Try to parse as worker ID
200                        if let Ok(id_val) = entry_str.parse::<u32>() {
201                            if worker_map.contains_left(&id_val) {
202                                Some(id_val)
203                            } else {
204                                panic!("Worker ID '{entry_str}' is not a known worker.");
205                            }
206                        } else if let Some(&found_id) = worker_map.get_by_right(entry_str) {
207                            // Try to find the hostname in the map
208                            Some(found_id)
209                        } else {
210                            panic!("'{entry_str}' is not a valid worker ID or known hostname.");
211                        }
212                    })
213                    .collect()
214            },
215        );
216
217        // Print selected workers
218        if !sender_ids.is_empty() {
219            println!("[CLI] Selective probing using the following workers:");
220            sender_ids.iter().for_each(|id| {
221                let hostname = worker_map.get_by_left(id).unwrap_or_else(|| {
222                    panic!("Worker ID {id} is not a connected worker!");
223                });
224                println!("[CLI]\t * ID: {id}, Hostname: {hostname}");
225            });
226        }
227
228        // Read the configuration file (unnecessary for unicast)
229        let configurations = if is_config {
230            let conf_file = matches.get_one::<String>("configuration").unwrap();
231            println!("[CLI] Using configuration file: {conf_file}");
232            let file = File::open(conf_file)
233                .unwrap_or_else(|_| panic!("Unable to open configuration file {conf_file}"));
234            let buf_reader = BufReader::new(file);
235            let mut origin_id = 0;
236            let mut is_ipv6: Option<bool> = None;
237            let configurations: Vec<Configuration> = buf_reader // Create a vector of addresses from the file
238                .lines()
239                .filter_map(|line| {
240                    let line = line.expect("Unable to read configuration line");
241                    let line = line.trim();
242                    if line.is_empty() || line.starts_with("#") {
243                        return None;
244                    } // Skip comments and empty lines
245
246                    let parts: Vec<&str> = line.splitn(2, " - ").map(|s| s.trim()).collect();
247                    if parts.len() != 2 {
248                        panic!("Invalid configuration format: {line}");
249                    }
250
251                    // Parse the worker ID
252                    let worker_id = if parts[0] == "ALL" {
253                        u32::MAX
254                    } else if let Ok(id_val) = parts[0].parse::<u32>() {
255                        if !worker_map.contains_left(&id_val) {
256                            panic!("Worker ID {id_val} is not a known worker.");
257                        }
258                        id_val
259                    } else if let Some(&found_id) = worker_map.get_by_right(parts[0]) {
260                        // Try to find the hostname in the map
261                        found_id
262                    } else {
263                        panic!("'{}' is not a valid worker ID or known hostname.", parts[0]);
264                    };
265
266                    let addr_ports: Vec<&str> = parts[1].split(',').map(|s| s.trim()).collect();
267                    if addr_ports.len() != 3 {
268                        panic!("Invalid configuration format: {line}");
269                    }
270                    let src = Address::from(addr_ports[0]);
271
272                    if let Some(v6) = is_ipv6 {
273                        if v6 != src.is_v6() {
274                            panic!("Configuration file contains mixed IPv4 and IPv6 addresses!");
275                        }
276                    } else {
277                        is_ipv6 = Some(src.is_v6());
278                    }
279
280                    // Parse to u16 first, must fit in header
281                    let sport =
282                        u16::from_str(addr_ports[1]).expect("Unable to parse source port") as u32;
283                    let dport = u16::from_str(addr_ports[2])
284                        .expect("Unable to parse destination port")
285                        as u32;
286                    origin_id += 1;
287
288                    Some(Configuration {
289                        worker_id,
290                        origin: Some(Origin {
291                            src: Some(src),
292                            sport,
293                            dport,
294                            origin_id,
295                        }),
296                    })
297                })
298                .collect();
299            if configurations.is_empty() {
300                panic!("No valid configurations found in file {conf_file}");
301            }
302
303            configurations
304        } else {
305            // Obtain port values (read as u16 as is the port header size)
306            let sport: u32 = *matches.get_one::<u16>("source port").unwrap() as u32;
307            // Default destination port is 53 for DNS, 63853 for all other measurements
308            let dport = matches
309                .get_one::<u16>("destination port")
310                .map(|&port| port as u32)
311                .unwrap_or_else(|| {
312                    if m_type == A_ID || m_type == CHAOS_ID {
313                        53
314                    } else {
315                        63853
316                    }
317                });
318
319            if sender_ids.is_empty() {
320                // All workers
321                vec![Configuration {
322                    worker_id: u32::MAX, // All clients
323                    origin: Some(Origin {
324                        src,
325                        sport,
326                        dport,
327                        origin_id: 0, // Only one origin
328                    }),
329                }]
330            } else {
331                // list of worker IDs defined
332                sender_ids
333                    .iter()
334                    .map(|&worker_id| Configuration {
335                        worker_id,
336                        origin: Some(Origin {
337                            src,
338                            sport,
339                            dport,
340                            origin_id: 0, // Only one origin
341                        }),
342                    })
343                    .collect()
344            }
345        };
346
347        // There must be a defined anycast source address, configuration, or unicast flag
348        if src.is_none() && !is_config && !is_unicast {
349            panic!("No source address or configuration file provided!");
350        }
351
352        // TODO implement feed of addresses instead of a hitlist file
353        // format: address,tx -> tx optional to specify from which site to probe
354        // protocol and ports used are pre-configured when starting a live measurement at the CLI
355
356        // Get the target IP addresses
357        let hitlist_path = matches.get_one::<String>("IP_FILE").unwrap();
358        let is_shuffle = matches.get_flag("shuffle");
359
360        let (ips, is_ipv6) = get_hitlist(hitlist_path, &configurations, is_unicast, is_shuffle);
361
362        // CHAOS value to send in the DNS query
363        let dns_record = if m_type == CHAOS_ID {
364            // get CHAOS query
365            matches
366                .get_one::<String>("query")
367                .map_or("hostname.bind", |q| q.as_str())
368        } else if m_type == A_ID || m_type == ALL_ID {
369            matches
370                .get_one::<String>("query")
371                .map_or("example.org", |q| q.as_str())
372        } else {
373            ""
374        };
375
376        // Check for command-line option that determines whether to stream to CLI
377        let is_cli = matches.get_flag("stream");
378
379        // Check for command-line option that determines whether to write results in Parquet format
380        let is_parquet = matches.get_flag("parquet");
381
382        if is_cli && is_parquet {
383            panic!("Cannot stream results to CLI and write in Parquet format at the same time!");
384        }
385
386        // --latency and --divide send single probes to each address, so no worker interval is needed
387        let worker_interval = if is_latency || is_divide {
388            0
389        } else {
390            *matches.get_one::<u32>("worker_interval").unwrap()
391        };
392        let probe_interval = *matches.get_one::<u32>("probe_interval").unwrap();
393        let probing_rate = *matches.get_one::<u32>("rate").unwrap();
394        let number_of_probes = *matches.get_one::<u32>("number_of_probes").unwrap();
395        let t_type = match m_type {
396            ICMP_ID => "ICMP",
397            A_ID => "DNS/A",
398            TCP_ID => "TCP/SYN-ACK",
399            CHAOS_ID => "DNS/CHAOS",
400            ALL_ID => "All (ICMP,DNS/A,TCP)",
401            _ => "Unknown",
402        };
403        let hitlist_length = ips.len();
404
405        println!("[CLI] Performing {} measurement targeting {} addresses, with a rate of {}, and an interval of {}",
406                 t_type,
407                 hitlist_length.with_separator(),
408                 probing_rate.with_separator(),
409                 worker_interval
410        );
411
412        if is_responsive {
413            println!("[CLI] Responsive mode enabled");
414        }
415
416        if is_latency {
417            println!("[CLI] Latency mode enabled");
418        }
419
420        // Print the origins used
421        if is_unicast {
422            let unicast_origin = configurations.first().unwrap().origin.unwrap();
423            println!(
424                "[CLI] Unicast probing with src port {} and dst port {}",
425                unicast_origin.sport, unicast_origin.dport
426            );
427        } else if is_config {
428            println!("[CLI] Workers send probes using the following configurations:");
429            for configuration in configurations.iter() {
430                if let Some(origin) = &configuration.origin {
431                    if configuration.worker_id == u32::MAX {
432                        println!(
433                            "\t* All workers, source IP: {}, source port: {}, destination port: {}",
434                            origin.src.unwrap(),
435                            origin.sport,
436                            origin.dport
437                        );
438                    } else {
439                        let worker_hostname = worker_map
440                            .get_by_left(&configuration.worker_id)
441                            .expect("Worker ID not found");
442                        println!(
443                            "\t* worker {} (with ID: {:<2}), source IP: {}, source port: {}, destination port: {}",
444                            worker_hostname, configuration.worker_id, origin.src.unwrap(), origin.sport, origin.dport
445                        );
446                    }
447                }
448            }
449        } else {
450            let anycast_origin = configurations.first().unwrap().origin.unwrap();
451
452            println!(
453                "[CLI] Workers probe with source IP: {}, source port: {}, destination port: {}",
454                anycast_origin.src.unwrap(),
455                anycast_origin.sport,
456                anycast_origin.dport
457            );
458        }
459
460        // get optional path to write results to
461        let path = matches.get_one::<String>("out");
462        if let Some(value) = validate_path_perms(path) {
463            return value;
464        }
465
466        // Create the measurement definition and send it to the orchestrator
467        let m_definition = ScheduleMeasurement {
468            probing_rate,
469            configurations,
470            m_type: m_type as u32,
471            is_unicast,
472            is_ipv6,
473            is_divide,
474            worker_interval,
475            is_responsive,
476            is_latency,
477            targets: Some(Targets {
478                dst_list: ips,
479                is_discovery: None,
480            }),
481            record: dns_record.to_string(),
482            url,
483            probe_interval,
484            number_of_probes,
485        };
486
487        let args = MeasurementExecutionArgs {
488            is_cli,
489            is_parquet,
490            is_shuffle,
491            hitlist_path,
492            hitlist_length,
493            out_path: path,
494            is_config,
495            worker_map,
496        };
497
498        cli_client
499            .do_measurement_to_server(m_definition, args)
500            .await
501    } else {
502        panic!("Unrecognized command");
503    }
504}
505
506fn validate_path_perms(path: Option<&String>) -> Option<Result<(), Box<dyn Error>>> {
507    if let Some(path_str) = path {
508        let path = Path::new(path_str);
509
510        if !path_str.ends_with('/') {
511            // User provided a file
512            if path.exists() {
513                if path.is_dir() {
514                    println!("[CLI] Path is already a directory, exiting");
515                    return Some(Err("Path is already a directory".into()));
516                } else if fs::metadata(path)
517                    .expect("Unable to get path metadata")
518                    .permissions()
519                    .readonly()
520                {
521                    println!("[CLI] Lacking write permissions for file {path_str}");
522                    return Some(Err("Lacking write permissions".into()));
523                } else {
524                    println!("[CLI] Overwriting existing file {path_str} when measurement is done");
525                }
526            } else {
527                println!("[CLI] Writing results to new file {path_str}");
528
529                // File does not yet exist, create it to verify permissions
530                File::create(path)
531                    .expect("Unable to create output file")
532                    .sync_all()
533                    .expect("Unable to sync file");
534                fs::remove_file(path).expect("Unable to remove file");
535            }
536        } else {
537            // User provided a directory
538            if path.exists() {
539                if !path.is_dir() {
540                    println!("[CLI] Path is already a file, exiting");
541                    return Some(Err("Cannot make dir, file with name already exists.".into()));
542                } else if fs::metadata(path)
543                    .expect("Unable to get path metadata")
544                    .permissions()
545                    .readonly()
546                {
547                    println!("[CLI] Lacking write permissions for directory {path_str}");
548                    return Some(Err("Path is not writable".into()));
549                } else {
550                    println!("[CLI] Writing results to existing directory {path_str}");
551                }
552            } else {
553                println!("[CLI] Writing results to new directory {path_str}");
554
555                // Attempt creating path to verify permissions
556                fs::create_dir_all(path).expect("Unable to create output directory");
557            }
558        }
559    }
560    None
561}
562
563/// Get the hitlist from a file.
564///
565/// # Arguments
566///
567/// * 'hitlist_path' - path to the hitlist file
568///
569/// * 'configurations' - list of configurations to check the source address type
570///
571/// * 'is_unicast' - boolean whether the measurement is unicast or anycast
572///
573/// * 'is_shuffle' - boolean whether the hitlist should be shuffled or not
574///
575/// # Returns
576///
577/// * A tuple containing a vector of addresses and a boolean indicating whether the addresses are IPv6 or IPv4.
578///
579/// # Panics
580///
581/// * If the hitlist file cannot be opened.
582///
583/// * If the anycast source address type (v4 or v6) does not match the hitlist addresses.
584///
585/// * If the hitlist addresses are of mixed types (v4 and v6).
586fn get_hitlist(
587    hitlist_path: &String,
588    configurations: &[Configuration],
589    is_unicast: bool,
590    is_shuffle: bool,
591) -> (Vec<Address>, bool) {
592    let file =
593        File::open(hitlist_path).unwrap_or_else(|_| panic!("Unable to open file {hitlist_path}"));
594
595    // Create reader based on file extension
596    let reader: Box<dyn BufRead> = if hitlist_path.ends_with(".gz") {
597        let decoder = GzDecoder::new(file);
598        Box::new(BufReader::new(decoder))
599    } else {
600        Box::new(BufReader::new(file))
601    };
602
603    let mut ips: Vec<Address> = reader // Create a vector of addresses from the file
604        .lines()
605        .map_while(Result::ok) // Handle potential errors
606        .filter(|l| !l.trim().is_empty()) // Skip empty lines
607        .map(Address::from)
608        .collect();
609    let is_ipv6 = ips.first().unwrap().is_v6();
610
611    // Panic if the source IP is not the same type as the addresses
612    if !is_unicast
613        && configurations
614            .first()
615            .expect("Empty configuration list")
616            .origin
617            .expect("No origin found")
618            .src
619            .expect("No source address")
620            .is_v6()
621            != is_ipv6
622    {
623        panic!(
624            "Hitlist addresses are not the same type as the source addresses used! (IPv4 & IPv6)"
625        );
626    }
627    // Panic if the ips in the hitlist are not all the same type
628    if ips.iter().any(|ip| ip.is_v6() != is_ipv6) {
629        panic!("Hitlist addresses are not all of the same type! (mixed IPv4 & IPv6)");
630    }
631
632    // Shuffle the hitlist, if desired
633    if is_shuffle {
634        ips.as_mut_slice().shuffle(&mut rand::rng());
635    }
636    (ips, is_ipv6)
637}
638
639pub struct MeasurementExecutionArgs<'a> {
640    /// Determines whether results should be streamed to the command-line interface as they arrive.
641    pub is_cli: bool,
642
643    /// Indicates whether the results should be written in Parquet format (default: .csv.gz).
644    pub is_parquet: bool,
645
646    /// Specifies whether the list of targets should be shuffled before the measurement begins.
647    pub is_shuffle: bool,
648
649    /// The path to the file containing the list of measurement targets (the "hitlist").
650    pub hitlist_path: &'a str,
651
652    /// The total number of targets in the hitlist, used for estimating measurement duration.
653    pub hitlist_length: usize,
654
655    /// An optional path to a file where the final measurement results should be saved.
656    /// If `None`, results will be written to the current directory with a default naming convention.
657    pub out_path: Option<&'a String>,
658
659    /// Indicates whether the measurement is configuration-based (using a configuration file)
660    pub is_config: bool,
661
662    /// A bidirectional map used to resolve worker IDs to their corresponding hostnames.
663    pub worker_map: BiHashMap<u32, String>,
664}
665
666impl CliClient {
667    /// Perform a measurement at the orchestrator, await measurement results, and write them to a file.
668    ///
669    /// # Arguments
670    ///
671    /// * 'm_definition' - measurement definition created from the command-line arguments
672    ///
673    /// * 'is_cli' - boolean whether the results should be streamed to the CLI or not
674    ///
675    /// * 'is_shuffle' - boolean whether the hitlist should be shuffled or not
676    ///
677    /// * 'hitlist' - hitlist file path
678    ///
679    /// * 'hitlist_length' - length of hitlist (i.e., number of target addresses)
680    ///
681    /// * 'path' - optional path for output file (default is current directory)
682    ///
683    /// * 'is_config' - boolean whether the measurement is configuration-based or not
684    ///
685    /// * 'worker_map' - bidirectional map of worker IDs to hostnames
686    async fn do_measurement_to_server(
687        &mut self,
688        m_definition: ScheduleMeasurement,
689        args: MeasurementExecutionArgs<'_>,
690    ) -> Result<(), Box<dyn Error>> {
691        let is_divide = m_definition.is_divide;
692        let is_ipv6 = m_definition.is_ipv6;
693        let probing_rate = m_definition.probing_rate as f32;
694        let worker_interval = m_definition.worker_interval;
695        let m_type = m_definition.m_type;
696        let is_unicast = m_definition.is_unicast;
697        let is_latency = m_definition.is_latency;
698        let is_responsive = m_definition.is_responsive;
699        let origin_str = if is_unicast {
700            m_definition
701                .configurations
702                .first()
703                .and_then(|conf| conf.origin.as_ref())
704                .map(|origin| {
705                    format!(
706                        "Unicast (source port: {}, destination port: {})",
707                        origin.sport, origin.dport
708                    )
709                })
710                .expect("No unicast origin found")
711        } else if args.is_config {
712            "Anycast configuration-based".to_string()
713        } else {
714            m_definition
715                .configurations
716                .first()
717                .and_then(|conf| conf.origin.as_ref())
718                .map(|origin| {
719                    format!(
720                        "Anycast (source IP: {}, source port: {}, destination port: {})",
721                        origin.src.unwrap(),
722                        origin.sport,
723                        origin.dport
724                    )
725                })
726                .expect("No anycast origin found")
727        };
728
729        // List of Worker IDs that are sending out probes (empty means all)
730        let probing_workers: Vec<String> = if m_definition
731            .configurations
732            .iter()
733            .any(|config| config.worker_id == u32::MAX)
734        {
735            Vec::new() // all workers are probing
736        } else {
737            // Get list of unique worker hostnames that are probing
738            m_definition
739                .configurations
740                .iter()
741                .map(|config| {
742                    args.worker_map
743                        .get_by_left(&config.worker_id)
744                        .unwrap_or_else(|| {
745                            panic!("Worker ID {} is not a connected worker!", config.worker_id)
746                        })
747                        .clone()
748                })
749                .collect::<HashSet<String>>() // Use HashSet to ensure uniqueness
750                .into_iter()
751                .collect::<Vec<String>>()
752        };
753
754        let number_of_probers = if probing_workers.is_empty() {
755            args.worker_map.len() as f32
756        } else {
757            probing_workers.len() as f32
758        };
759
760        let m_time = if is_divide || is_latency {
761            ((args.hitlist_length as f32 / (probing_rate * number_of_probers)) + 1.0) / 60.0
762        } else {
763            (((number_of_probers - 1.0) * worker_interval as f32) // Last worker starts probing
764                + (args.hitlist_length as f32 / probing_rate) // Time to probe all addresses
765                + 1.0) // Time to wait for last replies
766                / 60.0 // Convert to minutes
767        };
768
769        if is_divide {
770            println!("[CLI] Divide-and-conquer enabled");
771        }
772        println!("[CLI] This measurement will take an estimated {m_time:.2} minutes");
773
774        let response = self
775            .grpc_client
776            .do_measurement(Request::new(m_definition.clone()))
777            .await;
778        if let Err(e) = response {
779            println!(
780                "[CLI] Orchestrator did not perform the measurement for reason: '{}'",
781                e.message()
782            );
783            return Err(Box::new(e));
784        }
785        let start = SystemTime::now()
786            .duration_since(UNIX_EPOCH)
787            .unwrap()
788            .as_nanos() as u64;
789        let timestamp_start = Local::now();
790        let timestamp_start_str = timestamp_start.format("%Y-%m-%dT%H_%M_%S").to_string();
791        println!(
792            "[CLI] Measurement started at {}",
793            timestamp_start.format("%H:%M:%S")
794        );
795
796        // Progress bar
797        let total_steps = (m_time * 60.0) as u64; // measurement_length in seconds
798        let pb = ProgressBar::new(total_steps);
799        pb.set_style(
800            ProgressStyle::with_template(
801                "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
802            )
803            .unwrap()
804            .progress_chars("#>-"),
805        );
806        let is_done = Arc::new(AtomicBool::new(false));
807        let is_done_clone = is_done.clone();
808
809        // Spawn a separate async task to update the progress bar
810        tokio::spawn(async move {
811            // If we are streaming to the CLI, we cannot use a progress bar
812            if !args.is_cli {
813                for _ in 0..total_steps {
814                    if is_done_clone.load(Ordering::Relaxed) {
815                        break;
816                    }
817                    pb.inc(1); // Increment the progress bar by one step
818                    tokio::time::sleep(Duration::from_secs(1)).await; // Simulate time taken for each step
819                }
820            }
821        });
822
823        let mut graceful = false; // Will be set to true if the stream closes gracefully
824                                  // Obtain the Stream from the orchestrator and read from it
825        let mut stream = response
826            .expect("Unable to obtain the orchestrator stream")
827            .into_inner();
828        // Channel for writing results to file
829        let (tx_r, rx_r) = unbounded_channel();
830
831        // Get measurement type
832        let type_str = match m_type as u8 {
833            ICMP_ID => "ICMP",
834            A_ID => "DNS",
835            TCP_ID => "TCP",
836            CHAOS_ID => "CHAOS",
837            _ => "ICMP",
838        };
839        let type_str = if is_ipv6 {
840            format!("{type_str}v6")
841        } else {
842            format!("{type_str}v4")
843        };
844
845        // Determine the type of measurement
846        let filetype = if is_unicast { "GCD_" } else { "MAnycast_" };
847
848        // Determine the file extension based on the output format
849        let mut is_parquet = args.is_parquet;
850
851        let extension = if is_parquet { ".parquet" } else { ".csv.gz" };
852
853        // Output file
854        let file_path = if let Some(path) = args.out_path {
855            if path.ends_with('/') {
856                // User provided a path, use default naming convention for file
857                format!("{path}{filetype}{type_str}{timestamp_start_str}{extension}")
858            } else {
859                // User provided a file (with possibly a path)
860
861                if path.ends_with(".parquet") {
862                    is_parquet = true; // If the file ends with .parquet, we will write in Parquet format
863                }
864                path.to_string()
865            }
866        } else {
867            // Write file to current directory using default naming convention
868            format!("./{filetype}{type_str}{timestamp_start_str}{extension}")
869        };
870
871        // Create the output file
872        let file = File::create(file_path).expect("Unable to create file");
873
874        let metadata_args = MetadataArgs {
875            is_divide,
876            origin_str,
877            hitlist: args.hitlist_path,
878            is_shuffle: args.is_shuffle,
879            m_type_str: type_str,
880            probing_rate: probing_rate as u32,
881            interval: worker_interval,
882            active_workers: probing_workers,
883            all_workers: &args.worker_map,
884            configurations: &m_definition.configurations,
885            is_config: args.is_config,
886            is_latency,
887            is_responsive,
888        };
889
890        let is_multi_origin = if is_unicast {
891            false
892        } else {
893            // Check if any configuration has origin_id that is not 0 or u32::MAX
894            m_definition.configurations.iter().any(|conf| {
895                conf.origin
896                    .as_ref()
897                    .is_some_and(|origin| origin.origin_id != 0 && origin.origin_id != u32::MAX)
898            })
899        };
900
901        let config = WriteConfig {
902            print_to_cli: args.is_cli,
903            output_file: file,
904            metadata_args,
905            m_type,
906            is_multi_origin,
907            is_symmetric: is_unicast || is_latency,
908            worker_map: args.worker_map.clone(),
909        };
910
911        // Start thread that writes results to file
912        if is_parquet {
913            write_results_parquet(rx_r, config);
914        } else {
915            write_results(rx_r, config);
916        }
917
918        let mut replies_count = 0;
919        'mloop: while let Some(task_result) = match stream.message().await {
920            Ok(Some(result)) => Some(result),
921            Ok(None) => {
922                eprintln!("Stream closed by orchestrator");
923                break 'mloop;
924            } // Stream is exhausted
925            Err(e) => {
926                eprintln!("Error receiving message: {e}");
927                break 'mloop;
928            }
929        } {
930            // A default result notifies the CLI that it should not expect any more results
931            if task_result == TaskResult::default() {
932                tx_r.send(task_result).unwrap(); // Let the results channel know that we are done
933                graceful = true;
934                break;
935            }
936
937            replies_count += task_result.result_list.len();
938            // Send the results to the file channel
939            tx_r.send(task_result).unwrap();
940        }
941
942        is_done.store(true, Ordering::Relaxed); // Signal the progress bar to stop
943
944        let end = SystemTime::now()
945            .duration_since(UNIX_EPOCH)
946            .unwrap()
947            .as_nanos() as u64;
948        let length = (end - start) as f64 / 1_000_000_000.0; // Measurement length in seconds
949        println!("[CLI] Waited {length:.6} seconds for results.");
950        println!(
951            "[CLI] Time of end measurement {}",
952            Local::now().format("%H:%M:%S")
953        );
954        println!(
955            "[CLI] Number of replies captured: {}",
956            replies_count.with_separator()
957        );
958
959        // If the stream closed during a measurement
960        if !graceful {
961            tx_r.send(TaskResult::default()).unwrap(); // Let the results channel know that we are done
962            println!("[CLI] Measurement ended prematurely!");
963        }
964
965        tx_r.closed().await; // Wait for all results to be written to file
966
967        Ok(())
968    }
969
970    /// Connect to the orchestrator
971    ///
972    /// # Arguments
973    ///
974    /// * 'address' - the address of the orchestrator (e.g., 10.10.10.10:50051)
975    ///
976    /// * 'fqdn' - an optional string that contains the FQDN of the orchestrator certificate (if TLS is enabled)
977    ///
978    /// # Returns
979    ///
980    /// A gRPC client that is connected to the orchestrator
981    ///
982    /// # Panics
983    ///
984    /// If the connection to the orchestrator fails
985    ///
986    /// # Remarks
987    ///
988    /// TLS enabled requires a certificate at ./tls/orchestrator.crt
989    async fn connect(
990        address: &str,
991        fqdn: Option<&String>,
992    ) -> Result<ControllerClient<Channel>, Box<dyn Error>> {
993        let channel = if let Some(fqdn) = fqdn {
994            // Secure connection
995            let addr = format!("https://{address}");
996
997            // Load the CA certificate used to authenticate the orchestrator
998            let pem = fs::read_to_string("tls/orchestrator.crt")
999                .expect("Unable to read CA certificate at ./tls/orchestrator.crt");
1000            let ca = Certificate::from_pem(pem);
1001
1002            let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(fqdn);
1003
1004            let builder = Channel::from_shared(addr.to_owned())?; // Use the address provided
1005            builder
1006                .tls_config(tls)
1007                .expect("Unable to set TLS configuration")
1008                .connect()
1009                .await
1010                .expect("Unable to connect to orchestrator")
1011        } else {
1012            // Unsecure connection
1013            let addr = format!("http://{address}");
1014
1015            Channel::from_shared(addr.to_owned())
1016                .expect("Unable to set address")
1017                .connect()
1018                .await
1019                .expect("Unable to connect to orchestrator")
1020        };
1021        // Create client with secret token that is used to authenticate client commands.
1022        let client = ControllerClient::new(channel);
1023
1024        Ok(client)
1025    }
1026}