feat: Add p2p monitoring and improve p2p node communication

This commit is contained in:
Matthew Esposito 2025-04-19 22:30:59 -04:00
parent 3f0b27604b
commit 34e2fd23ac
6 changed files with 404 additions and 19 deletions

1
Cargo.lock generated
View file

@ -779,6 +779,7 @@ dependencies = [
"lock_api", "lock_api",
"once_cell", "once_cell",
"parking_lot_core", "parking_lot_core",
"serde",
] ]
[[package]] [[package]]

View file

@ -67,7 +67,7 @@ data-encoding = "2.9.0"
postcard = "1.1.1" postcard = "1.1.1"
bytes = "1.10.1" bytes = "1.10.1"
ed25519-dalek = "2.1.1" ed25519-dalek = "2.1.1"
dashmap = "6.1.0" dashmap = { version = "6.1.0", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
lipsum = "0.9.0" lipsum = "0.9.0"
@ -77,3 +77,7 @@ sealed_test = "1.0.0"
codegen-units = 1 codegen-units = 1
lto = true lto = true
strip = "symbols" strip = "symbols"
[[bin]]
name = "p2p_mon"
path = "src/p2p_mon.rs"

228
index.html Normal file
View file

@ -0,0 +1,228 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Cluster Monitor</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="p-6 font-sans select-none">
<div class="flex">
<div id="setupPanel" class="w-1/3 border-4 border-red-500 p-6">
<h2 class="text-3xl font-semibold mb-6">Node setup</h2>
<div id="nodesContainer" class="space-y-6"></div>
<button id="addNodeBtn" class="mt-4 text-blue-600 hover:underline">+ Add node</button>
</div>
<div class="border-l-4 border-red-500 mx-6"></div>
<div class="flex-1 relative">
<div id="circlesContainer" class="flex justify-around flex-wrap gap-12 pt-4"></div>
<div id="messagesBox"
class="border-4 border-red-500 p-4 w-80 absolute bottom-0 right-0 mb-4 mr-4 bg-white/80 backdrop-blur">
<h3 class="text-xl font-semibold mb-2">Messages</h3>
<div id="messagesList" class="text-red-600 font-mono space-y-1 text-sm max-h-60 overflow-y-auto"></div>
</div>
</div>
</div>
<script>
// ─────────────────────────────────────────────────────────────────────
// Application state
// ─────────────────────────────────────────────────────────────────────
const nodes = []; // [{name, host, online, view:{}}]
let baseTimestamp = null; // first log timestamp → time origin
let processedLines = 0; // tracks #lines already handled in /log.json
// ─────────────────────────────────────────────────────────────────────
// Utility helpers
// ─────────────────────────────────────────────────────────────────────
const fmtTime = (seconds) => {
const m = String(Math.floor(seconds / 60)).padStart(1, '0');
const s = String(seconds % 60).padStart(2, '0');
return `${m}:${s}`;
};
const byHostname = (hostname) => nodes.find(n => (new URL(n.host)).host === hostname);
const colourForState = (online) => online ? 'border-green-400' : 'border-red-500';
// ─────────────────────────────────────────────────────────────────────
// DOM creation helpers
// ─────────────────────────────────────────────────────────────────────
function buildNodeRow(node, idx) {
const row = document.createElement('div');
// label
const label = document.createElement('label');
label.textContent = node.name;
label.className = 'block font-medium';
row.appendChild(label);
// host input
const input = document.createElement('input');
input.type = 'text';
input.value = node.host;
input.className = 'w-full border px-2 py-1 rounded mt-1 text-sm';
input.addEventListener('change', () => {
node.host = input.value.trim();
});
row.appendChild(input);
// force buttons wrapper
const btnWrap = document.createElement('div');
btnWrap.className = 'grid grid-cols-2 gap-1 mt-2';
const offlineBtn = document.createElement('button');
offlineBtn.textContent = 'Force Offline';
offlineBtn.className = 'w-full py-1 bg-red-600 text-white rounded';
offlineBtn.onclick = () => fetch(node.host + '/force_offline').catch(console.error);
const onlineBtn = document.createElement('button');
onlineBtn.textContent = 'Force Online';
onlineBtn.className = 'w-full py-1 bg-green-600 text-white rounded';
onlineBtn.onclick = () => fetch(node.host + '/force_online').catch(console.error);
btnWrap.append(offlineBtn, onlineBtn);
row.appendChild(btnWrap);
return row;
}
function buildCircle(node, idx) {
const circle = document.createElement('div');
circle.id = `circle-${idx}`;
circle.className = `w-40 h-40 rounded-full flex flex-col items-center justify-center border-4 ${colourForState(node.online)} transition-colors`;
const title = document.createElement('div');
title.textContent = node.name;
title.className = 'font-medium';
circle.appendChild(title);
const list = document.createElement('div');
list.id = `view-${idx}`;
list.className = 'text-xs mt-1 text-center whitespace-pre';
circle.appendChild(list);
return circle;
}
// ─────────────────────────────────────────────────────────────────────
// Rendering functions
// ─────────────────────────────────────────────────────────────────────
function renderNodesPanel() {
const container = document.getElementById('nodesContainer');
container.innerHTML = '';
nodes.forEach((n, i) => container.appendChild(buildNodeRow(n, i)));
}
function renderCircles() {
const container = document.getElementById('circlesContainer');
container.innerHTML = '';
nodes.forEach((n, i) => container.appendChild(buildCircle(n, i)));
}
function refreshCircle(idx) {
const node = nodes[idx];
// border colour
const circle = document.getElementById(`circle-${idx}`);
if (circle) {
circle.className = circle.className.replace(/border-(green|red)-[0-9]+/g, '') + ' ' + colourForState(node.online);
// update view list
const view = document.getElementById(`view-${idx}`);
if (view) {
const lines = nodes.map((n2, j) => `${j + 1}: ${node.view[n2.host]?.online ? 'online' : 'offline'}`);
view.textContent = lines.join('\n');
}
}
}
function appendMessage(line, nodeIdx) {
const list = document.getElementById('messagesList');
const div = document.createElement('div');
div.textContent = line;
list.appendChild(div);
list.scrollTop = list.scrollHeight;
}
// ─────────────────────────────────────────────────────────────────────
// Polling: /log.json
// ─────────────────────────────────────────────────────────────────────
async function pollLogs() {
try {
const res = await fetch('/log.json', { cache: 'no-store' });
const text = await res.text();
const lines = text.trim().split(/\n+/);
for (let i = processedLines; i < lines.length; i++) {
const obj = JSON.parse(lines[i]);
if (baseTimestamp === null) baseTimestamp = obj.timestamp;
const secondsSinceStart = obj.timestamp - baseTimestamp;
const node = byHostname(obj.message.hostname);
if (!node) continue;
node.online = obj.message.online;
const idx = nodes.indexOf(node);
refreshCircle(idx);
const statusText = node.online ? 'online' : 'offline';
appendMessage(`${fmtTime(secondsSinceStart)}: ${node.name} ${statusText}`, idx);
}
processedLines = lines.length;
} catch (err) {
console.error('log poll error', err);
}
}
// ─────────────────────────────────────────────────────────────────────
// Poll each node's /map.json
// ─────────────────────────────────────────────────────────────────────
async function pollMaps() {
await Promise.all(nodes.map(async (node, idx) => {
try {
const res = await fetch(node.host + '/map.json', { cache: 'no-store' });
const data = await res.json(); // {hostname: bool}
node.view = {};
for (const [host, online] of Object.entries(data)) {
node.view[host] = { online };
}
refreshCircle(idx);
} catch (err) {
console.error('map poll error', node.host, err);
}
}));
}
// ─────────────────────────────────────────────────────────────────────
// Addnode button logic & initial population
// ─────────────────────────────────────────────────────────────────────
document.getElementById('addNodeBtn').addEventListener('click', () => {
const nextIdx = nodes.length + 1;
const defaultHost = `http://localhost:808${nextIdx - 1}`; // 8080, 8081 ...
nodes.push({ name: `Node ${nextIdx}`, host: defaultHost, online: false, view: {} });
renderNodesPanel();
renderCircles();
});
// bootstrap with 3 nodes
[0, 1, 2].forEach(i => {
nodes.push({ name: `Node ${i + 1}`, host: `http://localhost:808${i}`, online: false, view: {} });
});
renderNodesPanel();
renderCircles();
// ─────────────────────────────────────────────────────────────────────
// Timers
// ─────────────────────────────────────────────────────────────────────
setInterval(pollLogs, 2000); // every 2s
setInterval(pollMaps, 3000); // every 3s
</script>
</body>
</html>

View file

@ -4,7 +4,7 @@
use cached::proc_macro::cached; use cached::proc_macro::cached;
use clap::{Arg, ArgAction, Command}; use clap::{Arg, ArgAction, Command};
use redlib::p2p::ONLINE; use redlib::p2p::{map_json, ONLINE};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -239,6 +239,8 @@ async fn main() {
resource("", "text/plain", false).boxed() resource("", "text/plain", false).boxed()
}); });
app.at("/map.json").get(|_| async move { map_json().await }.boxed());
// Read static files // Read static files
app.at("/style.css").get(|_| style().boxed()); app.at("/style.css").get(|_| style().boxed());
app app

