1use std::fs::File;
2use std::io;
3use std::io::Write;
4
5use bimap::BiHashMap;
6use csv::Writer;
7use tokio::sync::mpsc::UnboundedReceiver;
8
9use custom_module::manycastr::{Configuration, Reply, TaskResult};
10use custom_module::Separated;
11use flate2::write::GzEncoder;
12use flate2::Compression;
13use std::io::BufWriter;
14use std::sync::Arc;
15
16use parquet::basic::{Compression as ParquetCompression, LogicalType, Repetition};
17use parquet::data_type::{ByteArray, DoubleType, Int32Type, Int64Type};
18use parquet::file::properties::WriterProperties;
19use parquet::file::writer::SerializedFileWriter;
20use parquet::schema::types::{Type as SchemaType, TypePtr};
21
22use crate::{custom_module, CHAOS_ID, TCP_ID};
23
24pub struct WriteConfig<'a> {
30 pub print_to_cli: bool,
32 pub output_file: File,
34 pub metadata_args: MetadataArgs<'a>,
36 pub m_type: u32,
39 pub is_multi_origin: bool,
42 pub is_symmetric: bool,
45 pub worker_map: BiHashMap<u32, String>,
47}
48
49pub struct MetadataArgs<'a> {
51 pub is_divide: bool,
53 pub origin_str: String,
55 pub hitlist: &'a str,
57 pub is_shuffle: bool,
59 pub m_type_str: String,
61 pub probing_rate: u32,
63 pub interval: u32,
65 pub active_workers: Vec<String>,
67 pub all_workers: &'a BiHashMap<u32, String>,
69 pub configurations: &'a Vec<Configuration>,
71 pub is_config: bool,
73 pub is_latency: bool,
75 pub is_responsive: bool,
77}
78
79pub fn write_results(mut rx: UnboundedReceiver<TaskResult>, config: WriteConfig) {
87 let mut wtr_cli = if config.print_to_cli {
89 Some(Writer::from_writer(io::stdout()))
90 } else {
91 None
92 };
93
94 let buffered_file_writer = BufWriter::new(config.output_file);
95 let mut gz_encoder = GzEncoder::new(buffered_file_writer, Compression::default());
96
97 let md_lines = get_csv_metadata(config.metadata_args, &config.worker_map);
99 for line in md_lines {
100 if let Err(e) = writeln!(gz_encoder, "{line}") {
101 eprintln!("Failed to write metadata line to Gzip stream: {e}");
102 }
103 }
104
105 let mut wtr_file = Writer::from_writer(gz_encoder);
107
108 let header = get_header(config.m_type, config.is_multi_origin, config.is_symmetric);
110 if let Some(wtr) = wtr_cli.as_mut() {
111 wtr.write_record(&header)
112 .expect("Failed to write header to stdout")
113 };
114 wtr_file
115 .write_record(header)
116 .expect("Failed to write header to file");
117
118 tokio::spawn(async move {
119 while let Some(task_result) = rx.recv().await {
121 if task_result == TaskResult::default() {
122 break;
123 }
124 let results: Vec<Reply> = task_result.result_list;
125 for result in results {
126 let result = get_row(
127 result,
128 task_result.worker_id,
129 config.m_type as u8,
130 config.is_symmetric,
131 config.worker_map.clone(),
132 );
133
134 if let Some(ref mut wtr) = wtr_cli {
136 wtr.write_record(&result)
137 .expect("Failed to write payload to CLI");
138 wtr.flush().expect("Failed to flush stdout");
139 }
140
141 wtr_file
143 .write_record(result)
144 .expect("Failed to write payload to file");
145 }
146 wtr_file.flush().expect("Failed to flush file");
147 }
148 rx.close();
149 wtr_file.flush().expect("Failed to flush file");
150 });
151}
152
153pub fn get_header(m_type: u32, is_multi_origin: bool, is_symmetric: bool) -> Vec<&'static str> {
163 let mut header = if is_symmetric {
164 vec!["rx", "addr", "ttl", "rtt"]
165 } else {
166 if m_type == TCP_ID as u32 {
168 vec!["rx", "rx_time", "addr", "ttl", "tx"]
169 } else {
170 vec!["rx", "rx_time", "addr", "ttl", "tx_time", "tx"]
171 }
172 };
173
174 if m_type == CHAOS_ID as u32 {
175 header.push("chaos_data");
176 }
177
178 if is_multi_origin {
179 header.push("origin_id");
180 }
181
182 header
183}
184
185fn get_row(
203 result: Reply,
204 rx_worker_id: u32,
205 m_type: u8,
206 is_symmetric: bool,
207 worker_map: BiHashMap<u32, String>,
208) -> Vec<String> {
209 let origin_id = result.origin_id.to_string();
210 let is_multi_origin = result.origin_id != 0 && result.origin_id != u32::MAX;
211 let rx_worker_id = rx_worker_id.to_string();
212 let rx_hostname = worker_map
214 .get_by_left(&rx_worker_id.parse::<u32>().unwrap())
215 .unwrap_or(&String::from("Unknown"))
216 .to_string();
217 let rx_time = result.rx_time.to_string();
218 let tx_time = result.tx_time.to_string();
219 let tx_id = result.tx_id;
220 let ttl = result.ttl.to_string();
221 let reply_src = result.src.unwrap().to_string();
222
223 let mut row = if is_symmetric {
224 let rtt = format!(
225 "{:.2}",
226 calculate_rtt(result.rx_time, result.tx_time, m_type == TCP_ID)
227 );
228 vec![rx_hostname, reply_src, ttl, rtt]
229 } else {
230 let tx_hostname = worker_map
231 .get_by_left(&tx_id)
232 .unwrap_or(&String::from("Unknown"))
233 .to_string();
234
235 if m_type == TCP_ID {
237 vec![rx_hostname, rx_time, reply_src, ttl, tx_hostname]
238 } else {
239 vec![rx_hostname, rx_time, reply_src, ttl, tx_time, tx_hostname]
240 }
241 };
242
243 if let Some(chaos) = result.chaos {
245 row.push(chaos);
246 }
247 if is_multi_origin {
248 row.push(origin_id);
249 }
250
251 row
252}
253
254pub fn calculate_rtt(rx_time: u64, tx_time: u64, is_tcp: bool) -> f64 {
255 if is_tcp {
256 let rx_time_ms = rx_time / 1_000;
257 let rx_time_adj = rx_time_ms as u32;
258
259 (rx_time_adj - tx_time as u32) as f64
260 } else {
261 (rx_time - tx_time) as f64 / 1_000.0
262 }
263}
264
265pub fn get_csv_metadata(
271 args: MetadataArgs<'_>,
272 worker_map: &BiHashMap<u32, String>,
273) -> Vec<String> {
274 let mut md_file = Vec::new();
275 if args.is_divide {
276 md_file.push("# Measurement style: Divide-and-conquer".to_string());
277 } else if args.is_latency {
278 md_file.push("# Measurement style: Anycast latency".to_string());
279 } else if args.is_responsive {
280 md_file.push("# Measurement style: Responsive-mode".to_string());
281 }
282 md_file.push(format!("# Origin used: {}", args.origin_str));
283 md_file.push(format!(
284 "# Hitlist{}: {}",
285 if args.is_shuffle { " (shuffled)" } else { "" },
286 args.hitlist
287 ));
288 md_file.push(format!("# Measurement type: {}", args.m_type_str));
289 md_file.push(format!(
290 "# Probing rate: {}",
291 args.probing_rate.with_separator()
292 ));
293 md_file.push(format!("# Worker interval: {}", args.interval));
294 if !args.active_workers.is_empty() {
295 md_file.push(format!(
296 "# Selective probing using the following workers: {:?}",
297 args.active_workers
298 ));
299 }
300 md_file.push(format!("# {} connected workers:", args.all_workers.len()));
301 for (_, hostname) in args.all_workers {
302 md_file.push(format!("# * {hostname}"))
303 }
304
305 if args.is_config {
307 md_file.push("# Configurations:".to_string());
308 for configuration in args.configurations {
309 let origin = configuration.origin.unwrap();
310 let src = origin.src.expect("Invalid source address");
311 let hostname = if configuration.worker_id == u32::MAX {
312 "ALL".to_string()
313 } else {
314 worker_map
315 .get_by_left(&configuration.worker_id)
316 .unwrap_or(&String::from("Unknown"))
317 .to_string()
318 };
319 md_file.push(format!(
320 "# * {:<2}, source IP: {}, source port: {}, destination port: {}",
321 hostname, src, origin.sport, origin.dport
322 ));
323 }
324 }
325
326 md_file
327}
328
329pub fn get_parquet_metadata(
331 args: MetadataArgs<'_>,
332 worker_map: &BiHashMap<u32, String>,
333) -> Vec<(String, String)> {
334 let mut md = Vec::new();
335
336 if args.is_divide {
337 md.push((
338 "measurement_style".to_string(),
339 "Divide-and-conquer".to_string(),
340 ));
341 }
342 if args.is_latency {
343 md.push((
344 "measurement_style".to_string(),
345 "Anycast-latency".to_string(),
346 ));
347 }
348 if args.is_responsive {
349 md.push((
350 "measurement_style".to_string(),
351 "Responsive-mode".to_string(),
352 ));
353 }
354
355 md.push(("origin_used".to_string(), args.origin_str));
356 md.push(("hitlist_path".to_string(), args.hitlist.to_string()));
357 md.push(("hitlist_shuffled".to_string(), args.is_shuffle.to_string()));
358 md.push(("measurement_type".to_string(), args.m_type_str));
359 md.push((
361 "probing_rate_pps".to_string(),
362 args.probing_rate.to_string(),
363 ));
364 md.push(("worker_interval_ms".to_string(), args.interval.to_string()));
365
366 if !args.active_workers.is_empty() {
368 md.push((
369 "selective_probing_workers".to_string(),
370 serde_json::to_string(&args.active_workers).unwrap_or_default(),
371 ));
372 }
373
374 let worker_hostnames: Vec<&String> = args.all_workers.right_values().collect();
375 md.push((
376 "connected_workers".to_string(),
377 serde_json::to_string(&worker_hostnames).unwrap_or_default(),
378 ));
379 md.push((
380 "connected_workers_count".to_string(),
381 args.all_workers.len().to_string(),
382 ));
383
384 if args.is_config && !args.configurations.is_empty() {
385 let config_str = args
386 .configurations
387 .iter()
388 .map(|c| {
389 format!(
390 "Worker: {}, SrcIP: {}, SrcPort: {}, DstPort: {}",
391 if c.worker_id == u32::MAX {
392 "ALL".to_string()
393 } else {
394 worker_map
395 .get_by_left(&c.worker_id)
396 .unwrap_or(&String::from("Unknown"))
397 .to_string()
398 },
399 c.origin
400 .as_ref()
401 .and_then(|o| o.src)
402 .map_or("N/A".to_string(), |s| s.to_string()),
403 c.origin.as_ref().map_or(0, |o| o.sport),
404 c.origin.as_ref().map_or(0, |o| o.dport)
405 )
406 })
407 .collect::<Vec<_>>();
408
409 md.push((
410 "configurations".to_string(),
411 serde_json::to_string(&config_str).unwrap_or_default(),
412 ));
413 }
414
415 md
416}
417
418const BATCH_SIZE: usize = 1024;
419
420struct ParquetDataRow {
423 rx: Option<String>,
425 rx_time: Option<u64>,
427 addr: Option<String>,
429 ttl: Option<u8>,
431 tx_time: Option<u64>,
433 tx: Option<String>,
435 rtt: Option<f64>,
437 chaos_data: Option<String>,
439 origin_id: Option<u8>,
441}
442
443pub fn write_results_parquet(mut rx: UnboundedReceiver<TaskResult>, config: WriteConfig) {
454 let schema = build_parquet_schema(config.m_type, config.is_multi_origin, config.is_symmetric);
455
456 let key_value_tuples = get_parquet_metadata(config.metadata_args, &config.worker_map);
458
459 let key_value_metadata: Vec<parquet::file::metadata::KeyValue> = key_value_tuples
461 .into_iter()
462 .map(|(key, value)| parquet::file::metadata::KeyValue::new(key, value))
463 .collect();
464
465 let props = Arc::new(
466 WriterProperties::builder()
467 .set_compression(ParquetCompression::SNAPPY)
468 .set_key_value_metadata(Some(key_value_metadata)) .build(),
470 );
471
472 let mut writer = SerializedFileWriter::new(config.output_file, schema.clone(), props)
473 .expect("Failed to create parquet writer");
474
475 let headers = get_header(config.m_type, config.is_multi_origin, config.is_symmetric);
477
478 tokio::spawn(async move {
479 let mut row_buffer: Vec<ParquetDataRow> = Vec::with_capacity(BATCH_SIZE);
480
481 while let Some(task_result) = rx.recv().await {
482 if task_result == TaskResult::default() {
483 break; }
485
486 let worker_id = task_result.worker_id;
487 for reply in task_result.result_list {
488 let parquet_row = reply_to_parquet_row(
489 reply,
490 worker_id,
491 config.m_type as u8,
492 config.is_symmetric,
493 &config.worker_map,
494 );
495 row_buffer.push(parquet_row);
496 }
497
498 if row_buffer.len() >= BATCH_SIZE {
500 write_batch_to_parquet(&mut writer, &row_buffer, &headers)
501 .expect("Failed to write batch to Parquet file");
502 row_buffer.clear();
503 }
504 }
505
506 if !row_buffer.is_empty() {
508 write_batch_to_parquet(&mut writer, &row_buffer, &headers)
509 .expect("Failed to write final batch to Parquet file");
510 }
511
512 writer.close().expect("Failed to close Parquet writer");
513 rx.close();
514 });
515}
516
517fn build_parquet_schema(m_type: u32, is_multi_origin: bool, is_symmetric: bool) -> TypePtr {
519 let headers = get_header(m_type, is_multi_origin, is_symmetric);
520 let mut fields = Vec::new();
521
522 for &header in &headers {
523 let field = match header {
524 "rx" | "addr" | "tx" | "chaos_data" => {
525 SchemaType::primitive_type_builder(header, parquet::basic::Type::BYTE_ARRAY)
526 .with_repetition(Repetition::OPTIONAL)
527 .with_logical_type(Some(parquet::basic::LogicalType::String))
528 .build()
529 .unwrap()
530 }
531 "rx_time" | "tx_time" => {
532 SchemaType::primitive_type_builder(header, parquet::basic::Type::INT64)
533 .with_repetition(Repetition::OPTIONAL)
534 .with_logical_type(Some(LogicalType::Integer {
535 bit_width: 64,
536 is_signed: false,
537 })) .build()
539 .unwrap()
540 }
541 "ttl" | "origin_id" => {
542 SchemaType::primitive_type_builder(header, parquet::basic::Type::INT32)
543 .with_repetition(Repetition::OPTIONAL)
544 .with_logical_type(Some(LogicalType::Integer {
545 bit_width: 8,
546 is_signed: false,
547 })) .build()
549 .unwrap()
550 }
551 "rtt" => SchemaType::primitive_type_builder(header, parquet::basic::Type::DOUBLE)
552 .with_repetition(Repetition::OPTIONAL)
553 .build()
554 .unwrap(),
555 _ => panic!("Unknown header column: {header}"),
556 };
557 fields.push(Arc::new(field));
558 }
559
560 Arc::new(
561 SchemaType::group_type_builder("schema")
562 .with_fields(fields)
563 .build()
564 .unwrap(),
565 )
566}
567
568fn reply_to_parquet_row(
570 result: Reply,
571 rx_worker_id: u32,
572 m_type: u8,
573 is_symmetric: bool,
574 worker_map: &BiHashMap<u32, String>,
575) -> ParquetDataRow {
576 let mut row = ParquetDataRow {
577 rx: worker_map.get_by_left(&rx_worker_id).cloned(),
578 rx_time: Some(result.rx_time),
579 addr: result.src.map(|s| s.to_string()),
580 ttl: Some(result.ttl as u8),
581 tx_time: Some(result.tx_time),
582 tx: None,
583 rtt: None,
584 chaos_data: result.chaos,
585 origin_id: if result.origin_id != 0 && result.origin_id != u32::MAX {
586 Some(result.origin_id as u8)
587 } else {
588 None
589 },
590 };
591
592 if is_symmetric {
593 row.rtt = Some(calculate_rtt(
594 result.rx_time,
595 result.tx_time,
596 m_type == TCP_ID,
597 ));
598 row.rx_time = None;
599 row.tx_time = None;
600 } else {
601 row.tx = worker_map.get_by_left(&result.tx_id).cloned();
602 if m_type == TCP_ID {
603 row.tx_time = None;
604 }
605 }
606
607 row
608}
609
610fn write_batch_to_parquet(
612 writer: &mut SerializedFileWriter<File>,
613 batch: &[ParquetDataRow],
614 headers: &[&str],
615) -> Result<(), parquet::errors::ParquetError> {
616 let mut row_group_writer = writer.next_row_group()?;
617
618 for &header in headers {
619 if let Some(mut col_writer) = row_group_writer.next_column()? {
620 match header {
621 "rx" | "addr" | "tx" | "chaos_data" => {
622 let mut values = Vec::new();
623 let def_levels: Vec<i16> = batch
624 .iter()
625 .map(|row| {
626 let opt_val = match header {
627 "rx" => row.rx.as_ref(),
628 "addr" => row.addr.as_ref(),
629 "tx" => row.tx.as_ref(),
630 "chaos_data" => row.chaos_data.as_ref(),
631 _ => None,
632 };
633 if let Some(val) = opt_val {
634 values.push(ByteArray::from(val.as_str()));
635 1 } else {
637 0 }
639 })
640 .collect();
641 col_writer
642 .typed::<parquet::data_type::ByteArrayType>()
643 .write_batch(&values, Some(&def_levels), None)?;
644 }
645 "rx_time" | "tx_time" => {
646 let mut values = Vec::new();
647 let def_levels: Vec<i16> = batch
648 .iter()
649 .map(|row| {
650 let opt_val = match header {
651 "rx_time" => row.rx_time,
652 "tx_time" => row.tx_time,
653 _ => None,
654 };
655 if let Some(val) = opt_val {
656 values.push(val as i64);
657 1
658 } else {
659 0
660 }
661 })
662 .collect();
663 col_writer.typed::<Int64Type>().write_batch(
664 &values,
665 Some(&def_levels),
666 None,
667 )?;
668 }
669 "ttl" | "origin_id" => {
670 let mut values = Vec::new();
671 let def_levels: Vec<i16> = batch
672 .iter()
673 .map(|row| {
674 let opt_val = match header {
675 "ttl" => row.ttl,
676 "origin_id" => row.origin_id,
677 _ => None,
678 };
679 if let Some(val) = opt_val {
680 values.push(val as i32);
681 1
682 } else {
683 0
684 }
685 })
686 .collect();
687 col_writer.typed::<Int32Type>().write_batch(
688 &values,
689 Some(&def_levels),
690 None,
691 )?;
692 }
693 "rtt" => {
694 let mut values = Vec::new();
695 let def_levels: Vec<i16> = batch
696 .iter()
697 .map(|row| {
698 if let Some(val) = row.rtt {
699 values.push(val);
700 1 } else {
702 0 }
704 })
705 .collect();
706 col_writer.typed::<DoubleType>().write_batch(
707 &values,
708 Some(&def_levels),
709 None,
710 )?;
711 }
712 _ => {}
713 }
714 col_writer.close()?;
715 }
716 }
717 row_group_writer.close()?;
718 Ok(())
719}