manycast/worker/
outbound.rs

1use std::num::NonZeroU32;
2use std::sync::atomic::AtomicBool;
3use std::sync::Arc;
4use std::thread;
5use std::thread::sleep;
6use std::time::Duration;
7
8use tokio::sync::mpsc::{error::TryRecvError, Receiver};
9
10use crate::custom_module;
11use custom_module::manycastr::task::Data::{End, Targets};
12use custom_module::manycastr::{task::Data, Origin};
13
14use pnet::datalink::DataLinkSender;
15
16use crate::custom_module::Separated;
17use crate::net::packet::{create_dns, create_icmp, create_tcp, get_ethernet_header};
18use ratelimit_meter::{DirectRateLimiter, LeakyBucket};
19
20const DISCOVERY_WORKER_ID_OFFSET: u32 = u16::MAX as u32;
21
22/// Configuration for an outbound packet sending worker.
23///
24/// This struct holds all the parameters needed to initialize and run a worker
25/// that generates and sends measurement probes (e.g., ICMP, DNS, TCP)
26/// at a specified rate.
27pub struct OutboundConfig {
28    /// The unique ID of this specific worker.
29    pub worker_id: u16,
30
31    /// A list of source addresses and port values (`Origin`) to send probes from.
32    pub tx_origins: Vec<Origin>,
33
34    /// A shared signal that can be used to forcefully shut down the worker.
35    ///
36    /// E.g., when the CLI has abruptly disconnected.
37    pub abort_s: Arc<AtomicBool>,
38
39    /// Specifies whether to send IPv6 packets (`true`) or IPv4 packets (`false`).
40    pub is_ipv6: bool,
41
42    /// Indicates if this is a latency measurement.
43    pub is_symmetric: bool,
44
45    /// The unique ID of the measurement.
46    pub m_id: u32,
47
48    /// The type of probe to send (e.g., 1 for ICMP, 2 for DNS/A, 3 for TCP).
49    pub m_type: u8,
50
51    /// The domain name to query in DNS measurement probes.
52    pub qname: String,
53
54    /// An informational URL to be embedded in the probe's payload (e.g., an opt-out link).
55    pub info_url: String,
56
57    /// The name of the network interface to send packets from (e.g., "eth0").
58    pub if_name: String,
59
60    /// The target rate for sending probes, measured in packets per second (pps).
61    pub probing_rate: u32,
62}
63
64/// Spawns thread that sends out ICMP, DNS, or TCP probes.
65///
66/// # Arguments
67///
68/// * 'config' - configuration for the outbound worker thread
69///
70/// * 'outbound_rx' - on this channel we receive future tasks that are part of the current measurement
71///
72/// * 'socket_tx' - the sender object to send packets
73pub fn outbound(
74    config: OutboundConfig,
75    mut outbound_rx: Receiver<Data>,
76    mut socket_tx: Box<dyn DataLinkSender>,
77) {
78    thread::Builder::new()
79        .name("outbound".to_string())
80        .spawn(move || {
81            let mut sent: u32 = 0;
82            let mut sent_discovery = 0;
83            let mut failed : u32= 0;
84            // Rate limit the number of packets sent per second, each origin has the same rate (i.e., sending with 2 origins will double the rate)
85            let mut limiter = DirectRateLimiter::<LeakyBucket>::per_second(NonZeroU32::new(config.probing_rate * config.tx_origins.len() as u32).unwrap());
86
87            let ethernet_header = get_ethernet_header(config.is_ipv6, config.if_name);
88            'outer: loop {
89                if config.abort_s.load(std::sync::atomic::Ordering::SeqCst) {
90                    // If the finish_rx is set to true, break the loop (abort)
91                    println!("[Worker outbound] ABORTING");
92                    break;
93                }
94                let task;
95                // Receive tasks from the outbound channel
96                loop {
97                    match outbound_rx.try_recv() {
98                        Ok(t) => {
99                            task = t;
100                            break;
101                        }
102                        Err(e) => {
103                            if e == TryRecvError::Disconnected {
104                                println!("[Worker outbound] Channel disconnected");
105                                break 'outer;
106                            }
107                            // wait some time and try again
108                            sleep(Duration::from_millis(100));
109                        }
110                    };
111                }
112                match task {
113                    End(_) => {
114                        // An End task means the measurement has finished
115                        break;
116                    }
117                    Targets(targets) => {
118                        let worker_id = if targets.is_discovery == Some(true) {
119                            sent_discovery += targets.dst_list.len();
120                            config.worker_id as u32 + DISCOVERY_WORKER_ID_OFFSET
121                        } else {
122                            config.worker_id as u32
123                        };
124                        for origin in &config.tx_origins {
125                            match config.m_type {
126                                1 => { // ICMP
127                                    for dst in &targets.dst_list {
128                                        let mut packet = ethernet_header.clone();
129                                        packet.extend_from_slice(&create_icmp(
130                                            origin,
131                                            dst,
132                                            worker_id,
133                                            config.m_id,
134                                            &config.info_url,
135                                        ));
136
137                                        while limiter.check().is_err() { // Rate limit to avoid bursts
138                                            sleep(Duration::from_millis(1));
139                                        }
140
141                                        match socket_tx.send_to(&packet, None) {
142                                            Some(Ok(())) => sent += 1,
143                                            Some(Err(e)) => {
144                                                eprintln!("[Worker outbound] Failed to send ICMP packet: {e}");
145                                                failed += 1;
146                                            },
147                                            None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
148                                        }
149                                    }
150                                }
151                                2 | 4 => { // DNS A record or CHAOS
152                                    for dst in &targets.dst_list {
153                                        let mut packet = ethernet_header.clone();
154                                        packet.extend_from_slice(&create_dns(
155                                            origin,
156                                            dst,
157                                            worker_id,
158                                            config.m_type,
159                                            &config.qname,
160                                        ));
161                                        while limiter.check().is_err() { // Rate limit to avoid bursts
162                                            sleep(Duration::from_millis(1));
163                                        }
164                                        match socket_tx.send_to(&packet, None) {
165                                            Some(Ok(())) => sent += 1,
166                                            Some(Err(e)) => {
167                                                eprintln!("[Worker outbound] Failed to send DNS packet: {e}");
168                                                failed += 1;
169                                            },
170                                            None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
171                                        }
172                                    }
173                                }
174                                3 => { // TCP
175                                    for dst in &targets.dst_list {
176                                        let mut packet = ethernet_header.clone();
177                                        packet.extend_from_slice(&create_tcp(
178                                            origin,
179                                            dst,
180                                            worker_id,
181                                            config.is_symmetric,
182                                            &config.info_url,
183                                        ));
184
185                                        while limiter.check().is_err() { // Rate limit to avoid bursts
186                                            sleep(Duration::from_millis(1));
187                                        }
188
189                                        match socket_tx.send_to(&packet, None) {
190                                            Some(Ok(())) => sent += 1,
191                                            Some(Err(e)) => {
192                                                eprintln!("[Worker outbound] Failed to send TCP packet: {e}");
193                                                failed += 1;
194                                            },
195                                            None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
196                                        }
197                                    }
198                                }
199                                255 => {
200                                    panic!("Invalid measurement type)") // TODO all
201                                }
202                                _ => panic!("Invalid measurement type"), // Invalid measurement
203                            }
204                        }
205                    }
206                    _ => continue, // Invalid measurement
207                };
208            }
209            println!("[Worker outbound] Outbound thread finished - packets sent : {} (including {} discovery probes), packets failed to send: {}", sent.with_separator(), sent_discovery.with_separator(), failed.with_separator());
210        })
211        .expect("Failed to spawn outbound thread");
212}