discord-rpc/src/discord-rpc.cpp

336 lines
9.5 KiB
C++
Raw Normal View History

2017-06-30 23:18:54 +00:00
#include "discord-rpc.h"
2017-07-18 18:10:39 +00:00
#include "backoff.h"
2017-09-14 15:59:32 +00:00
#include "discord_register.h"
#include "rpc_connection.h"
#include "serialization.h"
2017-06-30 23:18:54 +00:00
#include <atomic>
#include <chrono>
#include <mutex>
2017-07-18 18:10:39 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
#include <condition_variable>
#include <thread>
2017-07-18 18:10:39 +00:00
#endif
constexpr size_t MaxMessageSize{16 * 1024};
constexpr size_t MessageQueueSize{8};
2017-07-17 22:42:49 +00:00
struct QueuedMessage {
size_t length;
char buffer[MaxMessageSize];
void Copy(const QueuedMessage& other)
{
length = other.length;
if (length) {
memcpy(buffer, other.buffer, length);
}
}
2017-07-17 22:42:49 +00:00
};
static RpcConnection* Connection{nullptr};
2017-06-30 23:18:54 +00:00
static DiscordEventHandlers Handlers{};
static std::atomic_bool WasJustConnected{false};
static std::atomic_bool WasJustDisconnected{false};
2017-07-24 21:58:53 +00:00
static std::atomic_bool GotErrorMessage{false};
static std::atomic_bool WasJoinGame{false};
static std::atomic_bool WasSpectateGame{false};
static char JoinGameSecret[256];
static char SpectateGameSecret[256];
2017-07-17 22:42:49 +00:00
static int LastErrorCode{0};
2017-07-13 15:32:08 +00:00
static char LastErrorMessage[256];
2017-07-24 21:58:53 +00:00
static int LastDisconnectErrorCode{0};
static char LastDisconnectErrorMessage[256];
static std::mutex PresenceMutex;
static QueuedMessage QueuedPresence{};
2017-07-17 22:42:49 +00:00
static QueuedMessage SendQueue[MessageQueueSize]{};
static std::atomic_uint SendQueueNextAdd{0};
static std::atomic_uint SendQueueNextSend{0};
static std::atomic_uint SendQueuePendingSends{0};
2017-07-28 16:45:11 +00:00
// We want to auto connect, and retry on failure, but not as fast as possible. This does expoential
// backoff from 0.5 seconds to 1 minute
2017-07-18 18:10:39 +00:00
static Backoff ReconnectTimeMs(500, 60 * 1000);
static auto NextConnect{std::chrono::system_clock::now()};
static int Pid{0};
static int Nonce{1};
2017-07-17 22:42:49 +00:00
2017-07-18 16:47:33 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
static std::atomic_bool KeepRunning{true};
2017-07-18 16:47:33 +00:00
static std::mutex WaitForIOMutex;
static std::condition_variable WaitForIOActivity;
static std::thread IoThread;
#endif // DISCORD_DISABLE_IO_THREAD
2017-07-18 18:10:39 +00:00
static void UpdateReconnectTime()
{
2017-07-25 16:27:48 +00:00
NextConnect = std::chrono::system_clock::now() +
std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
2017-07-18 18:10:39 +00:00
}
2017-07-25 16:27:48 +00:00
static QueuedMessage* SendQueueGetNextAddMessage()
{
2017-07-28 16:45:53 +00:00
// if we are not connected, let's not batch up stale messages for later
if (!Connection || !Connection->IsOpen()) {
return nullptr;
}
2017-07-17 22:42:49 +00:00
// if we are falling behind, bail
if (SendQueuePendingSends.load() >= MessageQueueSize) {
return nullptr;
}
auto index = (SendQueueNextAdd++) % MessageQueueSize;
return &SendQueue[index];
}
2017-07-25 16:27:48 +00:00
static QueuedMessage* SendQueueGetNextSendMessage()
{
2017-07-17 22:42:49 +00:00
auto index = (SendQueueNextSend++) % MessageQueueSize;
return &SendQueue[index];
}
2017-07-25 16:27:48 +00:00
static void SendQueueCommitMessage()
{
2017-07-17 22:42:49 +00:00
SendQueuePendingSends++;
}
2017-09-14 15:59:32 +00:00
DISCORD_EXPORT void Discord_UpdateConnection()
{
2017-09-07 16:23:35 +00:00
if (!Connection) {
return;
}
if (!Connection->IsOpen()) {
2017-07-18 18:10:39 +00:00
if (std::chrono::system_clock::now() >= NextConnect) {
UpdateReconnectTime();
Connection->Open();
}
}
else {
// reads
for (;;) {
2017-07-25 16:06:48 +00:00
JsonDocument message;
if (!Connection->Read(message)) {
break;
}
const char* evtName = GetStrMember(&message, "evt");
const char* nonce = GetStrMember(&message, "nonce");
if (nonce) {
// in responses only -- should use to match up response when needed.
if (evtName && strcmp(evtName, "ERROR") == 0) {
auto data = GetObjMember(&message, "data");
LastErrorCode = GetIntMember(data, "code");
StringCopy(LastErrorMessage, GetStrMember(data, "message", ""));
GotErrorMessage.store(true);
}
}
else {
// should have evt == name of event, optional data
if (evtName == nullptr) {
continue;
}
if (strcmp(evtName, "GAME_JOIN") == 0) {
auto data = GetObjMember(&message, "data");
auto secret = GetStrMember(data, "secret");
if (secret) {
StringCopy(JoinGameSecret, secret);
WasJoinGame.store(true);
}
}
else if (strcmp(evtName, "GAME_SPECTATE") == 0) {
auto data = GetObjMember(&message, "data");
auto secret = GetStrMember(data, "secret");
if (secret) {
StringCopy(SpectateGameSecret, secret);
WasSpectateGame.store(true);
}
}
}
}
2017-07-17 22:42:49 +00:00
// writes
if (QueuedPresence.length) {
QueuedMessage local;
PresenceMutex.lock();
local.Copy(QueuedPresence);
QueuedPresence.length = 0;
PresenceMutex.unlock();
if (!Connection->Write(local.buffer, local.length)) {
// if we fail to send, requeue
PresenceMutex.lock();
QueuedPresence.Copy(local);
PresenceMutex.unlock();
}
}
2017-07-17 22:42:49 +00:00
while (SendQueuePendingSends.load()) {
auto qmessage = SendQueueGetNextSendMessage();
Connection->Write(qmessage->buffer, qmessage->length);
--SendQueuePendingSends;
}
}
}
2017-07-18 16:47:33 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
void DiscordRpcIo()
{
const std::chrono::duration<int64_t, std::milli> maxWait{500LL};
2017-07-25 16:27:48 +00:00
while (KeepRunning.load()) {
Discord_UpdateConnection();
std::unique_lock<std::mutex> lock(WaitForIOMutex);
WaitForIOActivity.wait_for(lock, maxWait);
}
}
2017-07-18 16:47:33 +00:00
#endif
void SignalIOActivity()
{
2017-07-18 16:47:33 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
WaitForIOActivity.notify_all();
2017-07-18 16:47:33 +00:00
#endif
}
2017-06-30 23:18:54 +00:00
bool RegisterForEvent(const char* evtName)
{
auto qmessage = SendQueueGetNextAddMessage();
if (qmessage) {
2017-07-25 16:27:48 +00:00
qmessage->length =
JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
SendQueueCommitMessage();
SignalIOActivity();
return true;
}
return false;
}
2017-09-14 15:59:32 +00:00
DISCORD_EXPORT void Discord_Initialize(const char* applicationId,
2017-07-27 20:52:37 +00:00
DiscordEventHandlers* handlers,
int autoRegister,
const char* optionalSteamId)
2017-06-30 23:18:54 +00:00
{
2017-07-27 20:29:24 +00:00
if (autoRegister) {
if (optionalSteamId && optionalSteamId[0]) {
Discord_RegisterSteamGame(applicationId, optionalSteamId);
}
else {
Discord_Register(applicationId, nullptr);
}
2017-07-27 20:29:24 +00:00
}
2017-07-20 21:59:32 +00:00
Pid = GetProcessId();
2017-06-30 23:18:54 +00:00
if (handlers) {
Handlers = *handlers;
}
else {
Handlers = {};
}
2017-08-03 17:47:27 +00:00
if (Connection) {
return;
}
Connection = RpcConnection::Create(applicationId);
Connection->onConnect = []() {
WasJustConnected.exchange(true);
2017-07-18 18:10:39 +00:00
ReconnectTimeMs.reset();
if (Handlers.joinGame) {
RegisterForEvent("GAME_JOIN");
}
if (Handlers.spectateGame) {
RegisterForEvent("GAME_SPECTATE");
}
};
Connection->onDisconnect = [](int err, const char* message) {
2017-07-24 21:58:53 +00:00
LastDisconnectErrorCode = err;
StringCopy(LastDisconnectErrorMessage, message);
WasJustDisconnected.exchange(true);
2017-07-18 18:10:39 +00:00
UpdateReconnectTime();
2017-07-17 16:28:54 +00:00
};
2017-07-18 16:47:33 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
2017-08-03 17:47:27 +00:00
KeepRunning.store(true);
IoThread = std::thread(DiscordRpcIo);
2017-07-18 16:47:33 +00:00
#endif
2017-06-30 23:18:54 +00:00
}
2017-09-14 15:59:32 +00:00
DISCORD_EXPORT void Discord_Shutdown()
2017-06-30 23:18:54 +00:00
{
2017-08-03 17:47:27 +00:00
if (!Connection) {
return;
}
Connection->onConnect = nullptr;
Connection->onDisconnect = nullptr;
2017-06-30 23:18:54 +00:00
Handlers = {};
2017-07-18 16:47:33 +00:00
#ifndef DISCORD_DISABLE_IO_THREAD
KeepRunning.exchange(false);
SignalIOActivity();
if (IoThread.joinable()) {
IoThread.join();
}
2017-07-18 16:47:33 +00:00
#endif
RpcConnection::Destroy(Connection);
2017-06-30 23:18:54 +00:00
}
2017-09-14 15:59:32 +00:00
DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence)
2017-06-30 23:18:54 +00:00
{
PresenceMutex.lock();
QueuedPresence.length = JsonWriteRichPresenceObj(
QueuedPresence.buffer, sizeof(QueuedPresence.buffer), Nonce++, Pid, presence);
PresenceMutex.unlock();
SignalIOActivity();
2017-06-30 23:18:54 +00:00
}
2017-07-07 16:41:20 +00:00
2017-09-14 15:59:32 +00:00
DISCORD_EXPORT void Discord_RunCallbacks()
2017-07-07 16:41:20 +00:00
{
// Note on some weirdness: internally we might connect, get other signals, disconnect any number
// of times inbetween calls here. Externally, we want the sequence to seem sane, so any other
// signals are book-ended by calls to ready and disconnect.
2017-07-24 21:58:53 +00:00
2017-09-07 16:23:35 +00:00
if (!Connection) {
return;
}
bool wasDisconnected = WasJustDisconnected.exchange(false);
bool isConnected = Connection->IsOpen();
if (isConnected) {
// if we are connected, disconnect cb first
if (wasDisconnected && Handlers.disconnected) {
Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
}
2017-07-07 16:41:20 +00:00
}
if (WasJustConnected.exchange(false) && Handlers.ready) {
2017-07-07 16:41:20 +00:00
Handlers.ready();
}
if (GotErrorMessage.exchange(false) && Handlers.errored) {
Handlers.errored(LastErrorCode, LastErrorMessage);
}
if (WasJoinGame.exchange(false) && Handlers.joinGame) {
Handlers.joinGame(JoinGameSecret);
}
if (WasSpectateGame.exchange(false) && Handlers.spectateGame) {
Handlers.spectateGame(SpectateGameSecret);
}
if (!isConnected) {
// if we are not connected, disconnect message last
if (wasDisconnected && Handlers.disconnected) {
Handlers.disconnected(LastDisconnectErrorCode, LastDisconnectErrorMessage);
}
}
2017-07-07 16:41:20 +00:00
}