feat: p2p monitoring

This commit is contained in:
Matthew Esposito 2025-04-17 18:44:27 -04:00
parent 051b63c6de
commit 3f0b27604b
4 changed files with 54 additions and 10 deletions

View file

@ -18,6 +18,7 @@ use std::{io, result::Result};
use crate::dbg_msg; use crate::dbg_msg;
use crate::oauth::{force_refresh_token, token_daemon, Oauth}; use crate::oauth::{force_refresh_token, token_daemon, Oauth};
use crate::p2p::ONLINE;
use crate::server::RequestExt; use crate::server::RequestExt;
use crate::utils::{format_url, Post}; use crate::utils::{format_url, Post};
@ -466,6 +467,7 @@ pub async fn json(path: String, quarantine: bool) -> Result<Value, String> {
if status.is_server_error() { if status.is_server_error() {
Err("Reddit is having issues, check if there's an outage".to_string()) Err("Reddit is having issues, check if there's an outage".to_string())
} else { } else {
ONLINE.store(false, Ordering::SeqCst);
err("Failed to parse page JSON data", e.to_string(), path) err("Failed to parse page JSON data", e.to_string(), path)
} }
} }

View file

@ -4,6 +4,7 @@ pub mod duplicates;
pub mod instance_info; pub mod instance_info;
pub mod oauth; pub mod oauth;
pub mod oauth_resources; pub mod oauth_resources;
pub mod p2p;
pub mod post; pub mod post;
pub mod search; pub mod search;
pub mod server; pub mod server;
@ -11,4 +12,3 @@ pub mod settings;
pub mod subreddit; pub mod subreddit;
pub mod user; pub mod user;
pub mod utils; pub mod utils;
pub mod p2p;

View file

@ -4,7 +4,9 @@
use cached::proc_macro::cached; use cached::proc_macro::cached;
use clap::{Arg, ArgAction, Command}; use clap::{Arg, ArgAction, Command};
use redlib::p2p::ONLINE;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::Ordering;
use futures_lite::FutureExt; use futures_lite::FutureExt;
use hyper::Uri; use hyper::Uri;
@ -227,6 +229,16 @@ async fn main() {
} }
} }
// Manual overrides for online value
app.at("/force_offline").get(|_| {
ONLINE.store(false, Ordering::SeqCst);
resource("", "text/plain", false).boxed()
});
app.at("/force_online").get(|_| {
ONLINE.store(true, Ordering::SeqCst);
resource("", "text/plain", false).boxed()
});
// Read static files // Read static files
app.at("/style.css").get(|_| style().boxed()); app.at("/style.css").get(|_| style().boxed());
app app

View file

@ -1,4 +1,7 @@
use std::str::FromStr; use std::{
str::FromStr,
sync::atomic::{AtomicBool, Ordering},
};
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
@ -6,19 +9,18 @@ use ed25519_dalek::Signature;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey}; use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey};
use iroh_gossip::{ use iroh_gossip::{
net::{Event, Gossip, GossipEvent, GossipReceiver}, net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender},
proto::TopicId, proto::TopicId,
ALPN as GOSSIP_ALPN, ALPN as GOSSIP_ALPN,
}; };
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::task; use tokio::{task, time::sleep};
use crate::config; use crate::config;
static TICKET: &str = ""; pub static DASHMAP: Lazy<DashMap<String, bool>> = Lazy::new(DashMap::new);
pub static ONLINE: Lazy<AtomicBool> = Lazy::new(AtomicBool::default);
static DASHMAP: Lazy<DashMap<String, bool>> = Lazy::new(DashMap::new);
pub async fn main() -> Result<(), Box<dyn std::error::Error>> { pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let endpoint = Endpoint::builder().discovery_n0().bind().await?; let endpoint = Endpoint::builder().discovery_n0().bind().await?;
@ -27,8 +29,19 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?; let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?;
let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?; let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?;
let (topic, peers) = { // there are two ways to run the p2p chat
let Ticket { topic, peers } = Ticket::from_str(TICKET)?; // 1. "bootstrap" mode - this requires REDLIB_P2P_BOOTSTRAP=true and REDLIB_P2P_TOPIC set to the topic ID you want to use
// in this mode, the node will create the topic and start a chat room
// 2. "join" mode - this requires REDLIB_P2P_BOOTSTRAP=false (or unset) and REDLIB_P2P_TICKET set to a ticket.
// in this mode, the node will join the existing chat room with the given topic ID, and connect to the peers listed
// in the ticket
let (topic, peers) = if std::env::var("REDLIB_P2P_BOOTSTRAP").unwrap_or_default() == "true" {
let topic = std::env::var("REDLIB_P2P_TOPIC").map(|s| TopicId::from_str(&s).unwrap()).unwrap();
println!("> opening chat room for topic {topic}");
(topic, vec![])
} else {
let ticket_str = std::env::var("REDLIB_P2P_TICKET").expect("REDLIB_P2P_TICKET not set");
let Ticket { topic, peers } = Ticket::from_str(&ticket_str)?;
println!("> joining chat room for topic {topic}"); println!("> joining chat room for topic {topic}");
(topic, peers) (topic, peers)
}; };
@ -53,15 +66,19 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split(); let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
println!("> connected!"); println!("> connected!");
let secret_key = endpoint.secret_key().clone();
let message = Message { let message = Message {
hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(),
online: true, online: true,
}; };
let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message)?;
sender.broadcast(encoded_message).await?; sender.broadcast(encoded_message).await?;
task::spawn(subscribe_loop(receiver)); task::spawn(subscribe_loop(receiver));
task::spawn(sender_loop(sender, secret_key));
Ok(()) Ok(())
} }
@ -81,6 +98,19 @@ async fn subscribe_loop(mut receiver: GossipReceiver) {
} }
} }
async fn sender_loop(sender: GossipSender, secret_key: SecretKey) {
loop {
let message = Message {
hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(),
online: ONLINE.load(Ordering::SeqCst),
};
let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message).unwrap();
let _ = sender.broadcast(encoded_message).await;
sleep(std::time::Duration::from_secs(10)).await;
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Ticket { struct Ticket {
topic: TopicId, topic: TopicId,