net: Forgot to push Session

This commit is contained in:
Joshua Scott 2018-04-15 22:52:41 +01:00
parent bdededbb84
commit a58341b279
3 changed files with 197 additions and 98 deletions

View File

@ -1,76 +0,0 @@
#pragma once
#include <cstdint>
#include <sstream>
#define KI_DEFAULT_MAXIMUM_RECEIVE_SIZE 0x2000
#define KI_START_SIGNAL 0xF00D
namespace ki
{
namespace protocol
{
namespace net
{
enum class ReceiveState
{
// Waiting for the 0xF00D start signal.
WAITING_FOR_START_SIGNAL,
// Waiting for the 2-byte length.
WAITING_FOR_LENGTH,
// Waiting for the packet data.
WAITING_FOR_PACKET
};
enum class ParticipantType
{
SERVER,
CLIENT
};
/**
* This class implements the packet framing logic when
* sending and receiving data to/from an external source.
*/
class Participant
{
public:
Participant(ParticipantType type);
virtual ~Participant() = default;
ParticipantType get_type() const;
void set_type(ParticipantType type);
uint16_t get_maximum_packet_size() const;
void set_maximum_packet_size(uint16_t maximum_packet_size);
protected:
std::stringstream m_data_stream;
/**
* Frames raw data into a Packet, and transmits it.
*/
void send_data(const char *data, size_t size);
/**
* Process incoming raw data into Packets.
* Once a packet is read into the internal data
* stream, handle_packet_available is called.
*/
void process_data(const char *data, size_t size);
virtual void send_packet_data(const char *data, const size_t size) = 0;
virtual void on_packet_available() {};
virtual void close() = 0;
private:
ParticipantType m_type;
uint16_t m_maximum_packet_size;
ReceiveState m_receive_state;
uint16_t m_start_signal;
uint16_t m_incoming_packet_size;
uint8_t m_shift;
};
}
}
}

View File

@ -0,0 +1,122 @@
#pragma once
#include "PacketHeader.h"
#include "../control/Opcode.h"
#include "../../util/Serializable.h"
#include <cstdint>
#include <sstream>
#include <chrono>
#include <type_traits>
#define KI_DEFAULT_MAXIMUM_RECEIVE_SIZE 0x2000
#define KI_START_SIGNAL 0xF00D
#define KI_CONNECTION_TIMEOUT 3
namespace ki
{
namespace protocol
{
namespace net
{
enum class ReceiveState
{
// Waiting for the 0xF00D start signal.
WAITING_FOR_START_SIGNAL,
// Waiting for the 2-byte length.
WAITING_FOR_LENGTH,
// Waiting for the packet data.
WAITING_FOR_PACKET
};
/**
* This class implements session and packet framing logic
* when sending and receiving data to/from an external
* source.
*/
class Session
{
public:
explicit Session(uint16_t id = 0);
virtual ~Session() = default;
uint16_t get_maximum_packet_size() const;
void set_maximum_packet_size(uint16_t maximum_packet_size);
uint16_t get_id() const;
bool is_established() const;
uint8_t get_access_level() const;
void set_access_level(uint8_t access_level);
uint16_t get_latency() const;
virtual bool is_alive() const = 0;
void send_packet(bool is_control, uint8_t opcode,
const util::Serializable &data);
protected:
/* Higher-level session members */
uint16_t m_id;
bool m_established;
uint8_t m_access_level;
/* Timing members */
std::chrono::steady_clock::time_point m_creation_time;
std::chrono::steady_clock::time_point m_connection_time;
std::chrono::steady_clock::time_point m_establish_time;
std::chrono::steady_clock::time_point m_last_received_heartbeat_time;
std::chrono::steady_clock::time_point m_last_sent_heartbeat_time;
bool m_waiting_for_keep_alive_response;
uint16_t m_latency;
// The packet data stream
std::stringstream m_data_stream;
/**
* Reads a serializable structure from the data stream.
*/
template <typename DataT>
DataT read_data()
{
static_assert(std::is_base_of<util::Serializable, DataT>::value,
"DataT must inherit Serializable.");
DataT data = DataT();
data.read_from(m_data_stream);
return data;
}
/**
* Frames raw data into a Packet, and transmits it.
*/
void send_data(const char *data, size_t size);
/**
* Process incoming raw data into Packets.
* Once a packet is read into the internal data
* stream, handle_packet_available is called.
*/
void process_data(const char *data, size_t size);
/* Event handlers */
virtual void on_invalid_packet() {}
virtual void on_control_message(const PacketHeader &header) {}
virtual void on_application_message(const PacketHeader &header) {}
/* Low-level socket methods */
virtual void send_packet_data(const char *data, const size_t size) = 0;
virtual void close() = 0;
private:
/* Low-level networking members */
uint16_t m_maximum_packet_size;
ReceiveState m_receive_state;
uint16_t m_start_signal;
uint16_t m_incoming_packet_size;
uint8_t m_shift;
void on_packet_available();
};
}
}
}

