protocol: Combine Participant and Session

Also removes ParticipantType
This commit is contained in:
Joshua Scott 2018-04-15 22:52:13 +01:00
parent 38e0547e84
commit bdededbb84
16 changed files with 613 additions and 396 deletions

View File

@ -8,12 +8,12 @@ namespace protocol
{
namespace control
{
class Ping final : public util::Serializable
class ClientKeepAlive final : public util::Serializable
{
public:
Ping(uint16_t session_id = 0,
uint16_t milliseconds = 0, uint8_t minutes = 0);
virtual ~Ping() = default;
ClientKeepAlive(uint16_t session_id = 0,
uint16_t milliseconds = 0, uint16_t minutes = 0);
virtual ~ClientKeepAlive() = default;
uint16_t get_session_id() const;
void set_session_id(uint16_t session_id);
@ -21,8 +21,8 @@ namespace control
uint16_t get_milliseconds() const;
void set_milliseconds(uint16_t milliseconds);
uint8_t get_minutes() const;
void set_minutes(uint8_t minutes);
uint16_t get_minutes() const;
void set_minutes(uint16_t minutes);
void write_to(std::ostream &ostream) const override final;
void read_from(std::istream &istream) override final;
@ -30,7 +30,7 @@ namespace control
private:
uint16_t m_session_id;
uint16_t m_milliseconds;
uint8_t m_minutes;
uint16_t m_minutes;
};
}
}

View File

@ -12,8 +12,8 @@ namespace control
NONE = 0,
SESSION_OFFER = 0,
UDP_HELLO = 1,
PING = 3,
PING_RSP = 4,
KEEP_ALIVE = 3,
KEEP_ALIVE_RSP = 4,
SESSION_ACCEPT = 5
};
}

View File

