diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index bae040c..7b002b9 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -48,7 +47,10 @@ class NonRealTime : public SCUnit public: using Cache = std::unordered_map; - static Cache mCache; + using RTCacheMirror = std::unordered_map; + + static Cache mCache; + static RTCacheMirror mRTCache; private: static bool isNull(WeakCacheEntryPointer const& weak) @@ -57,58 +59,74 @@ private: !WeakCacheEntryPointer{}.owner_before(weak); } - // https://rigtorp.se/spinlock/ - struct Spinlock + // shouldn't be called without at least *thinking* about getting spin lock + // first + static WeakCacheEntryPointer unsafeGet(index id) { - std::atomic lock_ = {0}; + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } + + + static WeakCacheEntryPointer rtget(index id) + { + auto lookup = mRTCache.find(id); + return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second; + } - void lock() noexcept + using RawCacheEntry = typename Cache::value_type; + + struct addToRTCache + { + void operator()(World* w, RawCacheEntry& r) { - for (;;) + FifoMsg msg; + auto add = [](FifoMsg* m) { + RawCacheEntry* r = static_cast(m->mData); + auto res = mRTCache.emplace(r->first, r->second); + if (!res.second) { std::cout << "ERROR: Could not add to RT cache"; } + }; + msg.Set(w, add, nullptr, &r); + auto ft = Wrapper::getInterfaceTable(); + if (!ft->fSendMsgToRT(w, msg)) { - // Optimistically assume the lock is free on the first try - if (!lock_.exchange(true, std::memory_order_acquire)) { return; } - // Wait for lock to be released without generating cache misses - while (lock_.load(std::memory_order_relaxed)) - { - // Issue X86 PAUSE or ARM YIELD instruction to reduce contention - // between hyper-threads - _mm_pause(); - } + std::cout << "ERROR: Message to RT failed"; } } + }; - bool try_lock() noexcept + struct removeFromRTCache + { + void operator()(World* world, index id) { - // First do a relaxed load to check if lock is free in order to prevent - // unnecessary cache misses if someone does while(!try_lock()) - return !lock_.load(std::memory_order_relaxed) && - !lock_.exchange(true, std::memory_order_acquire); - } + index* data = new index(); + *data = id; - void unlock() noexcept { lock_.store(false, std::memory_order_release); } - }; + FifoMsg msg; + + auto remove = [](FifoMsg* m) { + int* id = static_cast(m->mData); + mRTCache.erase(*id); + }; - static Spinlock mSpinlock; - using ScopedSpinLock = std::unique_lock; + auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; + + msg.Set(world, remove, cleanup, data); + } + }; - // shouldn't be called without at least *thinking* about getting spin lock - // first - static WeakCacheEntryPointer unsafeGet(index id) - { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; - } public: static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } - static WeakCacheEntryPointer add(index id, const Params& params) + static WeakCacheEntryPointer add(World* world, index id, const Params& params) { if (isNull(get(id))) { auto result = mCache.emplace(id, std::make_shared(params)); + addToRTCache{}(world, *(result.first)); + return result.second ? (result.first)->second : WeakCacheEntryPointer(); // sob } @@ -120,7 +138,11 @@ public: } } - static void remove(index id) { mCache.erase(id); } + static void remove(World* world, index id) + { + mCache.erase(id); + removeFromRTCache{}(world, id); + } static void printNotFound(index id) { @@ -143,7 +165,6 @@ private: typename ClientParams::template Setter; - struct NRTCommand { NRTCommand(World*, sc_msg_iter* args, void* replyAddr, @@ -224,14 +245,11 @@ private: bool stage2(World* w) { - // auto entry = ; - ScopedSpinLock lock{mSpinlock}; - Result constraintsRes = validateParameters(mParams); if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); - mResult = (!isNull(add(NRTCommand::mID, mParams))); + mResult = (!isNull(add(w, NRTCommand::mID, mParams))); // Sigh. The cache entry above has both the client instance and main // params instance. @@ -280,150 +298,21 @@ private: return cmd.c_str(); } - bool stage2(World*) + bool stage2(World* world) { - ScopedSpinLock lock(mSpinlock); cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); - remove(NRTCommand::mID); + remove(world, NRTCommand::mID); NRTCommand::sendReply(name(), true); return true; } }; - /// Not registered as a PlugInCmd. Triggered by worker thread callback - struct CommandAsyncComplete : public NRTCommand - { - CommandAsyncComplete(World*, index id, void* replyAddress) - { - NRTCommand::mID = id; - NRTCommand::mReplyAddress = replyAddress; - } - - static const char* name() { return CommandProcess::name(); } - - bool stage2(World* world) - { - - // std::cout << "In Async completion\n"; - ScopedSpinLock lock{mSpinlock}; - if (auto ptr = get(NRTCommand::mID).lock()) - { - Result r; - mRecord = ptr; - auto& client = ptr->mClient; - ProcessState s = client.checkProgress(r); - if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) - { - if (r.status() == Result::Status::kCancelled) - { - std::cout << Wrapper::getName() << ": Processing cancelled" - << std::endl; - ptr->mDone.store(true, std::memory_order_relaxed); - return false; - } - - client.checkProgress(r); - mSuccess = !(r.status() == Result::Status::kError); - if (!r.ok()) - { - Wrapper::printResult(world, r); - if (r.status() == Result::Status::kError) - { - ptr->mDone.store(true, std::memory_order_relaxed); - return false; - } - } - // if we're progressing to stage3, don't unlock the lock just yet - lock.release(); - return true; - } - } - return false; - } - - bool stage3(World* world) - { - ScopedSpinLock lock(mSpinlock, std::adopt_lock); - if (auto ptr = mRecord.lock()) - { - auto& params = ptr->mParams; - params.template forEachParamType(world); - return true; - } - return false; - } - - bool stage4(World*) // nrt - { - ScopedSpinLock lock(mSpinlock); - if (auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template forEachParamType(); - - if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) - { - NRTCommand::sendReply(name(), mSuccess); - } - ptr->mDone.store(true, std::memory_order_relaxed); // = true; - return true; - } - std::cout << "ERROR: Failed to lock\n"; - return false; - } - - bool mSuccess; - WeakCacheEntryPointer mRecord; - }; - - - static void doProcessCallback(World* world, index id, - size_t completionMsgSize, - char* completionMessage, void* replyAddress) - { - auto ft = getInterfaceTable(); - struct Context - { - World* mWorld; - index mID; - size_t mCompletionMsgSize; - char* mCompletionMessage; - void* mReplyAddress; - }; - - Context* c = new Context{world, id, completionMsgSize, completionMessage, - replyAddress}; - - auto launchCompletionFromNRT = [](FifoMsg* inmsg) { - auto runCompletion = [](FifoMsg* msg) { - Context* c = static_cast(msg->mData); - World* world = c->mWorld; - index id = c->mID; - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); - CommandAsyncComplete* cmd = - new (space) CommandAsyncComplete(world, id, c->mReplyAddress); - runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, - c->mCompletionMessage); - if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); - }; - - auto tidyup = [](FifoMsg* msg) { - Context* c = static_cast(msg->mData); - delete c; - }; - - auto ft = getInterfaceTable(); - FifoMsg fwd = *inmsg; - fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); - if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd); - }; - - FifoMsg msg; - msg.Set(world, launchCompletionFromNRT, nullptr, c); - - if (world->mRunning) ft->fSendMsgFromRT(world, msg); - } +// struct UnitInfo +// { +// int mSynthIndex; +// int mNodeID{0}; +// }; struct CommandProcess : public NRTCommand { @@ -431,8 +320,7 @@ private: : NRTCommand{world, args, replyAddr}, mParams{Client::getParameterDescriptors()} { - auto& ar = *args; - ScopedSpinLock lock(mSpinlock); + auto& ar = *args; if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mDone.store(false, std::memory_order_relaxed); @@ -453,7 +341,6 @@ private: } } - static const char* name() { static std::string cmd = std::string(Wrapper::getName()) + "/process"; @@ -462,7 +349,6 @@ private: bool stage2(World* world) { - ScopedSpinLock lock(mSpinlock); mRecord = get(NRTCommand::mID); if (auto ptr = mRecord.lock()) { @@ -480,7 +366,6 @@ private: size_t completionMsgSize = mCompletionMsgSize; char* completionMessage = mCompletionMessage; void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - auto callback = [world, id, completionMsgSize, completionMessage, replyAddress]() { doProcessCallback(world, id, completionMsgSize, completionMessage, @@ -491,7 +376,7 @@ private: : client.enqueue(params, callback); Wrapper::printResult(world, result); - if (result.ok()) + if (result.status() != Result::Status::kError) { ptr->mDone.store(false, std::memory_order_relaxed); mResult = client.process(); @@ -499,10 +384,10 @@ private: bool error = mResult.status() == Result::Status::kError; - if (error) ptr->mDone.store(true, std::memory_order_relaxed); + if (error) + ptr->mDone.store(true, + std::memory_order_relaxed); bool toStage3 = mSynchronous && !error; - - if (toStage3) lock.release(); return toStage3; } } @@ -519,7 +404,6 @@ private: // Only for blocking execution bool stage3(World* world) // rt { - ScopedSpinLock lock(mSpinlock, std::adopt_lock); if (auto ptr = mRecord.lock()) { ptr->mParams.template forEachParamType(world); @@ -529,22 +413,21 @@ private: } // Only for blocking execution - bool stage4(World*) // nrt + bool stage4(World* w) // nrt { - ScopedSpinLock lock(mSpinlock); if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mParams.template forEachParamType(); if (NRTCommand::mID >= 0 && mSynchronous) - NRTCommand::sendReply(name(), mResult.ok()); + NRTCommand::sendReply(name(), + mResult.status() != Result::Status::kError); ptr->mDone.store(true, std::memory_order_relaxed); return true; } return false; } - bool synchronous() { return mSynchronous; } void addCompletionMessage(size_t size, char* message) //, void* addr) @@ -553,7 +436,7 @@ private: mCompletionMessage = message; } - // private: + // private: Result mResult; bool mSynchronous; size_t mCompletionMsgSize{0}; @@ -561,8 +444,191 @@ private: Params mParams; bool mOverwriteParams{false}; WeakCacheEntryPointer mRecord; + + }; + + + /// Not registered as a PlugInCmd. Triggered by worker thread callback + struct CommandAsyncComplete : public NRTCommand + { + CommandAsyncComplete(World*, index id, void* replyAddress) + { + NRTCommand::mID = id; + NRTCommand::mReplyAddress = replyAddress; + } + + static const char* name() { return CommandProcess::name(); } + + bool stage2(World* world) + { + if (auto ptr = get(NRTCommand::mID).lock()) + { + Result r; + mRecord = ptr; + auto& client = ptr->mClient; + ProcessState s = client.checkProgress(r); + if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) + { + if (r.status() == Result::Status::kCancelled) + { + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + + client.checkProgress(r); + mSuccess = !(r.status() == Result::Status::kError); + Wrapper::printResult(world, r); + if (!mSuccess) + { + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + // if we're progressing to stage3, don't unlock the lock just yet + // lock.release(); + return true; + } + } + std::cout << "Error: Entered callback but thread not done!?\n"; + return false; + } + + bool stage3(World* world) + { + if (auto ptr = mRecord.lock()) + { + auto& params = ptr->mParams; + params.template forEachParamType(world); + } + return true; + } + + bool stage4(World* w) // nrt + { + if (auto ptr = get(NRTCommand::mID).lock()) + { + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) + { + NRTCommand::sendReply(name(), mSuccess); + } + + ptr->mDone.store(true, std::memory_order_relaxed); // = true; + return true; + } + std::cout << "ERROR: Failed to lock\n"; + return false; + } + + + // void notifyUnit(World* w) + // { + // if(mUnitInfo.mNodeID > 0) + // { + // auto ft = Wrapper::getInterfaceTable(); + // + // NRTDoneCount++; + // FifoMsg msg; + // + // auto updateUnitDone = [](FifoMsg* m) + // { + // UnitInfo* info = static_cast(m->mData); + // + // auto ft = Wrapper::getInterfaceTable(); + // Graph* g = ft->fGetGraph(m->mWorld,info->mNodeID); + // if(g) + // { + // Unit* u = g->mUnits[info->mSynthIndex]; + // if(u) + // { + // RTDoneCount++; + // u->mDone = true; + // } + // } + // }; + // + // msg.Set(w, updateUnitDone, nullptr, &mUnitInfo); + // ft->fSendMsgToRT(w,msg); + // } + // } + + + bool mSuccess; + WeakCacheEntryPointer mRecord; }; + + static void doProcessCallback(World* world, index id, + size_t completionMsgSize, + char* completionMessage, void* replyAddress) + { + auto ft = getInterfaceTable(); + struct Context + { + World* mWorld; + index mID; + size_t mCompletionMsgSize; + char* mCompletionMessage; + void* mReplyAddress; + }; + + Context* c = new Context{world, id, completionMsgSize, completionMessage, + replyAddress}; + + auto launchCompletionFromNRT = [](FifoMsg* inmsg) { + + auto runCompletion = [](FifoMsg* msg) { + + Context* c = static_cast(msg->mData); + World* world = c->mWorld; + index id = c->mID; + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); + CommandAsyncComplete* cmd = + new (space) CommandAsyncComplete(world, id, c->mReplyAddress); + if (runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, + c->mCompletionMessage) != 0) + { + std::cout << "ERROR: Async cmf failed in callback" << std::endl; + } + if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); + }; + + auto tidyup = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + delete c; + }; + + auto ft = getInterfaceTable(); + FifoMsg fwd = *inmsg; + fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); + if (inmsg->mWorld->mRunning) + if (!ft->fSendMsgToRT(inmsg->mWorld, fwd)) + { + std::cout << "ERROR: Failed to queue -> RT\n"; + } + }; + + FifoMsg msg; + msg.Set(world, launchCompletionFromNRT, nullptr, c); + + if (world->mRunning) + { + ft->fNRTLock(world); + msg.Perform(); + // if(!ft->fSendMsgFromRT(world, msg)) + // { + // std::cout << "ERROR: Failed to queue -> NRT\n"; + // } + ft->fNRTUnlock(world); + } + else + std::cout << "ERROR: World not running??"; + } + + struct CommandProcessNew : public NRTCommand { CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) @@ -572,10 +638,6 @@ private: mProcess.mReplyAddress = mNew.mReplyAddress; } - CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) - : mNew{id, world, args, x}, mProcess{id} - {} - static const char* name() { static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; @@ -630,13 +692,14 @@ private: bool stage2(World*) { - ScopedSpinLock lock(mSpinlock); if (auto ptr = get(NRTCommand::mID).lock()) { auto& client = ptr->mClient; if (!client.synchronous()) { client.cancel(); + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; return true; } } @@ -649,8 +712,7 @@ private: CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) : NRTCommand{world, args, replyAddr} { - auto& ar = *args; - ScopedSpinLock lock(mSpinlock); + auto& ar = *args; if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mParams.template setParameterValuesRT(nullptr, @@ -773,33 +835,31 @@ private: NRTProgressUnit() { mInterval = static_cast(0.02 / controlDur()); + mID = static_cast(mInBuf[0][0]); + std::cout << mID << std::endl; + mRecord = rtget(mID); set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } void next(int) { - if (0 == mCounter) - { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (!lock.owns_lock()) return; - index id = static_cast(mInBuf[0][0]); - auto record = get(id); - mCounter++; + if (isNull(mRecord)) { mRecord = rtget(mID); }; - if (auto ptr = record.lock()) + if (0 == mCounter++) + { + if (auto ptr = mRecord.lock()) { mInit = true; - if (ptr->mClient.done()) mDone = 1; + mDone = ptr->mDone.load(std::memory_order_relaxed); out0(0) = static_cast(ptr->mClient.progress()); } else { if (!mInit) std::cout << "WARNING: No " << Wrapper::getName() << " with ID " - << id << std::endl; + << mID << std::endl; else mDone = 1; } @@ -808,9 +868,11 @@ private: } private: - index mInterval; - index mCounter{0}; - bool mInit{false}; + index mID; + index mInterval; + index mCounter{0}; + bool mInit{false}; + WeakCacheEntryPointer mRecord; }; @@ -845,7 +907,11 @@ private: if (mID == -1) mID = count(); auto cmd = NonRealTime::rtalloc(mWorld, mID, mWorld, mControlsIterator, this); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in NRTTriggerUnit()" + << std::endl; + } set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } @@ -854,12 +920,17 @@ private: { set_calc_function(); auto cmd = NonRealTime::rtalloc(mWorld, mID); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in ~NRTTriggerUnit()" + << std::endl; + } } void clear(int) { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + Wrapper::getInterfaceTable()->fClearUnitOutputs( + this, static_cast(mNumOutputs)); } void next(int) @@ -879,18 +950,18 @@ private: bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = - rtalloc(mWorld, mID, blocking, &mParams); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + CommandProcess* cmd = rtalloc(mWorld, mID, blocking, + &mParams); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in NRTTriggerUnit::next()" + << std::endl; + } mRunCount++; } else { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (!lock.owns_lock()) return; - - auto record = get(mID); + auto record = rtget(mID); if (auto ptr = record.lock()) { mInit = true; @@ -902,7 +973,6 @@ private: mDone = mInit; } } - private: bool mPreviousTrigger{false}; bool mTrigger{false}; @@ -945,42 +1015,32 @@ private: void init() { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (lock.owns_lock()) + mInit = false; + mInst = rtget(mID); + if (auto ptr = mInst.lock()) { - mInit = false; - mInst = get(mID); - if (auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - mDelegate.init(*this, client, mControls); - mInit = true; - } + auto& client = ptr->mClient; + mDelegate.init(*this, client, mControls); + mInit = true; } } void next(int) { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, + asSigned(mNumOutputs)); index id = static_cast(in0(1)); if (mID != id) init(); if (!mInit) return; - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - if (lock.owns_lock()) - ; + if (auto ptr = mInst.lock()) { - if (auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - auto& params = ptr->mParams; - mControls.reset(mInBuf + ControlOffset()); - mDelegate.next(*this, client, params, mControls, - ptr.use_count() == 2); - } - else - printNotFound(id); + auto& client = ptr->mClient; + auto& params = ptr->mParams; + mControls.reset(mInBuf + ControlOffset()); + mDelegate.next(*this, client, params, mControls, ptr.use_count() == 2); } + else + printNotFound(id); } private: @@ -1012,7 +1072,6 @@ private: { void operator()() { - // std::cout << CommandType::name() << std::endl; defineNRTCommand(); } }; @@ -1032,13 +1091,11 @@ private: } }; - using IsRTQueryModel_t = typename Client::isRealTime; static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; static constexpr bool IsModel = Client::isModelObject::value; - - + public: static void setup(InterfaceTable* ft, const char*) { @@ -1113,9 +1170,8 @@ typename NonRealTime::Cache NonRealTime::mCache{}; template -typename NonRealTime::Spinlock - NonRealTime::mSpinlock{}; - +typename NonRealTime::RTCacheMirror + NonRealTime::mRTCache; } // namespace impl } // namespace client