From 34e2fd23ac469f6b19d41e61271f4d63c3379349 Mon Sep 17 00:00:00 2001 From: Matthew Esposito Date: Sat, 19 Apr 2025 22:30:59 -0400 Subject: [PATCH] feat: Add p2p monitoring and improve p2p node communication --- Cargo.lock | 1 + Cargo.toml | 6 +- index.html | 228 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 +- src/p2p.rs | 47 ++++++---- src/p2p_mon.rs | 137 +++++++++++++++++++++++++++++ 6 files changed, 404 insertions(+), 19 deletions(-) create mode 100644 index.html create mode 100644 src/p2p_mon.rs diff --git a/Cargo.lock b/Cargo.lock index 660d6aa..1126871 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4db60e3..0b80726 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ data-encoding = "2.9.0" postcard = "1.1.1" bytes = "1.10.1" ed25519-dalek = "2.1.1" -dashmap = "6.1.0" +dashmap = { version = "6.1.0", features = ["serde"] } [dev-dependencies] lipsum = "0.9.0" @@ -77,3 +77,7 @@ sealed_test = "1.0.0" codegen-units = 1 lto = true strip = "symbols" + +[[bin]] +name = "p2p_mon" +path = "src/p2p_mon.rs" diff --git a/index.html b/index.html new file mode 100644 index 0000000..749624d --- /dev/null +++ b/index.html @@ -0,0 +1,228 @@ + + + + + + + + Cluster Monitor + + + + +
+
+

Node setup

+ +
+ + +
+ +
+ +
+
+ +
+

Messages