@ -0,0 +1,28 @@
#pragma once
#include "../../util/Serializable.h"
#include <cstdint>
namespace ki
{
namespace protocol
{
namespace control
{
class ServerKeepAlive final : public util::Serializable
{
public:
ServerKeepAlive(uint32_t timestamp = 0);
virtual ~ServerKeepAlive() = default;
uint32_t get_timestamp() const;
void set_timestamp(uint32_t timestamp);
void write_to(std::ostream &ostream) const override final;
void read_from(std::istream &istream) override final;
size_t get_size() const override final;
private:
uint32_t m_timestamp;
};
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include "ClientSession.h"
#include "DMLSession.h"
// Disable inheritance via dominance warning
#if _MSC_VER
#pragma warning(disable: 4250)
#endif
namespace ki
{
namespace protocol
{
namespace net
{
class ClientDMLSession : public ClientSession, public DMLSession
{
// Explicitly specify that we are intentionally inheritting
// via dominance.
using DMLSession::on_application_message;
using ClientSession::on_control_message;
using ClientSession::is_alive;
public:
ClientDMLSession(const uint16_t id, const dml::MessageManager &manager)
: Session(id), ClientSession(id), DMLSession(id, manager) {}
virtual ~ClientDMLSession() = default;
};
}
}
}
// Re-enable inheritance via dominance warning
#if _MSC_VER
#pragma warning(default: 4250)
#endif

View File

@ -0,0 +1,34 @@
#pragma once
#include "Session.h"
#define KI_SERVER_HEARTBEAT 60
namespace ki
{
namespace protocol
{
namespace net
{
/**
* Implements client-sided session logic.
*/
class ClientSession : public virtual Session
{
public:
explicit ClientSession(uint16_t id);
virtual ~ClientSession() = default;
void send_keep_alive();
bool is_alive() const override;
protected:
void on_connected();
virtual void on_established() {}
void on_control_message(const PacketHeader& header) override;
private:
void on_session_offer();
void on_keep_alive();
void on_keep_alive_response();
};
}
}
}

View File

@ -8,12 +8,15 @@ namespace protocol
{
namespace net
{
class DMLSession : public Session
/**
* Implements an application protocol that uses the DML
* message system (as seen in Wizard101 and Pirate101).
*/
class DMLSession : public virtual Session
{
public:
DMLSession(ParticipantType type, uint16_t id,
const dml::MessageManager &manager);
~DMLSession() = default;
DMLSession(uint16_t id, const dml::MessageManager &manager);
virtual ~DMLSession() = default;
void send_message(const dml::Message &message);
protected:

View File

@ -0,0 +1,35 @@
#pragma once
#include "ServerSession.h"
#include "DMLSession.h"
// Disable inheritance via dominance warning
#if _MSC_VER
#pragma warning(disable: 4250)
#endif
namespace ki
{
namespace protocol
{
namespace net
{
class ServerDMLSession : public ServerSession, public DMLSession
{
// Explicitly specify that we are intentionally inheritting
// via dominance.
using DMLSession::on_application_message;
using ServerSession::on_control_message;
using ServerSession::is_alive;
public:
ServerDMLSession(const uint16_t id, const dml::MessageManager &manager)
: Session(id), ServerSession(id), DMLSession(id, manager) {}
virtual ~ServerDMLSession() = default;
};
}
}
}
// Re-enable inheritance via dominance warning
#if _MSC_VER
#pragma warning(default: 4250)
#endif

View File

@ -0,0 +1,34 @@
#pragma once
#include "Session.h"
#define KI_CLIENT_HEARTBEAT 10
namespace ki
{
namespace protocol
{
namespace net
{
/**
* Implements server-sided session logic.
*/
class ServerSession : public virtual Session
{
public:
explicit ServerSession(uint16_t id);
virtual ~ServerSession() = default;
void send_keep_alive(uint32_t milliseconds_since_startup);
bool is_alive() const override;
protected:
void on_connected();
virtual void on_established() {}
void on_control_message(const PacketHeader& header) override;
private:
void on_session_accept();
void on_keep_alive();
void on_keep_alive_response();
};
}
}
}

View File

@ -1,75 +0,0 @@
#pragma once
#include "Participant.h"
#include "PacketHeader.h"
#include "ki/protocol/control/Opcode.h"
#include "../../util/Serializable.h"
#include <cstdint>
#include <chrono>
#include <type_traits>
namespace ki
{
namespace protocol
{
namespace net
{
/**
* This class implements session logic on top of the
* low-level Participant class.
*/
class Session : public Participant
{
public:
Session(ParticipantType type, uint16_t id);
uint16_t get_id() const;
bool is_established() const;
uint8_t get_access_level() const;
void set_access_level(uint8_t access_level);
uint16_t get_latency() const;
bool is_alive() const;
void send_packet(bool is_control, control::Opcode opcode,
const util::Serializable &data);
protected:
template <typename DataT>
DataT read_data()
{
static_assert(std::is_base_of<util::Serializable, DataT>::value,
"DataT must inherit Serializable.");
DataT data = DataT();
data.read_from(m_data_stream);
return data;
}
void on_connected();
virtual void on_established() {};
virtual void on_application_message(const PacketHeader &header) {};
virtual void on_invalid_packet() {};
private:
uint16_t m_id;
bool m_established;
uint8_t m_access_level;
uint16_t m_latency;
std::chrono::steady_clock::time_point m_creation_time;
std::chrono::steady_clock::time_point m_establish_time;
std::chrono::steady_clock::time_point m_last_heartbeat;
void on_packet_available() override final;
void on_control_message(const PacketHeader &header);
void on_server_hello();
void on_client_hello();
void on_ping();
void on_ping_response();
void on_hello(uint16_t session_id, uint32_t timestamp,
uint16_t milliseconds);
};
}
}
}

View File

@ -1,6 +1,7 @@
target_sources(${PROJECT_NAME}
PRIVATE
${PROJECT_SOURCE_DIR}/src/protocol/control/Ping.cpp
${PROJECT_SOURCE_DIR}/src/protocol/control/ClientKeepAlive.cpp
${PROJECT_SOURCE_DIR}/src/protocol/control/ServerKeepAlive.cpp
${PROJECT_SOURCE_DIR}/src/protocol/control/SessionAccept.cpp
${PROJECT_SOURCE_DIR}/src/protocol/control/SessionOffer.cpp
${PROJECT_SOURCE_DIR}/src/protocol/dml/Message.cpp
@ -8,8 +9,9 @@ target_sources(${PROJECT_NAME}
${PROJECT_SOURCE_DIR}/src/protocol/dml/MessageManager.cpp
${PROJECT_SOURCE_DIR}/src/protocol/dml/MessageModule.cpp
${PROJECT_SOURCE_DIR}/src/protocol/dml/MessageTemplate.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/PacketHeader.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/Participant.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/Session.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/ClientSession.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/DMLSession.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/PacketHeader.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/ServerSession.cpp
${PROJECT_SOURCE_DIR}/src/protocol/net/Session.cpp
)

View File

@ -1,4 +1,4 @@
#include "ki/protocol/control/Ping.h"
#include "ki/protocol/control/ClientKeepAlive.h"
#include "ki/dml/Record.h"
#include "ki/protocol/exception.h"
@ -8,58 +8,59 @@ namespace protocol
{
namespace control
{
Ping::Ping(uint16_t session_id, uint16_t milliseconds, uint8_t minutes)
ClientKeepAlive::ClientKeepAlive(const uint16_t session_id, const uint16_t milliseconds,
const uint16_t minutes)
{
m_session_id = session_id;
m_milliseconds = milliseconds;
m_minutes = minutes;
}
uint16_t Ping::get_session_id() const
uint16_t ClientKeepAlive::get_session_id() const
{
return m_session_id;
}
void Ping::set_session_id(uint16_t session_id)
void ClientKeepAlive::set_session_id(const uint16_t session_id)
{
m_session_id = session_id;
}
uint16_t Ping::get_milliseconds() const
uint16_t ClientKeepAlive::get_milliseconds() const
{
return m_milliseconds;
}
void Ping::set_milliseconds(uint16_t milliseconds)
void ClientKeepAlive::set_milliseconds(const uint16_t milliseconds)
{
m_milliseconds = milliseconds;
}
uint8_t Ping::get_minutes() const
uint16_t ClientKeepAlive::get_minutes() const
{
return m_minutes;
}
void Ping::set_minutes(uint8_t minutes)
void ClientKeepAlive::set_minutes(const uint16_t minutes)
{
m_minutes = minutes;
}
void Ping::write_to(std::ostream &ostream) const
void ClientKeepAlive::write_to(std::ostream &ostream) const
{
dml::Record record;
record.add_field<dml::USHRT>("m_session_id")->set_value(m_session_id);
record.add_field<dml::USHRT>("m_milliseconds")->set_value(m_milliseconds);
record.add_field<dml::UBYT>("m_minutes")->set_value(m_minutes);
record.add_field<dml::USHRT>("m_minutes")->set_value(m_minutes);
record.write_to(ostream);
}
void Ping::read_from(std::istream &istream)
void ClientKeepAlive::read_from(std::istream &istream)
{
dml::Record record;
auto *session_id = record.add_field<dml::USHRT>("m_session_id");
auto *milliseconds = record.add_field<dml::USHRT>("m_milliseconds");
auto *minutes = record.add_field<dml::UBYT>("m_minutes");
auto *minutes = record.add_field<dml::USHRT>("m_minutes");
try
{
record.read_from(istream);
@ -67,7 +68,7 @@ namespace control
catch (dml::parse_error &e)
{
std::ostringstream oss;
oss << "Error reading Ping payload: " << e.what();
oss << "Error reading ClientKeepAlive payload: " << e.what();
throw parse_error(oss.str());
}
@ -76,10 +77,10 @@ namespace control
m_minutes = minutes->get_value();
}
size_t Ping::get_size() const
size_t ClientKeepAlive::get_size() const
{
return sizeof(dml::USHRT) + sizeof(dml::USHRT) +
sizeof(dml::UBYT);
sizeof(dml::USHRT);
}
}
}

View File

@ -0,0 +1,61 @@
#include "ki/protocol/control/ServerKeepAlive.h"
#include "ki/dml/Record.h"
#include "ki/protocol/exception.h"
#include <chrono>
namespace ki
{
namespace protocol
{
namespace control
{
ServerKeepAlive::ServerKeepAlive(const uint32_t timestamp)
{
m_timestamp = timestamp;
}
uint32_t ServerKeepAlive::get_timestamp() const
{
return m_timestamp;
}
void ServerKeepAlive::set_timestamp(const uint32_t timestamp)
{
m_timestamp = timestamp;
}
void ServerKeepAlive::write_to(std::ostream& ostream) const
{
dml::Record record;
record.add_field<dml::USHRT>("m_session_id");
record.add_field<dml::INT>("m_timestamp")->set_value(m_timestamp);
record.write_to(ostream);
}
void ServerKeepAlive::read_from(std::istream& istream)
{
dml::Record record;
record.add_field<dml::USHRT>("m_session_id");
auto *timestamp = record.add_field<dml::INT>("m_timestamp");
try
{
record.read_from(istream);
}
catch (dml::parse_error &e)
{
std::ostringstream oss;
oss << "Error reading ServerKeepAlive payload: " << e.what();
throw parse_error(oss.str());
}
m_timestamp = timestamp->get_value();
}
size_t ServerKeepAlive::get_size() const
{
return sizeof(dml::USHRT) + sizeof(dml::INT);
}
}
}
}

View File

@ -0,0 +1,173 @@
#include "ki/protocol/net/ClientSession.h"
#include "ki/protocol/control/SessionOffer.h"
#include "ki/protocol/control/SessionAccept.h"
#include "ki/protocol/control/ClientKeepAlive.h"
#include "ki/protocol/control/ServerKeepAlive.h"
#include "ki/protocol/exception.h"
namespace ki
{
namespace protocol
{
namespace net
{
ClientSession::ClientSession(const uint16_t id)
: Session(id) {}
void ClientSession::send_keep_alive()
{
// Don't send a keep alive if we're waiting for a response
if (m_waiting_for_keep_alive_response)
return;
m_waiting_for_keep_alive_response = true;
// Work out how many minutes have been since the establish time, and
// how many milliseconds we are in to the current minute.
const auto time_since_establish = std::chrono::steady_clock::now() - m_establish_time;
const auto minutes = std::chrono::duration_cast<std::chrono::minutes>(time_since_establish);
const auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
time_since_establish - minutes
).count();
// Send a KEEP_ALIVE packet
control::ClientKeepAlive keep_alive(m_id, milliseconds, minutes.count());
send_packet(true, (uint8_t)control::Opcode::KEEP_ALIVE, keep_alive);
m_last_sent_heartbeat_time = std::chrono::steady_clock::now();
}
bool ClientSession::is_alive() const
{
// If the session isn't established yet, use the time of
// creation to decide whether this session is alive.
if (!m_established)
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_creation_time
).count() <= KI_CONNECTION_TIMEOUT;
// Otherwise, use the last time we received a heartbeat.
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_last_received_heartbeat_time
).count() <= KI_SERVER_HEARTBEAT;
}
void ClientSession::on_connected()
{
m_connection_time = std::chrono::steady_clock::now();
}
void ClientSession::on_control_message(const PacketHeader& header)
{
switch ((control::Opcode)header.get_opcode())
{
case control::Opcode::SESSION_OFFER:
on_session_offer();
break;
case control::Opcode::KEEP_ALIVE:
on_keep_alive();
break;
case control::Opcode::KEEP_ALIVE_RSP:
on_keep_alive_response();
break;
default:
close();
break;
}
}
void ClientSession::on_session_offer()
{
// Read the payload data into a structure
control::SessionOffer offer;
try
{
offer = read_data<control::SessionOffer>();
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
return;
}
// Should this session have already timed out?
if (std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_connection_time
).count() > KI_CONNECTION_TIMEOUT)
{
close();
return;
}
// Work out the current timestamp and how many milliseconds
// have elapsed in the current second.
auto now = std::chrono::system_clock::now();
const auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()
).count();
const auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()
).count() - (timestamp * 1000);
// Accept the session
m_id = offer.get_session_id();
control::SessionAccept accept(m_id, timestamp, milliseconds);
send_packet(true, (uint8_t)control::Opcode::SESSION_ACCEPT, accept);
// The session is successfully established
m_established = true;
m_establish_time = std::chrono::steady_clock::now();
m_last_received_heartbeat_time = m_establish_time;
on_established();
}
void ClientSession::on_keep_alive()
{
// Read the payload data into a structure
control::ServerKeepAlive keep_alive;
try
{
keep_alive = read_data<control::ServerKeepAlive>();
}
catch (parse_error &e)
{
// The KEEP_ALIVE wasn't valid...
// Close the session
close();
return;
}
// Send the response
m_last_received_heartbeat_time = std::chrono::steady_clock::now();
send_packet(true, (uint8_t)control::Opcode::KEEP_ALIVE_RSP, keep_alive);
}
void ClientSession::on_keep_alive_response()
{
// Read the payload data into a structure
try
{
// We don't actually need the data inside, but
// read it to check if the structure is right.
read_data<control::ClientKeepAlive>();
}
catch (parse_error &e)
{
// The KEEP_ALIVE_RSP wasn't valid...
// Close the session
close();
return;
}
// Calculate latency and allow for KEEP_ALIVE packets to be sent again
m_latency = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - m_last_sent_heartbeat_time
).count();
m_waiting_for_keep_alive_response = false;
}
}
}
}

View File

@ -6,13 +6,12 @@ namespace protocol
{
namespace net
{
DMLSession::DMLSession(const ParticipantType type, const uint16_t id,
const dml::MessageManager& manager)
: Session(type, id), m_manager(manager) {}
DMLSession::DMLSession(const uint16_t id, const dml::MessageManager& manager)
: Session(id), m_manager(manager) {}
void DMLSession::send_message(const dml::Message& message)
{
send_packet(false, control::Opcode::NONE, message);
send_packet(false, 0, message);
}
void DMLSession::on_application_message(const PacketHeader& header)

View File

@ -0,0 +1,171 @@
#include "ki/protocol/net/ServerSession.h"
#include "ki/protocol/control/SessionOffer.h"
#include "ki/protocol/control/SessionAccept.h"
#include "ki/protocol/control/ClientKeepAlive.h"
#include "ki/protocol/control/ServerKeepAlive.h"
#include "ki/protocol/exception.h"
namespace ki
{
namespace protocol
{
namespace net
{
ServerSession::ServerSession(const uint16_t id)
: Session(id) {}
void ServerSession::send_keep_alive(const uint32_t milliseconds_since_startup)
{
// Don't send a keep alive if we're waiting for a response
if (m_waiting_for_keep_alive_response)
return;
m_waiting_for_keep_alive_response = true;
// Send a KEEP_ALIVE packet
const control::ServerKeepAlive keep_alive(milliseconds_since_startup);
send_packet(true, (uint8_t)control::Opcode::KEEP_ALIVE, keep_alive);
m_last_sent_heartbeat_time = std::chrono::steady_clock::now();
}
bool ServerSession::is_alive() const
{
// If the session isn't established yet, use the time of
// creation to decide whether this session is alive.
if (!m_established)
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_creation_time
).count() <= KI_CONNECTION_TIMEOUT;
// Otherwise, use the last time we received a heartbeat.
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_last_received_heartbeat_time
).count() <= KI_CLIENT_HEARTBEAT;
}
void ServerSession::on_connected()
{
m_connection_time = std::chrono::steady_clock::now();
// Work out the current timestamp and how many milliseconds
// have elapsed in the current second.
auto now = std::chrono::system_clock::now();
const auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()
).count();
const auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()
).count() - (timestamp * 1000);
// Send a SESSION_OFFER packet to the client
const control::SessionOffer offer(m_id, timestamp, milliseconds);
send_packet(true, (uint8_t)control::Opcode::SESSION_OFFER, offer);
}
void ServerSession::on_control_message(const PacketHeader& header)
{
switch ((control::Opcode)header.get_opcode())
{
case control::Opcode::SESSION_ACCEPT:
on_session_accept();
break;
case control::Opcode::KEEP_ALIVE:
on_keep_alive();
break;
case control::Opcode::KEEP_ALIVE_RSP:
on_keep_alive_response();
break;
default:
close();
break;
}
}
void ServerSession::on_session_accept()
{
// Read the payload data into a structure
control::SessionAccept accept;
try
{
accept = read_data<control::SessionAccept>();
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
return;
}
// Should this session have already timed out?
if (std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_connection_time
).count() > KI_CONNECTION_TIMEOUT)
{
close();
return;
}
// Make sure they're accepting this session
if (accept.get_session_id() != m_id)
{
close();
return;
}
// The session is successfully established
m_established = true;
m_establish_time = std::chrono::steady_clock::now();
m_last_received_heartbeat_time = m_establish_time;
on_established();
}
void ServerSession::on_keep_alive()
{
// Read the payload data into a structure
control::ClientKeepAlive keep_alive;
try
{
keep_alive = read_data<control::ClientKeepAlive>();
}
catch (parse_error &e)
{
// The KEEP_ALIVE wasn't valid...
// Close the session
close();
return;
}
// Send the response
m_last_received_heartbeat_time = std::chrono::steady_clock::now();
send_packet(true, (uint8_t)control::Opcode::KEEP_ALIVE_RSP, keep_alive);
}
void ServerSession::on_keep_alive_response()
{
// Read the payload data into a structure
try
{
// We don't actually need the data inside, but
// read it to check if the structure is right.
read_data<control::ServerKeepAlive>();
}
catch (parse_error &e)
{
// The KEEP_ALIVE_RSP wasn't valid...
// Close the session
close();
return;
}
// Calculate latency and allow for KEEP_ALIVE packets to be sent again
m_latency = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - m_last_sent_heartbeat_time
).count();
m_waiting_for_keep_alive_response = false;
}
}
}
}