View file

@ -7,6 +7,7 @@ use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use ed25519_dalek::Signature; use ed25519_dalek::Signature;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use hyper::{Body, Response};
use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey}; use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey};
use iroh_gossip::{ use iroh_gossip::{
net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender}, net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender},
@ -60,30 +61,29 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("> trying to connect to {} peers...", peers.len()); println!("> trying to connect to {} peers...", peers.len());
// add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed // add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
for peer in peers.into_iter() { for peer in peers.into_iter() {
endpoint.add_node_addr(peer)?; let result = endpoint.add_node_addr(peer);
if let Err(e) = result {
println!("> failed to add peer: {e}");
}
} }
}; };
let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split(); let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
println!("> connected!"); println!("> connected!");
let secret_key = endpoint.secret_key().clone(); let join_handle = task::spawn(subscribe_loop(receiver));
let sender_handle = task::spawn(sender_loop(sender, endpoint.clone()));
let message = Message { let ctrl_c_handle = task::spawn(async move {
hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), tokio::signal::ctrl_c().await.unwrap();
online: true, println!("> received ctrl-c, exiting");
}; });
let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message)?; let _ = tokio::join!(join_handle, sender_handle, ctrl_c_handle);
sender.broadcast(encoded_message).await?;
task::spawn(subscribe_loop(receiver));
task::spawn(sender_loop(sender, secret_key));
Ok(()) Ok(())
} }
async fn subscribe_loop(mut receiver: GossipReceiver) { async fn subscribe_loop(mut receiver: GossipReceiver) {
while let Ok(Some(event)) = receiver.try_next().await { while let Ok(Some(event)) = receiver.try_next().await {
eprintln!("received event!: {event:?}");
if let Event::Gossip(GossipEvent::Received(msg)) = event { if let Event::Gossip(GossipEvent::Received(msg)) = event {
let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) { let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) {
Ok(v) => v, Ok(v) => v,
@ -98,16 +98,17 @@ async fn subscribe_loop(mut receiver: GossipReceiver) {
} }
} }
async fn sender_loop(sender: GossipSender, secret_key: SecretKey) { async fn sender_loop(sender: GossipSender, endpoint: Endpoint) {
loop { loop {
let message = Message { let message = Message {
hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(),
online: ONLINE.load(Ordering::SeqCst), online: ONLINE.load(Ordering::SeqCst),
}; };
let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message).unwrap(); let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message).unwrap();
let _ = sender.broadcast(encoded_message).await; let message_delivery = sender.broadcast(encoded_message).await;
println!("> sent message: {message:?}: {message_delivery:?}");
sleep(std::time::Duration::from_secs(10)).await; sleep(std::time::Duration::from_secs(3)).await;
} }
} }
@ -174,3 +175,15 @@ struct Message {
hostname: String, hostname: String,
online: bool, online: bool,
} }
pub async fn map_json() -> Result<Response<Body>, String> {
let map = &*DASHMAP;
let map = serde_json::to_string(map).unwrap();
Ok(
Response::builder()
.status(200)
.header("content-type", "application/json")
.body(map.into())
.unwrap_or_default(),
)
}

