Merge pull request #2 from besterprotocol/feature/river_stream
Feature/river stream
This commit is contained in:
commit
5d9c978629
|
@ -3,7 +3,7 @@ bformat
|
|||
|
||||
[![D](https://github.com/besterprotocol/bformat/actions/workflows/d.yml/badge.svg)](https://github.com/besterprotocol/bformat/actions/workflows/d.yml)
|
||||
|
||||
A simple message format for automatically length-prefixing messages over any [`Socket`](https://dlang.org/phobos/std_socket.html#.Socket).
|
||||
A simple message format for automatically length-prefixing messages over any [`Socket`](https://dlang.org/phobos/std_socket.html#.Socket) or [River-based](https://github.com/deavmi/river) `Stream`.
|
||||
|
||||
## What is bformat?
|
||||
|
||||
|
|
7
dub.json
7
dub.json
|
@ -2,9 +2,12 @@
|
|||
"authors": [
|
||||
"Tristan B. Velloza Kildaire"
|
||||
],
|
||||
"homepage": "https://deavmi.assigned.network/projects/bformat/",
|
||||
"copyright": "Copyright © 2023, Tristan B. Kildaire",
|
||||
"description": "Bformat message encoder/decoder for streams and sockets",
|
||||
"dependencies": {
|
||||
"river": "~>0.3.6"
|
||||
},
|
||||
"description": "bformat socket writer and reader",
|
||||
"homepage": "https://deavmi.assigned.network/projects/bformat/",
|
||||
"license": "LGPL v3",
|
||||
"name": "bformat",
|
||||
"targetType": "library"
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
/**
|
||||
* Socket encoding/decoding functions
|
||||
*/
|
||||
module bformat.client;
|
||||
|
||||
import std.socket : Socket;
|
||||
import river.core;
|
||||
import river.impls.sock : SockStream;
|
||||
|
||||
/**
|
||||
* Bformat client to encode and decode via a
|
||||
* `Socket` or river-based `Stream`
|
||||
*/
|
||||
public class BClient
|
||||
{
|
||||
/**
|
||||
* Underlying stream
|
||||
*/
|
||||
private Stream stream;
|
||||
|
||||
/**
|
||||
* Constructs a new `BClient` for encoding and decoding
|
||||
* to and from the provided `Socket`
|
||||
*
|
||||
* Params:
|
||||
* socket = the `Socket` to use for writing and reading
|
||||
*/
|
||||
this(Socket socket)
|
||||
{
|
||||
this(new SockStream(socket));
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new `BClient` for encoding and decoding
|
||||
* to and from the provided river-based `Stream`
|
||||
*
|
||||
* Params:
|
||||
* stream = the `Stream` to use for writing and reading
|
||||
*/
|
||||
this(Stream stream)
|
||||
{
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a message from the provided socket
|
||||
* by decoding the streamed bytes into bformat
|
||||
* and finally placing the resulting payload in
|
||||
* the provided array
|
||||
*
|
||||
* Params:
|
||||
* originator = the socket to receive from
|
||||
* receiveMessage = the nbuffer to receive into
|
||||
*
|
||||
* Returns: true if the receive succeeded, false otheriwse
|
||||
*/
|
||||
public bool receiveMessage(ref byte[] receiveMessage)
|
||||
{
|
||||
/* Construct a buffer to receive into */
|
||||
byte[] receiveBuffer;
|
||||
|
||||
/* Get the length of the message */
|
||||
byte[4] messageLengthBytes;
|
||||
|
||||
try
|
||||
{
|
||||
stream.readFully(messageLengthBytes);
|
||||
}
|
||||
catch(StreamException streamErr)
|
||||
{
|
||||
/* If there was an error reading from the socket */
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/* Response message length */
|
||||
uint messageLength;
|
||||
|
||||
/* Little endian version you simply read if off the bone (it's already in the correct order) */
|
||||
version(LittleEndian)
|
||||
{
|
||||
messageLength = *cast(int*)messageLengthBytes.ptr;
|
||||
}
|
||||
|
||||
/* Big endian requires we byte-sapped the little-endian encoded number */
|
||||
version(BigEndian)
|
||||
{
|
||||
byte[] swappedLength;
|
||||
swappedLength.length = 4;
|
||||
|
||||
swappedLength[0] = messageLengthBytes[3];
|
||||
swappedLength[1] = messageLengthBytes[2];
|
||||
swappedLength[2] = messageLengthBytes[1];
|
||||
swappedLength[3] = messageLengthBytes[0];
|
||||
|
||||
messageLength = *cast(int*)swappedLength.ptr;
|
||||
}
|
||||
|
||||
|
||||
/* Read the full message */
|
||||
receiveBuffer.length = messageLength;
|
||||
try
|
||||
{
|
||||
stream.readFully(receiveBuffer);
|
||||
receiveMessage = receiveBuffer;
|
||||
|
||||
/* If there was no error receiving the message */
|
||||
return true;
|
||||
}
|
||||
catch(StreamException streamErr)
|
||||
{
|
||||
/* If there was an error reading from the socket */
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes the provided message into the bformat format
|
||||
* and sends it over the provided socket
|
||||
*
|
||||
* Params:
|
||||
* recipient = the socket to send over
|
||||
* message = the message to encode and send
|
||||
*
|
||||
* Returns: true if the send succeeded, false otherwise
|
||||
*/
|
||||
public bool sendMessage(byte[] message)
|
||||
{
|
||||
/* The message buffer */
|
||||
byte[] messageBuffer;
|
||||
|
||||
import bformat.marshall : encodeBformat;
|
||||
messageBuffer = encodeBformat(message);
|
||||
|
||||
try
|
||||
{
|
||||
/* Send the message */
|
||||
stream.writeFully(messageBuffer);
|
||||
|
||||
return true;
|
||||
}
|
||||
catch(StreamException streamError)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
version(unittest)
|
||||
{
|
||||
import std.socket;
|
||||
import core.thread;
|
||||
import std.stdio;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a server that encodes a message to the client
|
||||
* and then let the client decode it from us; both making
|
||||
* use of `BClient` to accomplish this
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
UnixAddress unixAddr = new UnixAddress("/tmp/bformatServer.sock");
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
import std.stdio;
|
||||
remove(cast(char*)unixAddr.path());
|
||||
}
|
||||
|
||||
Socket serverSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
|
||||
serverSocket.bind(unixAddr);
|
||||
serverSocket.listen(0);
|
||||
|
||||
class ServerThread : Thread
|
||||
{
|
||||
private Socket servSock;
|
||||
|
||||
this(Socket servSock)
|
||||
{
|
||||
this.servSock = servSock;
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
Socket clientSock = servSock.accept();
|
||||
|
||||
BClient bClient = new BClient(clientSock);
|
||||
|
||||
byte[] message = cast(byte[])"ABBA";
|
||||
bClient.sendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
Thread serverThread = new ServerThread(serverSocket);
|
||||
serverThread.start();
|
||||
|
||||
Socket client = new Socket(AddressFamily.UNIX, SocketType.STREAM);
|
||||
client.connect(unixAddr);
|
||||
BClient bClient = new BClient(client);
|
||||
|
||||
byte[] receivedMessage;
|
||||
bClient.receiveMessage(receivedMessage);
|
||||
assert(receivedMessage == "ABBA");
|
||||
writeln(receivedMessage);
|
||||
writeln(cast(string)receivedMessage);
|
||||
}
|
|
@ -4,18 +4,11 @@
|
|||
module bformat;
|
||||
|
||||
/**
|
||||
* Encodes the provided message into the bformat format
|
||||
* and sends it over the provided socket
|
||||
* Provides a client which consumes a stream
|
||||
* which can encode and decode messages to
|
||||
* and from it
|
||||
*/
|
||||
public import bformat.sockets : BClient;
|
||||
|
||||
/**
|
||||
* Receives a message from the provided socket
|
||||
* by decoding the streamed bytes into bformat
|
||||
* and finally placing the resulting payload in
|
||||
* the provided array
|
||||
*/
|
||||
// public import bformat.sockets : receiveMessage;
|
||||
public import bformat.client : BClient;
|
||||
|
||||
/**
|
||||
* Encodes the provided message into the bformat format
|
||||
|
|
|
@ -1,154 +0,0 @@
|
|||
/**
|
||||
* Socket encoding/decoding functions
|
||||
*/
|
||||
module bformat.sockets;
|
||||
|
||||
import std.socket : Socket, SocketFlags, MSG_WAITALL;
|
||||
|
||||
public class BClient
|
||||
{
|
||||
/**
|
||||
* Underlying socket
|
||||
*/
|
||||
private Socket socket;
|
||||
|
||||
// TODO: comment
|
||||
this(Socket socket)
|
||||
{
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a message from the provided socket
|
||||
* by decoding the streamed bytes into bformat
|
||||
* and finally placing the resulting payload in
|
||||
* the provided array
|
||||
*
|
||||
* Params:
|
||||
* originator = the socket to receive from
|
||||
* receiveMessage = the nbuffer to receive into
|
||||
*
|
||||
* Returns: true if the receive succeeded, false otheriwse
|
||||
*/
|
||||
public bool receiveMessage(ref byte[] receiveMessage)
|
||||
{
|
||||
/* Construct a buffer to receive into */
|
||||
byte[] receiveBuffer;
|
||||
|
||||
bool status = true;
|
||||
|
||||
|
||||
/* The amount of bytes received */
|
||||
long bytesReceived;
|
||||
|
||||
/* Get the length of the message */
|
||||
byte[4] messageLengthBytes;
|
||||
bytesReceived = socket.receive(messageLengthBytes, cast(SocketFlags)MSG_WAITALL);
|
||||
|
||||
/* If there was an error reading from the socket */
|
||||
if(!(bytesReceived > 0))
|
||||
{
|
||||
status = false;
|
||||
}
|
||||
/* If the receive was successful */
|
||||
else
|
||||
{
|
||||
/* Response message length */
|
||||
uint messageLength;
|
||||
|
||||
/* Little endian version you simply read if off the bone (it's already in the correct order) */
|
||||
version(LittleEndian)
|
||||
{
|
||||
messageLength = *cast(int*)messageLengthBytes.ptr;
|
||||
}
|
||||
|
||||
/* Big endian requires we byte-sapped the little-endian encoded number */
|
||||
version(BigEndian)
|
||||
{
|
||||
byte[] swappedLength;
|
||||
swappedLength.length = 4;
|
||||
|
||||
swappedLength[0] = messageLengthBytes[3];
|
||||
swappedLength[1] = messageLengthBytes[2];
|
||||
swappedLength[2] = messageLengthBytes[1];
|
||||
swappedLength[3] = messageLengthBytes[0];
|
||||
|
||||
messageLength = *cast(int*)swappedLength.ptr;
|
||||
}
|
||||
|
||||
|
||||
/* Read the full message */
|
||||
receiveBuffer.length = messageLength;
|
||||
bytesReceived = socket.receive(receiveBuffer, cast(SocketFlags)MSG_WAITALL);
|
||||
|
||||
/* If there was an error reading from the socket */
|
||||
if(!(bytesReceived > 0))
|
||||
{
|
||||
status = false;
|
||||
}
|
||||
/* If there was no error receiving the message */
|
||||
else
|
||||
{
|
||||
receiveMessage = receiveBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes the provided message into the bformat format
|
||||
* and sends it over the provided socket
|
||||
*
|
||||
* Params:
|
||||
* recipient = the socket to send over
|
||||
* message = the message to encode and send
|
||||
*
|
||||
* Returns: true if the send succeeded, false otherwise
|
||||
*/
|
||||
public bool sendMessage(byte[] message)
|
||||
{
|
||||
/* The message buffer */
|
||||
byte[] messageBuffer;
|
||||
|
||||
/* Encode the 4 byte message length header (little endian) */
|
||||
int payloadLength = cast(int)message.length;
|
||||
byte* lengthBytes = cast(byte*)&payloadLength;
|
||||
|
||||
/* On little endian simply get the bytes as is (it would be encoded as little endian) */
|
||||
version(LittleEndian)
|
||||
{
|
||||
messageBuffer ~= *(lengthBytes+0);
|
||||
messageBuffer ~= *(lengthBytes+1);
|
||||
messageBuffer ~= *(lengthBytes+2);
|
||||
messageBuffer ~= *(lengthBytes+3);
|
||||
}
|
||||
|
||||
/* On Big Endian you must swap the big-endian-encoded number to be in little endian ordering */
|
||||
version(BigEndian)
|
||||
{
|
||||
messageBuffer ~= *(lengthBytes+3);
|
||||
messageBuffer ~= *(lengthBytes+2);
|
||||
messageBuffer ~= *(lengthBytes+1);
|
||||
messageBuffer ~= *(lengthBytes+0);
|
||||
}
|
||||
|
||||
|
||||
/* Add the message to the buffer */
|
||||
messageBuffer ~= cast(byte[])message;
|
||||
|
||||
/* Send the message */
|
||||
long bytesSent = socket.send(messageBuffer);
|
||||
|
||||
/* TODO: Compact this */
|
||||
return bytesSent > 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue