Implemented sendMessage and receiveMessage
This commit is contained in:
parent
eb4626fd33
commit
73b892a072
|
@ -73,115 +73,14 @@ public final class BesterConnection : Thread
|
||||||
{
|
{
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
/* Receive buffer */
|
/* Received JSON message */
|
||||||
byte[] buffer;
|
JSONValue receivedMessage;
|
||||||
|
|
||||||
/* Byte counter for loop-consumer */
|
|
||||||
uint currentByte = 0;
|
|
||||||
|
|
||||||
/* Bytes received counter */
|
|
||||||
long bytesReceived;
|
|
||||||
|
|
||||||
/* TODO: Add fix here to loop for bytes */
|
/* Receive a message */
|
||||||
while(currentByte < 4)
|
receiveMessage(clientConnection, receivedMessage);
|
||||||
{
|
|
||||||
/* Size buffer */
|
|
||||||
byte[4] tempBuffer;
|
|
||||||
|
|
||||||
/* Read at most 4 bytes */
|
|
||||||
bytesReceived = clientConnection.receive(tempBuffer);
|
|
||||||
|
|
||||||
if(!(bytesReceived > 0))
|
|
||||||
{
|
|
||||||
/* TODO: Handle error here */
|
|
||||||
debugPrint("Error with receiving");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Read the bytes from the temp buffer (as many as was received)
|
|
||||||
* and append them to the *real* buffer.
|
|
||||||
*/
|
|
||||||
buffer ~= tempBuffer[0..bytesReceived];
|
|
||||||
|
|
||||||
/* Increment the byte counter */
|
|
||||||
currentByte += bytesReceived;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Get the message length */
|
|
||||||
int messageLength = *(cast(int*)buffer.ptr);
|
|
||||||
writeln("Message length: ", cast(uint)messageLength);
|
|
||||||
|
|
||||||
/* TODO: Testing locally ain't good as stuff arrives way too fast, although not as fast as I can type */
|
|
||||||
/* What must happen is a loop to loop and wait for data */
|
|
||||||
|
|
||||||
/* Full message buffer */
|
|
||||||
byte[] messageBuffer;
|
|
||||||
|
|
||||||
|
|
||||||
/* TODO: Add timeout if we haven't received a message in a certain amount of time */
|
|
||||||
|
|
||||||
/* Reset the current byte counter */
|
|
||||||
currentByte = 0;
|
|
||||||
|
|
||||||
while(currentByte < messageLength)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Receive 20 bytes (at most) at a time and don't dequeue from
|
|
||||||
* the kernel's TCP stack's buffer.
|
|
||||||
*/
|
|
||||||
byte[20] messageBufferPartial;
|
|
||||||
bytesReceived = clientConnection.receive(messageBufferPartial, SocketFlags.PEEK);
|
|
||||||
|
|
||||||
/* Check for receive error */
|
|
||||||
if(!(bytesReceived > 0))
|
|
||||||
{
|
|
||||||
debugPrint("Error receiving");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* TODO: Make sure we only take [0, messageLength) bytes */
|
|
||||||
if(cast(uint)bytesReceived+currentByte > messageLength)
|
|
||||||
{
|
|
||||||
byte[] remainingBytes;
|
|
||||||
remainingBytes.length = messageLength-currentByte;
|
|
||||||
|
|
||||||
clientConnection.receive(remainingBytes);
|
|
||||||
|
|
||||||
/* Increment counter of received bytes */
|
|
||||||
currentByte += remainingBytes.length;
|
|
||||||
|
|
||||||
/* Append the received bytes to the FULL message buffer */
|
|
||||||
messageBuffer ~= remainingBytes;
|
|
||||||
|
|
||||||
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* Increment counter of received bytes */
|
|
||||||
currentByte += bytesReceived;
|
|
||||||
|
|
||||||
|
|
||||||
/* Append the received bytes to the FULL message buffer */
|
|
||||||
messageBuffer ~= messageBufferPartial[0..bytesReceived];
|
|
||||||
|
|
||||||
/* TODO: Bug when over send, we must not allow this */
|
|
||||||
|
|
||||||
|
|
||||||
writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
|
|
||||||
|
|
||||||
clientConnection.receive(messageBufferPartial);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Process the message */
|
/* Process the message */
|
||||||
processMessage(messageBuffer);
|
processMessage(receivedMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -463,17 +362,13 @@ public final class BesterConnection : Thread
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process the received message */
|
/* Process the received message */
|
||||||
private void processMessage(byte[] messageBuffer)
|
private void processMessage(JSONValue jsonMessage)
|
||||||
{
|
{
|
||||||
/* The message as a JSONValue struct */
|
|
||||||
JSONValue jsonMessage;
|
|
||||||
|
|
||||||
|
|
||||||
/* Attempt to convert the message to JSON */
|
/* Attempt to convert the message to JSON */
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
/* Convert message to JSON */
|
/* Convert message to JSON */
|
||||||
jsonMessage = parseJSON(cast(string)messageBuffer);
|
|
||||||
debugPrint("<<< Received JSON >>>\n\n" ~ jsonMessage.toPrettyString());
|
debugPrint("<<< Received JSON >>>\n\n" ~ jsonMessage.toPrettyString());
|
||||||
|
|
||||||
/* TODO: Bounds checking, type checking */
|
/* TODO: Bounds checking, type checking */
|
||||||
|
|
Loading…
Reference in New Issue