+
+
+
+
+ + + + + \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index f869d08..c2195e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use cached::proc_macro::cached; use clap::{Arg, ArgAction, Command}; -use redlib::p2p::ONLINE; +use redlib::p2p::{map_json, ONLINE}; use std::str::FromStr; use std::sync::atomic::Ordering; @@ -239,6 +239,8 @@ async fn main() { resource("", "text/plain", false).boxed() }); + app.at("/map.json").get(|_| async move { map_json().await }.boxed()); + // Read static files app.at("/style.css").get(|_| style().boxed()); app diff --git a/src/p2p.rs b/src/p2p.rs index 9708cb5..25469b9 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -7,6 +7,7 @@ use bytes::Bytes; use dashmap::DashMap; use ed25519_dalek::Signature; use futures_lite::StreamExt; +use hyper::{Body, Response}; use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey}; use iroh_gossip::{ net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender}, @@ -60,30 +61,29 @@ pub async fn main() -> Result<(), Box> { println!("> trying to connect to {} peers...", peers.len()); // add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed for peer in peers.into_iter() { - endpoint.add_node_addr(peer)?; + let result = endpoint.add_node_addr(peer); + if let Err(e) = result { + println!("> failed to add peer: {e}"); + } } }; let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split(); println!("> connected!"); - let secret_key = endpoint.secret_key().clone(); - - let message = Message { - hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), - online: true, - }; - let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message)?; - sender.broadcast(encoded_message).await?; - - task::spawn(subscribe_loop(receiver)); - - task::spawn(sender_loop(sender, secret_key)); + let join_handle = task::spawn(subscribe_loop(receiver)); + let sender_handle = task::spawn(sender_loop(sender, endpoint.clone())); + let ctrl_c_handle = task::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + println!("> received ctrl-c, exiting"); + }); + let _ = tokio::join!(join_handle, sender_handle, ctrl_c_handle); Ok(()) } async fn subscribe_loop(mut receiver: GossipReceiver) { while let Ok(Some(event)) = receiver.try_next().await { + eprintln!("received event!: {event:?}"); if let Event::Gossip(GossipEvent::Received(msg)) = event { let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) { Ok(v) => v, @@ -98,16 +98,17 @@ async fn subscribe_loop(mut receiver: GossipReceiver) { } } -async fn sender_loop(sender: GossipSender, secret_key: SecretKey) { +async fn sender_loop(sender: GossipSender, endpoint: Endpoint) { loop { let message = Message { hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), online: ONLINE.load(Ordering::SeqCst), }; - let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message).unwrap(); - let _ = sender.broadcast(encoded_message).await; + let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message).unwrap(); + let message_delivery = sender.broadcast(encoded_message).await; + println!("> sent message: {message:?}: {message_delivery:?}"); - sleep(std::time::Duration::from_secs(10)).await; + sleep(std::time::Duration::from_secs(3)).await; } } @@ -174,3 +175,15 @@ struct Message { hostname: String, online: bool, } + +pub async fn map_json() -> Result, String> { + let map = &*DASHMAP; + let map = serde_json::to_string(map).unwrap(); + Ok( + Response::builder() + .status(200) + .header("content-type", "application/json") + .body(map.into()) + .unwrap_or_default(), + ) +} diff --git a/src/p2p_mon.rs b/src/p2p_mon.rs new file mode 100644 index 0000000..164dfae --- /dev/null +++ b/src/p2p_mon.rs @@ -0,0 +1,137 @@ +use std::{str::FromStr, time::SystemTime}; + +use bytes::Bytes; +use ed25519_dalek::Signature; +use futures_lite::StreamExt; +use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey}; +use iroh_gossip::{ + net::{Event, Gossip, GossipEvent}, + proto::TopicId, + ALPN as GOSSIP_ALPN, +}; +use serde::{Deserialize, Serialize}; + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + let endpoint = Endpoint::builder().discovery_n0().bind().await?; + let builder = Router::builder(endpoint.clone()); + let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?; + let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?; + + let ticket_str = std::env::var("REDLIB_P2P_TICKET").expect("REDLIB_P2P_TICKET not set"); + let Ticket { topic, peers } = Ticket::from_str(&ticket_str)?; + + let ticket = { + let me = endpoint.node_addr().await?; + let peers = peers.iter().cloned().chain([me]).collect(); + Ticket { topic, peers } + }; + eprintln!("> ticket to join us: {ticket}"); + + let peer_ids = peers.iter().map(|p| p.node_id).collect(); + if peers.is_empty() { + eprintln!("> waiting for peers to join us..."); + } else { + eprintln!("> trying to connect to {} peers...", peers.len()); + // add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed + for peer in peers.into_iter() { + let result = endpoint.add_node_addr(peer); + if let Err(e) = result { + println!("> failed to add peer: {e}"); + } + } + }; + let (_sender, mut receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split(); + eprintln!("> connected!"); + loop { + match receiver.try_next().await { + Ok(Some(event)) => { + eprintln!("received event!: {event:?}"); + if let Event::Gossip(GossipEvent::Received(msg)) = event { + let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) { + Ok(v) => v, + Err(e) => { + eprintln!("> failed to verify message: {}", e); + continue; + } + }; + // Log the message log + let message_log: MessageLog = message.into(); + println!("{}", serde_json::to_string(&message_log).unwrap()); + } + } + Ok(None) => continue, + Err(e) => { + eprintln!("> failed to receive: {e}"); + continue; + } + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct Ticket { + topic: TopicId, + peers: Vec, +} + +impl Ticket { + /// Deserializes from bytes. + fn from_bytes(bytes: &[u8]) -> anyhow::Result { + postcard::from_bytes(bytes).map_err(Into::into) + } + /// Serializes to bytes. + pub fn to_bytes(&self) -> Vec { + postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible") + } +} + +impl FromStr for Ticket { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?; + Self::from_bytes(&bytes) + } +} +impl std::fmt::Display for Ticket { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]); + text.make_ascii_lowercase(); + write!(f, "{}", text) + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct SignedMessage { + from: PublicKey, + data: Bytes, + signature: Signature, +} + +impl SignedMessage { + pub fn verify_and_decode(bytes: &[u8]) -> anyhow::Result<(PublicKey, Message)> { + let signed_message: Self = postcard::from_bytes(bytes)?; + let key: PublicKey = signed_message.from; + key.verify(&signed_message.data, &signed_message.signature)?; + let message: Message = postcard::from_bytes(&signed_message.data)?; + Ok((signed_message.from, message)) + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct Message { + hostname: String, + online: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +struct MessageLog { + timestamp: u64, + message: Message, +} +impl From for MessageLog { + fn from(message: Message) -> Self { + let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + Self { timestamp, message } + } +}