diff --git a/Cargo.lock b/Cargo.lock
index 660d6aa..1126871 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -779,6 +779,7 @@ dependencies = [
"lock_api",
"once_cell",
"parking_lot_core",
+ "serde",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 4db60e3..0b80726 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -67,7 +67,7 @@ data-encoding = "2.9.0"
postcard = "1.1.1"
bytes = "1.10.1"
ed25519-dalek = "2.1.1"
-dashmap = "6.1.0"
+dashmap = { version = "6.1.0", features = ["serde"] }
[dev-dependencies]
lipsum = "0.9.0"
@@ -77,3 +77,7 @@ sealed_test = "1.0.0"
codegen-units = 1
lto = true
strip = "symbols"
+
+[[bin]]
+name = "p2p_mon"
+path = "src/p2p_mon.rs"
diff --git a/index.html b/index.html
new file mode 100644
index 0000000..749624d
--- /dev/null
+++ b/index.html
@@ -0,0 +1,228 @@
+
+
+
+
+
+
+
+ Cluster Monitor
+
+
+
+
+
+
+
Node setup
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index f869d08..c2195e9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,7 +4,7 @@
use cached::proc_macro::cached;
use clap::{Arg, ArgAction, Command};
-use redlib::p2p::ONLINE;
+use redlib::p2p::{map_json, ONLINE};
use std::str::FromStr;
use std::sync::atomic::Ordering;
@@ -239,6 +239,8 @@ async fn main() {
resource("", "text/plain", false).boxed()
});
+ app.at("/map.json").get(|_| async move { map_json().await }.boxed());
+
// Read static files
app.at("/style.css").get(|_| style().boxed());
app
diff --git a/src/p2p.rs b/src/p2p.rs
index 9708cb5..25469b9 100644
--- a/src/p2p.rs
+++ b/src/p2p.rs
@@ -7,6 +7,7 @@ use bytes::Bytes;
use dashmap::DashMap;
use ed25519_dalek::Signature;
use futures_lite::StreamExt;
+use hyper::{Body, Response};
use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey};
use iroh_gossip::{
net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender},
@@ -60,30 +61,29 @@ pub async fn main() -> Result<(), Box> {
println!("> trying to connect to {} peers...", peers.len());
// add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
for peer in peers.into_iter() {
- endpoint.add_node_addr(peer)?;
+ let result = endpoint.add_node_addr(peer);
+ if let Err(e) = result {
+ println!("> failed to add peer: {e}");
+ }
}
};
let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
println!("> connected!");
- let secret_key = endpoint.secret_key().clone();
-
- let message = Message {
- hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(),
- online: true,
- };
- let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message)?;
- sender.broadcast(encoded_message).await?;
-
- task::spawn(subscribe_loop(receiver));
-
- task::spawn(sender_loop(sender, secret_key));
+ let join_handle = task::spawn(subscribe_loop(receiver));
+ let sender_handle = task::spawn(sender_loop(sender, endpoint.clone()));
+ let ctrl_c_handle = task::spawn(async move {
+ tokio::signal::ctrl_c().await.unwrap();
+ println!("> received ctrl-c, exiting");
+ });
+ let _ = tokio::join!(join_handle, sender_handle, ctrl_c_handle);
Ok(())
}
async fn subscribe_loop(mut receiver: GossipReceiver) {
while let Ok(Some(event)) = receiver.try_next().await {
+ eprintln!("received event!: {event:?}");
if let Event::Gossip(GossipEvent::Received(msg)) = event {
let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) {
Ok(v) => v,
@@ -98,16 +98,17 @@ async fn subscribe_loop(mut receiver: GossipReceiver) {
}
}
-async fn sender_loop(sender: GossipSender, secret_key: SecretKey) {
+async fn sender_loop(sender: GossipSender, endpoint: Endpoint) {
loop {
let message = Message {
hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(),
online: ONLINE.load(Ordering::SeqCst),
};
- let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message).unwrap();
- let _ = sender.broadcast(encoded_message).await;
+ let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message).unwrap();
+ let message_delivery = sender.broadcast(encoded_message).await;
+ println!("> sent message: {message:?}: {message_delivery:?}");
- sleep(std::time::Duration::from_secs(10)).await;
+ sleep(std::time::Duration::from_secs(3)).await;
}
}
@@ -174,3 +175,15 @@ struct Message {
hostname: String,
online: bool,
}
+
+pub async fn map_json() -> Result, String> {
+ let map = &*DASHMAP;
+ let map = serde_json::to_string(map).unwrap();
+ Ok(
+ Response::builder()
+ .status(200)
+ .header("content-type", "application/json")
+ .body(map.into())
+ .unwrap_or_default(),
+ )
+}
diff --git a/src/p2p_mon.rs b/src/p2p_mon.rs
new file mode 100644
index 0000000..164dfae
--- /dev/null
+++ b/src/p2p_mon.rs
@@ -0,0 +1,137 @@
+use std::{str::FromStr, time::SystemTime};
+
+use bytes::Bytes;
+use ed25519_dalek::Signature;
+use futures_lite::StreamExt;
+use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey};
+use iroh_gossip::{
+ net::{Event, Gossip, GossipEvent},
+ proto::TopicId,
+ ALPN as GOSSIP_ALPN,
+};
+use serde::{Deserialize, Serialize};
+
+#[tokio::main]
+pub async fn main() -> Result<(), Box> {
+ let endpoint = Endpoint::builder().discovery_n0().bind().await?;
+ let builder = Router::builder(endpoint.clone());
+ let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?;
+ let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?;
+
+ let ticket_str = std::env::var("REDLIB_P2P_TICKET").expect("REDLIB_P2P_TICKET not set");
+ let Ticket { topic, peers } = Ticket::from_str(&ticket_str)?;
+
+ let ticket = {
+ let me = endpoint.node_addr().await?;
+ let peers = peers.iter().cloned().chain([me]).collect();
+ Ticket { topic, peers }
+ };
+ eprintln!("> ticket to join us: {ticket}");
+
+ let peer_ids = peers.iter().map(|p| p.node_id).collect();
+ if peers.is_empty() {
+ eprintln!("> waiting for peers to join us...");
+ } else {
+ eprintln!("> trying to connect to {} peers...", peers.len());
+ // add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed
+ for peer in peers.into_iter() {
+ let result = endpoint.add_node_addr(peer);
+ if let Err(e) = result {
+ println!("> failed to add peer: {e}");
+ }
+ }
+ };
+ let (_sender, mut receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
+ eprintln!("> connected!");
+ loop {
+ match receiver.try_next().await {
+ Ok(Some(event)) => {
+ eprintln!("received event!: {event:?}");
+ if let Event::Gossip(GossipEvent::Received(msg)) = event {
+ let (_from, message) = match SignedMessage::verify_and_decode(&msg.content) {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("> failed to verify message: {}", e);
+ continue;
+ }
+ };
+ // Log the message log
+ let message_log: MessageLog = message.into();
+ println!("{}", serde_json::to_string(&message_log).unwrap());
+ }
+ }
+ Ok(None) => continue,
+ Err(e) => {
+ eprintln!("> failed to receive: {e}");
+ continue;
+ }
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Ticket {
+ topic: TopicId,
+ peers: Vec,
+}
+
+impl Ticket {
+ /// Deserializes from bytes.
+ fn from_bytes(bytes: &[u8]) -> anyhow::Result {
+ postcard::from_bytes(bytes).map_err(Into::into)
+ }
+ /// Serializes to bytes.
+ pub fn to_bytes(&self) -> Vec {
+ postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible")
+ }
+}
+
+impl FromStr for Ticket {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result {
+ let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?;
+ Self::from_bytes(&bytes)
+ }
+}
+impl std::fmt::Display for Ticket {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]);
+ text.make_ascii_lowercase();
+ write!(f, "{}", text)
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SignedMessage {
+ from: PublicKey,
+ data: Bytes,
+ signature: Signature,
+}
+
+impl SignedMessage {
+ pub fn verify_and_decode(bytes: &[u8]) -> anyhow::Result<(PublicKey, Message)> {
+ let signed_message: Self = postcard::from_bytes(bytes)?;
+ let key: PublicKey = signed_message.from;
+ key.verify(&signed_message.data, &signed_message.signature)?;
+ let message: Message = postcard::from_bytes(&signed_message.data)?;
+ Ok((signed_message.from, message))
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Message {
+ hostname: String,
+ online: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct MessageLog {
+ timestamp: u64,
+ message: Message,
+}
+impl From for MessageLog {
+ fn from(message: Message) -> Self {
+ let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
+ Self { timestamp, message }
+ }
+}