manycast/worker/inbound.rs
1use std::mem;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4use std::thread::{sleep, Builder};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use tokio::sync::mpsc::UnboundedSender;
7
8use crate::custom_module::manycastr::{Address, Origin, Reply, TaskResult};
9use crate::custom_module::Separated;
10use crate::net::{DNSAnswer, DNSRecord, IPv4Packet, IPv6Packet, PacketPayload, TXTRecord};
11use crate::{A_ID, CHAOS_ID, ICMP_ID, TCP_ID};
12use pnet::datalink::DataLinkReceiver;
13
14/// Configuration for an inbound packet listening worker.
15///
16/// This struct holds all the parameters needed to initialize and run a worker
17/// that listens for and processes incoming measurement packets.
18pub struct InboundConfig {
19 /// The unique ID of the measurement.
20 pub m_id: u32,
21
22 /// The unique ID of this specific worker.
23 pub worker_id: u16,
24
25 /// Specifies whether to listen for IPv6 packets (`true`) or IPv4 packets (`false`).
26 pub is_ipv6: bool,
27
28 /// The type of measurement being performed (e.g., ICMP, DNS, TCP).
29 pub m_type: u8,
30
31 /// A map of valid source addresses and port values (`Origin`) to verify incoming packets against.
32 pub origin_map: Vec<Origin>,
33
34 /// A shared signal that can be used to gracefully shut down the worker.
35 pub abort_s: Arc<AtomicBool>,
36}
37
38/// Listen for incoming packets
39/// Creates two threads, one that listens on the socket and another that forwards results to the orchestrator and shuts down the receiving socket when appropriate.
40/// Makes sure that the received packets are valid and belong to the current measurement.
41///
42/// # Arguments
43///
44/// * 'config' - configuration for the inbound worker thread
45///
46/// * 'tx' - sender to put task results in
47///
48/// * 'socket_rx' - the socket to listen on
49///
50/// # Panics
51///
52/// Panics if the measurement type is invalid
53pub fn inbound(
54 config: InboundConfig,
55 tx: UnboundedSender<TaskResult>,
56 mut socket_rx: Box<dyn DataLinkReceiver>,
57) {
58 println!("[Worker inbound] Started listener");
59 // Result queue to store incoming pings, and take them out when sending the TaskResults to the orchestrator
60 let rq = Arc::new(Mutex::new(Vec::new()));
61 let rq_c = rq.clone();
62 let rx_f_c = config.abort_s.clone();
63 Builder::new()
64 .name("listener_thread".to_string())
65 .spawn(move || {
66 // Listen for incoming packets
67 let mut received: u32 = 0;
68 loop {
69 // Check if we should exit
70 if rx_f_c.load(Ordering::Relaxed) {
71 break;
72 }
73 let packet = match socket_rx.next() {
74 // TODO blocking call
75 Ok(packet) => packet,
76 Err(_) => {
77 sleep(Duration::from_millis(1)); // Sleep to free CPU, let buffer fill
78 continue;
79 }
80 };
81
82 let result = if config.m_type == ICMP_ID {
83 // ICMP
84 // Convert the bytes into an ICMP packet (first 13 bytes are the eth header, which we skip)
85 if config.is_ipv6 {
86 parse_icmpv6(&packet[14..], config.m_id, &config.origin_map)
87 } else {
88 parse_icmpv4(&packet[14..], config.m_id, &config.origin_map)
89 }
90 } else if config.m_type == A_ID || config.m_type == CHAOS_ID {
91 // DNS A
92 if config.is_ipv6 {
93 if packet[20] == 17 {
94 // 17 is the protocol number for UDP
95 parse_dnsv6(&packet[14..], config.m_type, &config.origin_map)
96 } else {
97 None
98 }
99 } else if packet[23] == 17 {
100 // 17 is the protocol number for UDP
101 parse_dnsv4(&packet[14..], config.m_type, &config.origin_map)
102 } else {
103 None
104 }
105 } else if config.m_type == TCP_ID {
106 // TCP
107 if config.is_ipv6 {
108 parse_tcpv6(&packet[14..], &config.origin_map)
109 } else {
110 parse_tcpv4(&packet[14..], &config.origin_map)
111 }
112 } else {
113 panic!("Invalid measurement type");
114 };
115
116 // Invalid packets have value None
117 if result.is_none() {
118 continue;
119 }
120
121 // Put result in transmission queue
122 {
123 received += 1;
124 let mut buffer = rq_c.lock().unwrap();
125 buffer.push(result.unwrap())
126 }
127 }
128
129 println!(
130 "[Worker inbound] Stopped pnet listener (received {} packets)",
131 received.with_separator()
132 );
133 })
134 .expect("Failed to spawn listener_thread");
135
136 // Thread for sending the received replies to the orchestrator as TaskResult
137 Builder::new()
138 .name("result_sender_thread".to_string())
139 .spawn(move || {
140 handle_results(&tx, config.abort_s, config.worker_id, rq);
141 })
142 .expect("Failed to spawn result_sender_thread");
143}
144
145/// Thread for handling the received replies, wrapping them in a TaskResult, and streaming them back to the main worker class.
146///
147/// # Arguments
148///
149/// * `tx` - sender to put task results in
150///
151/// * `rx_f` - channel that is used to signal the end of the measurement
152///
153/// * `worker_id` - the unique worker ID of this worker
154///
155/// * `rq_sender` - contains a vector of all received replies as Reply results
156fn handle_results(
157 tx: &UnboundedSender<TaskResult>,
158 rx_f: Arc<AtomicBool>,
159 worker_id: u16,
160 rq_sender: Arc<Mutex<Vec<(Reply, bool)>>>,
161) {
162 loop {
163 // Every second, forward the ping results to the orchestrator
164 sleep(Duration::from_secs(1));
165
166 // Get the current result queue, and replace it with an empty one
167 let rq = {
168 let mut guard = rq_sender.lock().unwrap();
169 mem::take(&mut *guard)
170 };
171 // Split on discovery and non-discovery replies
172 let (discovery_rq, follow_rq): (Vec<_>, Vec<_>) = rq.into_iter().partition(|&(_, b)| b);
173 let discovery_rq: Vec<Reply> = discovery_rq.into_iter().map(|(r, _)| r).collect();
174 let follow_rq: Vec<Reply> = follow_rq.into_iter().map(|(r, _)| r).collect();
175
176 // Send the result to the worker handler
177 if !discovery_rq.is_empty() {
178 tx.send(TaskResult {
179 worker_id: worker_id as u32,
180 result_list: discovery_rq,
181 is_discovery: true,
182 })
183 .expect("Failed to send TaskResult to worker handler");
184 }
185 if !follow_rq.is_empty() {
186 tx.send(TaskResult {
187 worker_id: worker_id as u32,
188 result_list: follow_rq,
189 is_discovery: false,
190 })
191 .expect("Failed to send TaskResult to worker handler");
192 }
193
194 // Exit the thread if worker sends us the signal it's finished
195 if rx_f.load(Ordering::SeqCst) {
196 // Send default value to let the orchestrator know we are finished
197 tx.send(TaskResult::default())
198 .expect("Failed to send 'finished' signal to orchestrator");
199 break;
200 }
201 }
202}
203
204/// Parse packet bytes into an IPv4 header, returns the IP result for this header and the payload.
205///
206/// # Arguments
207///
208/// * 'packet_bytes' - the bytes of the packet to parse
209///
210/// # Returns
211///
212/// * 'Option<(IpResult, PacketPayload, u32, u32)>' - the IP result, the payload, and both dst and src address of the packet
213///
214/// # Remarks
215///
216/// The function returns None if the packet is too short to contain an IPv4 header.
217fn parse_ipv4(packet_bytes: &[u8]) -> (Address, u32, PacketPayload, u32, u32) {
218 // Create IPv4Packet from the bytes in the buffer
219 let packet = IPv4Packet::from(packet_bytes);
220
221 // Create a Reply for the received ping reply
222 (
223 Address::from(packet.src),
224 packet.ttl as u32,
225 packet.payload,
226 packet.dst,
227 packet.src,
228 )
229}
230
231/// Parse packet bytes into an IPv6 header, returns the IP result for this header and the payload.
232///
233/// # Arguments
234///
235/// * 'packet_bytes' - the bytes of the packet to parse
236///
237/// # Returns
238///
239/// * 'Option<(IpResult, PacketPayload, u128, u128)>' - the IP result, the payload, and both dst and src address of the packet
240///
241/// # Remarks
242///
243/// The function returns None if the packet is too short to contain an IPv6 header.
244fn parse_ipv6(packet_bytes: &[u8]) -> (Address, u32, PacketPayload, u128, u128) {
245 // Create IPv6Packet from the bytes in the buffer
246 let packet = IPv6Packet::from(packet_bytes);
247
248 // Create a Reply for the received ping reply
249 (
250 Address::from(packet.src),
251 packet.hop_limit as u32,
252 packet.payload,
253 packet.dst,
254 packet.src,
255 )
256}
257
258/// Parse ICMPv4 packets (including v4 headers) into a Reply result.
259///
260/// Filters out spoofed packets and only parses ICMP echo replies valid for the current measurement.
261///
262/// # Arguments
263///
264/// * `packet_bytes` - the bytes of the packet to parse
265///
266/// * `measurement_id` - the ID of the current measurement
267///
268/// * `origin_map` - mapping of origin to origin ID
269///
270/// # Returns
271///
272/// * `Option<Reply>` - the received ping reply
273///
274/// # Remarks
275///
276/// The function returns None if the packet is not an ICMP echo reply or if the packet is too short to contain the necessary information.
277///
278/// The function also discards packets that do not belong to the current measurement.
279fn parse_icmpv4(
280 packet_bytes: &[u8],
281 measurement_id: u32,
282 origin_map: &Vec<Origin>,
283) -> Option<(Reply, bool)> {
284 // ICMPv4 52 length (IPv4 header (20) + ICMP header (8) + ICMP body 24 bytes) + check it is an ICMP Echo reply TODO match with exact length (include -u URl length)
285 if (packet_bytes.len() < 52) || (packet_bytes[20] != 0) {
286 return None;
287 }
288
289 let (src, ttl, payload, reply_dst, reply_src) = parse_ipv4(packet_bytes);
290
291 let PacketPayload::Icmp { value: icmp_packet } = payload else {
292 return None;
293 };
294
295 if icmp_packet.icmp_type != 0 {
296 return None;
297 } // Only parse ICMP echo replies
298
299 let pkt_measurement_id: [u8; 4] = icmp_packet.body[0..4].try_into().ok()?;
300 // Make sure that this packet belongs to this measurement
301 if u32::from_be_bytes(pkt_measurement_id) != measurement_id {
302 // If not, we discard it and await the next packet
303 return None;
304 }
305
306 let tx_time = u64::from_be_bytes(icmp_packet.body[4..12].try_into().unwrap());
307 let mut tx_id = u32::from_be_bytes(icmp_packet.body[12..16].try_into().unwrap());
308 let probe_src = u32::from_be_bytes(icmp_packet.body[16..20].try_into().unwrap());
309 let probe_dst = u32::from_be_bytes(icmp_packet.body[20..24].try_into().unwrap());
310
311 if (probe_src != reply_dst) | (probe_dst != reply_src) {
312 return None; // spoofed reply
313 }
314
315 let origin_id = get_origin_id_v4(reply_dst, 0, 0, origin_map)?;
316
317 let rx_time = SystemTime::now()
318 .duration_since(UNIX_EPOCH)
319 .unwrap()
320 .as_micros() as u64;
321
322 let is_discovery = if tx_id > u16::MAX as u32 {
323 tx_id -= u16::MAX as u32;
324 true
325 } else {
326 false
327 };
328
329 // Create a Reply for the received ping reply
330 Some((
331 Reply {
332 tx_time,
333 tx_id,
334 src: Some(src),
335 ttl,
336 rx_time,
337 origin_id,
338 chaos: None,
339 },
340 is_discovery,
341 ))
342}
343
344/// Parse ICMPv6 packets (including v6 headers) into a Reply result.
345///
346/// Filters out spoofed packets and only parses ICMP echo replies valid for the current measurement.
347///
348/// # Arguments
349///
350/// * `packet_bytes` - the bytes of the packet to parse
351///
352/// * `measurement_id` - the ID of the current measurement
353///
354/// * `origin_map` - mapping of origin to origin ID
355///
356/// # Returns
357///
358/// * `Option<Reply>` - the received ping reply
359///
360/// # Remarks
361///
362/// The function returns None if the packet is not an ICMP echo reply or if the packet is too short to contain the necessary information.
363///
364/// The function also discards packets that do not belong to the current measurement.
365fn parse_icmpv6(
366 packet_bytes: &[u8],
367 measurement_id: u32,
368 origin_map: &Vec<Origin>,
369) -> Option<(Reply, bool)> {
370 // ICMPv6 66 length (IPv6 header (40) + ICMP header (8) + ICMP body 48 bytes) + check it is an ICMP Echo reply TODO match with exact length (include -u URl length)
371 if (packet_bytes.len() < 66) || (packet_bytes[40] != 129) {
372 return None;
373 }
374 let (address, ttl, payload, reply_dst, reply_src) = parse_ipv6(packet_bytes);
375
376 // Parse the ICMP header
377 let PacketPayload::Icmp { value: icmp_packet } = payload else {
378 return None;
379 };
380
381 // Obtain the payload
382 if icmp_packet.icmp_type != 129 {
383 return None;
384 } // Only parse ICMP echo replies
385
386 let pkt_measurement_id: [u8; 4] = icmp_packet.body[0..4].try_into().ok()?;
387 // Make sure that this packet belongs to this measurement
388 if u32::from_be_bytes(pkt_measurement_id) != measurement_id {
389 return None;
390 }
391
392 let tx_time = u64::from_be_bytes(icmp_packet.body[4..12].try_into().unwrap());
393 let mut tx_id = u32::from_be_bytes(icmp_packet.body[12..16].try_into().unwrap());
394 let probe_src = u128::from_be_bytes(icmp_packet.body[16..32].try_into().unwrap());
395 let probe_dst = u128::from_be_bytes(icmp_packet.body[32..48].try_into().unwrap());
396
397 if (probe_src != reply_dst) | (probe_dst != reply_src) {
398 return None; // spoofed reply
399 }
400
401 let origin_id = get_origin_id_v6(reply_dst, 0, 0, origin_map)?;
402
403 let rx_time = SystemTime::now()
404 .duration_since(UNIX_EPOCH)
405 .unwrap()
406 .as_micros() as u64;
407
408 let is_discovery = if tx_id > u16::MAX as u32 {
409 tx_id -= u16::MAX as u32;
410 true
411 } else {
412 false
413 };
414
415 // Create a Reply for the received ping reply
416 Some((
417 Reply {
418 tx_time,
419 tx_id,
420 src: Some(address),
421 ttl,
422 rx_time,
423 origin_id,
424 chaos: None,
425 },
426 is_discovery,
427 ))
428}
429
430/// Parse DNSv4 packets (including v4 headers) into a Reply result.
431///
432/// Filters out spoofed packets and only parses DNS replies valid for the current measurement.
433///
434/// # Arguments
435///
436/// * 'packet_bytes' - the bytes of the packet to parse
437///
438/// * 'measurement_type' - the type of measurement being performed
439///
440/// * 'origin_map' - mapping of origin to origin ID
441///
442/// # Returns
443///
444/// * `Option<Reply>` - the received DNS reply
445///
446/// # Remarks
447///
448/// The function returns None if the packet is too short to contain a UDP header.
449fn parse_dnsv4(
450 packet_bytes: &[u8],
451 measurement_type: u8,
452 origin_map: &Vec<Origin>,
453) -> Option<(Reply, bool)> {
454 // DNSv4 28 minimum (IPv4 header (20) + UDP header (8)) + check next protocol is UDP TODO incorporate minimum payload size
455 if (packet_bytes.len() < 28) || (packet_bytes[9] != 17) {
456 return None;
457 }
458
459 let (src, ttl, payload, reply_dst, reply_src) = parse_ipv4(packet_bytes);
460
461 let PacketPayload::Udp { value: udp_packet } = payload else {
462 return None;
463 };
464
465 // The UDP responses will be from DNS services, with src port 53 and our possible src ports as dest port, furthermore the body length has to be large enough to contain a DNS A reply
466 // TODO body packet length is variable based on the domain name used in the measurement
467 if ((measurement_type == A_ID) & (udp_packet.body.len() < 66))
468 | ((measurement_type == CHAOS_ID) & (udp_packet.body.len() < 10))
469 {
470 return None;
471 }
472
473 let reply_sport = udp_packet.sport;
474 let reply_dport = udp_packet.dport;
475
476 let rx_time = SystemTime::now()
477 .duration_since(UNIX_EPOCH)
478 .unwrap()
479 .as_micros() as u64;
480 let (tx_time, tx_id, chaos, is_discovery) = if measurement_type == A_ID {
481 let dns_result = parse_dns_a_record_v4(udp_packet.body.as_slice())?;
482
483 if (dns_result.probe_sport != reply_dport)
484 | (dns_result.probe_src != reply_dst)
485 | (dns_result.probe_dst != reply_src)
486 {
487 return None; // spoofed reply
488 }
489
490 (
491 dns_result.tx_time,
492 dns_result.tx_id,
493 None,
494 dns_result.is_discovery,
495 )
496 } else if measurement_type == CHAOS_ID {
497 // TODO is_discovery for chaos
498 let (tx_time, tx_id, chaos) = parse_chaos(udp_packet.body.as_slice())?;
499
500 (tx_time, tx_id, Some(chaos), false)
501 } else {
502 panic!("Invalid measurement type");
503 };
504
505 let origin_id = get_origin_id_v4(reply_dst, reply_sport, reply_dport, origin_map)?;
506
507 // Create a Reply for the received DNS reply
508 Some((
509 Reply {
510 tx_time,
511 tx_id,
512 src: Some(src),
513 ttl,
514 rx_time,
515 origin_id,
516 chaos,
517 },
518 is_discovery,
519 ))
520}
521
522/// Parse DNSv6 packets (including v6 headers) into a Reply.
523///
524/// Filters out spoofed packets and only parses DNS replies valid for the current measurement.
525///
526/// # Arguments
527///
528/// * `packet_bytes` - the bytes of the packet to parse
529///
530/// * `measurement_type` - the type of measurement being performed
531///
532/// * `origin_map` - mapping of origin to origin ID
533///
534/// # Returns
535///
536/// * `Option<Reply>` - the received DNS reply
537///
538/// # Remarks
539///
540/// The function returns None if the packet is too short to contain a UDP header.
541fn parse_dnsv6(
542 packet_bytes: &[u8],
543 measurement_type: u8,
544 origin_map: &Vec<Origin>,
545) -> Option<(Reply, bool)> {
546 // DNSv6 48 length (IPv6 header (40) + UDP header (8)) + check next protocol is UDP TODO incorporate minimum payload size
547 if (packet_bytes.len() < 48) || (packet_bytes[6] != 17) {
548 return None;
549 }
550 let (src, ttl, payload, reply_dst, reply_src) = parse_ipv6(packet_bytes);
551
552 let PacketPayload::Udp { value: udp_packet } = payload else {
553 return None;
554 };
555
556 // The UDP responses will be from DNS services, with src port 53 and our possible src ports as dest port, furthermore the body length has to be large enough to contain a DNS A reply
557 // TODO use 'get_domain_length'
558 if ((measurement_type == A_ID) & (udp_packet.body.len() < 66))
559 | ((measurement_type == CHAOS_ID) & (udp_packet.body.len() < 10))
560 {
561 return None;
562 }
563
564 let reply_sport = udp_packet.sport;
565 let reply_dport = udp_packet.dport;
566
567 let rx_time = SystemTime::now()
568 .duration_since(UNIX_EPOCH)
569 .unwrap()
570 .as_micros() as u64;
571 let (tx_time, tx_id, chaos, is_discovery) = if measurement_type == A_ID {
572 let dns_result = parse_dns_a_record_v6(udp_packet.body.as_slice())?;
573
574 if (dns_result.probe_sport != reply_dport)
575 | (dns_result.probe_dst != reply_src)
576 | (dns_result.probe_src != reply_dst)
577 {
578 return None; // spoofed reply
579 }
580
581 (
582 dns_result.tx_time,
583 dns_result.tx_id,
584 None,
585 dns_result.is_discovery,
586 )
587 } else if measurement_type == CHAOS_ID {
588 // TODO is_discovery for CHAOS
589 let (tx_time, tx_worker_id, chaos) = parse_chaos(udp_packet.body.as_slice())?;
590 (tx_time, tx_worker_id, Some(chaos), false)
591 } else {
592 panic!("Invalid measurement type");
593 };
594
595 let origin_id = get_origin_id_v6(reply_dst, reply_sport, reply_dport, origin_map)?;
596
597 // Create a Reply for the received DNS reply
598 Some((
599 Reply {
600 tx_time,
601 tx_id,
602 src: Some(src),
603 ttl,
604 rx_time,
605 origin_id,
606 chaos,
607 },
608 is_discovery,
609 ))
610}
611enum Addr {
612 V4(u32),
613 V6(u128),
614}
615
616impl PartialEq<u32> for Addr {
617 fn eq(&self, other_u32: &u32) -> bool {
618 match self {
619 Addr::V4(addr_val) => addr_val == other_u32,
620 Addr::V6(_) => false,
621 }
622 }
623}
624
625impl PartialEq<u128> for Addr {
626 fn eq(&self, other_u128: &u128) -> bool {
627 match self {
628 Addr::V6(addr_val) => addr_val == other_u128,
629 Addr::V4(_) => false,
630 }
631 }
632}
633
634struct DnsResult {
635 tx_time: u64,
636 tx_id: u32,
637 probe_sport: u16,
638 probe_src: Addr,
639 probe_dst: Addr,
640 is_discovery: bool,
641}
642
643/// Attempts to parse the DNS A record from a DNS payload body.
644///
645/// # Arguments
646///
647/// * `packet_bytes` - the bytes of the packet to parse
648///
649/// # Returns
650///
651/// * `Option<UdpResult, u16, u128, u128>` - the UDP result containing the DNS A record with the source port and source and destination addresses and whether it is a discovery packet
652///
653/// # Remarks
654///
655/// The function returns None if the packet is too short to contain a DNS A record.
656fn parse_dns_a_record_v6(packet_bytes: &[u8]) -> Option<DnsResult> {
657 // TODO v6 and v4 can be merged into one function
658 let record = DNSRecord::from(packet_bytes);
659 let domain = record.domain; // example: '1679305276037913215.3226971181.16843009.0.4000.any.dnsjedi.org'
660 // Get the information from the domain, continue to the next packet if it does not follow the format
661 let parts: Vec<&str> = domain.split('.').collect();
662 // Our domains have at least 5 parts
663 if parts.len() < 5 {
664 return None;
665 }
666
667 let tx_time = parts[0].parse::<u64>().ok()?;
668 let probe_src = Addr::V6(parts[1].parse::<u128>().ok()?);
669 let probe_dst = Addr::V6(parts[2].parse::<u128>().ok()?);
670 let mut tx_id = parts[3].parse::<u32>().ok()?;
671 let probe_sport = parts[4].parse::<u16>().ok()?;
672
673 let is_discovery = if tx_id > u16::MAX as u32 {
674 tx_id -= u16::MAX as u32;
675 true
676 } else {
677 false
678 };
679
680 Some(DnsResult {
681 tx_time,
682 tx_id,
683 probe_sport,
684 probe_src,
685 probe_dst,
686 is_discovery,
687 })
688}
689
690/// Attempts to parse the DNS A record from a UDP payload body.
691///
692/// # Arguments
693///
694/// * `packet_bytes` - the bytes of the packet to parse
695///
696/// # Returns
697///
698/// * `Option<UdpResult, u16, u128, u128, bool>` - the UDP result containing the DNS A record with the source port and source and destination addresses and whether it is a discovery packet TODO rustdoc
699///
700/// # Remarks
701///
702/// The function returns None if the packet is too short to contain a DNS A record.
703fn parse_dns_a_record_v4(packet_bytes: &[u8]) -> Option<DnsResult> {
704 let record = DNSRecord::from(packet_bytes);
705 let domain = record.domain; // example: '1679305276037913215.3226971181.16843009.0.4000.any.dnsjedi.org'
706 // Get the information from the domain, continue to the next packet if it does not follow the format
707
708 let parts: Vec<&str> = domain.split('.').collect();
709 // Our domains have at least 5 parts
710 if parts.len() < 5 {
711 return None;
712 }
713
714 let tx_time = parts[0].parse::<u64>().ok()?;
715 let probe_src = Addr::V4(parts[1].parse::<u32>().ok()?);
716 let probe_dst = Addr::V4(parts[2].parse::<u32>().ok()?);
717 let mut tx_id = parts[3].parse::<u32>().ok()?;
718 let probe_sport = parts[4].parse::<u16>().ok()?;
719
720 let is_discovery = if tx_id > u16::MAX as u32 {
721 tx_id -= u16::MAX as u32;
722 true
723 } else {
724 false
725 };
726
727 Some(DnsResult {
728 tx_time,
729 tx_id,
730 probe_sport,
731 probe_src,
732 probe_dst,
733 is_discovery,
734 })
735}
736
737/// Attempts to parse the DNS Chaos record from a UDP payload body.
738///
739/// # Arguments
740///
741/// * `packet_bytes` - the bytes of the packet to parse
742///
743/// # Returns
744///
745/// * `Option<UdpPayload>` - the UDP payload containing the DNS Chaos record
746///
747/// # Remarks
748///
749/// The function returns None if the packet is too short to contain a DNS Chaos record.
750fn parse_chaos(packet_bytes: &[u8]) -> Option<(u64, u32, String)> {
751 let record = DNSRecord::from(packet_bytes);
752
753 // 8 right most bits are the sender worker_id
754 let tx_worker_id = ((record.transaction_id >> 8) & 0xFF) as u32;
755
756 if record.answer == 0 {
757 return Some((0u64, tx_worker_id, "Not implemented".to_string()));
758 }
759
760 let chaos_data = TXTRecord::from(DNSAnswer::from(record.body.as_slice()).data.as_slice()).txt;
761
762 Some((0u64, tx_worker_id, chaos_data))
763}
764
765/// Parse TCPv4 packets (including v4 headers) into a Reply result.
766///
767/// # Arguments
768///
769/// * `packet_bytes` - the bytes of the packet to parse
770///
771/// * `origin_map` - mapping of origin to origin ID
772///
773/// # Returns
774///
775/// * `Option<Reply>` - the received TCP reply
776///
777/// # Remarks
778///
779/// The function returns None if the packet is too short to contain a TCP header.
780fn parse_tcpv4(packet_bytes: &[u8], origin_map: &Vec<Origin>) -> Option<(Reply, bool)> {
781 // TCPv4 40 bytes (IPv4 header (20) + TCP header (20)) + check for RST flag
782 if (packet_bytes.len() < 40) || ((packet_bytes[33] & 0x04) == 0) {
783 return None;
784 }
785 let (src, ttl, payload, reply_dst, _reply_src) = parse_ipv4(packet_bytes);
786 // cannot filter out spoofed packets as the probe_dst is unknown
787
788 let PacketPayload::Tcp { value: tcp_packet } = payload else {
789 return None;
790 };
791
792 let rx_time = SystemTime::now()
793 .duration_since(UNIX_EPOCH)
794 .unwrap()
795 .as_micros() as u64;
796
797 let origin_id = get_origin_id_v4(reply_dst, tcp_packet.sport, tcp_packet.dport, origin_map)?;
798
799 // Discovery probes have bit 16 set and higher bits unset
800 let bit_16_mask = 1 << 16;
801 let higher_bits_mask = !0u32 << 17;
802
803 let (tx_id, is_discovery) =
804 if (tcp_packet.seq & bit_16_mask) != 0 && (tcp_packet.seq & higher_bits_mask) == 0 {
805 (tcp_packet.seq - u16::MAX as u32, true)
806 } else {
807 (tcp_packet.seq, false)
808 };
809
810 Some((
811 Reply {
812 tx_time: tx_id as u64,
813 tx_id,
814 src: Some(src),
815 ttl,
816 rx_time,
817 origin_id,
818 chaos: None,
819 },
820 is_discovery,
821 ))
822}
823
824/// Parse TCPv6 packets (including v6 headers) into a Reply result.
825///
826/// # Arguments
827///
828/// * `packet_bytes` - the bytes of the packet to parse
829///
830/// * `origin_map` - mapping of origin to origin ID
831///
832/// # Returns
833///
834/// * `Option<Reply>` - the received TCP reply
835///
836/// # Remarks
837///
838/// The function returns None if the packet is too short to contain a TCP header.
839fn parse_tcpv6(packet_bytes: &[u8], origin_map: &Vec<Origin>) -> Option<(Reply, bool)> {
840 // TCPv6 64 length (IPv6 header (40) + TCP header (20)) + check for RST flag
841 if (packet_bytes.len() < 60) || ((packet_bytes[53] & 0x04) == 0) {
842 return None;
843 }
844 let (src, ttl, payload, reply_dst, _reply_src) = parse_ipv6(packet_bytes);
845 // cannot filter out spoofed packets as the probe_dst is unknown
846
847 let PacketPayload::Tcp { value: tcp_packet } = payload else {
848 return None;
849 };
850
851 let rx_time = SystemTime::now()
852 .duration_since(UNIX_EPOCH)
853 .unwrap()
854 .as_micros() as u64;
855
856 let origin_id = get_origin_id_v6(reply_dst, tcp_packet.sport, tcp_packet.dport, origin_map)?;
857
858 // Discovery probes have bit 16 set and higher bits unset
859 let bit_16_mask = 1 << 16;
860 let higher_bits_mask = !0u32 << 17;
861
862 let (tx_id, is_discovery) =
863 if (tcp_packet.seq & bit_16_mask) != 0 && (tcp_packet.seq & higher_bits_mask) == 0 {
864 (tcp_packet.seq - u16::MAX as u32, true)
865 } else {
866 (tcp_packet.seq, false)
867 };
868
869 Some((
870 Reply {
871 tx_time: tx_id as u64,
872 tx_id,
873 src: Some(src),
874 ttl,
875 rx_time,
876 origin_id,
877 chaos: None,
878 },
879 is_discovery,
880 ))
881}
882
883/// Get the origin ID from the origin map based on the reply destination address and ports.
884///
885/// # Arguments
886///
887/// * `reply_dst` - the destination address of the reply
888///
889/// * `reply_sport` - the source port of the reply
890///
891/// * `reply_dport` - the destination port of the reply
892///
893/// * `origin_map` - the origin map to search in
894///
895/// # Returns
896///
897/// * `Option<u32>` - the origin ID if found, None otherwise
898fn get_origin_id_v4(
899 reply_dst: u32,
900 reply_sport: u16,
901 reply_dport: u16,
902 origin_map: &Vec<Origin>,
903) -> Option<u32> {
904 for origin in origin_map {
905 if origin.src.unwrap().get_v4() == reply_dst
906 && origin.sport as u16 == reply_dport
907 && origin.dport as u16 == reply_sport
908 {
909 return Some(origin.origin_id);
910 } else if origin.src.unwrap().get_v4() == reply_dst && 0 == reply_sport && 0 == reply_dport
911 {
912 // ICMP replies have no port numbers
913 return Some(origin.origin_id);
914 }
915 }
916 None
917}
918
919/// Get the origin ID from the origin map based on the reply destination address and ports.
920///
921/// # Arguments
922///
923/// * `reply_dst` - the destination address of the reply
924///
925/// * `reply_sport` - the source port of the reply
926///
927/// * `reply_dport` - the destination port of the reply
928///
929/// * `origin_map` - the origin map to search in
930///
931/// # Returns
932///
933/// * `Option<u32>` - the origin ID if found, None otherwise
934fn get_origin_id_v6(
935 reply_dst: u128,
936 reply_sport: u16,
937 reply_dport: u16,
938 origin_map: &Vec<Origin>,
939) -> Option<u32> {
940 for origin in origin_map {
941 if origin.src.unwrap().get_v6() == reply_dst
942 && origin.sport as u16 == reply_dport
943 && origin.dport as u16 == reply_sport
944 {
945 return Some(origin.origin_id);
946 } else if origin.src.unwrap().get_v6() == reply_dst && 0 == reply_sport && 0 == reply_dport
947 {
948 // ICMP replies have no port numbers
949 return Some(origin.origin_id);
950 }
951 }
952 None
953}