diff --git a/README.md b/README.md index e6532e7..14e023a 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ bformat [![D](https://github.com/besterprotocol/bformat/actions/workflows/d.yml/badge.svg)](https://github.com/besterprotocol/bformat/actions/workflows/d.yml) -A simple message format for automatically length-prefixing messages over any [`Socket`](https://dlang.org/phobos/std_socket.html#.Socket). +A simple message format for automatically length-prefixing messages over any [`Socket`](https://dlang.org/phobos/std_socket.html#.Socket) or [River-based](https://github.com/deavmi/river) `Stream`. ## What is bformat? diff --git a/dub.json b/dub.json index f160163..3d87244 100644 --- a/dub.json +++ b/dub.json @@ -2,9 +2,12 @@ "authors": [ "Tristan B. Velloza Kildaire" ], - "homepage": "https://deavmi.assigned.network/projects/bformat/", "copyright": "Copyright © 2023, Tristan B. Kildaire", - "description": "Bformat message encoder/decoder for streams and sockets", + "dependencies": { + "river": "~>0.3.6" + }, + "description": "bformat socket writer and reader", + "homepage": "https://deavmi.assigned.network/projects/bformat/", "license": "LGPL v3", "name": "bformat", "targetType": "library" diff --git a/source/bformat/client.d b/source/bformat/client.d new file mode 100644 index 0000000..01ae730 --- /dev/null +++ b/source/bformat/client.d @@ -0,0 +1,209 @@ +/** + * Socket encoding/decoding functions + */ +module bformat.client; + +import std.socket : Socket; +import river.core; +import river.impls.sock : SockStream; + +/** + * Bformat client to encode and decode via a + * `Socket` or river-based `Stream` + */ +public class BClient +{ + /** + * Underlying stream + */ + private Stream stream; + + /** + * Constructs a new `BClient` for encoding and decoding + * to and from the provided `Socket` + * + * Params: + * socket = the `Socket` to use for writing and reading + */ + this(Socket socket) + { + this(new SockStream(socket)); + } + + /** + * Constructs a new `BClient` for encoding and decoding + * to and from the provided river-based `Stream` + * + * Params: + * stream = the `Stream` to use for writing and reading + */ + this(Stream stream) + { + this.stream = stream; + } + + /** + * Receives a message from the provided socket + * by decoding the streamed bytes into bformat + * and finally placing the resulting payload in + * the provided array + * + * Params: + * originator = the socket to receive from + * receiveMessage = the nbuffer to receive into + * + * Returns: true if the receive succeeded, false otheriwse + */ + public bool receiveMessage(ref byte[] receiveMessage) + { + /* Construct a buffer to receive into */ + byte[] receiveBuffer; + + /* Get the length of the message */ + byte[4] messageLengthBytes; + + try + { + stream.readFully(messageLengthBytes); + } + catch(StreamException streamErr) + { + /* If there was an error reading from the socket */ + return false; + } + + + /* Response message length */ + uint messageLength; + + /* Little endian version you simply read if off the bone (it's already in the correct order) */ + version(LittleEndian) + { + messageLength = *cast(int*)messageLengthBytes.ptr; + } + + /* Big endian requires we byte-sapped the little-endian encoded number */ + version(BigEndian) + { + byte[] swappedLength; + swappedLength.length = 4; + + swappedLength[0] = messageLengthBytes[3]; + swappedLength[1] = messageLengthBytes[2]; + swappedLength[2] = messageLengthBytes[1]; + swappedLength[3] = messageLengthBytes[0]; + + messageLength = *cast(int*)swappedLength.ptr; + } + + + /* Read the full message */ + receiveBuffer.length = messageLength; + try + { + stream.readFully(receiveBuffer); + receiveMessage = receiveBuffer; + + /* If there was no error receiving the message */ + return true; + } + catch(StreamException streamErr) + { + /* If there was an error reading from the socket */ + return false; + } + } + + /** + * Encodes the provided message into the bformat format + * and sends it over the provided socket + * + * Params: + * recipient = the socket to send over + * message = the message to encode and send + * + * Returns: true if the send succeeded, false otherwise + */ + public bool sendMessage(byte[] message) + { + /* The message buffer */ + byte[] messageBuffer; + + import bformat.marshall : encodeBformat; + messageBuffer = encodeBformat(message); + + try + { + /* Send the message */ + stream.writeFully(messageBuffer); + + return true; + } + catch(StreamException streamError) + { + return false; + } + } + +} + +version(unittest) +{ + import std.socket; + import core.thread; + import std.stdio; +} + +/** + * Create a server that encodes a message to the client + * and then let the client decode it from us; both making + * use of `BClient` to accomplish this + */ +unittest +{ + UnixAddress unixAddr = new UnixAddress("/tmp/bformatServer.sock"); + + scope(exit) + { + import std.stdio; + remove(cast(char*)unixAddr.path()); + } + + Socket serverSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM); + serverSocket.bind(unixAddr); + serverSocket.listen(0); + + class ServerThread : Thread + { + private Socket servSock; + + this(Socket servSock) + { + this.servSock = servSock; + super(&worker); + } + + private void worker() + { + Socket clientSock = servSock.accept(); + + BClient bClient = new BClient(clientSock); + + byte[] message = cast(byte[])"ABBA"; + bClient.sendMessage(message); + } + } + + Thread serverThread = new ServerThread(serverSocket); + serverThread.start(); + + Socket client = new Socket(AddressFamily.UNIX, SocketType.STREAM); + client.connect(unixAddr); + BClient bClient = new BClient(client); + + byte[] receivedMessage; + bClient.receiveMessage(receivedMessage); + assert(receivedMessage == "ABBA"); + writeln(receivedMessage); + writeln(cast(string)receivedMessage); +} \ No newline at end of file diff --git a/source/bformat/package.d b/source/bformat/package.d index 3929555..924dd9c 100644 --- a/source/bformat/package.d +++ b/source/bformat/package.d @@ -4,18 +4,11 @@ module bformat; /** - * Encodes the provided message into the bformat format - * and sends it over the provided socket + * Provides a client which consumes a stream + * which can encode and decode messages to + * and from it */ -public import bformat.sockets : BClient; - -/** - * Receives a message from the provided socket - * by decoding the streamed bytes into bformat - * and finally placing the resulting payload in - * the provided array - */ -// public import bformat.sockets : receiveMessage; +public import bformat.client : BClient; /** * Encodes the provided message into the bformat format diff --git a/source/bformat/sockets.d b/source/bformat/sockets.d deleted file mode 100644 index 5bd95d7..0000000 --- a/source/bformat/sockets.d +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Socket encoding/decoding functions - */ -module bformat.sockets; - -import std.socket : Socket, SocketFlags, MSG_WAITALL; - -public class BClient -{ - /** - * Underlying socket - */ - private Socket socket; - - // TODO: comment - this(Socket socket) - { - this.socket = socket; - } - - /** - * Receives a message from the provided socket - * by decoding the streamed bytes into bformat - * and finally placing the resulting payload in - * the provided array - * - * Params: - * originator = the socket to receive from - * receiveMessage = the nbuffer to receive into - * - * Returns: true if the receive succeeded, false otheriwse - */ - public bool receiveMessage(ref byte[] receiveMessage) - { - /* Construct a buffer to receive into */ - byte[] receiveBuffer; - - bool status = true; - - - /* The amount of bytes received */ - long bytesReceived; - - /* Get the length of the message */ - byte[4] messageLengthBytes; - bytesReceived = socket.receive(messageLengthBytes, cast(SocketFlags)MSG_WAITALL); - - /* If there was an error reading from the socket */ - if(!(bytesReceived > 0)) - { - status = false; - } - /* If the receive was successful */ - else - { - /* Response message length */ - uint messageLength; - - /* Little endian version you simply read if off the bone (it's already in the correct order) */ - version(LittleEndian) - { - messageLength = *cast(int*)messageLengthBytes.ptr; - } - - /* Big endian requires we byte-sapped the little-endian encoded number */ - version(BigEndian) - { - byte[] swappedLength; - swappedLength.length = 4; - - swappedLength[0] = messageLengthBytes[3]; - swappedLength[1] = messageLengthBytes[2]; - swappedLength[2] = messageLengthBytes[1]; - swappedLength[3] = messageLengthBytes[0]; - - messageLength = *cast(int*)swappedLength.ptr; - } - - - /* Read the full message */ - receiveBuffer.length = messageLength; - bytesReceived = socket.receive(receiveBuffer, cast(SocketFlags)MSG_WAITALL); - - /* If there was an error reading from the socket */ - if(!(bytesReceived > 0)) - { - status = false; - } - /* If there was no error receiving the message */ - else - { - receiveMessage = receiveBuffer; - } - } - - return status; - } - - /** - * Encodes the provided message into the bformat format - * and sends it over the provided socket - * - * Params: - * recipient = the socket to send over - * message = the message to encode and send - * - * Returns: true if the send succeeded, false otherwise - */ - public bool sendMessage(byte[] message) - { - /* The message buffer */ - byte[] messageBuffer; - - /* Encode the 4 byte message length header (little endian) */ - int payloadLength = cast(int)message.length; - byte* lengthBytes = cast(byte*)&payloadLength; - - /* On little endian simply get the bytes as is (it would be encoded as little endian) */ - version(LittleEndian) - { - messageBuffer ~= *(lengthBytes+0); - messageBuffer ~= *(lengthBytes+1); - messageBuffer ~= *(lengthBytes+2); - messageBuffer ~= *(lengthBytes+3); - } - - /* On Big Endian you must swap the big-endian-encoded number to be in little endian ordering */ - version(BigEndian) - { - messageBuffer ~= *(lengthBytes+3); - messageBuffer ~= *(lengthBytes+2); - messageBuffer ~= *(lengthBytes+1); - messageBuffer ~= *(lengthBytes+0); - } - - - /* Add the message to the buffer */ - messageBuffer ~= cast(byte[])message; - - /* Send the message */ - long bytesSent = socket.send(messageBuffer); - - /* TODO: Compact this */ - return bytesSent > 0; - } - -} - - - - - - -