Fully implemented use of sendMessage and receiveMessage
This commit is contained in:
parent
73b892a072
commit
845dcbcc46
|
@ -11,7 +11,7 @@ import handlers.handler;
|
||||||
import listeners.listener;
|
import listeners.listener;
|
||||||
import server.server;
|
import server.server;
|
||||||
import handlers.response;
|
import handlers.response;
|
||||||
|
import connection.message;
|
||||||
|
|
||||||
public final class BesterConnection : Thread
|
public final class BesterConnection : Thread
|
||||||
{
|
{
|
||||||
|
@ -62,12 +62,6 @@ public final class BesterConnection : Thread
|
||||||
return [username, password];
|
return [username, password];
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a message to the user/server */
|
|
||||||
public void sendMessage(JSONValue replyMessage)
|
|
||||||
{
|
|
||||||
/* TODO: Implement me */
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Read/send loop */
|
/* Read/send loop */
|
||||||
private void run()
|
private void run()
|
||||||
{
|
{
|
||||||
|
@ -84,155 +78,6 @@ public final class BesterConnection : Thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Generalized socket send function which will send the JSON
|
|
||||||
* encoded message, `jsonMessage`, over to the client at the
|
|
||||||
* other end of the socket, `recipient`.
|
|
||||||
*
|
|
||||||
* It gets the length of `jsonMessage` and encodes a 4 byte
|
|
||||||
* message header in little-endian containing the message's
|
|
||||||
* length.
|
|
||||||
*/
|
|
||||||
public static void sendMessage(Socket recipient, JSONValue jsonMessage)
|
|
||||||
{
|
|
||||||
/* The message buffer */
|
|
||||||
byte[] messageBuffer;
|
|
||||||
|
|
||||||
/* Get the JSON as a string */
|
|
||||||
string message = toJSON(jsonMessage);
|
|
||||||
|
|
||||||
/* Encode the 4 byte message length header (little endian) */
|
|
||||||
int payloadLength = cast(int)message.length;
|
|
||||||
byte* lengthBytes = cast(byte*)&payloadLength;
|
|
||||||
messageBuffer ~= *(lengthBytes+0);
|
|
||||||
messageBuffer ~= *(lengthBytes+1);
|
|
||||||
messageBuffer ~= *(lengthBytes+2);
|
|
||||||
messageBuffer ~= *(lengthBytes+3);
|
|
||||||
|
|
||||||
/* Add the message to the buffer */
|
|
||||||
messageBuffer ~= cast(byte[])message;
|
|
||||||
|
|
||||||
/* Send the message */
|
|
||||||
recipient.send(messageBuffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* TODO: Implement me */
|
|
||||||
/**
|
|
||||||
* Generalized socket receive function which will read into the
|
|
||||||
* variable pointed to by `receiveMessage` by reading from the
|
|
||||||
* socket `originator`.
|
|
||||||
*/
|
|
||||||
public static void receiveMessage(Socket originator, ref JSONValue receiveMessage)
|
|
||||||
{
|
|
||||||
/* TODO: Implement me */
|
|
||||||
|
|
||||||
/* Construct a buffer to receive into */
|
|
||||||
byte[] receiveBuffer;
|
|
||||||
|
|
||||||
/* The current byte */
|
|
||||||
uint currentByte = 0;
|
|
||||||
|
|
||||||
/* The amount of bytes received */
|
|
||||||
long bytesReceived;
|
|
||||||
|
|
||||||
/* Loop consume the next 4 bytes */
|
|
||||||
while(currentByte < 4)
|
|
||||||
{
|
|
||||||
/* Temporary buffer */
|
|
||||||
byte[4] tempBuffer;
|
|
||||||
|
|
||||||
/* Read at-most 4 bytes */
|
|
||||||
bytesReceived = originator.receive(tempBuffer);
|
|
||||||
|
|
||||||
/* If there was an error reading from the socket */
|
|
||||||
if(!(bytesReceived > 0))
|
|
||||||
{
|
|
||||||
/* TODO: Error handling */
|
|
||||||
debugPrint("Error receiving from socket");
|
|
||||||
}
|
|
||||||
/* If there is no error reading from the socket */
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Add the read bytes to the *real* buffer */
|
|
||||||
receiveBuffer ~= tempBuffer[0..bytesReceived];
|
|
||||||
|
|
||||||
/* Increment the byte counter */
|
|
||||||
currentByte += bytesReceived;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Response message length */
|
|
||||||
int messageLength = *cast(int*)receiveBuffer.ptr;
|
|
||||||
writeln("Message length is: ", cast(uint)messageLength);
|
|
||||||
|
|
||||||
/* Response message buffer */
|
|
||||||
byte[] fullMessage;
|
|
||||||
|
|
||||||
/* Reset the byte counter */
|
|
||||||
currentByte = 0;
|
|
||||||
|
|
||||||
while(currentByte < messageLength)
|
|
||||||
{
|
|
||||||
debugPrint("dhjkh");
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Receive 20 bytes (at most) at a time and don't dequeue from
|
|
||||||
* the kernel's TCP stack's buffer.
|
|
||||||
*/
|
|
||||||
byte[20] tempBuffer;
|
|
||||||
bytesReceived = originator.receive(tempBuffer, SocketFlags.PEEK);
|
|
||||||
|
|
||||||
/* Check for an error whilst receiving */
|
|
||||||
if(!(bytesReceived > 0))
|
|
||||||
{
|
|
||||||
/* TODO: Error handling */
|
|
||||||
debugPrint("Error whilst receiving from socket");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* TODO: Make sure we only take [0, messageLength) bytes */
|
|
||||||
if(cast(uint)bytesReceived+currentByte > messageLength)
|
|
||||||
{
|
|
||||||
byte[] remainingBytes;
|
|
||||||
remainingBytes.length = messageLength-currentByte;
|
|
||||||
|
|
||||||
originator.receive(remainingBytes);
|
|
||||||
|
|
||||||
/* Increment counter of received bytes */
|
|
||||||
currentByte += remainingBytes.length;
|
|
||||||
|
|
||||||
/* Append the received bytes to the FULL message buffer */
|
|
||||||
fullMessage ~= remainingBytes;
|
|
||||||
|
|
||||||
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Increment counter of received bytes */
|
|
||||||
currentByte += bytesReceived;
|
|
||||||
|
|
||||||
|
|
||||||
/* Append the received bytes to the FULL message buffer */
|
|
||||||
fullMessage ~= tempBuffer[0..bytesReceived];
|
|
||||||
|
|
||||||
/* TODO: Bug when over send, we must not allow this */
|
|
||||||
|
|
||||||
|
|
||||||
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
|
||||||
|
|
||||||
originator.receive(tempBuffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
writeln("Message ", fullMessage);
|
|
||||||
|
|
||||||
/* Set the message in `receiveMessage */
|
|
||||||
receiveMessage = parseJSON(cast(string)fullMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the payload to the designated message handler and gets
|
* Sends the payload to the designated message handler and gets
|
||||||
* the response message from the handler and returns it.
|
* the response message from the handler and returns it.
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
module connection.message;
|
||||||
|
|
||||||
|
import std.socket : Socket, SocketFlags;
|
||||||
|
import std.json : JSONValue, parseJSON, toJSON;
|
||||||
|
import utils.debugging : debugPrint;
|
||||||
|
import std.stdio : writeln;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generalized socket receive function which will read into the
|
||||||
|
* variable pointed to by `receiveMessage` by reading from the
|
||||||
|
* socket `originator`.
|
||||||
|
*/
|
||||||
|
public static void receiveMessage(Socket originator, ref JSONValue receiveMessage)
|
||||||
|
{
|
||||||
|
/* TODO: Implement me */
|
||||||
|
|
||||||
|
/* Construct a buffer to receive into */
|
||||||
|
byte[] receiveBuffer;
|
||||||
|
|
||||||
|
/* The current byte */
|
||||||
|
uint currentByte = 0;
|
||||||
|
|
||||||
|
/* The amount of bytes received */
|
||||||
|
long bytesReceived;
|
||||||
|
|
||||||
|
/* Loop consume the next 4 bytes */
|
||||||
|
while(currentByte < 4)
|
||||||
|
{
|
||||||
|
/* Temporary buffer */
|
||||||
|
byte[4] tempBuffer;
|
||||||
|
|
||||||
|
/* Read at-most 4 bytes */
|
||||||
|
bytesReceived = originator.receive(tempBuffer);
|
||||||
|
|
||||||
|
/* If there was an error reading from the socket */
|
||||||
|
if(!(bytesReceived > 0))
|
||||||
|
{
|
||||||
|
/* TODO: Error handling */
|
||||||
|
debugPrint("Error receiving from socket");
|
||||||
|
}
|
||||||
|
/* If there is no error reading from the socket */
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Add the read bytes to the *real* buffer */
|
||||||
|
receiveBuffer ~= tempBuffer[0..bytesReceived];
|
||||||
|
|
||||||
|
/* Increment the byte counter */
|
||||||
|
currentByte += bytesReceived;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Response message length */
|
||||||
|
int messageLength = *cast(int*)receiveBuffer.ptr;
|
||||||
|
writeln("Message length is: ", cast(uint)messageLength);
|
||||||
|
|
||||||
|
/* Response message buffer */
|
||||||
|
byte[] fullMessage;
|
||||||
|
|
||||||
|
/* Reset the byte counter */
|
||||||
|
currentByte = 0;
|
||||||
|
|
||||||
|
while(currentByte < messageLength)
|
||||||
|
{
|
||||||
|
debugPrint("dhjkh");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive 20 bytes (at most) at a time and don't dequeue from
|
||||||
|
* the kernel's TCP stack's buffer.
|
||||||
|
*/
|
||||||
|
byte[20] tempBuffer;
|
||||||
|
bytesReceived = originator.receive(tempBuffer, SocketFlags.PEEK);
|
||||||
|
|
||||||
|
/* Check for an error whilst receiving */
|
||||||
|
if(!(bytesReceived > 0))
|
||||||
|
{
|
||||||
|
/* TODO: Error handling */
|
||||||
|
debugPrint("Error whilst receiving from socket");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* TODO: Make sure we only take [0, messageLength) bytes */
|
||||||
|
if(cast(uint)bytesReceived+currentByte > messageLength)
|
||||||
|
{
|
||||||
|
byte[] remainingBytes;
|
||||||
|
remainingBytes.length = messageLength-currentByte;
|
||||||
|
|
||||||
|
originator.receive(remainingBytes);
|
||||||
|
|
||||||
|
/* Increment counter of received bytes */
|
||||||
|
currentByte += remainingBytes.length;
|
||||||
|
|
||||||
|
/* Append the received bytes to the FULL message buffer */
|
||||||
|
fullMessage ~= remainingBytes;
|
||||||
|
|
||||||
|
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Increment counter of received bytes */
|
||||||
|
currentByte += bytesReceived;
|
||||||
|
|
||||||
|
|
||||||
|
/* Append the received bytes to the FULL message buffer */
|
||||||
|
fullMessage ~= tempBuffer[0..bytesReceived];
|
||||||
|
|
||||||
|
/* TODO: Bug when over send, we must not allow this */
|
||||||
|
|
||||||
|
|
||||||
|
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
||||||
|
|
||||||
|
originator.receive(tempBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
writeln("Message ", fullMessage);
|
||||||
|
|
||||||
|
/* Set the message in `receiveMessage */
|
||||||
|
receiveMessage = parseJSON(cast(string)fullMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generalized socket send function which will send the JSON
|
||||||
|
* encoded message, `jsonMessage`, over to the client at the
|
||||||
|
* other end of the socket, `recipient`.
|
||||||
|
*
|
||||||
|
* It gets the length of `jsonMessage` and encodes a 4 byte
|
||||||
|
* message header in little-endian containing the message's
|
||||||
|
* length.
|
||||||
|
*/
|
||||||
|
public static void sendMessage(Socket recipient, JSONValue jsonMessage)
|
||||||
|
{
|
||||||
|
/* The message buffer */
|
||||||
|
byte[] messageBuffer;
|
||||||
|
|
||||||
|
/* Get the JSON as a string */
|
||||||
|
string message = toJSON(jsonMessage);
|
||||||
|
|
||||||
|
/* Encode the 4 byte message length header (little endian) */
|
||||||
|
int payloadLength = cast(int)message.length;
|
||||||
|
byte* lengthBytes = cast(byte*)&payloadLength;
|
||||||
|
messageBuffer ~= *(lengthBytes+0);
|
||||||
|
messageBuffer ~= *(lengthBytes+1);
|
||||||
|
messageBuffer ~= *(lengthBytes+2);
|
||||||
|
messageBuffer ~= *(lengthBytes+3);
|
||||||
|
|
||||||
|
/* Add the message to the buffer */
|
||||||
|
messageBuffer ~= cast(byte[])message;
|
||||||
|
|
||||||
|
/* Send the message */
|
||||||
|
recipient.send(messageBuffer);
|
||||||
|
}
|
Loading…
Reference in New Issue