From 44d52848aa89abdd64c01534878d41528a28cd34 Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Fri, 25 Sep 2020 16:21:35 -0500 Subject: [PATCH] Fix creating several heartbeat threads --- README.md | 3 +- src/nimcord/client.nim | 118 ++++++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index c81d6e2..5053f27 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,8 @@ You can view examples in the [examples](examples) directory. # Todo: - [x] Finish all REST API calls. - [x] Handle all gateway events. -- [x] Reconnecting +- [x] Reconnecting. +- [ ] Configurable Logger. - [ ] Add library to official Nimble package repository. - [ ] Memory optimizations. - [ ] Member diff --git a/src/nimcord/client.nim b/src/nimcord/client.nim index ad780ec..786284b 100644 --- a/src/nimcord/client.nim +++ b/src/nimcord/client.nim @@ -120,70 +120,77 @@ proc handleGatewayDisconnect(shard: Shard, error: string) {.async.} = #TODO: Reconnecting may be done, just needs testing. proc handleWebsocketPacket(shard: Shard) {.async.} = + var hasStartedHeartbeatThread = false; while true: - var packet = await shard.ws.receiveStrPacket() - shard.client.log.debug("[SHARD " & $shard.id & "] Received gateway payload: " & $packet) + # Skip if the websocket isn't open + if shard.ws.readyState == Open: + var packet = await shard.ws.receiveStrPacket() + shard.client.log.debug("[SHARD " & $shard.id & "] Received gateway payload: " & $packet) - #if packet == Opcode.Close: - # await shard.handleGatewayDisconnect(packet) + #if packet == Opcode.Close: + # await shard.handleGatewayDisconnect(packet) - var json: JsonNode + var json: JsonNode - # If we fail to parse the json just stop this loop - try: - json = parseJson(packet) - except: - shard.client.log.error("[SHARD " & $shard.id & "] Failed to parse websocket payload: " & $packet) - continue + # If we fail to parse the json just stop this loop + try: + json = parseJson(packet) + except: + shard.client.log.error("[SHARD " & $shard.id & "] Failed to parse websocket payload: " & $packet) + continue - if json.contains("s"): - shard.lastSequence = json["s"].getInt() + if json.contains("s"): + shard.lastSequence = json["s"].getInt() - case json["op"].getInt() - of ord(DiscordOpCode.opHello): - if shard.reconnecting: - shard.client.log.info("[SHARD " & $shard.id & "] Reconnected!") - shard.reconnecting = false + case json["op"].getInt() + of ord(DiscordOpCode.opHello): + if shard.reconnecting: + shard.client.log.info("[SHARD " & $shard.id & "] Reconnected!") + shard.reconnecting = false - let resume = %* { - "op": ord(opResume), - "d": { - "token": shard.client.token, + let resume = %* { + "op": ord(opResume), + "d": { + "token": shard.client.token, + "session_id": shard.sessionID, + "seq": shard.lastSequence + } + } + + await shard.sendGatewayRequest(resume) + else: + shard.heartbeatInterval = json["d"]["heartbeat_interval"].getInt() + await shard.sendGatewayRequest(shard.getIdentifyPacket()) + + # Don't start a new heartbeat thread if one is already started + echo "About to start a heartbeat thread! shard.heartbeatAcked is ", shard.heartbeatAcked + if not hasStartedHeartbeatThread: + echo "Starting new heartbeat thread! shard.heartbeatAcked is ", shard.heartbeatAcked + asyncCheck shard.handleHeartbeat() + hasStartedHeartbeatThread = true + else: + echo "Not gonna start a new heartbeat thread since. shard.heartbeatAcked is ", shard.heartbeatAcked + of ord(DiscordOpCode.opHeartbeatAck): + shard.heartbeatAcked = true + of ord(DiscordOpCode.opDispatch): + asyncCheck handleDiscordEvent(shard, json["d"], json["t"].getStr()) + of ord(DiscordOpCode.opReconnect): + asyncCheck shard.reconnectShard() + of ord(DiscordOpCode.opInvalidSession): + # If the json field `d` is true then the session may be resumable. + if json["d"].getBool(): + let resume = %* { + "op": ord(opResume), "session_id": shard.sessionID, "seq": shard.lastSequence } - } - await shard.sendGatewayRequest(resume) + await shard.sendGatewayRequest(resume) + else: + asyncCheck shard.reconnectShard() else: - shard.heartbeatInterval = json["d"]["heartbeat_interval"].getInt() - await shard.sendGatewayRequest(shard.getIdentifyPacket()) - - # Don't start a new - if (not shard.isHandlingHeartbeat): - asyncCheck shard.handleHeartbeat() - shard.heartbeatAcked = true - of ord(DiscordOpCode.opHeartbeatAck): - shard.heartbeatAcked = true - of ord(DiscordOpCode.opDispatch): - asyncCheck handleDiscordEvent(shard, json["d"], json["t"].getStr()) - of ord(DiscordOpCode.opReconnect): - asyncCheck shard.reconnectShard() - of ord(DiscordOpCode.opInvalidSession): - # If the json field `d` is true then the session may be resumable. - if json["d"].getBool(): - let resume = %* { - "op": ord(opResume), - "session_id": shard.sessionID, - "seq": shard.lastSequence - } - - await shard.sendGatewayRequest(resume) - else: - asyncCheck shard.reconnectShard() - else: - discard + discard proc newShard(shardID: int, client: DiscordClient): Shard = return Shard(id: shardID, client: client) @@ -233,12 +240,15 @@ proc startConnection*(client: DiscordClient, shardAmount: int = 1) {.async.} = shard.ws = await newWebSocket(shard.client.endpoint & "/v=6&encoding=json") - asyncCheck shard.handleWebsocketPacket() + await shard.handleWebsocketPacket() # Just wait. Don't poll while we're reconnecting - while true: + #[ while true: if not shard.reconnecting: - poll() + try: + poll() + except WebSocketError: + echo "WebSocketError" ]# else: raise newException(IOError, "Failed to get gateway url, token may of been incorrect!")