UDP en tiempo real: Por qué recibir datos tarde es peor que perderlos
Inicia sesión para descargarConstruye paso a paso en Rust un sistema de streaming sobre UDP con un sender y un receiver capaces de detectar pérdida, desorden, jitter y paquetes tardíos. Un tutorial práctico para entender, …
Contenido del tutorial ⌄
- 1. La decisión arquitectónica y el workspace
- 2. common: donde el sistema deja de ser una idea y se vuelve contrato
- 3. El protocolo: diseñando Packet
- 4. La inteligencia secuencial: SequenceTracker
- 5. La configuración compartida: config.rs
- 6. frame.rs: el laboratorio controlado del flujo
- 7. stats.rs: donde el receptor convierte eventos en veredicto
- 8. sender: donde el flujo deja de ser teoría y empieza a moverse
- 9. receiver: donde la red deja de ser tránsito y se vuelve juicio
📡 Telecomunicaciones · Rust & UDP
UDP en tiempo real: por qué recibir datos tarde es peor que perderlos
En sistemas de audio y streaming interactivo, el tiempo forma parte integral de la información. Hoy en Tu Código Cotidiano vamos a programar desde cero un sistema que convierte esta paradoja de las telecomunicaciones en algo visible, medible y reproducible.
Imagina que estás en una videollamada y gritas “¡Cuidado!”. Si la red se asegura de entregar ese mensaje sin errores, pero tarda cuatro segundos en hacerlo porque hubo congestión y el router tuvo que retransmitirlo, la advertencia ya no sirve para nada. Aquí nace una de las verdades más crudas de la ingeniería de redes: en sistemas de tiempo real, recibir datos perfectos pero tarde arruina el sistema. A veces, es mejor perderlos.
Suena contraintuitivo, ¿verdad? Estamos acostumbrados a que un archivo a medias es un archivo roto. Pero en voz sobre IP, videojuegos en línea o streaming interactivo, la continuidad pesa más que la perfección absoluta. Y como esta idea se entiende mejor cuando se ve funcionar, no vamos a quedarnos en teoría: vamos a programarla en Rust línea por línea para verla con nuestros propios ojos.
En tiempo real, el receptor no solo decide si un paquete llegó: también decide si llegó a tiempo para seguir teniendo valor.
1. La decisión arquitectónica y el workspace
Si intentáramos construir este sistema con TCP —el protocolo detrás de HTTP— fracasaríamos desde el diseño. TCP es increíblemente confiable porque obliga a la retransmisión: si el paquete 4 se pierde, detiene la entrega de los paquetes 5, 6 y 7 hasta que el 4 sea reenviado y recibido. Eso asegura un orden perfecto, pero en tiempo real inyecta una latencia impredecible y, en el peor caso, destructiva. Ese fenómeno es el famoso Head-of-line blocking.
Por eso elegimos UDP. UDP te entrega datagramas simples sin prometerte absolutamente nada. No retransmite, no ordena y no avisa si algo se perdió. La red solo transporta el dato crudo, y es nuestra aplicación la que debe implementar la inteligencia para decidir qué hacer si un paquete falta, si se duplicó, si llegó fuera de orden o si aterrizó demasiado tarde para seguir siendo útil.
Para este tutorial, vamos a estructurar el proyecto como un workspace limpio en Rust con tres crates bien definidos:
common: las reglas del juego. Aquí vivirá nuestro protocolo, la lógica para medir tiempo y secuencia, y la configuración compartida.sender: nuestro inyector. Generará una señal determinista y la lanzará a la red por UDP.receiver: el juez. Escuchará la red, analizará cada paquete y dictaminará en tiempo real si el flujo es saludable o se está degradando.
Archivo
Cargo.toml · raíz del workspace
Este archivo define la estructura general del proyecto y deja explícito que trabajaremos con tres crates: common, sender y receiver.
# Cargo.toml (raíz del workspace)
[workspace]
members = [
"crates/common",
"crates/sender",
"crates/receiver",
]
resolver = "2"
[workspace.package]
edition = "2021"
license = "MIT"
authors = ["Tu Nombre"]
[workspace.dependencies]
common = { path = "crates/common" }
sender = { path = "crates/sender" }
receiver = { path = "crates/receiver" }
Con esta estructura, obligamos al código a ser modular desde el primer momento. No queremos un archivo gigantesco mezclando red, serialización, análisis temporal y estadísticas. Queremos una arquitectura que ya enseñe, por sí misma, cómo pensar el sistema.
Empecemos entonces construyendo el corazón matemático del proyecto: la capa common, que será la encargada de definir el protocolo del paquete, el análisis del flujo y la base sobre la que se apoyarán tanto el sender como el receiver.
2. common: donde el sistema deja de ser una idea y se vuelve contrato
Hasta aquí solo hemos definido la arquitectura general: un sender, un receiver y una intuición fuerte sobre tiempo real. Pero un sistema no empieza de verdad cuando abre sockets; empieza cuando define sus reglas. Por eso el siguiente paso natural es construir la capa common, que será el lenguaje compartido del proyecto.
Esta crate no envía paquetes ni escucha la red. Su trabajo es más profundo: establecer qué significa un paquete válido, qué información debe contener, cómo representamos la secuencia, cómo representamos el tiempo y qué errores consideramos aceptables o fatales. En otras palabras, common es el sitio donde convertimos un problema abstracto de telecomunicaciones en una interfaz concreta de Rust.
Archivo
crates/common/Cargo.toml
Este manifiesto declara la crate common como biblioteca reutilizable. Su propósito no es ejecutar nada por sí sola, sino exponer la base compartida sobre la que se apoyarán el emisor y el receptor.
[package]
name = "common"
version = "0.1.0"
edition = "2021"
[lib]
name = "common"
path = "src/lib.rs"
El punto importante aquí no es el tamaño del archivo, sino la intención arquitectónica. common nace como librería porque queremos que el protocolo, el análisis y las métricas sean una verdad compartida, no lógica duplicada a mano entre el sender y el receiver.
Archivo
crates/common/src/lib.rs
Esta raíz pública de la crate deja visibles los módulos que forman el contrato del sistema: protocolo, configuración, generación de frames, análisis y estadísticas.
//! Capa compartida del proyecto UDP audio MVP.
//!
//! Aquí vive la parte que conviene mantener estable y pedagógica:
//! - protocolo de paquete,
//! - configuración,
//! - generador de pseudoaudio,
//! - análisis de secuencia,
//! - análisis temporal,
//! - estadísticas acumuladas.
pub mod analysis;
pub mod config;
pub mod frame;
pub mod packet;
pub mod stats;
pub use analysis::{SequenceEvent, SequenceTracker, TimingObservation, TimingTracker};
pub use config::{ReceiverConfig, SenderConfig, Verbosity};
pub use frame::{build_pseudoaudio_frame, DelayPlan, ReorderBuffer, SimpleRng};
pub use packet::{Packet, PacketError, HEADER_LEN, MAGIC, PROTOCOL_VERSION};
pub use stats::{PacketObservation, ReceiverStats, RunSummary};
Esta raíz ya nos deja ver el mapa mental del proyecto: el sistema tendrá protocolo, configuración, generación de datos, análisis y métricas. Pero dentro de todo ese mapa hay una pieza que manda sobre las demás: el formato exacto del paquete que viajará por UDP.
Y ahí entra la tarea central de esta etapa: diseñar Packet. Antes de pensar en sockets, cadencia o jitter, necesitamos decidir qué significa exactamente un datagrama válido dentro de nuestro sistema.
3. El protocolo: diseñando Packet
UDP no sabe qué es una muestra de audio, un paquete tardío o una secuencia rota. Solo mueve datagramas. Por eso necesitamos una capa de aplicación que convierta un bloque de bytes en algo interpretable. Nuestro Packet será precisamente eso: una estructura que le da semántica a lo que la red transporta.
Aquí estamos resolviendo una transformación fundamental: pasar de la idea abstracta de “quiero enviar frames de pseudoaudio y medir su comportamiento” a una estructura concreta con campos, tamaños y reglas claras. En este punto del tutorial todavía no estamos serializando ni parseando bytes; primero necesitamos definir qué información debe existir para que el receptor pueda juzgar la calidad del flujo.
Archivo
crates/common/src/packet.rs
Aquí definimos el contrato del paquete: sus constantes de protocolo, la estructura Packet y el enum PacketError, que nos permitirá tratar errores de red como estados explícitos y no como caos implícito.
use std::fmt;
pub const MAGIC: [u8; 4] = *b"USTR";
pub const PROTOCOL_VERSION: u8 = 1;
pub const HEADER_LEN: usize = 24;
pub const MAX_PAYLOAD_LEN: usize = 65_507 - HEADER_LEN;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Packet {
pub flags: u8,
pub sequence_number: u32,
pub timestamp_ms: u64,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PacketError {
BufferTooSmall { got: usize, min: usize },
InvalidMagic { got: [u8; 4] },
UnsupportedVersion { got: u8, expected: u8 },
InvalidHeaderLength { got: u16, expected: u16 },
PayloadTooLarge { got: usize, max: usize },
InconsistentPayloadLength { announced: u32, actual: usize },
}
impl fmt::Display for PacketError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BufferTooSmall { got, min } => write!(f, "buffer demasiado pequeño: {got} < {min}"),
Self::InvalidMagic { got } => write!(f, "magic inválido: {got:?}"),
Self::UnsupportedVersion { got, expected } => {
write!(f, "versión no soportada: {got}, se esperaba {expected}")
}
Self::InvalidHeaderLength { got, expected } => {
write!(f, "header_len inválido: {got}, se esperaba {expected}")
}
Self::PayloadTooLarge { got, max } => write!(f, "payload demasiado grande: {got} > {max}"),
Self::InconsistentPayloadLength { announced, actual } => {
write!(f, "payload_len inconsistente: anunciado={announced}, actual={actual}")
}
}
}
}
impl std::error::Error for PacketError {}
impl Packet {
pub fn new(sequence_number: u32, timestamp_ms: u64, payload: Vec<u8>) -> Result<Self, PacketError> {
Self::with_flags(0, sequence_number, timestamp_ms, payload)
}
pub fn with_flags(
flags: u8,
sequence_number: u32,
timestamp_ms: u64,
payload: Vec<u8>,
) -> Result<Self, PacketError> {
if payload.len() > MAX_PAYLOAD_LEN {
return Err(PacketError::PayloadTooLarge {
got: payload.len(),
max: MAX_PAYLOAD_LEN,
});
}
Ok(Self {
flags,
sequence_number,
timestamp_ms,
payload,
})
}
pub fn to_bytes(&self) -> Result<Vec<u8>, PacketError> {
if self.payload.len() > MAX_PAYLOAD_LEN {
return Err(PacketError::PayloadTooLarge {
got: self.payload.len(),
max: MAX_PAYLOAD_LEN,
});
}
let mut bytes = Vec::with_capacity(HEADER_LEN + self.payload.len());
bytes.extend_from_slice(&MAGIC);
bytes.push(PROTOCOL_VERSION);
bytes.push(self.flags);
bytes.extend_from_slice(&(HEADER_LEN as u16).to_be_bytes());
bytes.extend_from_slice(&self.sequence_number.to_be_bytes());
bytes.extend_from_slice(&self.timestamp_ms.to_be_bytes());
bytes.extend_from_slice(&(self.payload.len() as u32).to_be_bytes());
bytes.extend_from_slice(&self.payload);
Ok(bytes)
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, PacketError> {
if bytes.len() < HEADER_LEN {
return Err(PacketError::BufferTooSmall {
got: bytes.len(),
min: HEADER_LEN,
});
}
let magic = [bytes[0], bytes[1], bytes[2], bytes[3]];
if magic != MAGIC {
return Err(PacketError::InvalidMagic { got: magic });
}
let version = bytes[4];
if version != PROTOCOL_VERSION {
return Err(PacketError::UnsupportedVersion {
got: version,
expected: PROTOCOL_VERSION,
});
}
let flags = bytes[5];
let header_len = u16::from_be_bytes([bytes[6], bytes[7]]);
if header_len != HEADER_LEN as u16 {
return Err(PacketError::InvalidHeaderLength {
got: header_len,
expected: HEADER_LEN as u16,
});
}
let sequence_number = u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]);
let timestamp_ms = u64::from_be_bytes([
bytes[12], bytes[13], bytes[14], bytes[15], bytes[16], bytes[17], bytes[18], bytes[19],
]);
let payload_len = u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]) as usize;
let actual_payload = bytes.len() - HEADER_LEN;
if payload_len != actual_payload {
return Err(PacketError::InconsistentPayloadLength {
announced: payload_len as u32,
actual: actual_payload,
});
}
Ok(Self {
flags,
sequence_number,
timestamp_ms,
payload: bytes[HEADER_LEN..].to_vec(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn round_trip_preserves_fields() {
let packet = Packet::with_flags(7, 42, 1_711_111_111_000, vec![1, 2, 3, 4]).unwrap();
let bytes = packet.to_bytes().unwrap();
let decoded = Packet::from_bytes(&bytes).unwrap();
assert_eq!(decoded, packet);
}
#[test]
fn invalid_when_buffer_is_too_small() {
let err = Packet::from_bytes(&[0; 8]).unwrap_err();
assert!(matches!(err, PacketError::BufferTooSmall { .. }));
}
#[test]
fn invalid_magic_is_detected() {
let mut bytes = Packet::new(1, 2, vec![9, 9]).unwrap().to_bytes().unwrap();
bytes[0] = b'X';
let err = Packet::from_bytes(&bytes).unwrap_err();
assert!(matches!(err, PacketError::InvalidMagic { .. }));
}
#[test]
fn inconsistent_payload_length_is_detected() {
let mut bytes = Packet::new(1, 2, vec![9, 9]).unwrap().to_bytes().unwrap();
bytes[20..24].copy_from_slice(&99u32.to_be_bytes());
let err = Packet::from_bytes(&bytes).unwrap_err();
assert!(matches!(err, PacketError::InconsistentPayloadLength { .. }));
}
#[test]
fn sequence_and_timestamp_are_preserved() {
let packet = Packet::new(u32::MAX - 1, u64::MAX - 2, vec![1, 2]).unwrap();
let decoded = Packet::from_bytes(&packet.to_bytes().unwrap()).unwrap();
assert_eq!(decoded.sequence_number, u32::MAX - 1);
assert_eq!(decoded.timestamp_ms, u64::MAX - 2);
}
}
Lo primero que conviene notar es que este paquete no nace como “audio” en sentido estricto. Nace como una estructura de control. El campo payload contiene los bytes del frame sintético, sí, pero lo que realmente vuelve inteligente al sistema está en el encabezado que lo acompaña.
MAGIC nos permite reconocer si un bloque de bytes realmente pertenece a nuestro protocolo. PROTOCOL_VERSION deja preparada una evolución futura sin romper compatibilidad a ciegas. HEADER_LEN fija la forma del encabezado y MAX_PAYLOAD_LEN nos protege frente a tamaños absurdos o inválidos. Dicho de forma simple: antes de hablar de tiempo real, primero necesitamos saber que estamos leyendo algo que de verdad tiene sentido para nuestro sistema.
Ahora llegamos a los dos campos más importantes de todo el diseño: sequence_number y timestamp_ms. El primero responde a una pregunta secuencial: ¿este paquete llegó donde le correspondía en el flujo? Gracias a él, el receptor podrá detectar huecos, desorden y duplicados.
El segundo responde a una pregunta temporal: ¿este paquete llegó todavía a tiempo para servir? Sin timestamp_ms, el receptor solo podría decir “llegó” o “no llegó”; con él, ya puede medir retraso, estimar jitter y clasificar paquetes tardíos. En otras palabras, sequence_number le da ojos al sistema para ver el orden, y timestamp_ms le da reloj para juzgar utilidad.
El otro protagonista silencioso de este diseño es PacketError. En redes no conviene asumir que todo lo recibido será válido. Un buffer puede ser demasiado pequeño, el magic puede no coincidir, la versión puede ser incompatible o la longitud anunciada puede no corresponder con los bytes reales. En lugar de convertir esos casos en comportamientos ambiguos o pánicos del programa, los volvemos estados explícitos del sistema.
Y esa decisión también es parte del tutorial. No estamos programando una demo ingenua donde “todo sale bien”; estamos construyendo una base de red que entiende que los bytes que llegan desde un socket deben ser tratados con desconfianza hasta que el protocolo los valide.
Hasta aquí todavía no hemos convertido nada a bytes. Y eso es intencional. Primero definimos qué significa un paquete sano, qué información mínima debe cargar y qué errores estamos dispuestos a reconocer. Solo después de fijar ese contrato tiene sentido hablar de serialización y deserialización.
En la siguiente etapa vamos a tomar esta estructura y llevarla al terreno real de la red: transformar Packet en bytes con to_bytes(), reconstruirlo con from_bytes() y ver cómo Rust nos ayuda a hacerlo de manera segura, explícita y sin pánicos innecesarios.
4. La inteligencia secuencial: SequenceTracker
Ya sabemos construir paquetes, serializarlos y reconstruirlos desde bytes con validaciones explícitas. Pero todavía no sabemos leer el comportamiento del flujo. Un receptor de tiempo real no puede limitarse a decir “llegó un paquete válido”; necesita una segunda capa de inteligencia capaz de responder preguntas mucho más útiles: ¿era el paquete que esperábamos?, ¿faltó alguno en el camino?, ¿llegó fuera de orden?, ¿o estamos viendo un duplicado?
Ahí entra SequenceTracker. Esta estructura convierte una simple sucesión de números de secuencia en eventos semánticos. En lugar de tratar el flujo como una lista muda de enteros, lo interpreta como una historia: primer paquete, continuidad normal, hueco de pérdida, reordenamiento o duplicado. Y eso es justo lo que necesitamos para que nuestro receiver empiece a comportarse como un juez y no solo como un buzón.
Archivo
crates/common/src/analysis.rs · lógica secuencial del flujo
En este módulo vive la parte del sistema que interpreta el orden de llegada de los paquetes. Aquí definimos los eventos secuenciales posibles y la estructura que los detecta en tiempo real.
use crate::frame::SeenWindow;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SequenceEvent {
FirstPacket,
InOrder,
Gap { missing: u32 },
OutOfOrder,
Duplicate,
}
#[derive(Debug)]
pub struct SequenceTracker {
expected_next: Option<u32>,
last_seen: Option<u32>,
seen: SeenWindow,
}
impl Default for SequenceTracker {
fn default() -> Self {
Self {
expected_next: None,
last_seen: None,
seen: SeenWindow::with_capacity(8192),
}
}
}
impl SequenceTracker {
pub fn observe(&mut self, sequence_number: u32) -> SequenceEvent {
if self.seen.contains(&sequence_number) {
return SequenceEvent::Duplicate;
}
self.seen.insert(sequence_number);
match self.expected_next {
None => {
self.expected_next = Some(sequence_number.saturating_add(1));
self.last_seen = Some(sequence_number);
SequenceEvent::FirstPacket
}
Some(expected) if sequence_number == expected => {
self.expected_next = Some(sequence_number.saturating_add(1));
self.last_seen = Some(sequence_number);
SequenceEvent::InOrder
}
Some(expected) if sequence_number > expected => {
let missing = sequence_number - expected;
self.expected_next = Some(sequence_number.saturating_add(1));
self.last_seen = Some(sequence_number);
SequenceEvent::Gap { missing }
}
Some(_) => SequenceEvent::OutOfOrder,
}
}
pub fn expected_next(&self) -> Option<u32> {
self.expected_next
}
pub fn last_seen(&self) -> Option<u32> {
self.last_seen
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct TimingObservation {
pub delay_ms: u64,
pub is_late: bool,
pub jitter_ms: f64,
}
#[derive(Debug, Default)]
pub struct TimingTracker {
last_delay_ms: Option<u64>,
jitter_ms: f64,
}
impl TimingTracker {
pub fn observe(&mut self, sent_timestamp_ms: u64, arrival_timestamp_ms: u64, late_threshold_ms: u64) -> TimingObservation {
let delay_ms = arrival_timestamp_ms.saturating_sub(sent_timestamp_ms);
if let Some(previous_delay) = self.last_delay_ms {
let variation = previous_delay.abs_diff(delay_ms) as f64;
self.jitter_ms += (variation - self.jitter_ms) / 16.0;
}
self.last_delay_ms = Some(delay_ms);
TimingObservation {
delay_ms,
is_late: delay_ms > late_threshold_ms,
jitter_ms: self.jitter_ms,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detects_gaps_duplicates_and_out_of_order() {
let mut tracker = SequenceTracker::default();
assert_eq!(tracker.observe(10), SequenceEvent::FirstPacket);
assert_eq!(tracker.observe(11), SequenceEvent::InOrder);
assert_eq!(tracker.observe(14), SequenceEvent::Gap { missing: 2 });
assert_eq!(tracker.observe(13), SequenceEvent::OutOfOrder);
assert_eq!(tracker.observe(13), SequenceEvent::Duplicate);
}
#[test]
fn marks_packet_as_late_when_threshold_exceeded() {
let mut tracker = TimingTracker::default();
let obs = tracker.observe(1000, 1105, 80);
assert_eq!(obs.delay_ms, 105);
assert!(obs.is_late);
}
}
La primera decisión elegante de este diseño está en SequenceEvent. En lugar de devolver booleanos sueltos o códigos numéricos ambiguos, el tracker responde con un enum expresivo que ya encapsula el significado del estado del flujo.
FirstPacket: este fue el primer paquete observado; todavía no existe historial previo contra el cual compararlo.InOrder: llegó exactamente el número de secuencia que esperábamos.Gap { missing }: el flujo avanzó más de la cuenta y quedaron huecos sin recibir.OutOfOrder: llegó un paquete con secuencia menor a la esperada, pero no era duplicado.Duplicate: ya habíamos visto exactamente ese número de secuencia antes.
Esta forma de modelar el resultado es poderosa porque obliga al resto del sistema a pensar en eventos reales de red y no en comparaciones crudas. El receptor ya no pregunta “¿es mayor o menor?”; pregunta “¿qué clase de fenómeno acaba de ocurrir en el flujo?”.
La estructura SequenceTracker mantiene tres piezas de estado. expected_next representa el siguiente número de secuencia que debería llegar si el flujo siguiera sano. last_seen conserva la última secuencia efectivamente aceptada como referencia histórica. Y seen actúa como una memoria reciente del sistema para detectar si un valor ya pasó por aquí antes.
Lo más importante es que el tracker no necesita almacenar todos los paquetes del universo; solo necesita suficiente contexto para interpretar correctamente el presente. Esa es una idea muy de sistemas en tiempo real: no buscamos reconstruir toda la historia del flujo para siempre, sino mantener la mínima memoria operativa que permita detectar discontinuidades significativas.
Fíjate en la primera compuerta del método observe(): antes de entrar al match, el sistema revisa si ese número de secuencia ya estaba registrado dentro de seen. Si ya pasó por aquí, no tiene sentido seguir razonando sobre continuidad ni orden: estamos frente a un Duplicate.
Esto es una decisión muy buena porque separa dos problemas distintos. Una cosa es recibir algo más viejo que lo esperado; otra cosa es recibir exactamente el mismo número de secuencia dos veces. El primer caso puede ser reordenamiento; el segundo ya es duplicación confirmada.
Después del filtro de duplicados llega el verdadero corazón del algoritmo: match self.expected_next. Aquí el tracker compara el número de secuencia recién observado con el número que esperaba recibir si todo siguiera perfectamente alineado.
El primer brazo, None, representa el arranque del flujo. Como todavía no existe referencia previa, el sistema no puede decir ni “en orden” ni “desordenado”; solo puede afirmar que este fue el primer paquete y que, a partir de ahora, el siguiente esperado será sequence_number + 1.
El segundo brazo, Some(expected) if sequence_number == expected, es el caso ideal: continuidad perfecta. No hubo saltos, no hubo pérdida visible y el flujo está avanzando exactamente como queríamos. Por eso se devuelve InOrder.
El tercer brazo del match es donde aparece el diagnóstico realmente valioso: Some(expected) if sequence_number > expected. Aquí el paquete que llegó no es el que tocaba; saltó por encima del valor esperado. Matemáticamente, eso significa que entre lo esperado y lo recibido hay uno o más números ausentes.
Por eso se calcula missing = sequence_number - expected. Ese valor no es decorativo: cuantifica la magnitud del hueco. Si esperábamos el 12 y llegó el 14, entonces falta exactamente 2 valores de secuencia: 12 y 13 si estamos interpretando desde el punto esperado inmediato. En términos operativos, el flujo avanzó, pero dejó una cicatriz visible de pérdida.
Y lo mejor es que el sistema no se queda atascado lamentando el hueco. Actualiza expected_next para seguir a partir del paquete recién recibido. Es decir: registra la pérdida, pero continúa avanzando. Esa es una filosofía muy propia de tiempo real.
El último caso, Some(_) => SequenceEvent::OutOfOrder, captura otra clase de degradación: el paquete llegó con un número menor al esperado, pero no era duplicado. Eso significa que sí pertenece al flujo y sí es nuevo para nuestro historial reciente, pero apareció tarde respecto al orden ideal.
Este caso importa mucho porque no toda degradación secuencial es pérdida. A veces la red no pierde el datagrama: lo entrega, pero en el instante incorrecto dentro de la secuencia. Y eso también es una forma de corrupción del flujo desde la perspectiva de una aplicación de tiempo real.
Detrás de esta lógica hay una pieza auxiliar muy importante: SeenWindow. No aparece como protagonista del tutorial, pero cumple una función crítica. Es una ventana acotada de secuencias ya observadas que permite detectar duplicados sin tener que conservar para siempre todos los números que alguna vez cruzaron el sistema.
Esa decisión es elegantísima porque mantiene el análisis secuencial en tiempo constante razonable y evita que la memoria crezca sin control. Otra vez aparece la idea central del proyecto: no queremos un sistema “bonito” en abstracto, sino una base realista, medible y lista para evolucionar.
Con SequenceTracker ya conseguimos algo poderoso: el receptor puede observar el orden del flujo y convertirlo en eventos significativos. Ahora puede detectar continuidad, pérdida, reordenamiento y duplicación. Pero todavía nos falta una mitad entera de la historia.
Porque un paquete puede llegar en orden y seguir siendo inútil si aterriza demasiado tarde. Ese será el siguiente paso: construir TimingTracker y empezar a mirar el flujo ya no solo como una secuencia de números, sino como una secuencia de llegadas sometidas al reloj.
5. La configuración compartida: config.rs
A esta altura del tutorial ya sabemos algo muy importante: nuestro sistema no solo va a transportar bytes, también va a interpretar orden, atraso y degradación temporal. Pero todavía nos falta una pieza clave para que todo eso deje de ser una idea bonita y se convierta en un experimento realmente ejecutable: la configuración.
Un sistema de red no vive solo de structs elegantes. También necesita decidir con qué puerto escucha, a qué destino envía, qué tamaño tendrá cada frame, cada cuánto se transmitirá, qué umbral consideraremos “demasiado tarde” y qué fallos artificiales queremos inyectar para ponerlo a prueba. Todo eso vive en config.rs.
Dicho de otra forma: analysis.rs nos enseñó qué medir; config.rs define con qué parámetros va a vivir el sistema mientras lo ejecutamos desde consola.
Archivo
crates/common/src/config.rs · configuración y parseo de argumentos
Este módulo define las estructuras de configuración del emisor y del receptor, junto con sus valores por defecto, niveles de verbosidad y reglas de validación. Aquí también vive la lógica que transforma los argumentos de línea de comandos en configuraciones tipadas y seguras para ejecutar el experimento.
use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Verbosity {
Quiet,
Summary,
Verbose,
}
impl Verbosity {
pub fn from_flag(is_verbose: bool) -> Self {
if is_verbose {
Self::Verbose
} else {
Self::Summary
}
}
}
#[derive(Debug, Clone)]
pub struct SenderConfig {
pub bind_addr: SocketAddr,
pub dest_addr: SocketAddr,
pub frame_size: usize,
pub interval_ms: u64,
pub count: u32,
pub drop_every: u32,
pub drop_chance: u8,
pub delay_every: u32,
pub delay_ms: u64,
pub reorder_every: u32,
pub summary_every: u32,
pub verbosity: Verbosity,
pub random_seed: u64,
}
#[derive(Debug, Clone)]
pub struct ReceiverConfig {
pub bind_addr: SocketAddr,
pub late_threshold_ms: u64,
pub summary_every: u64,
pub read_timeout_ms: u64,
pub stop_after: Option<u64>,
pub verbosity: Verbosity,
}
#[derive(Debug, Clone)]
pub struct ConfigError(pub String);
impl fmt::Display for ConfigError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for ConfigError {}
impl Default for SenderConfig {
fn default() -> Self {
Self {
bind_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
dest_addr: SocketAddr::from_str("127.0.0.1:5000").unwrap(),
frame_size: 320,
interval_ms: 20,
count: 100,
drop_every: 0,
drop_chance: 0,
delay_every: 0,
delay_ms: 0,
reorder_every: 0,
summary_every: 10,
verbosity: Verbosity::Summary,
random_seed: 0xC0FFEE,
}
}
}
impl Default for ReceiverConfig {
fn default() -> Self {
Self {
bind_addr: SocketAddr::from_str("127.0.0.1:5000").unwrap(),
late_threshold_ms: 80,
summary_every: 10,
read_timeout_ms: 500,
stop_after: None,
verbosity: Verbosity::Summary,
}
}
}
fn parse_socket_addr(value: &str, flag: &str) -> Result<SocketAddr, ConfigError> {
value
.parse::<SocketAddr>()
.map_err(|e| ConfigError(format!("valor inválido para {flag}: {value} ({e})")))
}
fn parse_u64(value: &str, flag: &str) -> Result<u64, ConfigError> {
value
.parse::<u64>()
.map_err(|e| ConfigError(format!("valor inválido para {flag}: {value} ({e})")))
}
fn parse_u32(value: &str, flag: &str) -> Result<u32, ConfigError> {
value
.parse::<u32>()
.map_err(|e| ConfigError(format!("valor inválido para {flag}: {value} ({e})")))
}
fn parse_u8(value: &str, flag: &str) -> Result<u8, ConfigError> {
value
.parse::<u8>()
.map_err(|e| ConfigError(format!("valor inválido para {flag}: {value} ({e})")))
}
fn parse_usize(value: &str, flag: &str) -> Result<usize, ConfigError> {
value
.parse::<usize>()
.map_err(|e| ConfigError(format!("valor inválido para {flag}: {value} ({e})")))
}
impl SenderConfig {
pub fn from_args(args: &[String]) -> Result<Self, ConfigError> {
let mut cfg = Self::default();
let mut i = 0;
while i < args.len() {
match args[i].as_str() {
"--help" | "-h" => return Err(ConfigError(Self::help_text().to_string())),
"--bind" => {
i += 1;
cfg.bind_addr = parse_socket_addr(required(args, i, "--bind")?, "--bind")?;
}
"--dest" => {
i += 1;
cfg.dest_addr = parse_socket_addr(required(args, i, "--dest")?, "--dest")?;
}
"--frame-size" => {
i += 1;
cfg.frame_size = parse_usize(required(args, i, "--frame-size")?, "--frame-size")?;
}
"--interval-ms" => {
i += 1;
cfg.interval_ms = parse_u64(required(args, i, "--interval-ms")?, "--interval-ms")?;
}
"--count" => {
i += 1;
cfg.count = parse_u32(required(args, i, "--count")?, "--count")?;
}
"--drop-every" => {
i += 1;
cfg.drop_every = parse_u32(required(args, i, "--drop-every")?, "--drop-every")?;
}
"--drop-chance" => {
i += 1;
cfg.drop_chance = parse_u8(required(args, i, "--drop-chance")?, "--drop-chance")?;
}
"--delay-every" => {
i += 1;
cfg.delay_every = parse_u32(required(args, i, "--delay-every")?, "--delay-every")?;
}
"--delay-ms" => {
i += 1;
cfg.delay_ms = parse_u64(required(args, i, "--delay-ms")?, "--delay-ms")?;
}
"--reorder-every" => {
i += 1;
cfg.reorder_every = parse_u32(required(args, i, "--reorder-every")?, "--reorder-every")?;
}
"--summary-every" => {
i += 1;
cfg.summary_every = parse_u32(required(args, i, "--summary-every")?, "--summary-every")?;
}
"--seed" => {
i += 1;
cfg.random_seed = parse_u64(required(args, i, "--seed")?, "--seed")?;
}
"--verbose" => cfg.verbosity = Verbosity::Verbose,
"--quiet" => cfg.verbosity = Verbosity::Quiet,
unknown => return Err(ConfigError(format!("flag desconocida para sender: {unknown}"))),
}
i += 1;
}
if cfg.drop_chance > 100 {
return Err(ConfigError("--drop-chance debe estar entre 0 y 100".to_string()));
}
if cfg.frame_size == 0 {
return Err(ConfigError("--frame-size debe ser mayor que 0".to_string()));
}
if cfg.summary_every == 0 {
return Err(ConfigError("--summary-every debe ser mayor que 0".to_string()));
}
Ok(cfg)
}
pub fn help_text() -> &'static str {
"Uso sender: --dest HOST:PORT [--bind HOST:PORT] [--frame-size N] [--interval-ms N] [--count N]\n\
[--drop-every N] [--drop-chance 0..100] [--delay-every N] [--delay-ms N]\n\
[--reorder-every N] [--summary-every N] [--seed N] [--verbose|--quiet]"
}
}
impl ReceiverConfig {
pub fn from_args(args: &[String]) -> Result<Self, ConfigError> {
let mut cfg = Self::default();
let mut i = 0;
while i < args.len() {
match args[i].as_str() {
"--help" | "-h" => return Err(ConfigError(Self::help_text().to_string())),
"--bind" => {
i += 1;
cfg.bind_addr = parse_socket_addr(required(args, i, "--bind")?, "--bind")?;
}
"--late-threshold-ms" => {
i += 1;
cfg.late_threshold_ms = parse_u64(required(args, i, "--late-threshold-ms")?, "--late-threshold-ms")?;
}
"--summary-every" => {
i += 1;
cfg.summary_every = parse_u64(required(args, i, "--summary-every")?, "--summary-every")?;
}
"--read-timeout-ms" => {
i += 1;
cfg.read_timeout_ms = parse_u64(required(args, i, "--read-timeout-ms")?, "--read-timeout-ms")?;
}
"--stop-after" => {
i += 1;
cfg.stop_after = Some(parse_u64(required(args, i, "--stop-after")?, "--stop-after")?);
}
"--verbose" => cfg.verbosity = Verbosity::Verbose,
"--quiet" => cfg.verbosity = Verbosity::Quiet,
unknown => return Err(ConfigError(format!("flag desconocida para receiver: {unknown}"))),
}
i += 1;
}
if cfg.summary_every == 0 {
return Err(ConfigError("--summary-every debe ser mayor que 0".to_string()));
}
Ok(cfg)
}
pub fn help_text() -> &'static str {
"Uso receiver: --bind HOST:PORT [--late-threshold-ms N] [--summary-every N]\n\
[--read-timeout-ms N] [--stop-after N] [--verbose|--quiet]"
}
}
fn required<'a>(args: &'a [String], index: usize, flag: &str) -> Result<&'a str, ConfigError> {
args.get(index)
.map(|s| s.as_str())
.ok_or_else(|| ConfigError(format!("faltó valor para {flag}")))
}
Lo primero que conviene notar es que este archivo no modela paquetes ni analiza la red directamente. Su trabajo es otro: traducir intenciones humanas en un estado de ejecución coherente. Cuando lanzamos el programa desde terminal, no queremos tocar constantes dispersas por varios archivos; queremos pasar parámetros claros y dejar que el sistema los convierta en una configuración válida.
La enum Verbosity resuelve una necesidad muy concreta: no siempre queremos que el programa hable con la misma intensidad. A veces conviene silencio casi total, a veces un resumen periódico y, en momentos de depuración, una traza detallada por evento. Por eso el sistema distingue entre Quiet, Summary y Verbose. No es un detalle cosmético: es la diferencia entre una demo legible y una consola imposible de interpretar.
Después aparecen las dos estructuras grandes del módulo: SenderConfig y ReceiverConfig. Y esa separación también enseña arquitectura. El sender necesita saber a dónde enviar, qué tamaño tendrá cada frame, cada cuánto transmitirá y qué perturbaciones artificiales puede inyectar en el flujo. El receiver, en cambio, necesita saber dónde escuchar, cuánto retraso tolera antes de declarar un paquete tardío, cada cuánto imprimir resúmenes y cuándo detenerse si queremos una corrida finita. Son responsabilidades distintas, así que viven en estructuras distintas.
Los bloques Default merecen atención porque convierten al proyecto en un MVP razonable desde el primer arranque. El sender ya sabe que enviará a 127.0.0.1:5000, que su frame base tendrá 320 bytes y que la cadencia inicial será de 20 ms. El receiver ya sabe que escuchará en ese puerto local y que considerará tardío un paquete que supere los 80 ms. Es decir: antes de pasar una sola flag, el sistema ya tiene una personalidad operativa coherente.
Ahora bien, un valor por defecto útil no basta. También necesitamos una puerta de entrada que tome los argumentos crudos de consola y los convierta en tipos correctos. Esa puerta es from_args(). Aquí el programa recorre las flags, interpreta direcciones de socket, enteros, tamaños y umbrales, y construye una configuración final solo si todo tiene sentido. Si algo falla, no obtenemos un comportamiento extraño: obtenemos un ConfigError claro y explícito.
Esa decisión es importante porque en sistemas de red el error de configuración no debería degradarse en silencio. Un drop_chance de 180, un frame_size de 0 o un summary_every inválido no son “detalles menores”: son estados rotos del experimento. Por eso el módulo valida límites antes de que el programa empiece a abrir sockets o a emitir paquetes. Preferimos fallar temprano y con un mensaje preciso antes que ejecutar una corrida ambigua.
Un sistema serio no solo valida los bytes que recibe por la red; también valida los parámetros con los que fue lanzado.
Visto desde arriba, config.rs cumple una función silenciosa pero crítica: convierte la teoría del tutorial en una interfaz operable. Gracias a este módulo, el sistema ya no depende de editar el código fuente para cambiar umbrales, cadencias o escenarios de fallo. Ahora puede vivir desde consola, y eso lo vuelve mucho más reproducible, mucho más pedagógico y mucho más fácil de poner a prueba.
Con esto, la crate common ya no solo sabe qué es un paquete y cómo interpretar el flujo: también sabe con qué parámetros va a vivir cada corrida del sistema. Tenemos contrato, análisis y configuración.
El siguiente paso será construir el laboratorio que alimenta todo este experimento: la generación de pseudoaudio, la inyección controlada de atraso, el pequeño buffer para reordenamiento y las utilidades que nos permitirán forzar fallos sin depender todavía de audio real. En otras palabras: ahora toca entrar en frame.rs.
6. frame.rs: el laboratorio controlado del flujo
Hasta aquí ya construimos tres capas fundamentales del sistema: el contrato del paquete, la lectura semántica de la secuencia y la configuración compartida que vuelve ejecutable todo el experimento. Pero todavía nos falta algo decisivo: la materia prima que va a circular por la red y las herramientas que nos permitirán deformarla de forma controlada.
Ese papel lo cumple frame.rs. Este archivo no se dedica a abrir sockets ni a interpretar estadísticas; su misión es preparar el terreno experimental. Aquí generamos el pseudoaudio que usará el sender, modelamos el atraso artificial, provocamos pequeños reordenamientos, introducimos aleatoriedad reproducible y mantenemos la memoria reciente que luego aprovechará el análisis secuencial.
Dicho de forma simple: si packet.rs definía qué viaja y analysis.rs definía cómo interpretar lo que llega, frame.rs define cómo fabricamos y perturbamos el flujo antes de lanzarlo a la red.
Archivo
crates/common/src/frame.rs · payload sintético y utilidades de simulación
Este archivo concentra la lógica que fabrica frames de pseudoaudio a partir del número de secuencia y, además, define estructuras auxiliares para introducir perturbaciones reproducibles en el flujo: planes de retraso, reordenamiento de paquetes, generación pseudoaleatoria y seguimiento acotado de secuencias ya observadas.
use std::collections::{HashSet, VecDeque};
use std::f32::consts::PI;
use std::time::Duration;
use crate::Packet;
pub fn build_pseudoaudio_frame(sequence_number: u32, frame_size: usize) -> Vec<u8> {
let bytes = if frame_size % 2 == 0 { frame_size } else { frame_size - 1 };
let sample_count = bytes / 2;
let sample_rate = 48_000.0f32;
let tone_hz = 440.0f32;
let frame_phase = (sequence_number as f32) * 0.17;
let mut payload = Vec::with_capacity(bytes.max(2));
for i in 0..sample_count.max(1) {
let t = (i as f32) / sample_rate;
let envelope = 0.75 + ((sequence_number % 11) as f32) * 0.02;
let sample =
((2.0 * PI * tone_hz * t + frame_phase).sin() * envelope * i16::MAX as f32 * 0.4)
as i16;
payload.extend_from_slice(&sample.to_le_bytes());
}
if payload.is_empty() {
payload.extend_from_slice(&0i16.to_le_bytes());
}
payload
}
#[derive(Debug, Clone)]
pub struct DelayPlan {
pub every: u32,
pub delay: Duration,
}
impl DelayPlan {
pub fn maybe_delay_for(&self, sequence_number: u32) -> Option<Duration> {
if self.every > 0 && sequence_number > 0 && sequence_number % self.every == 0 {
Some(self.delay)
} else {
None
}
}
}
#[derive(Debug, Default)]
pub struct ReorderBuffer {
pending: Option<Packet>,
}
impl ReorderBuffer {
pub fn push_or_hold(&mut self, packet: Packet, reorder_every: u32) -> Vec<Packet> {
let mut ready = Vec::new();
if let Some(previous) = self.pending.take() {
ready.push(packet);
ready.push(previous);
return ready;
}
if reorder_every > 0
&& packet.sequence_number > 0
&& packet.sequence_number % reorder_every == 0
{
self.pending = Some(packet);
} else {
ready.push(packet);
}
ready
}
pub fn flush(&mut self) -> Option<Packet> {
self.pending.take()
}
}
#[derive(Debug, Clone)]
pub struct SimpleRng {
state: u64,
}
impl SimpleRng {
pub fn new(seed: u64) -> Self {
Self { state: seed }
}
pub fn next_u32(&mut self) -> u32 {
self.state = self
.state
.wrapping_mul(6364136223846793005)
.wrapping_add(1);
(self.state >> 32) as u32
}
pub fn percent_hit(&mut self, percent: u8) -> bool {
if percent == 0 {
return false;
}
let roll = self.next_u32() % 100;
roll < percent as u32
}
}
#[derive(Debug)]
pub struct SeenWindow {
capacity: usize,
queue: VecDeque<u32>,
set: HashSet<u32>,
}
impl SeenWindow {
pub fn with_capacity(capacity: usize) -> Self {
Self {
capacity,
queue: VecDeque::with_capacity(capacity),
set: HashSet::with_capacity(capacity),
}
}
pub fn contains(&self, value: &u32) -> bool {
self.set.contains(value)
}
pub fn insert(&mut self, value: u32) {
if self.set.insert(value) {
self.queue.push_back(value);
if self.queue.len() > self.capacity {
if let Some(oldest) = self.queue.pop_front() {
self.set.remove(&oldest);
}
}
}
}
}
La primera función del archivo, build_pseudoaudio_frame(), resuelve una decisión editorial y técnica muy inteligente del proyecto: no empezar con audio real. En vez de depender desde ya de micrófonos, drivers, buffers del sistema operativo o hardware externo, generamos una señal matemática reproducible. Eso aísla el experimento. Si algo se degrada, sabemos que la historia está ocurriendo en la red y no en la tarjeta de sonido.
La función toma dos entradas: sequence_number y frame_size. A partir de ahí fabrica un bloque de bytes que representa un pequeño fragmento PCM sintético. El detalle bonito es que la señal no es idéntica en todos los frames: depende de la secuencia. La fase cambia con frame_phase y la envolvente varía ligeramente con el número de paquete. Eso hace que cada frame tenga una firma propia sin dejar de ser completamente determinista.
Hay además dos precauciones importantes. La primera es que el tamaño del frame se fuerza a una cantidad par de bytes, porque estamos produciendo muestras de 16 bits y cada muestra ocupa exactamente dos bytes. La segunda es que, incluso en el caso extremo de un tamaño inválidamente pequeño, la función garantiza que el payload no quede vacío: si hiciera falta, inserta una muestra nula. En otras palabras, el generador no solo produce señal; también protege la forma mínima del experimento.
Lo verdaderamente valioso aquí no es “simular audio” por sí mismo, sino construir una carga útil estable, repetible y suficientemente rica como para viajar por la red sin introducir ruido accidental en el tutorial. El flujo ya no será un arreglo arbitrario de bytes: tendrá estructura y continuidad, pero sin obligarnos todavía a entrar en reproducción real.
Después del generador aparecen dos piezas que convierten al sistema en un verdadero laboratorio de fallos: DelayPlan y ReorderBuffer. La primera responde a una pregunta temporal muy simple: “¿cada cuántos paquetes quiero introducir atraso artificial, y cuánto debe durar?”. Su método maybe_delay_for() no complica más de la cuenta el diseño: recibe un número de secuencia y decide si en ese punto debe devolver una duración extra o no hacer nada.
Esta simplicidad es una virtud. No queremos un motor de simulación gigantesco; queremos una regla clara que nos permita forzar el fenómeno que queremos estudiar. Si configuramos every = 4 y un retraso de 60 ms, ya podemos fabricar un flujo donde algunos paquetes lleguen válidos pero fuera de ventana. Justamente ahí se vuelve visible la tesis central del tutorial: el dato puede llegar intacto y, aun así, dejar de servir.
ReorderBuffer cumple una función parecida, pero en el eje secuencial. En lugar de retrasar el flujo en bloque, mantiene temporalmente un paquete pendiente y lo libera en un orden distinto cuando llega el siguiente. Con eso basta para producir un reordenamiento leve y controlado, sin necesidad de modelar toda la complejidad de una red real.
No necesitamos una simulación gigantesca para enseñar una verdad de red. A veces basta con una pequeña perturbación bien diseñada para volver visible un fenómeno entero.
Lo elegante de push_or_hold() es que no intenta hacer demasiado. Si ya había un paquete pendiente, libera ambos en orden invertido. Si no lo había, decide si debe retener el actual según la regla reorder_every. Así, el sistema puede crear pequeños eventos de desorden sin destruir por completo la continuidad del flujo. Es justo el tipo de anomalía leve que luego nuestro análisis secuencial debería ser capaz de detectar.
Las dos últimas estructuras del archivo parecen utilidades secundarias, pero en realidad sostienen dos decisiones muy buenas del proyecto. La primera es SimpleRng, un generador pseudoaleatorio mínimo que permite introducir pérdida probabilística sin depender de capas externas. Gracias a su semilla explícita, la aleatoriedad no se vuelve caos irreproducible: el experimento puede seguir siendo repetible si reutilizamos el mismo seed.
Eso importa mucho pedagógicamente. Una demo de red que cambia de manera completamente impredecible en cada corrida puede ser realista, sí, pero también difícil de enseñar. Aquí buscamos un equilibrio mejor: suficiente variación para simular condiciones imperfectas, pero con la posibilidad de repetir escenarios y comparar resultados.
La otra pieza es SeenWindow, una ventana acotada de valores ya observados. Su idea central es muy de sistemas: queremos detectar duplicados recientes, pero no queremos conservar para siempre todos los números de secuencia que alguna vez cruzaron el programa. Para eso combinamos una VecDeque y un HashSet: una estructura conserva el orden de llegada reciente y la otra permite consultas rápidas de pertenencia.
Cuando insertamos un valor nuevo, se guarda tanto en la cola como en el conjunto. Y si la ventana supera su capacidad, el sistema elimina el valor más antiguo de ambas estructuras. Así mantenemos una memoria operativa suficiente para detectar repeticiones relevantes, sin convertir el análisis en una fuga de memoria silenciosa. Otra vez aparece la misma filosofía que atraviesa todo el tutorial: no se trata de construir lo más grande, sino lo más claro, medible y útil para el problema real.
Con frame.rs ya cerramos una parte muy importante de la crate common. El proyecto ahora sabe generar una carga útil determinista, introducir atraso controlado, producir desorden leve, lanzar pérdida probabilística reproducible y mantener una memoria reciente de valores observados.
Dicho de forma práctica: ya no tenemos solo un protocolo y un analizador; también tenemos un laboratorio capaz de fabricar el tipo de flujo imperfecto que necesitamos para poner a prueba nuestras ideas sobre telecomunicaciones y tiempo real.
El siguiente paso será unir esas dos dimensiones —la secuencial y la temporal— en una única capa de decisión acumulada. Porque el receptor real no piensa en eventos aislados: necesita observar paquetes uno tras otro, sumar evidencia y construir un veredicto global del estado del flujo. Ahí entra stats.rs.
7. stats.rs: donde el receptor convierte eventos en veredicto
Hasta aquí ya logramos dos cosas fundamentales: por un lado, el sistema sabe detectar qué ocurre en la secuencia de llegada; por el otro, sabe medir si cada paquete llegó a tiempo o demasiado tarde. Pero un receptor real no vive de observaciones aisladas. Necesita una capa superior capaz de acumular evidencia, resumir el estado del flujo y convertir una serie de eventos puntuales en un diagnóstico continuo.
Ese trabajo lo hace stats.rs. Este módulo toma la lectura secuencial de SequenceTracker y la lectura temporal de TimingTracker, las fusiona y produce una visión operativa del sistema: cuántos paquetes llegaron, cuántos parecen perdidos, cuántos llegaron fuera de orden, cuántos fueron tardíos y cómo se está comportando el retraso medio del flujo.
Dicho de otra manera: si analysis.rs nos daba los sentidos del receptor, stats.rs le da memoria, contexto y capacidad de emitir un resumen coherente de lo que está pasando.
Archivo
crates/common/src/stats.rs · acumulación y resumen del flujo
En este módulo reunimos la inteligencia secuencial y temporal del sistema para producir observaciones por paquete y resúmenes acumulados de la corrida completa.
use std::fmt;
use crate::{SequenceEvent, SequenceTracker, TimingObservation, TimingTracker};
#[derive(Debug, Clone, Copy)]
pub struct PacketObservation {
pub sequence_event: SequenceEvent,
pub timing: TimingObservation,
pub sequence_number: u32,
pub payload_len: usize,
}
#[derive(Debug, Clone)]
pub struct RunSummary {
pub received_total: u64,
pub estimated_lost: u64,
pub out_of_order: u64,
pub duplicates: u64,
pub late_packets: u64,
pub invalid_packets: u64,
pub min_delay_ms: u64,
pub max_delay_ms: u64,
pub avg_delay_ms: f64,
pub jitter_ms: f64,
pub last_sequence: Option<u32>,
}
impl fmt::Display for RunSummary {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"received={} lost_est={} out_of_order={} duplicates={} late={} invalid={} delay_ms[min={}; avg={:.2}; max={}] jitter_ms={:.2} last_seq={:?}",
self.received_total,
self.estimated_lost,
self.out_of_order,
self.duplicates,
self.late_packets,
self.invalid_packets,
self.min_delay_ms,
self.avg_delay_ms,
self.max_delay_ms,
self.jitter_ms,
self.last_sequence,
)
}
}
#[derive(Debug, Default)]
pub struct ReceiverStats {
sequence: SequenceTracker,
timing: TimingTracker,
received_total: u64,
estimated_lost: u64,
out_of_order: u64,
duplicates: u64,
late_packets: u64,
invalid_packets: u64,
last_sequence: Option<u32>,
min_delay_ms: Option<u64>,
max_delay_ms: Option<u64>,
delay_sum_ms: u128,
current_jitter_ms: f64,
}
impl ReceiverStats {
pub fn observe_packet(
&mut self,
sequence_number: u32,
sent_timestamp_ms: u64,
arrival_timestamp_ms: u64,
late_threshold_ms: u64,
payload_len: usize,
) -> PacketObservation {
let sequence_event = self.sequence.observe(sequence_number);
match sequence_event {
SequenceEvent::Gap { missing } => self.estimated_lost += missing as u64,
SequenceEvent::OutOfOrder => self.out_of_order += 1,
SequenceEvent::Duplicate => self.duplicates += 1,
SequenceEvent::FirstPacket | SequenceEvent::InOrder => {}
}
let timing = self
.timing
.observe(sent_timestamp_ms, arrival_timestamp_ms, late_threshold_ms);
if timing.is_late {
self.late_packets += 1;
}
self.received_total += 1;
self.last_sequence = Some(sequence_number);
self.min_delay_ms = Some(self.min_delay_ms.map_or(timing.delay_ms, |v| v.min(timing.delay_ms)));
self.max_delay_ms = Some(self.max_delay_ms.map_or(timing.delay_ms, |v| v.max(timing.delay_ms)));
self.delay_sum_ms += timing.delay_ms as u128;
self.current_jitter_ms = timing.jitter_ms;
PacketObservation {
sequence_event,
timing,
sequence_number,
payload_len,
}
}
pub fn observe_invalid_packet(&mut self) {
self.invalid_packets += 1;
}
pub fn snapshot(&self) -> RunSummary {
let avg_delay_ms = if self.received_total == 0 {
0.0
} else {
self.delay_sum_ms as f64 / self.received_total as f64
};
RunSummary {
received_total: self.received_total,
estimated_lost: self.estimated_lost,
out_of_order: self.out_of_order,
duplicates: self.duplicates,
late_packets: self.late_packets,
invalid_packets: self.invalid_packets,
min_delay_ms: self.min_delay_ms.unwrap_or(0),
max_delay_ms: self.max_delay_ms.unwrap_or(0),
avg_delay_ms,
jitter_ms: self.current_jitter_ms,
last_sequence: self.last_sequence,
}
}
pub fn received_total(&self) -> u64 {
self.received_total
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn accumulates_core_metrics() {
let mut stats = ReceiverStats::default();
let first = stats.observe_packet(1, 1000, 1010, 20, 320);
assert!(matches!(first.sequence_event, SequenceEvent::FirstPacket));
let second = stats.observe_packet(3, 1020, 1030, 20, 320);
assert!(matches!(second.sequence_event, SequenceEvent::Gap { missing: 1 }));
let third = stats.observe_packet(2, 1025, 1060, 20, 320);
assert!(matches!(third.sequence_event, SequenceEvent::OutOfOrder));
let snapshot = stats.snapshot();
assert_eq!(snapshot.received_total, 3);
assert_eq!(snapshot.estimated_lost, 1);
assert_eq!(snapshot.out_of_order, 1);
assert_eq!(snapshot.late_packets, 1);
}
}
Lo primero que conviene notar en este archivo es que el módulo trabaja en dos escalas distintas. La primera es la escala del paquete individual, representada por PacketObservation. La segunda es la escala de la corrida completa, representada por RunSummary.
PacketObservation es la forma más compacta de decir: “acabo de observar este paquete y esto fue lo que pasó”. Contiene el evento secuencial, la observación temporal, el número de secuencia y el tamaño del payload. Es decir, concentra en una sola estructura la lectura inmediata del paquete recién recibido.
RunSummary, en cambio, ya no habla de un paquete aislado, sino del estado acumulado del flujo. Aquí aparecen contadores y métricas que sí permiten responder preguntas operativas: cuántos paquetes válidos llegaron, cuántos parecen haberse perdido, cuántos fueron fuera de orden, cuántos se repitieron, cuántos resultaron tardíos y cómo se comportó el retraso a lo largo de toda la ejecución.
Esta separación es importante porque evita mezclar dos niveles de análisis distintos. Una cosa es diagnosticar el evento puntual que acaba de ocurrir. Otra muy distinta es resumir la historia de la corrida completa. El diseño de stats.rs conserva ambas capas sin confundirlas.
También vale la pena fijarse en la implementación de Display para RunSummary. No está ahí solo por comodidad estética. Sirve para convertir el estado interno del sistema en una línea de consola legible, compacta y útil para el tutorial. Gracias a eso, el resumen final no requiere que el receptor conozca todos los detalles internos del struct: basta con imprimirlo y el sistema ya produce una lectura clara del experimento.
El verdadero centro del archivo es ReceiverStats. Esta estructura actúa como la memoria viva del receptor. No solo guarda contadores; también encapsula internamente un SequenceTracker y un TimingTracker. Eso significa que el receptor ya no tiene que coordinar manualmente ambas inteligencias por separado: puede delegar esa responsabilidad en una sola capa acumuladora.
El método clave es observe_packet(). Cada vez que llega un paquete válido, el receptor le entrega a este método la secuencia, el timestamp de envío, el instante de llegada, el umbral de tardanza y el tamaño del payload. A partir de ahí, ReceiverStats hace todo el trabajo de integración.
Primero consulta el análisis secuencial con self.sequence.observe(sequence_number). El resultado puede ser continuidad perfecta, brecha, desorden o duplicado. Y según el caso, el sistema actualiza los contadores apropiados. Si hubo un Gap, aumenta la pérdida estimada. Si el paquete llegó fuera de orden, incrementa out_of_order. Si era repetido, incrementa duplicates.
Después llega la dimensión temporal. El método llama a self.timing.observe(...) y obtiene una observación con delay_ms, is_late y jitter_ms. Si el paquete superó el umbral configurado, el contador de tardíos también crece. Con eso, el sistema deja de mirar secuencia y tiempo como dos mundos separados: ambos ya están ocurriendo dentro del mismo punto de decisión.
Un receptor de tiempo real no juzga solo si un paquete llegó. Juzga qué lugar ocupa en la secuencia y qué valor conserva todavía en el tiempo.
Lo más interesante es que, además de clasificar el paquete, observe_packet() va actualizando el estado agregado de la corrida: total recibido, última secuencia vista, retraso mínimo, máximo, suma acumulada de delays y jitter actual. Es decir, cada paquete no solo deja una observación individual; también modifica el diagnóstico global del sistema.
Y al final, el método devuelve un PacketObservation. Esta decisión es muy buena porque permite que el receptor siga reaccionando en tiempo real a lo que acaba de ocurrir —por ejemplo, imprimir un gap o marcar un paquete como tardío— sin perder de vista que ese mismo evento ya quedó absorbido dentro de las estadísticas acumuladas.
Una vez que el sistema viene acumulando eventos, necesita una forma limpia de exponer su estado actual sin revelar todos los detalles internos de la estructura. Ese es el papel de snapshot(). Este método toma el estado interno de ReceiverStats y lo transforma en un RunSummary listo para imprimirse, inspeccionarse o compararse.
Aquí aparece un detalle importante: el promedio de retraso no se guarda directamente, sino que se calcula a partir de la suma acumulada y del total de paquetes recibidos. Eso evita pérdida innecesaria de precisión y mantiene el estado interno más simple. Si todavía no ha llegado ningún paquete, el promedio vale 0.0; si ya existe tráfico, entonces el sistema produce una media real del delay observado.
También conviene notar cómo se manejan los extremos mínimo y máximo. Internamente se guardan como Option<u64>, porque al inicio todavía no existe ningún delay observado. Solo cuando empiezan a llegar paquetes se convierten en valores reales. Esta clase de detalle mantiene el diseño honesto: el sistema no inventa métricas antes de tener evidencia para construirlas.
El pequeño método received_total() parece trivial, pero cumple una función práctica importante. Permite que otras partes del sistema consulten cuántos paquetes válidos van llegando sin exponer ni mutar el resto del estado interno. Otra vez aparece una idea de diseño limpio: publicar solo lo que hace falta.
Finalmente, el test accumulates_core_metrics() nos da una prueba muy valiosa de que esta capa realmente integra las dos dimensiones del problema. En una secuencia corta de eventos, el sistema detecta un primer paquete, una brecha, un caso fuera de orden y además registra un paquete tardío. Es una prueba pequeña, pero suficiente para demostrar que stats.rs no es un simple contenedor de contadores: es la estructura que convierte eventos de red en diagnóstico acumulado.
Con stats.rs ya cerramos de verdad el núcleo conceptual de la crate common. El sistema ahora sabe qué es un paquete válido, cómo convertirlo a bytes, cómo leer continuidad, cómo medir retraso, cómo generar flujo sintético, cómo perturbarlo y, finalmente, cómo resumir todo eso en un diagnóstico acumulado.
Dicho de forma más directa: ya construimos el lenguaje, la lógica y la memoria del experimento. La base compartida está lista.
A partir de aquí, el siguiente movimiento natural es salir por fin de la teoría controlada y entrar en la acción. Toca construir el sender: el proceso que va a fabricar frames, empaquetarlos, asignarles timestamps y lanzarlos por UDP con una cadencia real. En otras palabras: ahora sí vamos a empezar a mover el flujo.
8. sender: donde el flujo deja de ser teoría y empieza a moverse
Con la crate common ya cerramos el corazón conceptual del sistema. Definimos qué es un paquete válido, cómo se representa en bytes, cómo leer continuidad secuencial, cómo medir atraso, cómo configurar el experimento y cómo resumir el comportamiento del flujo. Pero hasta aquí todo seguía siendo potencial. Teníamos el lenguaje del sistema, pero todavía no teníamos a nadie hablándolo sobre la red.
Ese papel lo cumple sender. Esta crate es el inyector del experimento: genera frames sintéticos, los empaqueta con nuestro protocolo, les asigna un instante temporal y los lanza por UDP siguiendo una cadencia controlada. Además, aquí introducimos una parte crucial del tutorial: la capacidad de deformar el flujo de forma intencional para provocar pérdida, atraso y reordenamiento.
Dicho de forma simple: si common nos dio las reglas del juego, sender es quien empieza la partida.
Archivo
crates/sender/Cargo.toml
Este manifiesto declara la crate sender como una librería reutilizable y, al mismo tiempo, como un binario ejecutable desde consola. Su dependencia principal es common, porque toda la lógica de protocolo y análisis compartido vive allí.
[package]
name = "sender"
version = "0.1.0"
edition = "2021"
[dependencies]
common = { path = "../common" }
[lib]
name = "sender"
path = "src/lib.rs"
[[bin]]
name = "sender"
path = "src/main.rs"
Este manifiesto deja clara una decisión arquitectónica muy buena del proyecto: el sender no es solo un ejecutable improvisado, sino una pieza de software con dos caras. Como librería, expone una función run() reutilizable y testeable. Como binario, se puede lanzar desde terminal para ejecutar el experimento real.
Eso mantiene el diseño limpio. La lógica importante vive en src/lib.rs, mientras que src/main.rs se limita a parsear argumentos, manejar errores de arranque y delegar la ejecución. Es una separación pequeña, pero muy pedagógica: la lógica del sistema y la interfaz de consola no quedan mezcladas.
Archivo
crates/sender/src/lib.rs · generación, temporización y envío por UDP
En este módulo vive la lógica principal del emisor: apertura del socket, creación de paquetes, inyección de fallos controlados, cadencia temporal y resumen final de transmisión.
use std::fmt;
use std::net::UdpSocket;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use common::{
build_pseudoaudio_frame, DelayPlan, Packet, ReorderBuffer, SenderConfig, SimpleRng, Verbosity,
};
#[derive(Debug)]
pub struct SenderRunSummary {
pub generated: u32,
pub transmitted: u32,
pub dropped: u32,
pub reordered_flushes: u32,
}
#[derive(Debug)]
pub enum SenderError {
Io(std::io::Error),
Packet(common::PacketError),
}
impl fmt::Display for SenderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(err) => write!(f, "error de IO en sender: {err}"),
Self::Packet(err) => write!(f, "error de protocolo en sender: {err}"),
}
}
}
impl std::error::Error for SenderError {}
impl From<std::io::Error> for SenderError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
impl From<common::PacketError> for SenderError {
fn from(value: common::PacketError) -> Self {
Self::Packet(value)
}
}
pub fn run(config: SenderConfig) -> Result<SenderRunSummary, SenderError> {
let socket = UdpSocket::bind(config.bind_addr)?;
socket.connect(config.dest_addr)?;
let mut rng = SimpleRng::new(config.random_seed);
let delay_plan = DelayPlan {
every: config.delay_every,
delay: Duration::from_millis(config.delay_ms),
};
let mut reorder_buffer = ReorderBuffer::default();
let mut summary = SenderRunSummary {
generated: 0,
transmitted: 0,
dropped: 0,
reordered_flushes: 0,
};
let interval = Duration::from_millis(config.interval_ms);
let mut next_tick = Instant::now();
for sequence_number in 0..config.count {
let payload = build_pseudoaudio_frame(sequence_number, config.frame_size);
let packet = Packet::new(sequence_number, unix_timestamp_ms(), payload)?;
summary.generated += 1;
let should_drop = (config.drop_every > 0 && sequence_number > 0 && sequence_number % config.drop_every == 0)
|| rng.percent_hit(config.drop_chance);
if should_drop {
summary.dropped += 1;
log_sender(
config.verbosity,
format_args!("DROP seq={} frame_size={}B", sequence_number, config.frame_size),
);
} else {
let ready_packets = reorder_buffer.push_or_hold(packet, config.reorder_every);
if ready_packets.len() > 1 {
summary.reordered_flushes += 1;
}
for pkt in ready_packets {
maybe_delay(&delay_plan, pkt.sequence_number, config.verbosity);
let sent = send_packet(&socket, &pkt)?;
summary.transmitted += 1;
log_sender(
config.verbosity,
format_args!(
"SEND seq={} payload={}B wire={}B",
pkt.sequence_number,
pkt.payload.len(),
sent
),
);
}
}
if (sequence_number + 1) % config.summary_every == 0 && config.verbosity != Verbosity::Quiet {
println!(
"[sender] progress generated={} transmitted={} dropped={} reorder_flushes={}",
summary.generated, summary.transmitted, summary.dropped, summary.reordered_flushes
);
}
next_tick += interval;
sleep_until(next_tick);
}
if let Some(pending) = reorder_buffer.flush() {
maybe_delay(&delay_plan, pending.sequence_number, config.verbosity);
let sent = send_packet(&socket, &pending)?;
summary.transmitted += 1;
summary.reordered_flushes += 1;
log_sender(
config.verbosity,
format_args!(
"FLUSH pending seq={} payload={}B wire={}B",
pending.sequence_number,
pending.payload.len(),
sent
),
);
}
if config.verbosity != Verbosity::Quiet {
println!(
"[sender] done generated={} transmitted={} dropped={} reorder_flushes={}",
summary.generated, summary.transmitted, summary.dropped, summary.reordered_flushes
);
}
Ok(summary)
}
fn send_packet(socket: &UdpSocket, packet: &Packet) -> Result<usize, SenderError> {
let bytes = packet.to_bytes()?;
let sent = socket.send(&bytes)?;
Ok(sent)
}
fn maybe_delay(plan: &DelayPlan, sequence_number: u32, verbosity: Verbosity) {
if let Some(delay) = plan.maybe_delay_for(sequence_number) {
log_sender(verbosity, format_args!("DELAY seq={} extra={}ms", sequence_number, delay.as_millis()));
thread::sleep(delay);
}
}
fn sleep_until(target: Instant) {
let now = Instant::now();
if target > now {
thread::sleep(target - now);
}
}
fn unix_timestamp_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_millis(0))
.as_millis() as u64
}
fn log_sender(verbosity: Verbosity, args: fmt::Arguments<'_>) {
if verbosity == Verbosity::Verbose {
println!("[sender] {args}");
}
}
Lo primero que conviene notar en este módulo es que el sender no se limita a “mandar bytes”. En realidad, modela una línea de producción completa. Genera un frame sintético, lo empaqueta con nuestro protocolo, decide si debe perderse, retrasarse o reordenarse, y finalmente lo inyecta sobre UDP respetando una cadencia temporal.
Esa idea importa mucho, porque convierte al emisor en algo más interesante que un simple cliente de red. Aquí no estamos construyendo una utilidad genérica de sockets: estamos construyendo un generador de fenómenos de tiempo real.
La pequeña estructura SenderRunSummary resume el resultado de la corrida del emisor. No describe lo que vio el receptor, sino lo que el propio sender produjo: cuántos frames generó, cuántos transmitió de verdad, cuántos descartó y cuántas veces vació una pareja reordenada. Es la vista local del lado emisor.
Algo parecido ocurre con SenderError. En vez de dejar que el código mezcle errores de socket con errores de protocolo, el módulo los distingue explícitamente. Así, el sender puede decir con claridad si falló al construir un paquete válido o si falló al interactuar con la red. Otra vez aparece una idea transversal del proyecto: errores distintos deben seguir siendo errores distintos.
La función run() es el verdadero corazón del emisor. Todo empieza con dos pasos de red muy concretos: UdpSocket::bind(config.bind_addr) y socket.connect(config.dest_addr). Aunque UDP es un protocolo sin conexión, este connect() sigue siendo útil porque fija un destino por defecto y simplifica el envío posterior. El sender ya no necesita repetir la dirección cada vez que manda un paquete.
Después aparece la preparación del laboratorio. Se crea un generador aleatorio con semilla fija, un DelayPlan para decidir dónde inyectar atraso, un ReorderBuffer para introducir desorden leve y una estructura summary que irá acumulando el resultado de toda la corrida.
También se prepara la cadencia temporal del flujo. Aquí el sistema no dispara paquetes tan rápido como pueda, porque eso no se parecería a un flujo de audio ni a una transmisión interactiva razonable. En cambio, calcula un interval a partir de la configuración y trabaja con un next_tick basado en Instant, que es justamente la clase de reloj monotónico que conviene usar para temporización local.
Esto ya nos deja ver una idea importante: el sender no genera un montón de datagramas desordenados, sino un flujo con ritmo. Y en sistemas de tiempo real, el ritmo es parte esencial del experimento.
El gran bucle del sender recorre 0..config.count. Cada iteración representa un nuevo frame dentro del flujo. A partir del número de secuencia, el sistema genera un payload sintético con build_pseudoaudio_frame() y lo envuelve en un Packet usando el timestamp actual en milisegundos.
Aquí hay un detalle muy interesante: el timestamp se toma en el momento de construcción del paquete, antes de aplicar las perturbaciones artificiales como atraso o reordenamiento. Y eso, lejos de ser un error, es exactamente lo que hace visible el fenómeno que queremos enseñar. Si después inyectamos 60 ms de retraso, ese tiempo extra quedará reflejado en la diferencia entre el instante de creación y el instante de llegada al receptor.
Es decir: el timestamp del sender no marca solo “cuándo existió el paquete”, sino el punto de partida desde el cual el sistema medirá toda la degradación introducida más adelante.
Después viene la primera decisión de perturbación: should_drop. Aquí el emisor puede descartar un paquete de dos maneras. La primera es determinista, usando drop_every. La segunda es probabilística, usando drop_chance y el generador pseudoaleatorio. Esto permite construir escenarios reproducibles o escenarios con variación controlada, según lo que necesitemos para el experimento.
Si el paquete se descarta, el flujo no se detiene ni intenta reparar nada. El sender simplemente lo registra como dropped y sigue adelante. Esa es una diferencia clave frente a TCP: aquí la pérdida no activa ninguna retransmisión automática. El sistema acepta el hueco y continúa produciendo tiempo real.
Si el paquete no se pierde, entra en la segunda etapa del laboratorio: el reordenamiento. Para eso se usa reorder_buffer.push_or_hold(packet, config.reorder_every). La idea es muy simple y muy efectiva. Algunos paquetes se retienen momentáneamente y se liberan junto con el siguiente, en orden invertido. Con esa pequeña maniobra basta para fabricar un evento de desorden real sin tener que simular toda la complejidad de Internet.
Luego, para cada paquete listo, el sender aplica una posible demora artificial con maybe_delay(). Si el plan de atraso lo indica, el hilo se duerme durante una duración adicional antes del envío. Y como el timestamp ya había sido fijado, ese retraso terminará apareciendo como un incremento observable del delay_ms en el receptor.
Solo después de eso ocurre el envío real con send_packet(). Este helper convierte el Packet a bytes mediante to_bytes() y luego llama a socket.send(&bytes). La separación en una función auxiliar también es una buena decisión: aísla la serialización y el envío en una unidad pequeña, legible y fácil de reutilizar.
En modo Verbose, el sender además escribe logs como SEND, DROP, DELAY o FLUSH. En modo Summary, solo imprime progreso acumulado cada cierto número de paquetes. Y en Quiet, reduce su voz al mínimo. Es exactamente el tipo de comportamiento que un tutorial agradece: mucha visibilidad cuando estás explorando, poco ruido cuando quieres observar resultados globales.
Una de las decisiones más elegantes del módulo está en la forma de mantener la cadencia. En lugar de dormir siempre una duración fija al final del bucle, el sender va acumulando un next_tick y luego llama a sleep_until(next_tick). Eso hace que el flujo se comporte más como una línea temporal continua y menos como una secuencia ingenua de “trabajo y luego pausa”.
Esta diferencia parece pequeña, pero es importante. Si una iteración tarda un poco más por culpa de un atraso o una operación adicional, el uso de Instant y next_tick mantiene una referencia temporal más estable para el resto del flujo. Es una manera simple y correcta de aproximar una producción periódica.
Al terminar el bucle principal, el sender todavía hace una cosa más: llama a reorder_buffer.flush(). Esto garantiza que, si quedó un paquete retenido al final de la corrida, no se pierda silenciosamente dentro del buffer de reordenamiento. En vez de dejar una anomalía oculta, el sistema decide vaciarla explícitamente y registrar ese envío final.
Los helpers del final completan el diseño. unix_timestamp_ms() obtiene una marca temporal robusta basada en UNIX epoch. sleep_until() encapsula la espera monotónica. Y log_sender() concentra la política de logging para que la lógica principal del emisor no quede contaminada con condiciones repetitivas de salida por consola.
Todo eso hace que sender/src/lib.rs se lea como una pieza de ingeniería bastante limpia: genera, perturba, temporiza, transmite y resume. Nada más, pero tampoco nada menos.
Archivo
crates/sender/src/main.rs
Este binario es la puerta de entrada del emisor desde consola. Su función es mínima: leer argumentos, construir la configuración y delegar la lógica real a la librería sender.
use common::SenderConfig;
fn main() {
let args: Vec<String> = std::env::args().skip(1).collect();
let config = match SenderConfig::from_args(&args) {
Ok(cfg) => cfg,
Err(err) => {
eprintln!("{err}");
if err.0.starts_with("Uso sender") {
std::process::exit(0);
}
std::process::exit(2);
}
};
if let Err(err) = sender::run(config) {
eprintln!("{err}");
std::process::exit(1);
}
}
El main.rs del sender confirma una vez más la limpieza de la arquitectura. Aquí no vive la lógica real del emisor; solo la interfaz mínima para arrancarlo desde terminal.
Primero recopila los argumentos con std::env::args(), luego delega el parseo a SenderConfig::from_args() y, si todo sale bien, llama a sender::run(config). Si hay un error de uso, imprime el mensaje correspondiente y sale con un código controlado. Si la ejecución falla, también informa el error y finaliza de forma explícita.
Esta decisión hace que el sender sea más fácil de probar, más fácil de leer y más fácil de explicar. La terminal arranca el proceso, pero la lógica de verdad sigue viviendo en la librería.
Con esto, el lado emisor del experimento queda completo. El sistema ya sabe fabricar un flujo sintético, empaquetarlo con nuestro protocolo, inyectar pérdida, atraso y desorden, y lanzarlo por UDP con una cadencia que se parece mucho más a un flujo real que a una simple ráfaga de bytes.
Dicho de otra forma: el laboratorio ya está produciendo señales y anomalías. El flujo ya existe.
Pero todavía falta el otro protagonista de la historia: el proceso que recibirá ese tráfico, reconstruirá los paquetes, medirá su comportamiento y convertirá cada llegada en una lectura útil del estado de la red. Ahora toca construir el receiver, que será el juez final del experimento.
9. receiver: donde la red deja de ser tránsito y se vuelve juicio
Con sender ya conseguimos algo fundamental: el flujo existe. El sistema puede fabricar frames sintéticos, empaquetarlos, asignarles un instante temporal, perturbarlos y lanzarlos por UDP con una cadencia realista. Pero todavía nos falta la mitad más importante del experimento: la parte que observa lo que llega y decide qué significa.
Ese papel lo cumple receiver. Esta crate no produce tráfico; lo escucha. No inventa anomalías; las detecta. No genera la historia del flujo; la interpreta. Y eso la convierte en la pieza que finalmente transforma una sucesión de datagramas en un diagnóstico operativo sobre pérdida, desorden, duplicación y tardanza.
Dicho de forma simple: si el sender es el inyector del experimento, el receiver es el juez que dicta el veredicto.
Archivo
crates/receiver/Cargo.toml · manifiesto del receptor
Este archivo define el paquete receiver, sus dependencias y su doble naturaleza como librería reutilizable y binario ejecutable. También declara una dependencia de desarrollo sobre sender, útil para pruebas de integración extremo a extremo.
[package]
name = "receiver"
version = "0.1.0"
edition = "2021"
[dependencies]
common = { path = "../common" }
[lib]
name = "receiver"
path = "src/lib.rs"
[[bin]]
name = "receiver"
path = "src/main.rs"
[dev-dependencies]
sender = { path = "../sender" }
Igual que en el caso del sender, este manifiesto deja clara una decisión arquitectónica muy acertada: el receiver no está pensado como un binario improvisado, sino como una pieza con lógica reutilizable. La funcionalidad importante vivirá en src/lib.rs, mientras que src/main.rs se limitará a arrancar el proceso desde consola.
El detalle adicional aquí es [dev-dependencies]. El hecho de que el receptor declare a sender como dependencia de desarrollo anticipa una idea muy buena del proyecto: más adelante podremos probar el sistema completo de extremo a extremo, generando tráfico real desde el emisor y verificando la reacción del receptor dentro del mismo workspace.
Comentarios y valoraciones
No hay comentarios aún. ¡Sé el primero en opinar!