Compare commits

...

11 Commits

Author SHA1 Message Date
mirsal 732cc8fd62
Refactor peer discovery
* Move the discovery beacon codec to the discovery module
 * Move discovery::send and discovery::recv to static methods
 * Build the mock local Peer instance outside of discovery::send
2021-04-07 18:00:35 +00:00
mirsal 44f0d75d41 Merge pull request 'Add logging and filter out own beacons' (#16) from log into main
Reviewed-on: ryzokuken/cosmoline#16
2021-04-07 07:25:11 +00:00
Ujjwal Sharma ac40319249
src: filter out own beacons 2021-04-07 11:32:15 +05:30
Ujjwal Sharma ad7db7481d
src: add minimal logging 2021-04-07 11:32:14 +05:30
mirsal 59c4b4d37b Merge pull request 'Various networking improvements' (#10) from refactor-peer into main
Reviewed-on: ryzokuken/cosmoline#10
2021-04-07 03:58:09 +00:00
Ujjwal Sharma 9dbe2d0f24
src: remove raw logging 2021-04-04 09:16:08 +05:30
Ujjwal Sharma c80a600845
src: use channel for discovery, filter uniques 2021-04-04 09:15:48 +05:30
Ujjwal Sharma 808f979f06
src: break network into peer and discovery 2021-04-04 09:11:22 +05:30
Ujjwal Sharma 500198e593
src: create Peer before sending packet 2021-04-02 08:08:07 +05:30
Ujjwal Sharma 5ad54fd523
src: misc formatting fixes 2021-04-02 06:25:14 +05:30
Ujjwal Sharma f9fed6013d
src: update Peer to allow multiple addresses 2021-04-01 22:18:10 +05:30
6 changed files with 235 additions and 104 deletions

21
Cargo.lock generated
View File

@ -314,7 +314,9 @@ dependencies = [
"clap",
"dirs",
"ed25519-dalek",
"env_logger",
"json",
"log",
"rand",
"regex",
"toml",
@ -412,6 +414,19 @@ dependencies = [
"zeroize",
]
[[package]]
name = "env_logger"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "event-listener"
version = "2.5.1"
@ -521,6 +536,12 @@ dependencies = [
"libc",
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "indexmap"
version = "1.6.2"

View File

@ -16,6 +16,8 @@ json = "0.12.4"
base64 = "0.13.0"
regex = "1.4.5"
async-trait = "0.1.7"
log = "0.4.14"
env_logger = "0.8.3"
[dependencies.async-std]
version = "1.6.2"

124
src/discovery.rs Normal file
View File

@ -0,0 +1,124 @@
use async_std::net::{UdpSocket, IpAddr};
use async_std::sync::Arc;
use async_std::task;
use ed25519_dalek::PublicKey;
use std::collections::HashSet;
use crate::keypair::SSBPublicKey;
use crate::peer::{Peer, Address, Protocol, Handshake};
pub struct Server;
impl Server {
pub async fn recv(sender: async_std::channel::Sender<Peer>, pubkey: Arc<PublicKey>) {
let socket = UdpSocket::bind(":::8008").await.unwrap();
let mut buf = [0u8; 1024];
let mut old = HashSet::new();
loop {
let (amt, _) = socket.recv_from(&mut buf).await.unwrap();
let buf = &mut buf[..amt];
let packet = String::from_utf8(buf.to_vec()).unwrap();
log::debug!("Recieved discovery beacon: {}", packet);
let peer = Codec::decode(packet.as_str());
if peer.key.to_base64() != pubkey.to_base64() && !old.contains(&peer) {
old.insert(peer.clone());
sender.send(peer).await.unwrap();
}
}
}
pub async fn send(peer: Arc<Peer>) {
let socket = UdpSocket::bind(":::0").await.unwrap();
let msg = Codec::encode(Arc::try_unwrap(peer).unwrap());
let buf = msg.as_bytes();
socket.set_broadcast(true).unwrap();
loop {
log::debug!("Sending discovery beacon");
socket.send_to(&buf, "255.255.255.255:8008").await.unwrap();
socket.send_to(&buf, "[FF02::1]:8008").await.unwrap();
task::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
pub struct Codec;
impl Codec {
// TODO: do this properly
pub fn encode(peer: Peer) -> String {
peer.addresses
.iter()
.map(|address| {
let proto = match address.protocol {
Protocol::Net => "net",
Protocol::Ws => "ws",
Protocol::Wss => "wss",
};
let hs = match address.handshake {
Handshake::Shs => "shs",
Handshake::Shs2 => "shs2",
};
format!(
"{}:{}:{}~{}:{}",
proto,
address.host,
address.port,
hs,
peer.key.to_base64(),
)
})
.collect::<Vec<String>>()
.join(";")
}
// TODO: do this properly
pub fn decode(packet: &str) -> Peer {
let mut key = Option::None;
let addresses = packet
.split(';')
.map(|address| {
let mut address = address.splitn(2, '~');
let mut network = address.next().unwrap().splitn(3, ':');
let protocol = match network.next().unwrap() {
"net" => Protocol::Net,
"ws" => Protocol::Ws,
"wss" => Protocol::Wss,
_ => panic!("unknown protocol"),
};
let host = IpAddr::V4(network.next().unwrap().parse().unwrap());
let port = network.next().unwrap().parse().unwrap();
let mut info = address.next().unwrap().splitn(2, ':');
let handshake = match info.next().unwrap() {
"shs" => Handshake::Shs,
"shs2" => Handshake::Shs2,
_ => panic!("unknown handshake"),
};
let pubkey = SSBPublicKey::from_base64(info.next().unwrap());
if key == Option::None {
key = Some(pubkey);
} else if key.unwrap() != pubkey {
panic!("unexpected pubkey");
}
Address {
protocol,
host,
port,
handshake,
}
})
.collect();
Peer {
addresses,
key: key.unwrap(),
}
}
}

View File

@ -1,18 +1,19 @@
use async_std::{fs, task};
use async_std::path::PathBuf;
use async_std::sync::Arc;
use async_std::{fs, task};
use clap::{load_yaml, App};
use ed25519_dalek::Keypair;
use ed25519_dalek::{Keypair, PublicKey};
mod discovery;
mod keypair;
mod peer;
use peer::{Peer, Address, Protocol, Handshake};
use keypair::{SSBKeypair, SSBPublicKey};
mod network;
type Config = toml::map::Map<String, toml::Value>;
#[async_std::main]
async fn main() {
env_logger::init();
let options = load_yaml!("options.yaml");
let options = App::from(options).get_matches();
@ -26,7 +27,7 @@ async fn main() {
),
};
let config = fs::read_to_string(config_file).await.unwrap();
let config: Config = toml::from_str(config.as_str()).unwrap();
let config: toml::map::Map<String, toml::Value> = toml::from_str(config.as_str()).unwrap();
let path = match options.value_of("path") {
Some(path) => PathBuf::from(path),
@ -36,10 +37,21 @@ async fn main() {
},
};
let keypair = Keypair::read_or_generate(path.join("secret")).await;
println!("{}", keypair.to_json().pretty(2));
let peer = Peer::new(
Vec::from([Address::new(
Protocol::Net,
"1.2.3.4".parse().unwrap(),
8023,
Handshake::Shs,
)]),
PublicKey::from_base64(keypair.public.to_base64().as_str()),
);
let pubkey = keypair.public.to_base64();
let (psend, precv) = async_std::channel::unbounded();
task::spawn(discovery::Server::recv(psend, Arc::new(keypair.public)));
task::spawn(discovery::Server::send(Arc::new(peer)));
task::spawn(network::peer_discovery_recv());
task::spawn(network::peer_discovery_send(Arc::new(pubkey))).await;
while let Ok(peer) = precv.recv().await {
log::info!("New peer found: {}", discovery::Codec::encode(peer));
};
}

View File

@ -1,93 +0,0 @@
use async_std::task;
use async_std::net::{IpAddr, UdpSocket};
use async_std::sync::Arc;
use ed25519_dalek::PublicKey;
use crate::keypair::SSBPublicKey;
enum Protocol {
Net,
Ws,
Wss,
}
pub struct Peer {
protocol: Protocol,
host: IpAddr,
port: u16,
pubkey: PublicKey,
}
impl Peer {
pub fn to_discovery_packet(&self) -> String {
let proto = match self.protocol {
Protocol::Net => "net",
Protocol::Ws => "ws",
Protocol::Wss => "wss",
};
format!(
"{}:{}:{}~shs:{}",
proto,
self.host,
self.port,
self.pubkey.to_base64()
)
}
pub fn from_discovery_packet(packet: &str) -> Self {
let mut packet = packet.splitn(4, ':');
let protocol = match packet.next().unwrap() {
"net" => Protocol::Net,
"ws" => Protocol::Ws,
"wss" => Protocol::Wss,
_ => panic!("unknown protocol"),
};
let host = IpAddr::V4(packet.next().unwrap().parse().unwrap());
let port = packet
.next()
.unwrap()
.splitn(2, '~')
.next()
.unwrap()
.parse()
.unwrap();
let pubkey = SSBPublicKey::from_base64(packet.next().unwrap());
Peer {
protocol,
host,
port,
pubkey,
}
}
}
pub async fn peer_discovery_recv() {
let socket = UdpSocket::bind(":::8008").await.unwrap();
let mut buf = [0u8; 1024];
loop {
let (amt, peer) = socket.recv_from(&mut buf).await.unwrap();
let buf = &mut buf[..amt];
let packet = String::from_utf8(buf.to_vec()).unwrap();
println!(
"{} {}",
peer,
Peer::from_discovery_packet(&packet).to_discovery_packet()
);
}
}
pub async fn peer_discovery_send(pubkey: Arc<String>) {
let socket = UdpSocket::bind(":::0").await.unwrap();
let msg = format!("net:1.2.3.4:8023~shs:{}", &pubkey);
let buf = msg.as_bytes();
socket.set_broadcast(true).unwrap();
loop {
println!("Sending packet: {:?}", &msg);
socket.send_to(&buf, "255.255.255.255:8008").await.unwrap();
socket.send_to(&buf, "[FF02::1]:8008").await.unwrap();
task::sleep(std::time::Duration::from_secs(1)).await;
}
}

65
src/peer.rs Normal file
View File

@ -0,0 +1,65 @@
use async_std::net::IpAddr;
use ed25519_dalek::PublicKey;
use std::hash::{Hash, Hasher};
#[derive(Clone)]
#[derive(Debug)]
pub enum Protocol {
Net,
Ws,
Wss,
}
#[derive(Clone)]
#[derive(Debug)]
pub enum Handshake {
Shs,
Shs2,
}
#[derive(Clone)]
#[derive(Debug)]
pub struct Address {
pub protocol: Protocol,
pub host: IpAddr,
pub port: u16,
pub handshake: Handshake,
}
impl Address {
pub fn new(protocol: Protocol, host: IpAddr, port: u16, handshake: Handshake) -> Self {
Self {
protocol,
host,
port,
handshake,
}
}
}
#[derive(Clone)]
#[derive(Debug)]
pub struct Peer {
pub addresses: Vec<Address>,
pub key: PublicKey,
}
impl Peer {
pub fn new(addresses: Vec<Address>, key: PublicKey) -> Self {
Self { addresses, key }
}
}
impl Hash for Peer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.key.to_bytes().hash(state);
}
}
impl std::cmp::PartialEq for Peer {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl std::cmp::Eq for Peer {}