From f9fed6013dae3baa421889576c4dc56d5bac2bc0 Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Thu, 1 Apr 2021 22:18:10 +0530 Subject: [PATCH 1/6] src: update Peer to allow multiple addresses --- src/network.rs | 112 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 35 deletions(-) diff --git a/src/network.rs b/src/network.rs index 230c8e7..016d233 100644 --- a/src/network.rs +++ b/src/network.rs @@ -11,52 +11,94 @@ enum Protocol { Wss, } -pub struct Peer { +enum Handshake { + Shs, + Shs2, +} + +struct Address { protocol: Protocol, host: IpAddr, port: u16, - pubkey: PublicKey, + handshake: Handshake, +} + +pub struct Peer { + addresses: Vec
, + key: PublicKey, } impl Peer { + // TODO: do this properly pub fn to_discovery_packet(&self) -> String { - let proto = match self.protocol { - Protocol::Net => "net", - Protocol::Ws => "ws", - Protocol::Wss => "wss", - }; - format!( - "{}:{}:{}~shs:{}", - proto, - self.host, - self.port, - self.pubkey.to_base64() - ) + let parts: Vec = self + .addresses + .iter() + .map(|address| { + let proto = match address.protocol { + Protocol::Net => "net", + Protocol::Ws => "ws", + Protocol::Wss => "wss", + }; + let hs = match address.handshake { + Handshake::Shs => "shs", + Handshake::Shs2 => "shs2", + }; + format!( + "{}:{}:{}~{}:{}", + proto, + address.host, + address.port, + hs, + self.key.to_base64(), + ) + }) + .collect(); + parts.join(";") } + // TODO: do this properly pub fn from_discovery_packet(packet: &str) -> Self { - let mut packet = packet.splitn(4, ':'); - let protocol = match packet.next().unwrap() { - "net" => Protocol::Net, - "ws" => Protocol::Ws, - "wss" => Protocol::Wss, - _ => panic!("unknown protocol"), - }; - let host = IpAddr::V4(packet.next().unwrap().parse().unwrap()); - let port = packet - .next() - .unwrap() - .splitn(2, '~') - .next() - .unwrap() - .parse() - .unwrap(); - let pubkey = SSBPublicKey::from_base64(packet.next().unwrap()); + let mut key = Option::None; + let addresses = packet + .split(';') + .map(|address| { + let mut address = address.splitn(2, '~'); + + let mut network = address.next().unwrap().splitn(3, ':'); + let protocol = match network.next().unwrap() { + "net" => Protocol::Net, + "ws" => Protocol::Ws, + "wss" => Protocol::Wss, + _ => panic!("unknown protocol"), + }; + let host = IpAddr::V4(network.next().unwrap().parse().unwrap()); + let port = network.next().unwrap().parse().unwrap(); + + let mut info = address.next().unwrap().splitn(2, ':'); + let handshake = match info.next().unwrap() { + "shs" => Handshake::Shs, + "shs2" => Handshake::Shs2, + _ => panic!("unknown handshake"), + }; + let pubkey = SSBPublicKey::from_base64(info.next().unwrap()); + if key == Option::None { + key = Some(pubkey); + } else if key.unwrap() != pubkey { + panic!("unexpected pubkey"); + } + + Address { + protocol, + host, + port, + handshake, + } + }) + .collect(); Peer { - protocol, - host, - port, - pubkey, + addresses, + key: key.unwrap(), } } } -- 2.30.2 From 5ad54fd5230e506f310ec3113b3b68ab59b8c2fa Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Thu, 1 Apr 2021 22:19:03 +0530 Subject: [PATCH 2/6] src: misc formatting fixes --- src/main.rs | 6 ++---- src/network.rs | 9 ++++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 499e929..2ba9dca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ -use async_std::{fs, task}; use async_std::path::PathBuf; use async_std::sync::Arc; +use async_std::{fs, task}; use clap::{load_yaml, App}; use ed25519_dalek::Keypair; @@ -9,8 +9,6 @@ use keypair::{SSBKeypair, SSBPublicKey}; mod network; -type Config = toml::map::Map; - #[async_std::main] async fn main() { let options = load_yaml!("options.yaml"); @@ -26,7 +24,7 @@ async fn main() { ), }; let config = fs::read_to_string(config_file).await.unwrap(); - let config: Config = toml::from_str(config.as_str()).unwrap(); + let config: toml::map::Map = toml::from_str(config.as_str()).unwrap(); let path = match options.value_of("path") { Some(path) => PathBuf::from(path), diff --git a/src/network.rs b/src/network.rs index 016d233..3d5b922 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,6 +1,6 @@ -use async_std::task; use async_std::net::{IpAddr, UdpSocket}; use async_std::sync::Arc; +use async_std::task; use ed25519_dalek::PublicKey; use crate::keypair::SSBPublicKey; @@ -31,8 +31,7 @@ pub struct Peer { impl Peer { // TODO: do this properly pub fn to_discovery_packet(&self) -> String { - let parts: Vec = self - .addresses + self.addresses .iter() .map(|address| { let proto = match address.protocol { @@ -53,8 +52,8 @@ impl Peer { self.key.to_base64(), ) }) - .collect(); - parts.join(";") + .collect::>() + .join(";") } // TODO: do this properly -- 2.30.2 From 500198e59333d7339671cec69357ee7cfa80478c Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Fri, 2 Apr 2021 08:08:07 +0530 Subject: [PATCH 3/6] src: create Peer before sending packet --- src/network.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/network.rs b/src/network.rs index 3d5b922..dbfdc67 100644 --- a/src/network.rs +++ b/src/network.rs @@ -23,12 +23,27 @@ struct Address { handshake: Handshake, } +impl Address { + fn new(protocol: Protocol, host: IpAddr, port: u16, handshake: Handshake) -> Self { + Self { + protocol, + host, + port, + handshake, + } + } +} + pub struct Peer { addresses: Vec
, key: PublicKey, } impl Peer { + fn new(addresses: Vec
, key: PublicKey) -> Self { + Self { addresses, key } + } + // TODO: do this properly pub fn to_discovery_packet(&self) -> String { self.addresses @@ -120,7 +135,16 @@ pub async fn peer_discovery_recv() { pub async fn peer_discovery_send(pubkey: Arc) { let socket = UdpSocket::bind(":::0").await.unwrap(); - let msg = format!("net:1.2.3.4:8023~shs:{}", &pubkey); + let msg = Peer::new( + Vec::from([Address::new( + Protocol::Net, + "1.2.3.4".parse().unwrap(), + 8023, + Handshake::Shs, + )]), + PublicKey::from_base64(pubkey.as_str()), + ) + .to_discovery_packet(); let buf = msg.as_bytes(); socket.set_broadcast(true).unwrap(); -- 2.30.2 From 808f979f06769f2457d8f724e4a375e83297589a Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Fri, 2 Apr 2021 08:18:06 +0530 Subject: [PATCH 4/6] src: break network into peer and discovery --- src/discovery.rs | 47 ++++++++++++++++++++++++++++++++ src/main.rs | 9 ++++--- src/{network.rs => peer.rs} | 54 +++++-------------------------------- 3 files changed, 58 insertions(+), 52 deletions(-) create mode 100644 src/discovery.rs rename src/{network.rs => peer.rs} (66%) diff --git a/src/discovery.rs b/src/discovery.rs new file mode 100644 index 0000000..884be2e --- /dev/null +++ b/src/discovery.rs @@ -0,0 +1,47 @@ +use async_std::net::UdpSocket; +use async_std::sync::Arc; +use async_std::task; +use ed25519_dalek::PublicKey; + +use crate::keypair::SSBPublicKey; +use crate::peer::{Address, Handshake, Peer, Protocol}; + +pub async fn recv() { + let socket = UdpSocket::bind(":::8008").await.unwrap(); + let mut buf = [0u8; 1024]; + + loop { + let (amt, peer) = socket.recv_from(&mut buf).await.unwrap(); + let buf = &mut buf[..amt]; + let packet = String::from_utf8(buf.to_vec()).unwrap(); + println!( + "{} {}", + peer, + Peer::from_discovery_packet(&packet).to_discovery_packet() + ); + } +} + +pub async fn send(pubkey: Arc) { + let socket = UdpSocket::bind(":::0").await.unwrap(); + let msg = Peer::new( + Vec::from([Address::new( + Protocol::Net, + "1.2.3.4".parse().unwrap(), + 8023, + Handshake::Shs, + )]), + PublicKey::from_base64(pubkey.as_str()), + ) + .to_discovery_packet(); + let buf = msg.as_bytes(); + + socket.set_broadcast(true).unwrap(); + + loop { + println!("Sending packet: {:?}", &msg); + socket.send_to(&buf, "255.255.255.255:8008").await.unwrap(); + socket.send_to(&buf, "[FF02::1]:8008").await.unwrap(); + task::sleep(std::time::Duration::from_secs(1)).await; + } +} diff --git a/src/main.rs b/src/main.rs index 2ba9dca..33d1924 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,11 @@ use async_std::{fs, task}; use clap::{load_yaml, App}; use ed25519_dalek::Keypair; +mod discovery; mod keypair; -use keypair::{SSBKeypair, SSBPublicKey}; +mod peer; -mod network; +use keypair::{SSBKeypair, SSBPublicKey}; #[async_std::main] async fn main() { @@ -38,6 +39,6 @@ async fn main() { let pubkey = keypair.public.to_base64(); - task::spawn(network::peer_discovery_recv()); - task::spawn(network::peer_discovery_send(Arc::new(pubkey))).await; + task::spawn(discovery::recv()); + task::spawn(discovery::send(Arc::new(pubkey))).await; } diff --git a/src/network.rs b/src/peer.rs similarity index 66% rename from src/network.rs rename to src/peer.rs index dbfdc67..b45f4be 100644 --- a/src/network.rs +++ b/src/peer.rs @@ -1,22 +1,20 @@ -use async_std::net::{IpAddr, UdpSocket}; -use async_std::sync::Arc; -use async_std::task; +use async_std::net::IpAddr; use ed25519_dalek::PublicKey; use crate::keypair::SSBPublicKey; -enum Protocol { +pub enum Protocol { Net, Ws, Wss, } -enum Handshake { +pub enum Handshake { Shs, Shs2, } -struct Address { +pub struct Address { protocol: Protocol, host: IpAddr, port: u16, @@ -24,7 +22,7 @@ struct Address { } impl Address { - fn new(protocol: Protocol, host: IpAddr, port: u16, handshake: Handshake) -> Self { + pub fn new(protocol: Protocol, host: IpAddr, port: u16, handshake: Handshake) -> Self { Self { protocol, host, @@ -40,7 +38,7 @@ pub struct Peer { } impl Peer { - fn new(addresses: Vec
, key: PublicKey) -> Self { + pub fn new(addresses: Vec
, key: PublicKey) -> Self { Self { addresses, key } } @@ -116,43 +114,3 @@ impl Peer { } } } - -pub async fn peer_discovery_recv() { - let socket = UdpSocket::bind(":::8008").await.unwrap(); - let mut buf = [0u8; 1024]; - - loop { - let (amt, peer) = socket.recv_from(&mut buf).await.unwrap(); - let buf = &mut buf[..amt]; - let packet = String::from_utf8(buf.to_vec()).unwrap(); - println!( - "{} {}", - peer, - Peer::from_discovery_packet(&packet).to_discovery_packet() - ); - } -} - -pub async fn peer_discovery_send(pubkey: Arc) { - let socket = UdpSocket::bind(":::0").await.unwrap(); - let msg = Peer::new( - Vec::from([Address::new( - Protocol::Net, - "1.2.3.4".parse().unwrap(), - 8023, - Handshake::Shs, - )]), - PublicKey::from_base64(pubkey.as_str()), - ) - .to_discovery_packet(); - let buf = msg.as_bytes(); - - socket.set_broadcast(true).unwrap(); - - loop { - println!("Sending packet: {:?}", &msg); - socket.send_to(&buf, "255.255.255.255:8008").await.unwrap(); - socket.send_to(&buf, "[FF02::1]:8008").await.unwrap(); - task::sleep(std::time::Duration::from_secs(1)).await; - } -} -- 2.30.2 From c80a600845c4af030c7f5bf62b0c22b9f65834d2 Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Sun, 4 Apr 2021 09:15:48 +0530 Subject: [PATCH 5/6] src: use channel for discovery, filter uniques --- src/discovery.rs | 10 +++++++++- src/main.rs | 9 ++++++--- src/peer.rs | 19 +++++++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/discovery.rs b/src/discovery.rs index 884be2e..928e790 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -3,12 +3,15 @@ use async_std::sync::Arc; use async_std::task; use ed25519_dalek::PublicKey; +use std::collections::HashSet; + use crate::keypair::SSBPublicKey; use crate::peer::{Address, Handshake, Peer, Protocol}; -pub async fn recv() { +pub async fn recv(sender: async_std::channel::Sender) { let socket = UdpSocket::bind(":::8008").await.unwrap(); let mut buf = [0u8; 1024]; + let mut old = HashSet::new(); loop { let (amt, peer) = socket.recv_from(&mut buf).await.unwrap(); @@ -19,6 +22,11 @@ pub async fn recv() { peer, Peer::from_discovery_packet(&packet).to_discovery_packet() ); + let peer = Peer::from_discovery_packet(packet.as_str()); + if !old.contains(&peer) { + old.insert(peer.clone()); + sender.send(peer).await.unwrap(); + } } } diff --git a/src/main.rs b/src/main.rs index 33d1924..66b5a55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,8 +37,11 @@ async fn main() { let keypair = Keypair::read_or_generate(path.join("secret")).await; println!("{}", keypair.to_json().pretty(2)); - let pubkey = keypair.public.to_base64(); + let (psend, precv) = async_std::channel::unbounded(); + task::spawn(async move { discovery::recv(psend).await }); + task::spawn(discovery::send(Arc::new(keypair.public.to_base64()))); - task::spawn(discovery::recv()); - task::spawn(discovery::send(Arc::new(pubkey))).await; + while let Ok(peer) = precv.recv().await { + println!("{}", peer.to_discovery_packet()); + }; } diff --git a/src/peer.rs b/src/peer.rs index b45f4be..621792e 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,19 +1,24 @@ use async_std::net::IpAddr; use ed25519_dalek::PublicKey; +use std::hash::{Hash, Hasher}; + use crate::keypair::SSBPublicKey; +#[derive(Clone)] pub enum Protocol { Net, Ws, Wss, } +#[derive(Clone)] pub enum Handshake { Shs, Shs2, } +#[derive(Clone)] pub struct Address { protocol: Protocol, host: IpAddr, @@ -32,6 +37,7 @@ impl Address { } } +#[derive(Clone)] pub struct Peer { addresses: Vec
, key: PublicKey, @@ -114,3 +120,16 @@ impl Peer { } } } + +impl Hash for Peer { + fn hash(&self, state: &mut H) { + self.key.to_bytes().hash(state); + } +} + +impl std::cmp::PartialEq for Peer { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} +impl std::cmp::Eq for Peer {} -- 2.30.2 From 9dbe2d0f24f53b5a4454543b04b1cd9df51ad9f4 Mon Sep 17 00:00:00 2001 From: Ujjwal Sharma Date: Sun, 4 Apr 2021 09:16:08 +0530 Subject: [PATCH 6/6] src: remove raw logging --- src/discovery.rs | 8 +------- src/main.rs | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/discovery.rs b/src/discovery.rs index 928e790..c39e73b 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -14,14 +14,9 @@ pub async fn recv(sender: async_std::channel::Sender) { let mut old = HashSet::new(); loop { - let (amt, peer) = socket.recv_from(&mut buf).await.unwrap(); + let (amt, _) = socket.recv_from(&mut buf).await.unwrap(); let buf = &mut buf[..amt]; let packet = String::from_utf8(buf.to_vec()).unwrap(); - println!( - "{} {}", - peer, - Peer::from_discovery_packet(&packet).to_discovery_packet() - ); let peer = Peer::from_discovery_packet(packet.as_str()); if !old.contains(&peer) { old.insert(peer.clone()); @@ -47,7 +42,6 @@ pub async fn send(pubkey: Arc) { socket.set_broadcast(true).unwrap(); loop { - println!("Sending packet: {:?}", &msg); socket.send_to(&buf, "255.255.255.255:8008").await.unwrap(); socket.send_to(&buf, "[FF02::1]:8008").await.unwrap(); task::sleep(std::time::Duration::from_secs(1)).await; diff --git a/src/main.rs b/src/main.rs index 66b5a55..8edb2aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,6 @@ async fn main() { }, }; let keypair = Keypair::read_or_generate(path.join("secret")).await; - println!("{}", keypair.to_json().pretty(2)); let (psend, precv) = async_std::channel::unbounded(); task::spawn(async move { discovery::recv(psend).await }); -- 2.30.2