Fix creating several heartbeat threads
This commit is contained in:
parent
77e5a74141
commit
44d52848aa
|
@ -37,7 +37,8 @@ You can view examples in the [examples](examples) directory.
|
|||
# Todo:
|
||||
- [x] Finish all REST API calls.
|
||||
- [x] Handle all gateway events.
|
||||
- [x] Reconnecting
|
||||
- [x] Reconnecting.
|
||||
- [ ] Configurable Logger.
|
||||
- [ ] Add library to official Nimble package repository.
|
||||
- [ ] Memory optimizations.
|
||||
- [ ] Member
|
||||
|
|
|
@ -120,70 +120,77 @@ proc handleGatewayDisconnect(shard: Shard, error: string) {.async.} =
|
|||
|
||||
#TODO: Reconnecting may be done, just needs testing.
|
||||
proc handleWebsocketPacket(shard: Shard) {.async.} =
|
||||
var hasStartedHeartbeatThread = false;
|
||||
while true:
|
||||
|
||||
var packet = await shard.ws.receiveStrPacket()
|
||||
shard.client.log.debug("[SHARD " & $shard.id & "] Received gateway payload: " & $packet)
|
||||
# Skip if the websocket isn't open
|
||||
if shard.ws.readyState == Open:
|
||||
var packet = await shard.ws.receiveStrPacket()
|
||||
shard.client.log.debug("[SHARD " & $shard.id & "] Received gateway payload: " & $packet)
|
||||
|
||||
#if packet == Opcode.Close:
|
||||
# await shard.handleGatewayDisconnect(packet)
|
||||
#if packet == Opcode.Close:
|
||||
# await shard.handleGatewayDisconnect(packet)
|
||||
|
||||
var json: JsonNode
|
||||
var json: JsonNode
|
||||
|
||||
# If we fail to parse the json just stop this loop
|
||||
try:
|
||||
json = parseJson(packet)
|
||||
except:
|
||||
shard.client.log.error("[SHARD " & $shard.id & "] Failed to parse websocket payload: " & $packet)
|
||||
continue
|
||||
# If we fail to parse the json just stop this loop
|
||||
try:
|
||||
json = parseJson(packet)
|
||||
except:
|
||||
shard.client.log.error("[SHARD " & $shard.id & "] Failed to parse websocket payload: " & $packet)
|
||||
continue
|
||||
|
||||
if json.contains("s"):
|
||||
shard.lastSequence = json["s"].getInt()
|
||||
if json.contains("s"):
|
||||
shard.lastSequence = json["s"].getInt()
|
||||
|
||||
case json["op"].getInt()
|
||||
of ord(DiscordOpCode.opHello):
|
||||
if shard.reconnecting:
|
||||
shard.client.log.info("[SHARD " & $shard.id & "] Reconnected!")
|
||||
shard.reconnecting = false
|
||||
case json["op"].getInt()
|
||||
of ord(DiscordOpCode.opHello):
|
||||
if shard.reconnecting:
|
||||
shard.client.log.info("[SHARD " & $shard.id & "] Reconnected!")
|
||||
shard.reconnecting = false
|
||||
|
||||
let resume = %* {
|
||||
"op": ord(opResume),
|
||||
"d": {
|
||||
"token": shard.client.token,
|
||||
let resume = %* {
|
||||
"op": ord(opResume),
|
||||
"d": {
|
||||
"token": shard.client.token,
|
||||
"session_id": shard.sessionID,
|
||||
"seq": shard.lastSequence
|
||||
}
|
||||
}
|
||||
|
||||
await shard.sendGatewayRequest(resume)
|
||||
else:
|
||||
shard.heartbeatInterval = json["d"]["heartbeat_interval"].getInt()
|
||||
await shard.sendGatewayRequest(shard.getIdentifyPacket())
|
||||
|
||||
# Don't start a new heartbeat thread if one is already started
|
||||
echo "About to start a heartbeat thread! shard.heartbeatAcked is ", shard.heartbeatAcked
|
||||
if not hasStartedHeartbeatThread:
|
||||
echo "Starting new heartbeat thread! shard.heartbeatAcked is ", shard.heartbeatAcked
|
||||
asyncCheck shard.handleHeartbeat()
|
||||
hasStartedHeartbeatThread = true
|
||||
else:
|
||||
echo "Not gonna start a new heartbeat thread since. shard.heartbeatAcked is ", shard.heartbeatAcked
|
||||
of ord(DiscordOpCode.opHeartbeatAck):
|
||||
shard.heartbeatAcked = true
|
||||
of ord(DiscordOpCode.opDispatch):
|
||||
asyncCheck handleDiscordEvent(shard, json["d"], json["t"].getStr())
|
||||
of ord(DiscordOpCode.opReconnect):
|
||||
asyncCheck shard.reconnectShard()
|
||||
of ord(DiscordOpCode.opInvalidSession):
|
||||
# If the json field `d` is true then the session may be resumable.
|
||||
if json["d"].getBool():
|
||||
let resume = %* {
|
||||
"op": ord(opResume),
|
||||
"session_id": shard.sessionID,
|
||||
"seq": shard.lastSequence
|
||||
}
|
||||
}
|
||||
|
||||
await shard.sendGatewayRequest(resume)
|
||||
await shard.sendGatewayRequest(resume)
|
||||
else:
|
||||
asyncCheck shard.reconnectShard()
|
||||
else:
|
||||
shard.heartbeatInterval = json["d"]["heartbeat_interval"].getInt()
|
||||
await shard.sendGatewayRequest(shard.getIdentifyPacket())
|
||||
|
||||
# Don't start a new
|
||||
if (not shard.isHandlingHeartbeat):
|
||||
asyncCheck shard.handleHeartbeat()
|
||||
shard.heartbeatAcked = true
|
||||
of ord(DiscordOpCode.opHeartbeatAck):
|
||||
shard.heartbeatAcked = true
|
||||
of ord(DiscordOpCode.opDispatch):
|
||||
asyncCheck handleDiscordEvent(shard, json["d"], json["t"].getStr())
|
||||
of ord(DiscordOpCode.opReconnect):
|
||||
asyncCheck shard.reconnectShard()
|
||||
of ord(DiscordOpCode.opInvalidSession):
|
||||
# If the json field `d` is true then the session may be resumable.
|
||||
if json["d"].getBool():
|
||||
let resume = %* {
|
||||
"op": ord(opResume),
|
||||
"session_id": shard.sessionID,
|
||||
"seq": shard.lastSequence
|
||||
}
|
||||
|
||||
await shard.sendGatewayRequest(resume)
|
||||
else:
|
||||
asyncCheck shard.reconnectShard()
|
||||
else:
|
||||
discard
|
||||
discard
|
||||
|
||||
proc newShard(shardID: int, client: DiscordClient): Shard =
|
||||
return Shard(id: shardID, client: client)
|
||||
|
@ -233,12 +240,15 @@ proc startConnection*(client: DiscordClient, shardAmount: int = 1) {.async.} =
|
|||
|
||||
shard.ws = await newWebSocket(shard.client.endpoint & "/v=6&encoding=json")
|
||||
|
||||
asyncCheck shard.handleWebsocketPacket()
|
||||
await shard.handleWebsocketPacket()
|
||||
|
||||
# Just wait. Don't poll while we're reconnecting
|
||||
while true:
|
||||
#[ while true:
|
||||
if not shard.reconnecting:
|
||||
poll()
|
||||
try:
|
||||
poll()
|
||||
except WebSocketError:
|
||||
echo "WebSocketError" ]#
|
||||
else:
|
||||
raise newException(IOError, "Failed to get gateway url, token may of been incorrect!")
|
||||
|
||||
|
|
Reference in New Issue