manycast/worker/outbound.rs
1use std::num::NonZeroU32;
2use std::sync::atomic::AtomicBool;
3use std::sync::Arc;
4use std::thread;
5use std::thread::sleep;
6use std::time::Duration;
7
8use tokio::sync::mpsc::{error::TryRecvError, Receiver};
9
10use crate::custom_module;
11use custom_module::manycastr::task::Data::{End, Targets};
12use custom_module::manycastr::{task::Data, Origin};
13
14use pnet::datalink::DataLinkSender;
15
16use crate::custom_module::Separated;
17use crate::net::packet::{create_dns, create_icmp, create_tcp, get_ethernet_header};
18use ratelimit_meter::{DirectRateLimiter, LeakyBucket};
19
20const DISCOVERY_WORKER_ID_OFFSET: u32 = u16::MAX as u32;
21
22/// Configuration for an outbound packet sending worker.
23///
24/// This struct holds all the parameters needed to initialize and run a worker
25/// that generates and sends measurement probes (e.g., ICMP, DNS, TCP)
26/// at a specified rate.
27pub struct OutboundConfig {
28 /// The unique ID of this specific worker.
29 pub worker_id: u16,
30
31 /// A list of source addresses and port values (`Origin`) to send probes from.
32 pub tx_origins: Vec<Origin>,
33
34 /// A shared signal that can be used to forcefully shut down the worker.
35 ///
36 /// E.g., when the CLI has abruptly disconnected.
37 pub abort_s: Arc<AtomicBool>,
38
39 /// Specifies whether to send IPv6 packets (`true`) or IPv4 packets (`false`).
40 pub is_ipv6: bool,
41
42 /// Indicates if this is a latency measurement.
43 pub is_symmetric: bool,
44
45 /// The unique ID of the measurement.
46 pub m_id: u32,
47
48 /// The type of probe to send (e.g., 1 for ICMP, 2 for DNS/A, 3 for TCP).
49 pub m_type: u8,
50
51 /// The domain name to query in DNS measurement probes.
52 pub qname: String,
53
54 /// An informational URL to be embedded in the probe's payload (e.g., an opt-out link).
55 pub info_url: String,
56
57 /// The name of the network interface to send packets from (e.g., "eth0").
58 pub if_name: String,
59
60 /// The target rate for sending probes, measured in packets per second (pps).
61 pub probing_rate: u32,
62}
63
64/// Spawns thread that sends out ICMP, DNS, or TCP probes.
65///
66/// # Arguments
67///
68/// * 'config' - configuration for the outbound worker thread
69///
70/// * 'outbound_rx' - on this channel we receive future tasks that are part of the current measurement
71///
72/// * 'socket_tx' - the sender object to send packets
73pub fn outbound(
74 config: OutboundConfig,
75 mut outbound_rx: Receiver<Data>,
76 mut socket_tx: Box<dyn DataLinkSender>,
77) {
78 thread::Builder::new()
79 .name("outbound".to_string())
80 .spawn(move || {
81 let mut sent: u32 = 0;
82 let mut sent_discovery = 0;
83 let mut failed : u32= 0;
84 // Rate limit the number of packets sent per second, each origin has the same rate (i.e., sending with 2 origins will double the rate)
85 let mut limiter = DirectRateLimiter::<LeakyBucket>::per_second(NonZeroU32::new(config.probing_rate * config.tx_origins.len() as u32).unwrap());
86
87 let ethernet_header = get_ethernet_header(config.is_ipv6, config.if_name);
88 'outer: loop {
89 if config.abort_s.load(std::sync::atomic::Ordering::SeqCst) {
90 // If the finish_rx is set to true, break the loop (abort)
91 println!("[Worker outbound] ABORTING");
92 break;
93 }
94 let task;
95 // Receive tasks from the outbound channel
96 loop {
97 match outbound_rx.try_recv() {
98 Ok(t) => {
99 task = t;
100 break;
101 }
102 Err(e) => {
103 if e == TryRecvError::Disconnected {
104 println!("[Worker outbound] Channel disconnected");
105 break 'outer;
106 }
107 // wait some time and try again
108 sleep(Duration::from_millis(100));
109 }
110 };
111 }
112 match task {
113 End(_) => {
114 // An End task means the measurement has finished
115 break;
116 }
117 Targets(targets) => {
118 let worker_id = if targets.is_discovery == Some(true) {
119 sent_discovery += targets.dst_list.len();
120 config.worker_id as u32 + DISCOVERY_WORKER_ID_OFFSET
121 } else {
122 config.worker_id as u32
123 };
124 for origin in &config.tx_origins {
125 match config.m_type {
126 1 => { // ICMP
127 for dst in &targets.dst_list {
128 let mut packet = ethernet_header.clone();
129 packet.extend_from_slice(&create_icmp(
130 origin,
131 dst,
132 worker_id,
133 config.m_id,
134 &config.info_url,
135 ));
136
137 while limiter.check().is_err() { // Rate limit to avoid bursts
138 sleep(Duration::from_millis(1));
139 }
140
141 match socket_tx.send_to(&packet, None) {
142 Some(Ok(())) => sent += 1,
143 Some(Err(e)) => {
144 eprintln!("[Worker outbound] Failed to send ICMP packet: {e}");
145 failed += 1;
146 },
147 None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
148 }
149 }
150 }
151 2 | 4 => { // DNS A record or CHAOS
152 for dst in &targets.dst_list {
153 let mut packet = ethernet_header.clone();
154 packet.extend_from_slice(&create_dns(
155 origin,
156 dst,
157 worker_id,
158 config.m_type,
159 &config.qname,
160 ));
161 while limiter.check().is_err() { // Rate limit to avoid bursts
162 sleep(Duration::from_millis(1));
163 }
164 match socket_tx.send_to(&packet, None) {
165 Some(Ok(())) => sent += 1,
166 Some(Err(e)) => {
167 eprintln!("[Worker outbound] Failed to send DNS packet: {e}");
168 failed += 1;
169 },
170 None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
171 }
172 }
173 }
174 3 => { // TCP
175 for dst in &targets.dst_list {
176 let mut packet = ethernet_header.clone();
177 packet.extend_from_slice(&create_tcp(
178 origin,
179 dst,
180 worker_id,
181 config.is_symmetric,
182 &config.info_url,
183 ));
184
185 while limiter.check().is_err() { // Rate limit to avoid bursts
186 sleep(Duration::from_millis(1));
187 }
188
189 match socket_tx.send_to(&packet, None) {
190 Some(Ok(())) => sent += 1,
191 Some(Err(e)) => {
192 eprintln!("[Worker outbound] Failed to send TCP packet: {e}");
193 failed += 1;
194 },
195 None => eprintln!("[Worker outbound] Failed to send packet: No Tx interface"),
196 }
197 }
198 }
199 255 => {
200 panic!("Invalid measurement type)") // TODO all
201 }
202 _ => panic!("Invalid measurement type"), // Invalid measurement
203 }
204 }
205 }
206 _ => continue, // Invalid measurement
207 };
208 }
209 println!("[Worker outbound] Outbound thread finished - packets sent : {} (including {} discovery probes), packets failed to send: {}", sent.with_separator(), sent_discovery.with_separator(), failed.with_separator());
210 })
211 .expect("Failed to spawn outbound thread");
212}