manycast/worker/
inbound.rs

1use std::mem;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4use std::thread::{sleep, Builder};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use tokio::sync::mpsc::UnboundedSender;
7
8use crate::custom_module::manycastr::{Address, Origin, Reply, TaskResult};
9use crate::custom_module::Separated;
10use crate::net::{DNSAnswer, DNSRecord, IPv4Packet, IPv6Packet, PacketPayload, TXTRecord};
11use crate::{A_ID, CHAOS_ID, ICMP_ID, TCP_ID};
12use pnet::datalink::DataLinkReceiver;
13
14/// Configuration for an inbound packet listening worker.
15///
16/// This struct holds all the parameters needed to initialize and run a worker
17/// that listens for and processes incoming measurement packets.
18pub struct InboundConfig {
19    /// The unique ID of the measurement.
20    pub m_id: u32,
21
22    /// The unique ID of this specific worker.
23    pub worker_id: u16,
24
25    /// Specifies whether to listen for IPv6 packets (`true`) or IPv4 packets (`false`).
26    pub is_ipv6: bool,
27
28    /// The type of measurement being performed (e.g., ICMP, DNS, TCP).
29    pub m_type: u8,
30
31    /// A map of valid source addresses and port values (`Origin`) to verify incoming packets against.
32    pub origin_map: Vec<Origin>,
33
34    /// A shared signal that can be used to gracefully shut down the worker.
35    pub abort_s: Arc<AtomicBool>,
36}
37
38/// Listen for incoming packets
39/// Creates two threads, one that listens on the socket and another that forwards results to the orchestrator and shuts down the receiving socket when appropriate.
40/// Makes sure that the received packets are valid and belong to the current measurement.
41///
42/// # Arguments
43///
44/// * 'config' - configuration for the inbound worker thread
45///
46/// * 'tx' - sender to put task results in
47///
48/// * 'socket_rx' - the socket to listen on
49///
50/// # Panics
51///
52/// Panics if the measurement type is invalid
53pub fn inbound(
54    config: InboundConfig,
55    tx: UnboundedSender<TaskResult>,
56    mut socket_rx: Box<dyn DataLinkReceiver>,
57) {
58    println!("[Worker inbound] Started listener");
59    // Result queue to store incoming pings, and take them out when sending the TaskResults to the orchestrator
60    let rq = Arc::new(Mutex::new(Vec::new()));
61    let rq_c = rq.clone();
62    let rx_f_c = config.abort_s.clone();
63    Builder::new()
64        .name("listener_thread".to_string())
65        .spawn(move || {
66            // Listen for incoming packets
67            let mut received: u32 = 0;
68            loop {
69                // Check if we should exit
70                if rx_f_c.load(Ordering::Relaxed) {
71                    break;
72                }
73                let packet = match socket_rx.next() {
74                    // TODO blocking call
75                    Ok(packet) => packet,
76                    Err(_) => {
77                        sleep(Duration::from_millis(1)); // Sleep to free CPU, let buffer fill
78                        continue;
79                    }
80                };
81
82                let result = if config.m_type == ICMP_ID {
83                    // ICMP
84                    // Convert the bytes into an ICMP packet (first 13 bytes are the eth header, which we skip)
85                    if config.is_ipv6 {
86                        parse_icmpv6(&packet[14..], config.m_id, &config.origin_map)
87                    } else {
88                        parse_icmpv4(&packet[14..], config.m_id, &config.origin_map)
89                    }
90                } else if config.m_type == A_ID || config.m_type == CHAOS_ID {
91                    // DNS A
92                    if config.is_ipv6 {
93                        if packet[20] == 17 {
94                            // 17 is the protocol number for UDP
95                            parse_dnsv6(&packet[14..], config.m_type, &config.origin_map)
96                        } else {
97                            None
98                        }
99                    } else if packet[23] == 17 {
100                        // 17 is the protocol number for UDP
101                        parse_dnsv4(&packet[14..], config.m_type, &config.origin_map)
102                    } else {
103                        None
104                    }
105                } else if config.m_type == TCP_ID {
106                    // TCP
107                    if config.is_ipv6 {
108                        parse_tcpv6(&packet[14..], &config.origin_map)
109                    } else {
110                        parse_tcpv4(&packet[14..], &config.origin_map)
111                    }
112                } else {
113                    panic!("Invalid measurement type");
114                };
115
116                // Invalid packets have value None
117                if result.is_none() {
118                    continue;
119                }
120
121                // Put result in transmission queue
122                {
123                    received += 1;
124                    let mut buffer = rq_c.lock().unwrap();
125                    buffer.push(result.unwrap())
126                }
127            }
128
129            println!(
130                "[Worker inbound] Stopped pnet listener (received {} packets)",
131                received.with_separator()
132            );
133        })
134        .expect("Failed to spawn listener_thread");
135
136    // Thread for sending the received replies to the orchestrator as TaskResult
137    Builder::new()
138        .name("result_sender_thread".to_string())
139        .spawn(move || {
140            handle_results(&tx, config.abort_s, config.worker_id, rq);
141        })
142        .expect("Failed to spawn result_sender_thread");
143}
144
145/// Thread for handling the received replies, wrapping them in a TaskResult, and streaming them back to the main worker class.
146///
147/// # Arguments
148///
149/// * `tx` - sender to put task results in
150///
151/// * `rx_f` - channel that is used to signal the end of the measurement
152///
153/// * `worker_id` - the unique worker ID of this worker
154///
155/// * `rq_sender` - contains a vector of all received replies as Reply results
156fn handle_results(
157    tx: &UnboundedSender<TaskResult>,
158    rx_f: Arc<AtomicBool>,
159    worker_id: u16,
160    rq_sender: Arc<Mutex<Vec<(Reply, bool)>>>,
161) {
162    loop {
163        // Every second, forward the ping results to the orchestrator
164        sleep(Duration::from_secs(1));
165
166        // Get the current result queue, and replace it with an empty one
167        let rq = {
168            let mut guard = rq_sender.lock().unwrap();
169            mem::take(&mut *guard)
170        };
171        // Split on discovery and non-discovery replies
172        let (discovery_rq, follow_rq): (Vec<_>, Vec<_>) = rq.into_iter().partition(|&(_, b)| b);
173        let discovery_rq: Vec<Reply> = discovery_rq.into_iter().map(|(r, _)| r).collect();
174        let follow_rq: Vec<Reply> = follow_rq.into_iter().map(|(r, _)| r).collect();
175
176        // Send the result to the worker handler
177        if !discovery_rq.is_empty() {
178            tx.send(TaskResult {
179                worker_id: worker_id as u32,
180                result_list: discovery_rq,
181                is_discovery: true,
182            })
183            .expect("Failed to send TaskResult to worker handler");
184        }
185        if !follow_rq.is_empty() {
186            tx.send(TaskResult {
187                worker_id: worker_id as u32,
188                result_list: follow_rq,
189                is_discovery: false,
190            })
191            .expect("Failed to send TaskResult to worker handler");
192        }
193
194        // Exit the thread if worker sends us the signal it's finished
195        if rx_f.load(Ordering::SeqCst) {
196            // Send default value to let the orchestrator know we are finished
197            tx.send(TaskResult::default())
198                .expect("Failed to send 'finished' signal to orchestrator");
199            break;
200        }
201    }
202}
203
204/// Parse packet bytes into an IPv4 header, returns the IP result for this header and the payload.
205///
206/// # Arguments
207///
208/// * 'packet_bytes' - the bytes of the packet to parse
209///
210/// # Returns
211///
212/// * 'Option<(IpResult, PacketPayload, u32, u32)>' - the IP result, the payload, and both dst and src address of the packet
213///
214/// # Remarks
215///
216/// The function returns None if the packet is too short to contain an IPv4 header.
217fn parse_ipv4(packet_bytes: &[u8]) -> (Address, u32, PacketPayload, u32, u32) {
218    // Create IPv4Packet from the bytes in the buffer
219    let packet = IPv4Packet::from(packet_bytes);
220
221    // Create a Reply for the received ping reply
222    (
223        Address::from(packet.src),
224        packet.ttl as u32,
225        packet.payload,
226        packet.dst,
227        packet.src,
228    )
229}
230
231/// Parse packet bytes into an IPv6 header, returns the IP result for this header and the payload.
232///
233/// # Arguments
234///
235/// * 'packet_bytes' - the bytes of the packet to parse
236///
237/// # Returns
238///
239/// * 'Option<(IpResult, PacketPayload, u128, u128)>' - the IP result, the payload, and both dst and src address of the packet
240///
241/// # Remarks
242///
243/// The function returns None if the packet is too short to contain an IPv6 header.
244fn parse_ipv6(packet_bytes: &[u8]) -> (Address, u32, PacketPayload, u128, u128) {
245    // Create IPv6Packet from the bytes in the buffer
246    let packet = IPv6Packet::from(packet_bytes);
247
248    // Create a Reply for the received ping reply
249    (
250        Address::from(packet.src),
251        packet.hop_limit as u32,
252        packet.payload,
253        packet.dst,
254        packet.src,
255    )
256}
257
258/// Parse ICMPv4 packets (including v4 headers) into a Reply result.
259///
260/// Filters out spoofed packets and only parses ICMP echo replies valid for the current measurement.
261///
262/// # Arguments
263///
264/// * `packet_bytes` - the bytes of the packet to parse
265///
266/// * `measurement_id` - the ID of the current measurement
267///
268/// * `origin_map` - mapping of origin to origin ID
269///
270/// # Returns
271///
272/// * `Option<Reply>` - the received ping reply
273///
274/// # Remarks
275///
276/// The function returns None if the packet is not an ICMP echo reply or if the packet is too short to contain the necessary information.
277///
278/// The function also discards packets that do not belong to the current measurement.
279fn parse_icmpv4(
280    packet_bytes: &[u8],
281    measurement_id: u32,
282    origin_map: &Vec<Origin>,
283) -> Option<(Reply, bool)> {
284    // ICMPv4 52 length (IPv4 header (20) + ICMP header (8) + ICMP body 24 bytes) + check it is an ICMP Echo reply TODO match with exact length (include -u URl length)
285    if (packet_bytes.len() < 52) || (packet_bytes[20] != 0) {
286        return None;
287    }
288
289    let (src, ttl, payload, reply_dst, reply_src) = parse_ipv4(packet_bytes);
290
291    let PacketPayload::Icmp { value: icmp_packet } = payload else {
292        return None;
293    };
294
295    if icmp_packet.icmp_type != 0 {
296        return None;
297    } // Only parse ICMP echo replies
298
299    let pkt_measurement_id: [u8; 4] = icmp_packet.body[0..4].try_into().ok()?;
300    // Make sure that this packet belongs to this measurement
301    if u32::from_be_bytes(pkt_measurement_id) != measurement_id {
302        // If not, we discard it and await the next packet
303        return None;
304    }
305
306    let tx_time = u64::from_be_bytes(icmp_packet.body[4..12].try_into().unwrap());
307    let mut tx_id = u32::from_be_bytes(icmp_packet.body[12..16].try_into().unwrap());
308    let probe_src = u32::from_be_bytes(icmp_packet.body[16..20].try_into().unwrap());
309    let probe_dst = u32::from_be_bytes(icmp_packet.body[20..24].try_into().unwrap());
310
311    if (probe_src != reply_dst) | (probe_dst != reply_src) {
312        return None; // spoofed reply
313    }
314
315    let origin_id = get_origin_id_v4(reply_dst, 0, 0, origin_map)?;
316
317    let rx_time = SystemTime::now()
318        .duration_since(UNIX_EPOCH)
319        .unwrap()
320        .as_micros() as u64;
321
322    let is_discovery = if tx_id > u16::MAX as u32 {
323        tx_id -= u16::MAX as u32;
324        true
325    } else {
326        false
327    };
328
329    // Create a Reply for the received ping reply
330    Some((
331        Reply {
332            tx_time,
333            tx_id,
334            src: Some(src),
335            ttl,
336            rx_time,
337            origin_id,
338            chaos: None,
339        },
340        is_discovery,
341    ))
342}
343
344/// Parse ICMPv6 packets (including v6 headers) into a Reply result.
345///
346/// Filters out spoofed packets and only parses ICMP echo replies valid for the current measurement.
347///
348/// # Arguments
349///
350/// * `packet_bytes` - the bytes of the packet to parse
351///
352/// * `measurement_id` - the ID of the current measurement
353///
354/// * `origin_map` - mapping of origin to origin ID
355///
356/// # Returns
357///
358/// * `Option<Reply>` - the received ping reply
359///
360/// # Remarks
361///
362/// The function returns None if the packet is not an ICMP echo reply or if the packet is too short to contain the necessary information.
363///
364/// The function also discards packets that do not belong to the current measurement.
365fn parse_icmpv6(
366    packet_bytes: &[u8],
367    measurement_id: u32,
368    origin_map: &Vec<Origin>,
369) -> Option<(Reply, bool)> {
370    // ICMPv6 66 length (IPv6 header (40) + ICMP header (8) + ICMP body 48 bytes) + check it is an ICMP Echo reply TODO match with exact length (include -u URl length)
371    if (packet_bytes.len() < 66) || (packet_bytes[40] != 129) {
372        return None;
373    }
374    let (address, ttl, payload, reply_dst, reply_src) = parse_ipv6(packet_bytes);
375
376    // Parse the ICMP header
377    let PacketPayload::Icmp { value: icmp_packet } = payload else {
378        return None;
379    };
380
381    // Obtain the payload
382    if icmp_packet.icmp_type != 129 {
383        return None;
384    } // Only parse ICMP echo replies
385
386    let pkt_measurement_id: [u8; 4] = icmp_packet.body[0..4].try_into().ok()?;
387    // Make sure that this packet belongs to this measurement
388    if u32::from_be_bytes(pkt_measurement_id) != measurement_id {
389        return None;
390    }
391
392    let tx_time = u64::from_be_bytes(icmp_packet.body[4..12].try_into().unwrap());
393    let mut tx_id = u32::from_be_bytes(icmp_packet.body[12..16].try_into().unwrap());
394    let probe_src = u128::from_be_bytes(icmp_packet.body[16..32].try_into().unwrap());
395    let probe_dst = u128::from_be_bytes(icmp_packet.body[32..48].try_into().unwrap());
396
397    if (probe_src != reply_dst) | (probe_dst != reply_src) {
398        return None; // spoofed reply
399    }
400
401    let origin_id = get_origin_id_v6(reply_dst, 0, 0, origin_map)?;
402
403    let rx_time = SystemTime::now()
404        .duration_since(UNIX_EPOCH)
405        .unwrap()
406        .as_micros() as u64;
407
408    let is_discovery = if tx_id > u16::MAX as u32 {
409        tx_id -= u16::MAX as u32;
410        true
411    } else {
412        false
413    };
414
415    // Create a Reply for the received ping reply
416    Some((
417        Reply {
418            tx_time,
419            tx_id,
420            src: Some(address),
421            ttl,
422            rx_time,
423            origin_id,
424            chaos: None,
425        },
426        is_discovery,
427    ))
428}
429
430/// Parse DNSv4 packets (including v4 headers) into a Reply result.
431///
432/// Filters out spoofed packets and only parses DNS replies valid for the current measurement.
433///
434/// # Arguments
435///
436/// * 'packet_bytes' - the bytes of the packet to parse
437///
438/// * 'measurement_type' - the type of measurement being performed
439///
440/// * 'origin_map' - mapping of origin to origin ID
441///
442/// # Returns
443///
444/// * `Option<Reply>` - the received DNS reply
445///
446/// # Remarks
447///
448/// The function returns None if the packet is too short to contain a UDP header.
449fn parse_dnsv4(
450    packet_bytes: &[u8],
451    measurement_type: u8,
452    origin_map: &Vec<Origin>,
453) -> Option<(Reply, bool)> {
454    // DNSv4 28 minimum (IPv4 header (20) + UDP header (8)) + check next protocol is UDP TODO incorporate minimum payload size
455    if (packet_bytes.len() < 28) || (packet_bytes[9] != 17) {
456        return None;
457    }
458
459    let (src, ttl, payload, reply_dst, reply_src) = parse_ipv4(packet_bytes);
460
461    let PacketPayload::Udp { value: udp_packet } = payload else {
462        return None;
463    };
464
465    // The UDP responses will be from DNS services, with src port 53 and our possible src ports as dest port, furthermore the body length has to be large enough to contain a DNS A reply
466    // TODO body packet length is variable based on the domain name used in the measurement
467    if ((measurement_type == A_ID) & (udp_packet.body.len() < 66))
468        | ((measurement_type == CHAOS_ID) & (udp_packet.body.len() < 10))
469    {
470        return None;
471    }
472
473    let reply_sport = udp_packet.sport;
474    let reply_dport = udp_packet.dport;
475
476    let rx_time = SystemTime::now()
477        .duration_since(UNIX_EPOCH)
478        .unwrap()
479        .as_micros() as u64;
480    let (tx_time, tx_id, chaos, is_discovery) = if measurement_type == A_ID {
481        let dns_result = parse_dns_a_record_v4(udp_packet.body.as_slice())?;
482
483        if (dns_result.probe_sport != reply_dport)
484            | (dns_result.probe_src != reply_dst)
485            | (dns_result.probe_dst != reply_src)
486        {
487            return None; // spoofed reply
488        }
489
490        (
491            dns_result.tx_time,
492            dns_result.tx_id,
493            None,
494            dns_result.is_discovery,
495        )
496    } else if measurement_type == CHAOS_ID {
497        // TODO is_discovery for chaos
498        let (tx_time, tx_id, chaos) = parse_chaos(udp_packet.body.as_slice())?;
499
500        (tx_time, tx_id, Some(chaos), false)
501    } else {
502        panic!("Invalid measurement type");
503    };
504
505    let origin_id = get_origin_id_v4(reply_dst, reply_sport, reply_dport, origin_map)?;
506
507    // Create a Reply for the received DNS reply
508    Some((
509        Reply {
510            tx_time,
511            tx_id,
512            src: Some(src),
513            ttl,
514            rx_time,
515            origin_id,
516            chaos,
517        },
518        is_discovery,
519    ))
520}
521
522/// Parse DNSv6 packets (including v6 headers) into a Reply.
523///
524/// Filters out spoofed packets and only parses DNS replies valid for the current measurement.
525///
526/// # Arguments
527///
528/// * `packet_bytes` - the bytes of the packet to parse
529///
530/// * `measurement_type` - the type of measurement being performed
531///
532/// * `origin_map` - mapping of origin to origin ID
533///
534/// # Returns
535///
536/// * `Option<Reply>` - the received DNS reply
537///
538/// # Remarks
539///
540/// The function returns None if the packet is too short to contain a UDP header.
541fn parse_dnsv6(
542    packet_bytes: &[u8],
543    measurement_type: u8,
544    origin_map: &Vec<Origin>,
545) -> Option<(Reply, bool)> {
546    // DNSv6 48 length (IPv6 header (40) + UDP header (8)) + check next protocol is UDP TODO incorporate minimum payload size
547    if (packet_bytes.len() < 48) || (packet_bytes[6] != 17) {
548        return None;
549    }
550    let (src, ttl, payload, reply_dst, reply_src) = parse_ipv6(packet_bytes);
551
552    let PacketPayload::Udp { value: udp_packet } = payload else {
553        return None;
554    };
555
556    // The UDP responses will be from DNS services, with src port 53 and our possible src ports as dest port, furthermore the body length has to be large enough to contain a DNS A reply
557    // TODO use 'get_domain_length'
558    if ((measurement_type == A_ID) & (udp_packet.body.len() < 66))
559        | ((measurement_type == CHAOS_ID) & (udp_packet.body.len() < 10))
560    {
561        return None;
562    }
563
564    let reply_sport = udp_packet.sport;
565    let reply_dport = udp_packet.dport;
566
567    let rx_time = SystemTime::now()
568        .duration_since(UNIX_EPOCH)
569        .unwrap()
570        .as_micros() as u64;
571    let (tx_time, tx_id, chaos, is_discovery) = if measurement_type == A_ID {
572        let dns_result = parse_dns_a_record_v6(udp_packet.body.as_slice())?;
573
574        if (dns_result.probe_sport != reply_dport)
575            | (dns_result.probe_dst != reply_src)
576            | (dns_result.probe_src != reply_dst)
577        {
578            return None; // spoofed reply
579        }
580
581        (
582            dns_result.tx_time,
583            dns_result.tx_id,
584            None,
585            dns_result.is_discovery,
586        )
587    } else if measurement_type == CHAOS_ID {
588        // TODO is_discovery for CHAOS
589        let (tx_time, tx_worker_id, chaos) = parse_chaos(udp_packet.body.as_slice())?;
590        (tx_time, tx_worker_id, Some(chaos), false)
591    } else {
592        panic!("Invalid measurement type");
593    };
594
595    let origin_id = get_origin_id_v6(reply_dst, reply_sport, reply_dport, origin_map)?;
596
597    // Create a Reply for the received DNS reply
598    Some((
599        Reply {
600            tx_time,
601            tx_id,
602            src: Some(src),
603            ttl,
604            rx_time,
605            origin_id,
606            chaos,
607        },
608        is_discovery,
609    ))
610}
611enum Addr {
612    V4(u32),
613    V6(u128),
614}
615
616impl PartialEq<u32> for Addr {
617    fn eq(&self, other_u32: &u32) -> bool {
618        match self {
619            Addr::V4(addr_val) => addr_val == other_u32,
620            Addr::V6(_) => false,
621        }
622    }
623}
624
625impl PartialEq<u128> for Addr {
626    fn eq(&self, other_u128: &u128) -> bool {
627        match self {
628            Addr::V6(addr_val) => addr_val == other_u128,
629            Addr::V4(_) => false,
630        }
631    }
632}
633
634struct DnsResult {
635    tx_time: u64,
636    tx_id: u32,
637    probe_sport: u16,
638    probe_src: Addr,
639    probe_dst: Addr,
640    is_discovery: bool,
641}
642
643/// Attempts to parse the DNS A record from a DNS payload body.
644///
645/// # Arguments
646///
647/// * `packet_bytes` - the bytes of the packet to parse
648///
649/// # Returns
650///
651/// * `Option<UdpResult, u16, u128, u128>` - the UDP result containing the DNS A record with the source port and source and destination addresses and whether it is a discovery packet
652///
653/// # Remarks
654///
655/// The function returns None if the packet is too short to contain a DNS A record.
656fn parse_dns_a_record_v6(packet_bytes: &[u8]) -> Option<DnsResult> {
657    // TODO v6 and v4 can be merged into one function
658    let record = DNSRecord::from(packet_bytes);
659    let domain = record.domain; // example: '1679305276037913215.3226971181.16843009.0.4000.any.dnsjedi.org'
660                                // Get the information from the domain, continue to the next packet if it does not follow the format
661    let parts: Vec<&str> = domain.split('.').collect();
662    // Our domains have at least 5 parts
663    if parts.len() < 5 {
664        return None;
665    }
666
667    let tx_time = parts[0].parse::<u64>().ok()?;
668    let probe_src = Addr::V6(parts[1].parse::<u128>().ok()?);
669    let probe_dst = Addr::V6(parts[2].parse::<u128>().ok()?);
670    let mut tx_id = parts[3].parse::<u32>().ok()?;
671    let probe_sport = parts[4].parse::<u16>().ok()?;
672
673    let is_discovery = if tx_id > u16::MAX as u32 {
674        tx_id -= u16::MAX as u32;
675        true
676    } else {
677        false
678    };
679
680    Some(DnsResult {
681        tx_time,
682        tx_id,
683        probe_sport,
684        probe_src,
685        probe_dst,
686        is_discovery,
687    })
688}
689
690/// Attempts to parse the DNS A record from a UDP payload body.
691///
692/// # Arguments
693///
694/// * `packet_bytes` - the bytes of the packet to parse
695///
696/// # Returns
697///
698/// * `Option<UdpResult, u16, u128, u128, bool>` - the UDP result containing the DNS A record with the source port and source and destination addresses and whether it is a discovery packet TODO rustdoc
699///
700/// # Remarks
701///
702/// The function returns None if the packet is too short to contain a DNS A record.
703fn parse_dns_a_record_v4(packet_bytes: &[u8]) -> Option<DnsResult> {
704    let record = DNSRecord::from(packet_bytes);
705    let domain = record.domain; // example: '1679305276037913215.3226971181.16843009.0.4000.any.dnsjedi.org'
706                                // Get the information from the domain, continue to the next packet if it does not follow the format
707
708    let parts: Vec<&str> = domain.split('.').collect();
709    // Our domains have at least 5 parts
710    if parts.len() < 5 {
711        return None;
712    }
713
714    let tx_time = parts[0].parse::<u64>().ok()?;
715    let probe_src = Addr::V4(parts[1].parse::<u32>().ok()?);
716    let probe_dst = Addr::V4(parts[2].parse::<u32>().ok()?);
717    let mut tx_id = parts[3].parse::<u32>().ok()?;
718    let probe_sport = parts[4].parse::<u16>().ok()?;
719
720    let is_discovery = if tx_id > u16::MAX as u32 {
721        tx_id -= u16::MAX as u32;
722        true
723    } else {
724        false
725    };
726
727    Some(DnsResult {
728        tx_time,
729        tx_id,
730        probe_sport,
731        probe_src,
732        probe_dst,
733        is_discovery,
734    })
735}
736
737/// Attempts to parse the DNS Chaos record from a UDP payload body.
738///
739/// # Arguments
740///
741/// * `packet_bytes` - the bytes of the packet to parse
742///
743/// # Returns
744///
745/// * `Option<UdpPayload>` - the UDP payload containing the DNS Chaos record
746///
747/// # Remarks
748///
749/// The function returns None if the packet is too short to contain a DNS Chaos record.
750fn parse_chaos(packet_bytes: &[u8]) -> Option<(u64, u32, String)> {
751    let record = DNSRecord::from(packet_bytes);
752
753    // 8 right most bits are the sender worker_id
754    let tx_worker_id = ((record.transaction_id >> 8) & 0xFF) as u32;
755
756    if record.answer == 0 {
757        return Some((0u64, tx_worker_id, "Not implemented".to_string()));
758    }
759
760    let chaos_data = TXTRecord::from(DNSAnswer::from(record.body.as_slice()).data.as_slice()).txt;
761
762    Some((0u64, tx_worker_id, chaos_data))
763}
764
765/// Parse TCPv4 packets (including v4 headers) into a Reply result.
766///
767/// # Arguments
768///
769/// * `packet_bytes` - the bytes of the packet to parse
770///
771/// * `origin_map` - mapping of origin to origin ID
772///
773/// # Returns
774///
775/// * `Option<Reply>` - the received TCP reply
776///
777/// # Remarks
778///
779/// The function returns None if the packet is too short to contain a TCP header.
780fn parse_tcpv4(packet_bytes: &[u8], origin_map: &Vec<Origin>) -> Option<(Reply, bool)> {
781    // TCPv4 40 bytes (IPv4 header (20) + TCP header (20)) + check for RST flag
782    if (packet_bytes.len() < 40) || ((packet_bytes[33] & 0x04) == 0) {
783        return None;
784    }
785    let (src, ttl, payload, reply_dst, _reply_src) = parse_ipv4(packet_bytes);
786    // cannot filter out spoofed packets as the probe_dst is unknown
787
788    let PacketPayload::Tcp { value: tcp_packet } = payload else {
789        return None;
790    };
791
792    let rx_time = SystemTime::now()
793        .duration_since(UNIX_EPOCH)
794        .unwrap()
795        .as_micros() as u64;
796
797    let origin_id = get_origin_id_v4(reply_dst, tcp_packet.sport, tcp_packet.dport, origin_map)?;
798
799    // Discovery probes have bit 16 set and higher bits unset
800    let bit_16_mask = 1 << 16;
801    let higher_bits_mask = !0u32 << 17;
802
803    let (tx_id, is_discovery) =
804        if (tcp_packet.seq & bit_16_mask) != 0 && (tcp_packet.seq & higher_bits_mask) == 0 {
805            (tcp_packet.seq - u16::MAX as u32, true)
806        } else {
807            (tcp_packet.seq, false)
808        };
809
810    Some((
811        Reply {
812            tx_time: tx_id as u64,
813            tx_id,
814            src: Some(src),
815            ttl,
816            rx_time,
817            origin_id,
818            chaos: None,
819        },
820        is_discovery,
821    ))
822}
823
824/// Parse TCPv6 packets (including v6 headers) into a Reply result.
825///
826/// # Arguments
827///
828/// * `packet_bytes` - the bytes of the packet to parse
829///
830/// * `origin_map` - mapping of origin to origin ID
831///
832/// # Returns
833///
834/// * `Option<Reply>` - the received TCP reply
835///
836/// # Remarks
837///
838/// The function returns None if the packet is too short to contain a TCP header.
839fn parse_tcpv6(packet_bytes: &[u8], origin_map: &Vec<Origin>) -> Option<(Reply, bool)> {
840    // TCPv6 64 length (IPv6 header (40) + TCP header (20)) + check for RST flag
841    if (packet_bytes.len() < 60) || ((packet_bytes[53] & 0x04) == 0) {
842        return None;
843    }
844    let (src, ttl, payload, reply_dst, _reply_src) = parse_ipv6(packet_bytes);
845    // cannot filter out spoofed packets as the probe_dst is unknown
846
847    let PacketPayload::Tcp { value: tcp_packet } = payload else {
848        return None;
849    };
850
851    let rx_time = SystemTime::now()
852        .duration_since(UNIX_EPOCH)
853        .unwrap()
854        .as_micros() as u64;
855
856    let origin_id = get_origin_id_v6(reply_dst, tcp_packet.sport, tcp_packet.dport, origin_map)?;
857
858    // Discovery probes have bit 16 set and higher bits unset
859    let bit_16_mask = 1 << 16;
860    let higher_bits_mask = !0u32 << 17;
861
862    let (tx_id, is_discovery) =
863        if (tcp_packet.seq & bit_16_mask) != 0 && (tcp_packet.seq & higher_bits_mask) == 0 {
864            (tcp_packet.seq - u16::MAX as u32, true)
865        } else {
866            (tcp_packet.seq, false)
867        };
868
869    Some((
870        Reply {
871            tx_time: tx_id as u64,
872            tx_id,
873            src: Some(src),
874            ttl,
875            rx_time,
876            origin_id,
877            chaos: None,
878        },
879        is_discovery,
880    ))
881}
882
883/// Get the origin ID from the origin map based on the reply destination address and ports.
884///
885/// # Arguments
886///
887/// * `reply_dst` - the destination address of the reply
888///
889/// * `reply_sport` - the source port of the reply
890///
891/// * `reply_dport` - the destination port of the reply
892///
893/// * `origin_map` - the origin map to search in
894///
895/// # Returns
896///
897/// * `Option<u32>` - the origin ID if found, None otherwise
898fn get_origin_id_v4(
899    reply_dst: u32,
900    reply_sport: u16,
901    reply_dport: u16,
902    origin_map: &Vec<Origin>,
903) -> Option<u32> {
904    for origin in origin_map {
905        if origin.src.unwrap().get_v4() == reply_dst
906            && origin.sport as u16 == reply_dport
907            && origin.dport as u16 == reply_sport
908        {
909            return Some(origin.origin_id);
910        } else if origin.src.unwrap().get_v4() == reply_dst && 0 == reply_sport && 0 == reply_dport
911        {
912            // ICMP replies have no port numbers
913            return Some(origin.origin_id);
914        }
915    }
916    None
917}
918
919/// Get the origin ID from the origin map based on the reply destination address and ports.
920///
921/// # Arguments
922///
923/// * `reply_dst` - the destination address of the reply
924///
925/// * `reply_sport` - the source port of the reply
926///
927/// * `reply_dport` - the destination port of the reply
928///
929/// * `origin_map` - the origin map to search in
930///
931/// # Returns
932///
933/// * `Option<u32>` - the origin ID if found, None otherwise
934fn get_origin_id_v6(
935    reply_dst: u128,
936    reply_sport: u16,
937    reply_dport: u16,
938    origin_map: &Vec<Origin>,
939) -> Option<u32> {
940    for origin in origin_map {
941        if origin.src.unwrap().get_v6() == reply_dst
942            && origin.sport as u16 == reply_dport
943            && origin.dport as u16 == reply_sport
944        {
945            return Some(origin.origin_id);
946        } else if origin.src.unwrap().get_v6() == reply_dst && 0 == reply_sport && 0 == reply_dport
947        {
948            // ICMP replies have no port numbers
949            return Some(origin.origin_id);
950        }
951    }
952    None
953}