View File

@ -1,6 +1,5 @@
#include "ki/protocol/net/Participant.h" #include "ki/protocol/net/Session.h"
#include "ki/protocol/exception.h" #include "ki/protocol/exception.h"
#include <cstring>
namespace ki namespace ki
{ {
@ -8,39 +7,70 @@ namespace protocol
{ {
namespace net namespace net
{ {
Participant::Participant(const ParticipantType type) Session::Session(const uint16_t id)
{ {
m_type = type; m_id = id;
m_maximum_packet_size = KI_DEFAULT_MAXIMUM_RECEIVE_SIZE; m_established = false;
m_access_level = 0;
m_latency = 0;
m_creation_time = std::chrono::steady_clock::now();
m_waiting_for_keep_alive_response = false;
m_maximum_packet_size = KI_DEFAULT_MAXIMUM_RECEIVE_SIZE;
m_receive_state = ReceiveState::WAITING_FOR_START_SIGNAL; m_receive_state = ReceiveState::WAITING_FOR_START_SIGNAL;
m_start_signal = 0; m_start_signal = 0;
m_incoming_packet_size = 0; m_incoming_packet_size = 0;
m_shift = 0; m_shift = 0;
} }
ParticipantType Participant::get_type() const uint16_t Session::get_maximum_packet_size() const
{
return m_type;
}
void Participant::set_type(const ParticipantType type)
{
m_type = type;
}
uint16_t Participant::get_maximum_packet_size() const
{ {
return m_maximum_packet_size; return m_maximum_packet_size;
} }
void Participant::set_maximum_packet_size(const uint16_t maximum_packet_size) void Session::set_maximum_packet_size(const uint16_t maximum_packet_size)
{ {
m_maximum_packet_size = maximum_packet_size; m_maximum_packet_size = maximum_packet_size;
} }
void Participant::send_data(const char* data, const size_t size) uint16_t Session::get_id() const
{
return m_id;
}
bool Session::is_established() const
{
return m_established;
}
uint8_t Session::get_access_level() const
{
return m_access_level;
}
void Session::set_access_level(const uint8_t access_level)
{
m_access_level = access_level;
}
uint16_t Session::get_latency() const
{
return m_latency;
}
void Session::send_packet(const bool is_control, const uint8_t opcode,
const util::Serializable& data)
{
std::ostringstream ss;
PacketHeader header(is_control, opcode);
header.write_to(ss);
data.write_to(ss);
const auto buffer = ss.str();
send_data(buffer.c_str(), buffer.length());
}
void Session::send_data(const char* data, const size_t size)
{ {
// Allocate the entire buffer // Allocate the entire buffer
char *packet_data = new char[size + 4]; char *packet_data = new char[size + 4];
@ -55,7 +85,7 @@ namespace net
delete[] packet_data; delete[] packet_data;
} }
void Participant::process_data(const char *data, const size_t size) void Session::process_data(const char *data, const size_t size)
{ {
size_t position = 0; size_t position = 0;
while (position < size) while (position < size)
@ -133,6 +163,29 @@ namespace net
} }
} }
void Session::on_packet_available()
{
// Read the packet header
PacketHeader header;
try
{
header.read_from(m_data_stream);
}
catch (parse_error &e)
{
on_invalid_packet();
return;
}
// Hand off to the right handler based on
// whether this is a control packet or not
if (header.is_control())
on_control_message(header);
else if (m_established)
on_application_message(header);
else
close();
}
} }
} }
} }