1mod writer;
2
3use std::collections::HashSet;
4use std::error::Error;
5use std::fs;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use std::str::FromStr;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use bimap::BiHashMap;
15use chrono::Local;
16use clap::ArgMatches;
17use flate2::read::GzDecoder;
18use indicatif::{ProgressBar, ProgressStyle};
19use prettytable::{color, format, row, Attr, Cell, Row, Table};
20use rand::seq::SliceRandom;
21use tokio::sync::mpsc::unbounded_channel;
22use tonic::codec::CompressionEncoding;
23use tonic::transport::{Certificate, Channel, ClientTlsConfig};
24use tonic::Request;
25
26use custom_module::manycastr::{
27 controller_client::ControllerClient, Address, Configuration, Empty, Origin,
28 ScheduleMeasurement, Targets, TaskResult,
29};
30use custom_module::Separated;
31
32use crate::cli::writer::write_results;
33use crate::cli::writer::write_results_parquet;
34use crate::cli::writer::{MetadataArgs, WriteConfig};
35use crate::{custom_module, ALL_ID, A_ID, CHAOS_ID, ICMP_ID, TCP_ID};
36
37pub struct CliClient {
39 grpc_client: ControllerClient<Channel>,
40}
41
42#[tokio::main]
48pub async fn execute(args: &ArgMatches) -> Result<(), Box<dyn Error>> {
49 let server_address = args.get_one::<String>("orchestrator").unwrap();
50 let fqdn = args.get_one::<String>("tls");
51
52 println!("[CLI] Connecting to orchestrator - {server_address}");
54 let mut grpc_client = CliClient::connect(server_address, fqdn)
55 .await
56 .expect("Unable to connect to orchestrator")
57 .send_compressed(CompressionEncoding::Zstd);
58
59 let response = grpc_client
61 .list_workers(Request::new(Empty::default()))
62 .await
63 .expect("Connection to orchestrator failed");
64
65 let mut cli_client = CliClient { grpc_client };
66
67 if args.subcommand_matches("worker-list").is_some() {
68 println!("[CLI] Requesting workers list from orchestrator");
70 let mut table = Table::new();
72 table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
73 table.add_row(Row::new(vec![
74 Cell::new("Hostname")
75 .with_style(Attr::Bold)
76 .with_style(Attr::ForegroundColor(color::GREEN)),
77 Cell::new("Worker ID")
78 .with_style(Attr::Bold)
79 .with_style(Attr::ForegroundColor(color::GREEN)),
80 Cell::new("Status")
81 .with_style(Attr::Bold)
82 .with_style(Attr::ForegroundColor(color::GREEN)),
83 Cell::new("Unicast IPv4")
84 .with_style(Attr::Bold)
85 .with_style(Attr::ForegroundColor(color::GREEN)),
86 Cell::new("Unicast IPv6")
87 .with_style(Attr::Bold)
88 .with_style(Attr::ForegroundColor(color::GREEN)),
89 ]));
90
91 let mut connected_workers = 0;
92 let mut workers = response.into_inner().workers;
93 workers.sort_by(|a, b| a.worker_id.cmp(&b.worker_id));
94
95 for worker in workers {
96 let unicast_v4 = if let Some(addr) = &worker.unicast_v4 {
97 addr.to_string()
98 } else {
99 "N/A".to_string()
100 };
101
102 let unicast_v6 = if let Some(addr) = &worker.unicast_v6 {
103 addr.to_string()
104 } else {
105 "N/A".to_string()
106 };
107
108 table.add_row(row![
109 worker.hostname,
110 worker.worker_id,
111 worker.status,
112 unicast_v4,
113 unicast_v6
114 ]);
115 if worker.status != "DISCONNECTED" {
116 connected_workers += 1;
117 }
118 }
119
120 table.printstd();
121 println!("[CLI] Total connected workers: {connected_workers}");
122
123 Ok(())
124 } else if let Some(matches) = args.subcommand_matches("start") {
125 let is_unicast = matches.get_flag("unicast");
127 let is_divide = matches.get_flag("divide");
128 let is_responsive = matches.get_flag("responsive");
129 let mut is_latency = matches.get_flag("latency");
130
131 if is_responsive && is_divide {
132 panic!("Incompatible flags: Responsive mode cannot be combined with divide-and-conquer measurements.");
133 } else if is_latency && (is_divide || is_responsive) {
134 panic!("Incompatible flags: Latency mode cannot be combined with divide-and-conquer or responsive measurements.");
135 } else if is_unicast && is_latency {
136 is_latency = false; }
138
139 let worker_map: BiHashMap<u32, String> = response
141 .into_inner()
142 .workers
143 .into_iter()
144 .map(|worker| (worker.worker_id, worker.hostname)) .collect();
146
147 let url = matches.get_one::<String>("url").unwrap().clone();
149
150 let src = matches.get_one::<String>("address").map(Address::from);
152
153 let m_type = match matches
155 .get_one::<String>("type")
156 .unwrap()
157 .to_lowercase()
158 .as_str()
159 {
160 "icmp" => ICMP_ID,
161 "dns" => A_ID,
162 "tcp" => TCP_ID,
163 "chaos" => CHAOS_ID,
164 "all" => ALL_ID,
165 _ => panic!("Invalid measurement type! (can be either ICMP, DNS, TCP, all, or CHAOS)"),
166 };
167
168 if is_latency && m_type == TCP_ID {
170 panic!("TCP measurements are not supported in latency mode!");
171 }
172
173 if is_responsive && is_unicast && m_type == TCP_ID {
175 panic!("Responsive mode not supported for unicast TCP measurements");
176 }
177
178 let is_config = matches.contains_id("configuration");
179
180 let sender_ids: Vec<u32> = matches.get_one::<String>("selective").map_or_else(
182 || {
183 println!(
184 "[CLI] Probes will be sent out from all ({}) workers",
185 worker_map.len()
186 );
187 Vec::new()
188 },
189 |worker_entries_str| {
190 println!("[CLI] Selective probing using specified workers...");
191 worker_entries_str
192 .trim_matches(|c| c == '[' || c == ']')
193 .split(',')
194 .filter_map(|entry_str_untrimmed| {
195 let entry_str = entry_str_untrimmed.trim();
196 if entry_str.is_empty() {
197 return None; }
199 if let Ok(id_val) = entry_str.parse::<u32>() {
201 if worker_map.contains_left(&id_val) {
202 Some(id_val)
203 } else {
204 panic!("Worker ID '{entry_str}' is not a known worker.");
205 }
206 } else if let Some(&found_id) = worker_map.get_by_right(entry_str) {
207 Some(found_id)
209 } else {
210 panic!("'{entry_str}' is not a valid worker ID or known hostname.");
211 }
212 })
213 .collect()
214 },
215 );
216
217 if !sender_ids.is_empty() {
219 println!("[CLI] Selective probing using the following workers:");
220 sender_ids.iter().for_each(|id| {
221 let hostname = worker_map.get_by_left(id).unwrap_or_else(|| {
222 panic!("Worker ID {id} is not a connected worker!");
223 });
224 println!("[CLI]\t * ID: {id}, Hostname: {hostname}");
225 });
226 }
227
228 let configurations = if is_config {
230 let conf_file = matches.get_one::<String>("configuration").unwrap();
231 println!("[CLI] Using configuration file: {conf_file}");
232 let file = File::open(conf_file)
233 .unwrap_or_else(|_| panic!("Unable to open configuration file {conf_file}"));
234 let buf_reader = BufReader::new(file);
235 let mut origin_id = 0;
236 let mut is_ipv6: Option<bool> = None;
237 let configurations: Vec<Configuration> = buf_reader .lines()
239 .filter_map(|line| {
240 let line = line.expect("Unable to read configuration line");
241 let line = line.trim();
242 if line.is_empty() || line.starts_with("#") {
243 return None;
244 } let parts: Vec<&str> = line.splitn(2, " - ").map(|s| s.trim()).collect();
247 if parts.len() != 2 {
248 panic!("Invalid configuration format: {line}");
249 }
250
251 let worker_id = if parts[0] == "ALL" {
253 u32::MAX
254 } else if let Ok(id_val) = parts[0].parse::<u32>() {
255 if !worker_map.contains_left(&id_val) {
256 panic!("Worker ID {id_val} is not a known worker.");
257 }
258 id_val
259 } else if let Some(&found_id) = worker_map.get_by_right(parts[0]) {
260 found_id
262 } else {
263 panic!("'{}' is not a valid worker ID or known hostname.", parts[0]);
264 };
265
266 let addr_ports: Vec<&str> = parts[1].split(',').map(|s| s.trim()).collect();
267 if addr_ports.len() != 3 {
268 panic!("Invalid configuration format: {line}");
269 }
270 let src = Address::from(addr_ports[0]);
271
272 if let Some(v6) = is_ipv6 {
273 if v6 != src.is_v6() {
274 panic!("Configuration file contains mixed IPv4 and IPv6 addresses!");
275 }
276 } else {
277 is_ipv6 = Some(src.is_v6());
278 }
279
280 let sport =
282 u16::from_str(addr_ports[1]).expect("Unable to parse source port") as u32;
283 let dport = u16::from_str(addr_ports[2])
284 .expect("Unable to parse destination port")
285 as u32;
286 origin_id += 1;
287
288 Some(Configuration {
289 worker_id,
290 origin: Some(Origin {
291 src: Some(src),
292 sport,
293 dport,
294 origin_id,
295 }),
296 })
297 })
298 .collect();
299 if configurations.is_empty() {
300 panic!("No valid configurations found in file {conf_file}");
301 }
302
303 configurations
304 } else {
305 let sport: u32 = *matches.get_one::<u16>("source port").unwrap() as u32;
307 let dport = matches
309 .get_one::<u16>("destination port")
310 .map(|&port| port as u32)
311 .unwrap_or_else(|| {
312 if m_type == A_ID || m_type == CHAOS_ID {
313 53
314 } else {
315 63853
316 }
317 });
318
319 if sender_ids.is_empty() {
320 vec![Configuration {
322 worker_id: u32::MAX, origin: Some(Origin {
324 src,
325 sport,
326 dport,
327 origin_id: 0, }),
329 }]
330 } else {
331 sender_ids
333 .iter()
334 .map(|&worker_id| Configuration {
335 worker_id,
336 origin: Some(Origin {
337 src,
338 sport,
339 dport,
340 origin_id: 0, }),
342 })
343 .collect()
344 }
345 };
346
347 if src.is_none() && !is_config && !is_unicast {
349 panic!("No source address or configuration file provided!");
350 }
351
352 let hitlist_path = matches.get_one::<String>("IP_FILE").unwrap();
358 let is_shuffle = matches.get_flag("shuffle");
359
360 let (ips, is_ipv6) = get_hitlist(hitlist_path, &configurations, is_unicast, is_shuffle);
361
362 let dns_record = if m_type == CHAOS_ID {
364 matches
366 .get_one::<String>("query")
367 .map_or("hostname.bind", |q| q.as_str())
368 } else if m_type == A_ID || m_type == ALL_ID {
369 matches
370 .get_one::<String>("query")
371 .map_or("example.org", |q| q.as_str())
372 } else {
373 ""
374 };
375
376 let is_cli = matches.get_flag("stream");
378
379 let is_parquet = matches.get_flag("parquet");
381
382 if is_cli && is_parquet {
383 panic!("Cannot stream results to CLI and write in Parquet format at the same time!");
384 }
385
386 let worker_interval = if is_latency || is_divide {
388 0
389 } else {
390 *matches.get_one::<u32>("worker_interval").unwrap()
391 };
392 let probe_interval = *matches.get_one::<u32>("probe_interval").unwrap();
393 let probing_rate = *matches.get_one::<u32>("rate").unwrap();
394 let number_of_probes = *matches.get_one::<u32>("number_of_probes").unwrap();
395 let t_type = match m_type {
396 ICMP_ID => "ICMP",
397 A_ID => "DNS/A",
398 TCP_ID => "TCP/SYN-ACK",
399 CHAOS_ID => "DNS/CHAOS",
400 ALL_ID => "All (ICMP,DNS/A,TCP)",
401 _ => "Unknown",
402 };
403 let hitlist_length = ips.len();
404
405 println!("[CLI] Performing {} measurement targeting {} addresses, with a rate of {}, and an interval of {}",
406 t_type,
407 hitlist_length.with_separator(),
408 probing_rate.with_separator(),
409 worker_interval
410 );
411
412 if is_responsive {
413 println!("[CLI] Responsive mode enabled");
414 }
415
416 if is_latency {
417 println!("[CLI] Latency mode enabled");
418 }
419
420 if is_unicast {
422 let unicast_origin = configurations.first().unwrap().origin.unwrap();
423 println!(
424 "[CLI] Unicast probing with src port {} and dst port {}",
425 unicast_origin.sport, unicast_origin.dport
426 );
427 } else if is_config {
428 println!("[CLI] Workers send probes using the following configurations:");
429 for configuration in configurations.iter() {
430 if let Some(origin) = &configuration.origin {
431 if configuration.worker_id == u32::MAX {
432 println!(
433 "\t* All workers, source IP: {}, source port: {}, destination port: {}",
434 origin.src.unwrap(),
435 origin.sport,
436 origin.dport
437 );
438 } else {
439 let worker_hostname = worker_map
440 .get_by_left(&configuration.worker_id)
441 .expect("Worker ID not found");
442 println!(
443 "\t* worker {} (with ID: {:<2}), source IP: {}, source port: {}, destination port: {}",
444 worker_hostname, configuration.worker_id, origin.src.unwrap(), origin.sport, origin.dport
445 );
446 }
447 }
448 }
449 } else {
450 let anycast_origin = configurations.first().unwrap().origin.unwrap();
451
452 println!(
453 "[CLI] Workers probe with source IP: {}, source port: {}, destination port: {}",
454 anycast_origin.src.unwrap(),
455 anycast_origin.sport,
456 anycast_origin.dport
457 );
458 }
459
460 let path = matches.get_one::<String>("out");
462 if let Some(value) = validate_path_perms(path) {
463 return value;
464 }
465
466 let m_definition = ScheduleMeasurement {
468 probing_rate,
469 configurations,
470 m_type: m_type as u32,
471 is_unicast,
472 is_ipv6,
473 is_divide,
474 worker_interval,
475 is_responsive,
476 is_latency,
477 targets: Some(Targets {
478 dst_list: ips,
479 is_discovery: None,
480 }),
481 record: dns_record.to_string(),
482 url,
483 probe_interval,
484 number_of_probes,
485 };
486
487 let args = MeasurementExecutionArgs {
488 is_cli,
489 is_parquet,
490 is_shuffle,
491 hitlist_path,
492 hitlist_length,
493 out_path: path,
494 is_config,
495 worker_map,
496 };
497
498 cli_client
499 .do_measurement_to_server(m_definition, args)
500 .await
501 } else {
502 panic!("Unrecognized command");
503 }
504}
505
506fn validate_path_perms(path: Option<&String>) -> Option<Result<(), Box<dyn Error>>> {
507 if let Some(path_str) = path {
508 let path = Path::new(path_str);
509
510 if !path_str.ends_with('/') {
511 if path.exists() {
513 if path.is_dir() {
514 println!("[CLI] Path is already a directory, exiting");
515 return Some(Err("Path is already a directory".into()));
516 } else if fs::metadata(path)
517 .expect("Unable to get path metadata")
518 .permissions()
519 .readonly()
520 {
521 println!("[CLI] Lacking write permissions for file {path_str}");
522 return Some(Err("Lacking write permissions".into()));
523 } else {
524 println!("[CLI] Overwriting existing file {path_str} when measurement is done");
525 }
526 } else {
527 println!("[CLI] Writing results to new file {path_str}");
528
529 File::create(path)
531 .expect("Unable to create output file")
532 .sync_all()
533 .expect("Unable to sync file");
534 fs::remove_file(path).expect("Unable to remove file");
535 }
536 } else {
537 if path.exists() {
539 if !path.is_dir() {
540 println!("[CLI] Path is already a file, exiting");
541 return Some(Err("Cannot make dir, file with name already exists.".into()));
542 } else if fs::metadata(path)
543 .expect("Unable to get path metadata")
544 .permissions()
545 .readonly()
546 {
547 println!("[CLI] Lacking write permissions for directory {path_str}");
548 return Some(Err("Path is not writable".into()));
549 } else {
550 println!("[CLI] Writing results to existing directory {path_str}");
551 }
552 } else {
553 println!("[CLI] Writing results to new directory {path_str}");
554
555 fs::create_dir_all(path).expect("Unable to create output directory");
557 }
558 }
559 }
560 None
561}
562
563fn get_hitlist(
587 hitlist_path: &String,
588 configurations: &[Configuration],
589 is_unicast: bool,
590 is_shuffle: bool,
591) -> (Vec<Address>, bool) {
592 let file =
593 File::open(hitlist_path).unwrap_or_else(|_| panic!("Unable to open file {hitlist_path}"));
594
595 let reader: Box<dyn BufRead> = if hitlist_path.ends_with(".gz") {
597 let decoder = GzDecoder::new(file);
598 Box::new(BufReader::new(decoder))
599 } else {
600 Box::new(BufReader::new(file))
601 };
602
603 let mut ips: Vec<Address> = reader .lines()
605 .map_while(Result::ok) .filter(|l| !l.trim().is_empty()) .map(Address::from)
608 .collect();
609 let is_ipv6 = ips.first().unwrap().is_v6();
610
611 if !is_unicast
613 && configurations
614 .first()
615 .expect("Empty configuration list")
616 .origin
617 .expect("No origin found")
618 .src
619 .expect("No source address")
620 .is_v6()
621 != is_ipv6
622 {
623 panic!(
624 "Hitlist addresses are not the same type as the source addresses used! (IPv4 & IPv6)"
625 );
626 }
627 if ips.iter().any(|ip| ip.is_v6() != is_ipv6) {
629 panic!("Hitlist addresses are not all of the same type! (mixed IPv4 & IPv6)");
630 }
631
632 if is_shuffle {
634 ips.as_mut_slice().shuffle(&mut rand::rng());
635 }
636 (ips, is_ipv6)
637}
638
639pub struct MeasurementExecutionArgs<'a> {
640 pub is_cli: bool,
642
643 pub is_parquet: bool,
645
646 pub is_shuffle: bool,
648
649 pub hitlist_path: &'a str,
651
652 pub hitlist_length: usize,
654
655 pub out_path: Option<&'a String>,
658
659 pub is_config: bool,
661
662 pub worker_map: BiHashMap<u32, String>,
664}
665
666impl CliClient {
667 async fn do_measurement_to_server(
687 &mut self,
688 m_definition: ScheduleMeasurement,
689 args: MeasurementExecutionArgs<'_>,
690 ) -> Result<(), Box<dyn Error>> {
691 let is_divide = m_definition.is_divide;
692 let is_ipv6 = m_definition.is_ipv6;
693 let probing_rate = m_definition.probing_rate as f32;
694 let worker_interval = m_definition.worker_interval;
695 let m_type = m_definition.m_type;
696 let is_unicast = m_definition.is_unicast;
697 let is_latency = m_definition.is_latency;
698 let is_responsive = m_definition.is_responsive;
699 let origin_str = if is_unicast {
700 m_definition
701 .configurations
702 .first()
703 .and_then(|conf| conf.origin.as_ref())
704 .map(|origin| {
705 format!(
706 "Unicast (source port: {}, destination port: {})",
707 origin.sport, origin.dport
708 )
709 })
710 .expect("No unicast origin found")
711 } else if args.is_config {
712 "Anycast configuration-based".to_string()
713 } else {
714 m_definition
715 .configurations
716 .first()
717 .and_then(|conf| conf.origin.as_ref())
718 .map(|origin| {
719 format!(
720 "Anycast (source IP: {}, source port: {}, destination port: {})",
721 origin.src.unwrap(),
722 origin.sport,
723 origin.dport
724 )
725 })
726 .expect("No anycast origin found")
727 };
728
729 let probing_workers: Vec<String> = if m_definition
731 .configurations
732 .iter()
733 .any(|config| config.worker_id == u32::MAX)
734 {
735 Vec::new() } else {
737 m_definition
739 .configurations
740 .iter()
741 .map(|config| {
742 args.worker_map
743 .get_by_left(&config.worker_id)
744 .unwrap_or_else(|| {
745 panic!("Worker ID {} is not a connected worker!", config.worker_id)
746 })
747 .clone()
748 })
749 .collect::<HashSet<String>>() .into_iter()
751 .collect::<Vec<String>>()
752 };
753
754 let number_of_probers = if probing_workers.is_empty() {
755 args.worker_map.len() as f32
756 } else {
757 probing_workers.len() as f32
758 };
759
760 let m_time = if is_divide || is_latency {
761 ((args.hitlist_length as f32 / (probing_rate * number_of_probers)) + 1.0) / 60.0
762 } else {
763 (((number_of_probers - 1.0) * worker_interval as f32) + (args.hitlist_length as f32 / probing_rate) + 1.0) / 60.0 };
768
769 if is_divide {
770 println!("[CLI] Divide-and-conquer enabled");
771 }
772 println!("[CLI] This measurement will take an estimated {m_time:.2} minutes");
773
774 let response = self
775 .grpc_client
776 .do_measurement(Request::new(m_definition.clone()))
777 .await;
778 if let Err(e) = response {
779 println!(
780 "[CLI] Orchestrator did not perform the measurement for reason: '{}'",
781 e.message()
782 );
783 return Err(Box::new(e));
784 }
785 let start = SystemTime::now()
786 .duration_since(UNIX_EPOCH)
787 .unwrap()
788 .as_nanos() as u64;
789 let timestamp_start = Local::now();
790 let timestamp_start_str = timestamp_start.format("%Y-%m-%dT%H_%M_%S").to_string();
791 println!(
792 "[CLI] Measurement started at {}",
793 timestamp_start.format("%H:%M:%S")
794 );
795
796 let total_steps = (m_time * 60.0) as u64; let pb = ProgressBar::new(total_steps);
799 pb.set_style(
800 ProgressStyle::with_template(
801 "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
802 )
803 .unwrap()
804 .progress_chars("#>-"),
805 );
806 let is_done = Arc::new(AtomicBool::new(false));
807 let is_done_clone = is_done.clone();
808
809 tokio::spawn(async move {
811 if !args.is_cli {
813 for _ in 0..total_steps {
814 if is_done_clone.load(Ordering::Relaxed) {
815 break;
816 }
817 pb.inc(1); tokio::time::sleep(Duration::from_secs(1)).await; }
820 }
821 });
822
823 let mut graceful = false; let mut stream = response
826 .expect("Unable to obtain the orchestrator stream")
827 .into_inner();
828 let (tx_r, rx_r) = unbounded_channel();
830
831 let type_str = match m_type as u8 {
833 ICMP_ID => "ICMP",
834 A_ID => "DNS",
835 TCP_ID => "TCP",
836 CHAOS_ID => "CHAOS",
837 _ => "ICMP",
838 };
839 let type_str = if is_ipv6 {
840 format!("{type_str}v6")
841 } else {
842 format!("{type_str}v4")
843 };
844
845 let filetype = if is_unicast { "GCD_" } else { "MAnycast_" };
847
848 let mut is_parquet = args.is_parquet;
850
851 let extension = if is_parquet { ".parquet" } else { ".csv.gz" };
852
853 let file_path = if let Some(path) = args.out_path {
855 if path.ends_with('/') {
856 format!("{path}{filetype}{type_str}{timestamp_start_str}{extension}")
858 } else {
859 if path.ends_with(".parquet") {
862 is_parquet = true; }
864 path.to_string()
865 }
866 } else {
867 format!("./{filetype}{type_str}{timestamp_start_str}{extension}")
869 };
870
871 let file = File::create(file_path).expect("Unable to create file");
873
874 let metadata_args = MetadataArgs {
875 is_divide,
876 origin_str,
877 hitlist: args.hitlist_path,
878 is_shuffle: args.is_shuffle,
879 m_type_str: type_str,
880 probing_rate: probing_rate as u32,
881 interval: worker_interval,
882 active_workers: probing_workers,
883 all_workers: &args.worker_map,
884 configurations: &m_definition.configurations,
885 is_config: args.is_config,
886 is_latency,
887 is_responsive,
888 };
889
890 let is_multi_origin = if is_unicast {
891 false
892 } else {
893 m_definition.configurations.iter().any(|conf| {
895 conf.origin
896 .as_ref()
897 .is_some_and(|origin| origin.origin_id != 0 && origin.origin_id != u32::MAX)
898 })
899 };
900
901 let config = WriteConfig {
902 print_to_cli: args.is_cli,
903 output_file: file,
904 metadata_args,
905 m_type,
906 is_multi_origin,
907 is_symmetric: is_unicast || is_latency,
908 worker_map: args.worker_map.clone(),
909 };
910
911 if is_parquet {
913 write_results_parquet(rx_r, config);
914 } else {
915 write_results(rx_r, config);
916 }
917
918 let mut replies_count = 0;
919 'mloop: while let Some(task_result) = match stream.message().await {
920 Ok(Some(result)) => Some(result),
921 Ok(None) => {
922 eprintln!("Stream closed by orchestrator");
923 break 'mloop;
924 } Err(e) => {
926 eprintln!("Error receiving message: {e}");
927 break 'mloop;
928 }
929 } {
930 if task_result == TaskResult::default() {
932 tx_r.send(task_result).unwrap(); graceful = true;
934 break;
935 }
936
937 replies_count += task_result.result_list.len();
938 tx_r.send(task_result).unwrap();
940 }
941
942 is_done.store(true, Ordering::Relaxed); let end = SystemTime::now()
945 .duration_since(UNIX_EPOCH)
946 .unwrap()
947 .as_nanos() as u64;
948 let length = (end - start) as f64 / 1_000_000_000.0; println!("[CLI] Waited {length:.6} seconds for results.");
950 println!(
951 "[CLI] Time of end measurement {}",
952 Local::now().format("%H:%M:%S")
953 );
954 println!(
955 "[CLI] Number of replies captured: {}",
956 replies_count.with_separator()
957 );
958
959 if !graceful {
961 tx_r.send(TaskResult::default()).unwrap(); println!("[CLI] Measurement ended prematurely!");
963 }
964
965 tx_r.closed().await; Ok(())
968 }
969
970 async fn connect(
990 address: &str,
991 fqdn: Option<&String>,
992 ) -> Result<ControllerClient<Channel>, Box<dyn Error>> {
993 let channel = if let Some(fqdn) = fqdn {
994 let addr = format!("https://{address}");
996
997 let pem = fs::read_to_string("tls/orchestrator.crt")
999 .expect("Unable to read CA certificate at ./tls/orchestrator.crt");
1000 let ca = Certificate::from_pem(pem);
1001
1002 let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(fqdn);
1003
1004 let builder = Channel::from_shared(addr.to_owned())?; builder
1006 .tls_config(tls)
1007 .expect("Unable to set TLS configuration")
1008 .connect()
1009 .await
1010 .expect("Unable to connect to orchestrator")
1011 } else {
1012 let addr = format!("http://{address}");
1014
1015 Channel::from_shared(addr.to_owned())
1016 .expect("Unable to set address")
1017 .connect()
1018 .await
1019 .expect("Unable to connect to orchestrator")
1020 };
1021 let client = ControllerClient::new(channel);
1023
1024 Ok(client)
1025 }
1026}