137
src/p2p_mon.rs Normal file
View file

@ -0,0 +1,137 @@
use std::{str::FromStr, time::SystemTime};
use bytes::Bytes;
use ed25519_dalek::Signature;
use futures_lite::StreamExt;
use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey};
use iroh_gossip::{
net::{Event, Gossip, GossipEvent},
proto::TopicId,
ALPN as GOSSIP_ALPN,
};
use serde::{Deserialize, Serialize};
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
let builder = Router::builder(endpoint.clone());
let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?;
let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?;
let ticket_str = std::env::var("REDLIB_P2P_TICKET").expect("REDLIB_P2P_TICKET not set");
let Ticket { topic, peers } = Ticket::from_str(&ticket_str)?;
let ticket = {
let me = endpoint.node_addr().await?;
let peers = peers.iter().cloned().chain([me]).collect();
Ticket { topic, peers }
};
eprintln!("> ticket to join us: {ticket}");
let peer_ids = peers.iter().map(|p| p.node_id).collect();
if peers.is_empty() {
eprintln!("> waiting for peers to join us...");
} else {
eprintln!("> trying to connect to {} peers...", peers.len());
// add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
for peer in peers.into_iter() {
let result = endpoint.add_node_addr(peer);
if let Err(e) = result {
println!("> failed to add peer: {e}");
}
}
};
let (_sender, mut receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
eprintln!("> connected!");
loop {
match receiver.try_next().await {
Ok(Some(event)) => {
eprintln!("received event!: {event:?}");
if let Event::Gossip(GossipEvent::Received(msg)) = event {
let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) {
Ok(v) => v,
Err(e) => {
eprintln!("> failed to verify message: {}", e);
continue;
}
};
// Log the message log
let message_log: MessageLog = message.into();
println!("{}", serde_json::to_string(&message_log).unwrap());
}
}
Ok(None) => continue,
Err(e) => {
eprintln!("> failed to receive: {e}");
continue;
}
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Ticket {
topic: TopicId,
peers: Vec<NodeAddr>,
}
impl Ticket {
/// Deserializes from bytes.
fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
postcard::from_bytes(bytes).map_err(Into::into)
}
/// Serializes to bytes.
pub fn to_bytes(&self) -> Vec<u8> {
postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible")
}
}
impl FromStr for Ticket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?;
Self::from_bytes(&bytes)
}
}
impl std::fmt::Display for Ticket {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]);
text.make_ascii_lowercase();
write!(f, "{}", text)
}
}
#[derive(Debug, Serialize, Deserialize)]
struct SignedMessage {
from: PublicKey,
data: Bytes,
signature: Signature,
}
impl SignedMessage {
pub fn verify_and_decode(bytes: &[u8]) -> anyhow::Result<(PublicKey, Message)> {
let signed_message: Self = postcard::from_bytes(bytes)?;
let key: PublicKey = signed_message.from;
key.verify(&signed_message.data, &signed_message.signature)?;
let message: Message = postcard::from_bytes(&signed_message.data)?;
Ok((signed_message.from, message))
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Message {
hostname: String,
online: bool,
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageLog {
timestamp: u64,
message: Message,
}
impl From<Message> for MessageLog {
fn from(message: Message) -> Self {
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
Self { timestamp, message }
}
}