View File

@ -1,284 +0,0 @@
#include "ki/protocol/net/Session.h"
#include "ki/protocol/exception.h"
#include "ki/protocol/control/SessionOffer.h"
#include "ki/protocol/control/SessionAccept.h"
#include "ki/protocol/control/Ping.h"
namespace ki
{
namespace protocol
{
namespace net
{
Session::Session(const ParticipantType type, const uint16_t id)
: Participant(type)
{
m_id = id;
m_established = false;
m_access_level = 0;
m_latency = 0;
m_creation_time = std::chrono::steady_clock::now();
}
uint16_t Session::get_id() const
{
return m_id;
}
bool Session::is_established() const
{
return m_established;
}
uint8_t Session::get_access_level() const
{
return m_access_level;
}
void Session::set_access_level(const uint8_t access_level)
{
m_access_level = access_level;
}
uint16_t Session::get_latency() const
{
return m_latency;
}
bool Session::is_alive() const
{
// If the session isn't established yet, use the time of
// creation to decide whether this session is alive.
if (!m_established)
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_creation_time
).count() <= 3;
// Otherwise, use the last time we received a heartbeat.
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - m_last_heartbeat
).count() <= 10;
}
void Session::send_packet(const bool is_control, const control::Opcode opcode,
const util::Serializable& data)
{
std::ostringstream ss;
PacketHeader header(is_control, (uint8_t)opcode);
header.write_to(ss);
data.write_to(ss);
const auto buffer = ss.str();
send_data(buffer.c_str(), buffer.length());
}
void Session::on_connected()
{
// If this is the server-side of a Session
// we need to send SESSION_OFFER first.
if (get_type() == ParticipantType::SERVER)
{
// Work out the current timestamp and how many milliseconds
// have elapsed in the current second.
auto now = std::chrono::system_clock::now();
const auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()
).count();
const auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()
).count() - (timestamp * 1000);
// Send a SESSION_OFFER packet to the client
const control::SessionOffer hello(m_id, timestamp, milliseconds);
send_packet(true, control::Opcode::SESSION_OFFER, hello);
}
}
void Session::on_packet_available()
{
// Read the packet header
PacketHeader header;
try
{
header.read_from(m_data_stream);
}
catch (parse_error &e)
{
on_invalid_packet();
return;
}
// Hand off to the right handler based on
// whether this is a control packet or not
if (header.is_control())
on_control_message(header);
else if (m_established)
on_application_message(header);
else
close();
}
void Session::on_control_message(const PacketHeader& header)
{
switch ((control::Opcode)header.get_opcode())
{
case (control::Opcode::SESSION_OFFER):
on_server_hello();
break;
case (control::Opcode::SESSION_ACCEPT):
on_client_hello();
break;
case (control::Opcode::PING):
on_ping();
break;
case (control::Opcode::PING_RSP):
on_ping_response();
break;
default:
break;
}
}
void Session::on_server_hello()
{
// If this is the server-side of a Session
// we can't handle a SESSION_OFFER
if (get_type() != ParticipantType::CLIENT)
{
close();
return;
}
// Read the payload data into a structure
try
{
// We've been given our id from the server now
const auto server_hello = read_data<control::SessionOffer>();
m_id = server_hello.get_session_id();
on_hello(m_id,
server_hello.get_timestamp(),
server_hello.get_milliseconds());
// Work out the current timestamp and how many milliseconds
// have elapsed in the current second.
auto now = std::chrono::system_clock::now();
const auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()
).count();
const auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()
).count() - (timestamp * 1000);
// Send a SESSION_ACCEPT packet to the server
const control::SessionAccept hello(m_id, timestamp, milliseconds);
send_packet(true, control::Opcode::SESSION_ACCEPT, hello);
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
}
}
void Session::on_client_hello()
{
// If this is the client-side of a Session
// we can't handle a SESSION_ACCEPT
if (get_type() != ParticipantType::SERVER)
{
close();
return;
}
// Read the payload data into a structure
try
{
// The session is now established!
const auto client_hello = read_data<control::SessionAccept>();
on_hello(client_hello.get_session_id(),
client_hello.get_timestamp(),
client_hello.get_milliseconds());
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
}
}
void Session::on_ping()
{
// Read the payload data into a structure
try
{
const auto ping = read_data<control::Ping>();
if (get_type() == ParticipantType::SERVER)
{
// Calculate latency
const auto send_time = m_establish_time +
std::chrono::milliseconds(ping.get_milliseconds()) +
std::chrono::minutes(ping.get_minutes());
m_latency = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - send_time
).count();
}
// Send the response
send_packet(true, control::Opcode::PING_RSP, ping);
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
}
}
void Session::on_ping_response()
{
// Read the payload data into a structure
try
{
const auto ping = read_data<control::Ping>();
}
catch (parse_error &e)
{
// The SESSION_ACCEPT wasn't valid...
// Close the session
close();
}
}
void Session::on_hello(const uint16_t session_id,
const uint32_t timestamp, const uint16_t milliseconds)
{
// Make sure they're accepting this session
if (session_id != m_id)
{
close();
return;
}
// Calculate initial latency
const std::chrono::system_clock::time_point epoch;
const auto send_time = epoch + (std::chrono::seconds(timestamp) +
std::chrono::milliseconds(milliseconds));
m_latency = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - send_time
).count();
// The session is successfully established
m_established = true;
m_establish_time = std::chrono::steady_clock::now();
m_last_heartbeat = m_establish_time;
on_established();
}
}
}
}