From dcdc7f8f31b4ec16b6cd77e886b0a3634d567abf Mon Sep 17 00:00:00 2001 From: Owen Green Date: Sun, 20 Jun 2021 11:29:33 +0100 Subject: [PATCH 1/8] Better safety, worse contention Hold on to locks more strictly. Improves safety under load, but is slower. --- include/wrapper/NonRealtime.hpp | 1912 ++++++++++++++++--------------- 1 file changed, 968 insertions(+), 944 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 5df5f29..bae040c 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include "BufferFuncs.hpp" #include "CopyReplyAddress.hpp" @@ -9,1090 +9,1114 @@ #include #include #include +#include +#include #include #include namespace fluid { namespace client { -namespace impl { - - /// Non Real Time Processor - - template - class NonRealTime : public SCUnit +namespace impl { + +/// Non Real Time Processor + +template +class NonRealTime : public SCUnit +{ + using Params = typename Client::ParamSetType; + + template + static T* rtalloc(World* world, Args&&... args) + { + void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); + return new (space) T{std::forward(args)...}; + } + + /// Instance cache + struct CacheEntry { - using Params = typename Client::ParamSetType; - - template - static T* rtalloc(World* world,Args&&...args) + CacheEntry(const Params& p) : mParams{p}, mClient{mParams} {} + + Params mParams; + Client mClient; + std::atomic mDone{false}; + }; + + using CacheEntryPointer = std::shared_ptr; + using WeakCacheEntryPointer = + std::weak_ptr; // could use weak_type in 17 + +public: + using Cache = std::unordered_map; + static Cache mCache; + +private: + static bool isNull(WeakCacheEntryPointer const& weak) + { + return !weak.owner_before(WeakCacheEntryPointer{}) && + !WeakCacheEntryPointer{}.owner_before(weak); + } + + // https://rigtorp.se/spinlock/ + struct Spinlock + { + std::atomic lock_ = {0}; + + void lock() noexcept { - void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); - return new (space) T{std::forward(args)...}; + for (;;) + { + // Optimistically assume the lock is free on the first try + if (!lock_.exchange(true, std::memory_order_acquire)) { return; } + // Wait for lock to be released without generating cache misses + while (lock_.load(std::memory_order_relaxed)) + { + // Issue X86 PAUSE or ARM YIELD instruction to reduce contention + // between hyper-threads + _mm_pause(); + } + } } - - /// Instance cache - struct CacheEntry + + bool try_lock() noexcept { - - CacheEntry(const Params& p):mParams{p},mClient{mParams} - {} - - Params mParams; - Client mClient; - bool mDone{false}; - }; - - using CacheEntryPointer = std::shared_ptr; - using WeakCacheEntryPointer = std::weak_ptr; //could use weak_type in 17 - - public: - using Cache = std::unordered_map; - static Cache mCache; - private: - static bool isNull(WeakCacheEntryPointer const& weak) { - return !weak.owner_before(WeakCacheEntryPointer{}) && !WeakCacheEntryPointer{}.owner_before(weak); + // First do a relaxed load to check if lock is free in order to prevent + // unnecessary cache misses if someone does while(!try_lock()) + return !lock_.load(std::memory_order_relaxed) && + !lock_.exchange(true, std::memory_order_acquire); } - - // https://rigtorp.se/spinlock/ - struct Spinlock { - std::atomic lock_ = {0}; - - void lock() noexcept { - for (;;) { - // Optimistically assume the lock is free on the first try - if (!lock_.exchange(true, std::memory_order_acquire)) { - return; - } - // Wait for lock to be released without generating cache misses - while (lock_.load(std::memory_order_relaxed)) { - // Issue X86 PAUSE or ARM YIELD instruction to reduce contention between - // hyper-threads - //__builtin_ia32_pause(); - } - } - } - - bool tryLock() noexcept { - // First do a relaxed load to check if lock is free in order to prevent - // unnecessary cache misses if someone does while(!try_lock()) - return !lock_.load(std::memory_order_relaxed) && - !lock_.exchange(true, std::memory_order_acquire); - } - - void unlock() noexcept { - lock_.store(false, std::memory_order_release); - } - }; - - //RAII for above - struct ScopedSpinlock + + void unlock() noexcept { lock_.store(false, std::memory_order_release); } + }; + + static Spinlock mSpinlock; + using ScopedSpinLock = std::unique_lock; + + // shouldn't be called without at least *thinking* about getting spin lock + // first + static WeakCacheEntryPointer unsafeGet(index id) + { + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } + +public: + static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } + + static WeakCacheEntryPointer add(index id, const Params& params) + { + if (isNull(get(id))) { - ScopedSpinlock(Spinlock& _l) noexcept: mLock{_l} - { - mLock.lock(); - } - ~ScopedSpinlock() { mLock.unlock(); } - private: - Spinlock& mLock; - }; - - static Spinlock mSpinlock; - - // shouldn't be called without at least *thinking* about getting spin lock first - static inline WeakCacheEntryPointer unsafeGet(index id) + auto result = mCache.emplace(id, std::make_shared(params)); + + return result.second ? (result.first)->second + : WeakCacheEntryPointer(); // sob + } + else // client has screwed up { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + std::cout << "ERROR: " << Wrapper::getName() << " ID " << id + << " already in use\n"; + return {}; } - - public: - static WeakCacheEntryPointer get(index id) + } + + static void remove(index id) { mCache.erase(id); } + + static void printNotFound(index id) + { + std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " + << id << std::endl; + } + +private: + static InterfaceTable* getInterfaceTable() + { + return Wrapper::getInterfaceTable(); + } + + template + using ParamsFromOSC = + typename ClientParams::template Setter; + + template + using ParamsFromSynth = + typename ClientParams::template Setter; + + + struct NRTCommand + { + NRTCommand(World*, sc_msg_iter* args, void* replyAddr, + bool consumeID = true) { - ScopedSpinlock{mSpinlock}; - return unsafeGet(id); + auto count = args->count; + auto pos = args->rdpos; + + mID = args->geti(); + + if (!consumeID) + { + args->count = count; + args->rdpos = pos; + } + + if (replyAddr) mReplyAddress = copyReplyAddress(replyAddr); } - static WeakCacheEntryPointer tryGet(index id) + ~NRTCommand() { - if(mSpinlock.tryLock()) - { - auto ret = unsafeGet(id); - mSpinlock.unlock(); - return ret; - } - return WeakCacheEntryPointer{}; + if (mReplyAddress) deleteReplyAddress(mReplyAddress); } + NRTCommand() {} + + explicit NRTCommand(index id) : mID{id} {} + + bool stage2(World*) { return true; } // nrt + bool stage3(World*) { return true; } // rt + bool stage4(World*) { return false; } // nrt + void cleanup(World*) {} // rt - static WeakCacheEntryPointer add(index id, const Params& params) + void sendReply(const char* name, bool success) { - ScopedSpinlock{mSpinlock}; - if(isNull(get(id))) + if (mReplyAddress) { - auto result = mCache.emplace(id, - std::make_shared(params)); - - return result.second ? (result.first)->second : WeakCacheEntryPointer(); //sob - } - else //client has screwed up - { - std::cout << "ERROR: " << Wrapper::getName() << " ID " << id << " already in use\n"; - return {}; + std::string slash{"/"}; + small_scpacket packet; + packet.adds((slash + name).c_str()); + packet.maketags(3); + packet.addtag(','); + packet.addtag('i'); + packet.addtag('i'); + packet.addi(success); + packet.addi(static_cast(mID)); + + SendReply(mReplyAddress, packet.data(), + static_cast(packet.size())); } } - - static void remove(index id) + // protected: + index mID; + void* mReplyAddress{nullptr}; + }; + + struct CommandNew : public NRTCommand + { + CommandNew(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr, !IsNamedShared_v}, + mParams{Client::getParameterDescriptors()} { - ScopedSpinlock{mSpinlock}; - mCache.erase(id); + mParams.template setParameterValuesRT(nullptr, world, + *args); } - - static void printNotFound(index id) + + CommandNew(index id, World*, FloatControlsIter& args, Unit* x) + : NRTCommand{id}, mParams{Client::getParameterDescriptors()} { - std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " << id << std::endl; + mParams.template setParameterValuesRT(nullptr, x, args); } - - private: - static InterfaceTable* getInterfaceTable() { return Wrapper::getInterfaceTable() ;} - - template - using ParamsFromOSC = typename ClientParams::template Setter; - - template - using ParamsFromSynth = typename ClientParams::template Setter; - - struct NRTCommand + static const char* name() { - NRTCommand(World*, sc_msg_iter* args, void* replyAddr, bool consumeID = true) - { - auto count = args->count; - auto pos = args->rdpos; - - mID = args->geti(); - - if(!consumeID) - { - args->count = count; - args->rdpos = pos; - } - - if(replyAddr) - mReplyAddress = copyReplyAddress(replyAddr); - } - - ~NRTCommand() - { - if(mReplyAddress) deleteReplyAddress(mReplyAddress); - } - - NRTCommand(){} - - explicit NRTCommand(index id):mID{id}{} - - bool stage2(World*) { return true; } //nrt - bool stage3(World*) { return true; } //rt - bool stage4(World*) { return false; } //nrt - void cleanup(World*) {} //rt - - void sendReply(const char* name,bool success) - { - if(mReplyAddress) - { - std::string slash{"/"}; - small_scpacket packet; - packet.adds((slash+name).c_str()); - packet.maketags(3); - packet.addtag(','); - packet.addtag('i'); - packet.addtag('i'); - packet.addi(success); - packet.addi(static_cast(mID)); - - SendReply(mReplyAddress,packet.data(), static_cast(packet.size())); - } - } -// protected: - index mID; - void* mReplyAddress{nullptr}; - }; - - struct CommandNew : public NRTCommand + static std::string cmd = std::string(Wrapper::getName()) + "/new"; + return cmd.c_str(); + } + + bool stage2(World* w) { - CommandNew(World* world, sc_msg_iter* args,void* replyAddr) - : NRTCommand{world,args, replyAddr, !IsNamedShared_v}, - mParams{Client::getParameterDescriptors()} - { - mParams.template setParameterValuesRT(nullptr, world, *args); - } + // auto entry = ; + ScopedSpinLock lock{mSpinlock}; - CommandNew(index id, World*, FloatControlsIter& args, Unit* x) - :NRTCommand{id}, - mParams{Client::getParameterDescriptors()} - { - mParams.template setParameterValuesRT(nullptr, x, args); - } + Result constraintsRes = validateParameters(mParams); + + if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); - static const char* name() + mResult = (!isNull(add(NRTCommand::mID, mParams))); + + // Sigh. The cache entry above has both the client instance and main + // params instance. + // The client is linked to the params by reference; I've not got the + // in-place constrction working properly so that params are in their final + // resting place by the time we make the client so (for) now we need to + // manually repoint the client to the correct place. Or badness. + if (mResult) { - static std::string cmd = std::string(Wrapper::getName()) + "/new"; - return cmd.c_str(); + auto ptr = get(NRTCommand::mID).lock(); + ptr->mClient.setParams(ptr->mParams); } - bool stage2(World* w) - { -// auto entry = ; + NRTCommand::sendReply(name(), mResult); + return mResult; + } - Result constraintsRes = validateParameters(mParams); + private: + bool mResult; + Params mParams; + }; - if(!constraintsRes.ok()) Wrapper::printResult(w,constraintsRes); + struct CommandFree : public NRTCommand + { + using NRTCommand::NRTCommand; - mResult = (!isNull(add(NRTCommand::mID, mParams))); - - //Sigh. The cache entry above has both the client instance and main params instance. - // The client is linked to the params by reference; I've not got the in-place constrction - // working properly so that params are in their final resting place by the time we make the client - // so (for) now we need to manually repoint the client to the correct place. Or badness. - if(mResult) - { - auto ptr = get(NRTCommand::mID).lock(); - ptr->mClient.setParams(ptr->mParams); - } - - NRTCommand::sendReply(name(),mResult); - - return mResult; - } - - private: - bool mResult; - Params mParams; - }; - - struct CommandFree: public NRTCommand + void cancelCheck(std::false_type, index id) { - using NRTCommand::NRTCommand; - - void cancelCheck(std::false_type, index id) - { - if(auto ptr = get(id).lock()) - { - auto& client = ptr->mClient; - if(!client.synchronous() && client.state() == ProcessState::kProcessing) - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; - } - } - - void cancelCheck(std::true_type, index){} - - - static const char* name() + if (auto ptr = get(id).lock()) { - static std::string cmd = std::string(Wrapper::getName()) + "/free"; - return cmd.c_str(); - } - - bool stage2(World*) - { - cancelCheck(IsRTQueryModel_t(),NRTCommand::mID); - remove(NRTCommand::mID); - NRTCommand::sendReply(name(), true); - return true; + auto& client = ptr->mClient; + if (!client.synchronous() && + client.state() == ProcessState::kProcessing) + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; } + } - }; - - - /// Not registered as a PlugInCmd. Triggered by worker thread callback - struct CommandAsyncComplete: public NRTCommand + void cancelCheck(std::true_type, index) {} + + + static const char* name() { - CommandAsyncComplete(World*, index id, void* replyAddress) - { - NRTCommand::mID = id; - NRTCommand::mReplyAddress = replyAddress; - } - - static const char* name() { return CommandProcess::name(); } - - bool stage2(World* world) + static std::string cmd = std::string(Wrapper::getName()) + "/free"; + return cmd.c_str(); + } + + bool stage2(World*) + { + ScopedSpinLock lock(mSpinlock); + cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); + remove(NRTCommand::mID); + NRTCommand::sendReply(name(), true); + return true; + } + }; + + + /// Not registered as a PlugInCmd. Triggered by worker thread callback + struct CommandAsyncComplete : public NRTCommand + { + CommandAsyncComplete(World*, index id, void* replyAddress) + { + NRTCommand::mID = id; + NRTCommand::mReplyAddress = replyAddress; + } + + static const char* name() { return CommandProcess::name(); } + + bool stage2(World* world) + { + + // std::cout << "In Async completion\n"; + ScopedSpinLock lock{mSpinlock}; + if (auto ptr = get(NRTCommand::mID).lock()) { - - // std::cout << "In Async completion\n"; - if(auto ptr = get(NRTCommand::mID).lock()) + Result r; + mRecord = ptr; + auto& client = ptr->mClient; + ProcessState s = client.checkProgress(r); + if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) { - Result r; - mRecord = ptr; - auto& client = ptr->mClient; - ProcessState s = client.checkProgress(r); - if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) + if (r.status() == Result::Status::kCancelled) + { + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + + client.checkProgress(r); + mSuccess = !(r.status() == Result::Status::kError); + if (!r.ok()) { - if (r.status() == Result::Status::kCancelled) + Wrapper::printResult(world, r); + if (r.status() == Result::Status::kError) { - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; - ptr->mDone = true; + ptr->mDone.store(true, std::memory_order_relaxed); return false; } - - client.checkProgress(r); - mSuccess = !(r.status() == Result::Status::kError); - if (!r.ok()) - { - Wrapper::printResult(world,r); - if(r.status() == Result::Status::kError) - { - ptr->mDone = true; - return false; - } - } - - return true; } + // if we're progressing to stage3, don't unlock the lock just yet + lock.release(); + return true; } - return false; } - - bool stage3(World* world) + return false; + } + + bool stage3(World* world) + { + ScopedSpinLock lock(mSpinlock, std::adopt_lock); + if (auto ptr = mRecord.lock()) { - if(auto ptr = mRecord.lock()) - { - auto& params = ptr->mParams; - params.template forEachParamType(world); - return true; - } - return false; + auto& params = ptr->mParams; + params.template forEachParamType(world); + return true; } - - bool stage4(World*) //nrt + return false; + } + + bool stage4(World*) // nrt + { + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - if(auto ptr = get(NRTCommand::mID).lock()) + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) - { - NRTCommand::sendReply(name(),mSuccess); - } - ptr->mDone = true; - return true; + NRTCommand::sendReply(name(), mSuccess); } - return false; + ptr->mDone.store(true, std::memory_order_relaxed); // = true; + return true; } - - bool mSuccess; - WeakCacheEntryPointer mRecord; - }; - - - static void doProcessCallback(World* world, index id,size_t completionMsgSize,char* completionMessage,void* replyAddress) + std::cout << "ERROR: Failed to lock\n"; + return false; + } + + bool mSuccess; + WeakCacheEntryPointer mRecord; + }; + + + static void doProcessCallback(World* world, index id, + size_t completionMsgSize, + char* completionMessage, void* replyAddress) + { + auto ft = getInterfaceTable(); + struct Context { - auto ft = getInterfaceTable(); - struct Context{ - World* mWorld; - index mID; - size_t mCompletionMsgSize; - char* mCompletionMessage; - void* mReplyAddress; + World* mWorld; + index mID; + size_t mCompletionMsgSize; + char* mCompletionMessage; + void* mReplyAddress; + }; + + Context* c = new Context{world, id, completionMsgSize, completionMessage, + replyAddress}; + + auto launchCompletionFromNRT = [](FifoMsg* inmsg) { + auto runCompletion = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + World* world = c->mWorld; + index id = c->mID; + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); + CommandAsyncComplete* cmd = + new (space) CommandAsyncComplete(world, id, c->mReplyAddress); + runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, + c->mCompletionMessage); + if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); }; - - Context* c = new Context{world,id,completionMsgSize,completionMessage,replyAddress}; - - auto launchCompletionFromNRT = [](FifoMsg* inmsg) - { - auto runCompletion = [](FifoMsg* msg){ - Context* c = static_cast(msg->mData); - World* world = c->mWorld; - index id = c->mID; - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(CommandAsyncComplete)); - CommandAsyncComplete* cmd = new (space) CommandAsyncComplete(world, id,c->mReplyAddress); - runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, c->mCompletionMessage); - if(c->mCompletionMsgSize) ft->fRTFree(world,c->mCompletionMessage); - }; - - auto tidyup = [](FifoMsg* msg) - { - Context* c = static_cast(msg->mData); - delete c; - }; - - auto ft = getInterfaceTable(); - FifoMsg fwd = *inmsg; - fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); - if(inmsg->mWorld->mRunning) - ft->fSendMsgToRT(inmsg->mWorld,fwd); + + auto tidyup = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + delete c; }; - - FifoMsg msg; - msg.Set(world, launchCompletionFromNRT, nullptr, c); - - if(world->mRunning) ft->fSendMsgFromRT(world,msg); - } - - struct CommandProcess: public NRTCommand + + auto ft = getInterfaceTable(); + FifoMsg fwd = *inmsg; + fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); + if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd); + }; + + FifoMsg msg; + msg.Set(world, launchCompletionFromNRT, nullptr, c); + + if (world->mRunning) ft->fSendMsgFromRT(world, msg); + } + + struct CommandProcess : public NRTCommand + { + CommandProcess(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr}, + mParams{Client::getParameterDescriptors()} { - CommandProcess(World* world, sc_msg_iter* args, void* replyAddr): NRTCommand{world, args, replyAddr},mParams{Client::getParameterDescriptors()} + auto& ar = *args; + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - auto& ar = *args; - - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mDone = false; - mParams.template setParameterValuesRT(nullptr, world, ar); - mSynchronous = static_cast(ar.geti()); - } //if this fails, we'll hear about it in stage2 anyway - } + ptr->mDone.store(false, std::memory_order_relaxed); + mParams.template setParameterValuesRT(nullptr, world, + ar); + mSynchronous = static_cast(ar.geti()); + } // if this fails, we'll hear about it in stage2 anyway + } - explicit CommandProcess(index id,bool synchronous,Params* params):NRTCommand{id},mSynchronous(synchronous), - mParams{Client::getParameterDescriptors()} - { - if(params) - { - mParams = *params; - mOverwriteParams = true; - } - } - - - static const char* name() + explicit CommandProcess(index id, bool synchronous, Params* params) + : NRTCommand{id}, + mSynchronous(synchronous), mParams{Client::getParameterDescriptors()} + { + if (params) { - static std::string cmd = std::string(Wrapper::getName()) + "/process"; - return cmd.c_str(); + mParams = *params; + mOverwriteParams = true; } - - bool stage2(World* world) + } + + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/process"; + return cmd.c_str(); + } + + bool stage2(World* world) + { + ScopedSpinLock lock(mSpinlock); + mRecord = get(NRTCommand::mID); + if (auto ptr = mRecord.lock()) { - mRecord = get(NRTCommand::mID); - if(auto ptr = mRecord.lock()) + + auto& params = ptr->mParams; + if (mOverwriteParams) params = mParams; + auto& client = ptr->mClient; + + Result result = validateParameters(params); + Wrapper::printResult(world, result); + if (result.status() != Result::Status::kError) { - - auto& params = ptr->mParams; - if(mOverwriteParams) params = mParams; - auto& client = ptr->mClient; - - - -// if(mOSCData) -// { -// params.template setParameterValuesRT(nullptr, world, *mOSCData); -// mSynchronous = static_cast(mOSCData->geti()); -// } - - Result result = validateParameters(params); + client.setSynchronous(mSynchronous); + index id = NRTCommand::mID; + size_t completionMsgSize = mCompletionMsgSize; + char* completionMessage = mCompletionMessage; + void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); + + auto callback = [world, id, completionMsgSize, completionMessage, + replyAddress]() { + doProcessCallback(world, id, completionMsgSize, completionMessage, + replyAddress); + }; + + result = mSynchronous ? client.enqueue(params) + : client.enqueue(params, callback); Wrapper::printResult(world, result); - if (result.status() != Result::Status::kError) + + if (result.ok()) { -// client.done() - client.setSynchronous(mSynchronous); - index id = NRTCommand::mID; - size_t completionMsgSize = mCompletionMsgSize; - char* completionMessage = mCompletionMessage; - void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - - auto callback = [world,id,completionMsgSize,completionMessage,replyAddress](){ - doProcessCallback(world,id,completionMsgSize,completionMessage,replyAddress); - }; - - result = mSynchronous ? client.enqueue(params) : client.enqueue(params,callback); - Wrapper::printResult(world, result); - - if(result.ok()) - { - ptr->mDone = false; - mResult = client.process(); - Wrapper::printResult(world,mResult); - - bool error =mResult.status() == Result::Status::kError; - - if(error) ptr->mDone = true; - return mSynchronous && !error; - } - } - } - else - { - mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), " with ID ", NRTCommand::mID}; - Wrapper::printResult(world,mResult); - } - return false; - } + ptr->mDone.store(false, std::memory_order_relaxed); + mResult = client.process(); + Wrapper::printResult(world, mResult); - //Only for blocking execution - bool stage3(World* world) //rt - { - if(auto ptr = mRecord.lock()) - { - ptr->mParams.template forEachParamType(world); -// NRTCommand::sendReply(world, name(), mResult.ok()); - return true; + bool error = mResult.status() == Result::Status::kError; + + if (error) ptr->mDone.store(true, std::memory_order_relaxed); + bool toStage3 = mSynchronous && !error; + + if (toStage3) lock.release(); + return toStage3; + } } - // std::cout << "Ohno\n"; - return false; } - - //Only for blocking execution - bool stage4(World*) //nrt + else { - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && mSynchronous) - NRTCommand::sendReply(name(), mResult.ok()); - ptr->mDone = true; - return true; - } - return false; + mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), + " with ID ", NRTCommand::mID}; + Wrapper::printResult(world, mResult); } - + return false; + } - bool synchronous() + // Only for blocking execution + bool stage3(World* world) // rt + { + ScopedSpinLock lock(mSpinlock, std::adopt_lock); + if (auto ptr = mRecord.lock()) { - return mSynchronous; + ptr->mParams.template forEachParamType(world); + return true; } + return false; + } - void addCompletionMessage(size_t size, char* message)//, void* addr) - { - mCompletionMsgSize = size; - mCompletionMessage = message; - } - -// private: - Result mResult; - bool mSynchronous; - size_t mCompletionMsgSize{0}; - char* mCompletionMessage{nullptr}; - Params mParams; - bool mOverwriteParams{false}; - WeakCacheEntryPointer mRecord; - }; - - struct CommandProcessNew: public NRTCommand + // Only for blocking execution + bool stage4(World*) // nrt { - CommandProcessNew(World* world, sc_msg_iter* args,void* replyAddr) - : mNew{world, args, replyAddr}, - mProcess{mNew.mID,false,nullptr} - { - mProcess.mSynchronous = args->geti(); - mProcess.mReplyAddress = mNew.mReplyAddress; - } - - CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) - : mNew{id, world, args, x}, - mProcess{id} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; - return cmd.c_str(); - } - - bool stage2(World* world) + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - return mNew.stage2(world) ? mProcess.stage2(world) : false; - } - - bool stage3(World* world) //rt - { - return mProcess.stage3(world); - } + ptr->mParams.template forEachParamType(); - bool stage4(World* world) //nrt - { - return mProcess.stage4(world); - } - - void cleanup(World* world) - { - mProcess.mReplyAddress = nullptr; - mProcess.cleanup(world); - } - - bool synchronous() - { - return mProcess.synchronous(); - } - - void addCompletionMessage(size_t size, char* message) - { - mProcess.addCompletionMessage(size, message); + if (NRTCommand::mID >= 0 && mSynchronous) + NRTCommand::sendReply(name(), mResult.ok()); + ptr->mDone.store(true, std::memory_order_relaxed); + return true; } + return false; + } - private: - CommandNew mNew; - CommandProcess mProcess; - }; - - - struct CommandCancel: public NRTCommand + + bool synchronous() { return mSynchronous; } + + void addCompletionMessage(size_t size, char* message) //, void* addr) { - CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) - : NRTCommand{world, args, replyAddr} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; - return cmd.c_str(); - } - - bool stage2(World*) - { - if(auto ptr = get(NRTCommand::mID).lock()) - { - auto& client = ptr->mClient; - if(!client.synchronous()) - { - client.cancel(); - return true; - } - } - return false; - } - }; - - struct CommandSetParams: public NRTCommand + mCompletionMsgSize = size; + mCompletionMessage = message; + } + + // private: + Result mResult; + bool mSynchronous; + size_t mCompletionMsgSize{0}; + char* mCompletionMessage{nullptr}; + Params mParams; + bool mOverwriteParams{false}; + WeakCacheEntryPointer mRecord; + }; + + struct CommandProcessNew : public NRTCommand + { + CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) + : mNew{world, args, replyAddr}, mProcess{mNew.mID, false, nullptr} { - CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) - : NRTCommand{world, args, replyAddr} - { - auto& ar = *args; - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template setParameterValuesRT(nullptr, world, ar); - Result result = validateParameters(ptr->mParams); - ptr->mClient.setParams(ptr->mParams); - } else printNotFound(NRTCommand::mID); - } - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; - return cmd.c_str(); - } - }; - - - template - static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + mProcess.mSynchronous = args->geti(); + mProcess.mReplyAddress = mNew.mReplyAddress; + } + + CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) + : mNew{id, world, args, x}, mProcess{id} + {} + + static const char* name() { - auto ft = getInterfaceTable(); + static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; + return cmd.c_str(); + } - return ft->fDoAsynchronousCommand(world, replyAddr,Command::name(),cmd, - [](World* w, void* d) { return static_cast(d)->stage2(w); }, - [](World* w, void* d) { return static_cast(d)->stage3(w); }, - [](World* w, void* d) { return static_cast(d)->stage4(w); }, - [](World* w, void* d) - { - auto cmd = static_cast(d); - cmd->cleanup(w); - cmd->~Command(); - getInterfaceTable()->fRTFree(w,d); - }, - static_cast(completionMsgSize), completionMsgData); + bool stage2(World* world) + { + return mNew.stage2(world) ? mProcess.stage2(world) : false; } - - - static auto runAsyncCommand(World* world, CommandProcess* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + + bool stage3(World* world) // rt { - if(!cmd->synchronous()) - { - - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + return mProcess.stage3(world); } - - static auto runAsyncCommand(World* world, CommandProcessNew* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + + bool stage4(World* world) // nrt { - if(!cmd->synchronous()) - { - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + return mProcess.stage4(world); } - - - template - static void defineNRTCommand() + + void cleanup(World* world) { - auto ft = getInterfaceTable(); - auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, void* replyAddr) - { - - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(Command)); - Command* cmd = new (space) Command(world, args, replyAddr); - //This is brittle, but can't think of something better offhand - //This is the only place we can check for a completion message at the end of the OSC packet - //beause it has to be passed on to DoAsynhronousCommand at this point. However, detecting correctly - //relies on the Command type having fully consumed arguments from the args iterator in the constructor for cmd - size_t completionMsgSize{args ? args->getbsize() : 0}; - assert(completionMsgSize <= std::numeric_limits::max()); - char* completionMsgData = nullptr; - - if (completionMsgSize) { - completionMsgData = (char*)ft->fRTAlloc(world, completionMsgSize); - args->getb(completionMsgData, completionMsgSize); - } - runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); - - if(completionMsgSize) ft->fRTFree(world, completionMsgData); - - }; - ft->fDefinePlugInCmd(Command::name(),commandRunner,nullptr); + mProcess.mReplyAddress = nullptr; + mProcess.cleanup(world); } - - - - struct NRTProgressUnit: SCUnit + + bool synchronous() { return mProcess.synchronous(); } + + void addCompletionMessage(size_t size, char* message) { - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Monitor"; - return n.c_str(); - } - - NRTProgressUnit() - { - mInterval = static_cast(0.02 / controlDur()); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - void next(int) + mProcess.addCompletionMessage(size, message); + } + + private: + CommandNew mNew; + CommandProcess mProcess; + }; + + + struct CommandCancel : public NRTCommand + { + CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr} + {} + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; + return cmd.c_str(); + } + + bool stage2(World*) + { + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - if (0 == mCounter++) + auto& client = ptr->mClient; + if (!client.synchronous()) { - index id = static_cast(mInBuf[0][0]); - if(auto ptr = tryGet(id).lock()) - { - mInit = true; - if(ptr->mClient.done()) mDone = 1; - out0(0) = static_cast(ptr->mClient.progress()); - } - else - { - if(!mInit) - std::cout << "WARNING: No " << Wrapper::getName() << " with ID " << id << std::endl; - else mDone = 1; - } + client.cancel(); + return true; } - mCounter %= mInterval; } - - private: - index mInterval; - index mCounter{0}; - bool mInit{false}; - }; - - - struct NRTTriggerUnit: SCUnit + return false; + } + }; + + struct CommandSetParams : public NRTCommand + { + CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr} { - - static index count(){ - static index counter = -1; - return counter--; - } - - index ControlOffset() { return mSpecialIndex + 1; } - - index ControlSize() + auto& ar = *args; + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 3; //id + trig + blocking; + ptr->mParams.template setParameterValuesRT(nullptr, + world, ar); + Result result = validateParameters(ptr->mParams); + ptr->mClient.setParams(ptr->mParams); } - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Trigger"; - return n.c_str(); - } - - NRTTriggerUnit() - : mControlsIterator{mInBuf + ControlOffset(),ControlSize()},mParams{Client::getParameterDescriptors()} - { - mID = static_cast(mInBuf[0][0]); - if(mID == -1) mID = count(); - auto cmd = NonRealTime::rtalloc(mWorld,mID,mWorld, mControlsIterator, this); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// mInst = get(mID); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - ~NRTTriggerUnit() - { -// if(auto ptr = mInst.lock()) -// { - auto cmd = NonRealTime::rtalloc(mWorld,mID); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// } - } - - void next(int) + else + printNotFound(NRTCommand::mID); + } + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; + return cmd.c_str(); + } + }; + + + template + static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, + size_t completionMsgSize, char* completionMsgData) + { + auto ft = getInterfaceTable(); + + return ft->fDoAsynchronousCommand( + world, replyAddr, Command::name(), cmd, + [](World* w, void* d) { return static_cast(d)->stage2(w); }, + [](World* w, void* d) { return static_cast(d)->stage3(w); }, + [](World* w, void* d) { return static_cast(d)->stage4(w); }, + [](World* w, void* d) { + auto cmd = static_cast(d); + cmd->cleanup(w); + cmd->~Command(); + getInterfaceTable()->fRTFree(w, d); + }, + static_cast(completionMsgSize), completionMsgData); + } + + + static auto runAsyncCommand(World* world, CommandProcess* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) + { + + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); + } + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + static auto runAsyncCommand(World* world, CommandProcessNew* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) + { + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, + nullptr); + } + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + + template + static void defineNRTCommand() + { + auto ft = getInterfaceTable(); + auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, + void* replyAddr) { + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(Command)); + Command* cmd = new (space) Command(world, args, replyAddr); + // This is brittle, but can't think of something better offhand + // This is the only place we can check for a completion message at the end + // of the OSC packet beause it has to be passed on to DoAsynhronousCommand + // at this point. However, detecting correctly relies on the Command type + // having fully consumed arguments from the args iterator in the + // constructor for cmd + size_t completionMsgSize{args ? args->getbsize() : 0}; + assert(completionMsgSize <= std::numeric_limits::max()); + char* completionMsgData = nullptr; + + if (completionMsgSize) { - - - index triggerInput = static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); - mTrigger = mTrigger || triggerInput; - -// if(auto ptr = mInst->lock()) -// if(auto ptr = get(mID).lock()) -// { - bool trigger = (!mPreviousTrigger) && triggerInput;//mTrigger; - mPreviousTrigger = triggerInput; - mTrigger = 0; -// auto& client = ptr->mClient; - - if(trigger) - { - mControlsIterator.reset(1 + mInBuf); //add one for ID -// auto& params = ptr->mParams; - Wrapper::setParams(this,mParams,mControlsIterator,true,false); - bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = rtalloc(mWorld,mID,blocking,&mParams); - runAsyncCommand(mWorld,cmd, nullptr,0, nullptr); - mRunCount++; - } - else - { - if(auto ptr = tryGet(mID).lock()) - { - mInit = true; - auto& client = ptr->mClient; - mDone = ptr->mDone; - out0(0) = mDone ? 1 : static_cast(client.progress()); - } else mDone = mInit; - } -// } -// else printNotFound(id); + completionMsgData = (char*) ft->fRTAlloc(world, completionMsgSize); + args->getb(completionMsgData, completionMsgSize); } - - private: - bool mPreviousTrigger{false}; - bool mTrigger{false}; - Result mResult; - impl::FloatControlsIter mControlsIterator; - index mID; - index mRunCount{0}; - WeakCacheEntryPointer mInst; - Params mParams; - bool mInit{false}; + runAsyncCommand(world, cmd, replyAddr, completionMsgSize, + completionMsgData); + + if (completionMsgSize) ft->fRTFree(world, completionMsgData); }; - - struct NRTModelQueryUnit: SCUnit + ft->fDefinePlugInCmd(Command::name(), commandRunner, nullptr); + } + + + struct NRTProgressUnit : SCUnit + { + + static const char* name() { - using Delegate = impl::RealTimeBase; - - index ControlOffset() { return mSpecialIndex + 2; } - index ControlSize() - { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 2; // trig + id - } - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Query"; - return n.c_str(); - } - - NRTModelQueryUnit() - //Offset controls by 1 to account for ID - : mControls{mInBuf + ControlOffset(),ControlSize()} - { - mID = static_cast(in0(1)); - init(); -// mInst = get(id); -// if(auto ptr = mInst.lock()) -// { -// auto& client = ptr->mClient; -// mDelegate.init(*this,client,mControls); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); -// }else printNotFound(mID); - } - - void init() + static std::string n = std::string(Wrapper::getName()) + "Monitor"; + return n.c_str(); + } + + NRTProgressUnit() + { + mInterval = static_cast(0.02 / controlDur()); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } + + void next(int) + { + if (0 == mCounter) { - if(mSpinlock.tryLock()) + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (!lock.owns_lock()) return; + + index id = static_cast(mInBuf[0][0]); + auto record = get(id); + mCounter++; + + if (auto ptr = record.lock()) { - mInit = false; - mInst = unsafeGet(mID); - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - mDelegate.init(*this,client,mControls); - mInit = true; - }//else printNotFound(mID); - mSpinlock.unlock(); + mInit = true; + if (ptr->mClient.done()) mDone = 1; + out0(0) = static_cast(ptr->mClient.progress()); } - } - - void next(int) - { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); - index id = static_cast(in0(1)); - if(mID != id) init(); - if(!mInit) return; - if(mSpinlock.tryLock()) + else { - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - auto& params = ptr->mParams; - mControls.reset(mInBuf + ControlOffset()); - mDelegate.next(*this,client,params,mControls, ptr.use_count() == 2); - }else printNotFound(id); - mSpinlock.unlock(); + if (!mInit) + std::cout << "WARNING: No " << Wrapper::getName() << " with ID " + << id << std::endl; + else + mDone = 1; } - } - - private: - Delegate mDelegate; - FloatControlsIter mControls; - index mID; - WeakCacheEntryPointer mInst; - bool mInit{false}; - }; - - - using ParamSetType = typename Client::ParamSetType; - - template - using SetupMessageCmd = typename FluidSCMessaging::template SetupMessageCmd; - - - template - struct DefineCommandIf - { - void operator()() { } - }; + mCounter %= mInterval; + } + private: + index mInterval; + index mCounter{0}; + bool mInit{false}; + }; + + + struct NRTTriggerUnit : SCUnit + { - template - struct DefineCommandIf + static index count() { - void operator()() { - // std::cout << CommandType::name() << std::endl; - defineNRTCommand(); - } - }; - - template - struct RegisterUnitIf + static index counter = -1; + return counter--; + } + + index ControlOffset() { return mSpecialIndex + 1; } + + index ControlSize() { - void operator()(InterfaceTable*) {} - }; + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 3; // id + trig + blocking; + } - template - struct RegisterUnitIf + static const char* name() { - void operator()(InterfaceTable* ft) { registerUnit(ft,UnitType::name()); } - }; + static std::string n = std::string(Wrapper::getName()) + "Trigger"; + return n.c_str(); + } - - using IsRTQueryModel_t = typename Client::isRealTime; - static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; - - static constexpr bool IsModel = Client::isModelObject::value; - - - public: - static void setup(InterfaceTable* ft, const char*) + NRTTriggerUnit() + : mControlsIterator{mInBuf + ControlOffset(), ControlSize()}, + mParams{Client::getParameterDescriptors()} { - defineNRTCommand(); - DefineCommandIf()(); - DefineCommandIf()(); - DefineCommandIf()(); - - DefineCommandIf()(); - - defineNRTCommand(); - RegisterUnitIf()(ft); - RegisterUnitIf()(ft); - - RegisterUnitIf()(ft); - Client::getMessageDescriptors().template iterate(); - - - static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; - - ft->fDefinePlugInCmd(flushCmd.c_str(),[](World*, void*, struct sc_msg_iter*, void* ){ - mCache.clear(); - },nullptr); + mID = static_cast(mInBuf[0][0]); + if (mID == -1) mID = count(); + auto cmd = NonRealTime::rtalloc(mWorld, mID, mWorld, + mControlsIterator, this); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } + ~NRTTriggerUnit() + { + set_calc_function(); + auto cmd = NonRealTime::rtalloc(mWorld, mID); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + } - void init(){}; + void clear(int) + { + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + } - private: - static Result validateParameters(ParamSetType& p) + void next(int) { - auto results = p.constrainParameterValues(); - for (auto& r : results) + index triggerInput = + static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); + mTrigger = mTrigger || triggerInput; + + bool trigger = (!mPreviousTrigger) && triggerInput; // mTrigger; + mPreviousTrigger = triggerInput; + mTrigger = 0; + + if (trigger) + { + mControlsIterator.reset(1 + mInBuf); // add one for ID + Wrapper::setParams(this, mParams, mControlsIterator, true, false); + + bool blocking = mInBuf[mNumInputs - 1][0] > 0; + + CommandProcess* cmd = + rtalloc(mWorld, mID, blocking, &mParams); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + mRunCount++; + } + else { - if (!r.ok()) return r; + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (!lock.owns_lock()) return; + + auto record = get(mID); + if (auto ptr = record.lock()) + { + mInit = true; + auto& client = ptr->mClient; + mDone = ptr->mDone.load(std::memory_order_relaxed); + out0(0) = mDone ? 1 : static_cast(client.progress()); + } + else + mDone = mInit; } - return {}; } - template - struct AssignBuffer + private: + bool mPreviousTrigger{false}; + bool mTrigger{false}; + bool mBeingFreed{false}; + Result mResult; + impl::FloatControlsIter mControlsIterator; + index mID; + index mRunCount{0}; + WeakCacheEntryPointer mInst; + Params mParams; + bool mInit{false}; + }; + + struct NRTModelQueryUnit : SCUnit + { + using Delegate = impl::RealTimeBase; + + index ControlOffset() { return mSpecialIndex + 2; } + index ControlSize() + { + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 2; // trig + id + } + + static const char* name() { - void operator()(const typename BufferT::type& p, World* w) + static std::string n = std::string(Wrapper::getName()) + "Query"; + return n.c_str(); + } + + NRTModelQueryUnit() + // Offset controls by 1 to account for ID + : mControls{mInBuf + ControlOffset(), ControlSize()} + { + mID = static_cast(in0(1)); + init(); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } + + void init() + { + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (lock.owns_lock()) { - if (auto b = static_cast(p.get())) b->assignToRT(w); + mInit = false; + mInst = get(mID); + if (auto ptr = mInst.lock()) + { + auto& client = ptr->mClient; + mDelegate.init(*this, client, mControls); + mInit = true; + } } - }; + } - template - struct CleanUpBuffer + void next(int) { - void operator()(const typename BufferT::type& p) + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + index id = static_cast(in0(1)); + if (mID != id) init(); + if (!mInit) return; + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + if (lock.owns_lock()) + ; { - if (auto b = static_cast(p.get())) b->cleanUp(); + if (auto ptr = mInst.lock()) + { + auto& client = ptr->mClient; + auto& params = ptr->mParams; + mControls.reset(mInBuf + ControlOffset()); + mDelegate.next(*this, client, params, mControls, + ptr.use_count() == 2); + } + else + printNotFound(id); } - }; + } + + private: + Delegate mDelegate; + FloatControlsIter mControls; + index mID; + WeakCacheEntryPointer mInst; + bool mInit{false}; + }; + + + using ParamSetType = typename Client::ParamSetType; + + template + using SetupMessageCmd = + typename FluidSCMessaging::template SetupMessageCmd; + + + template + struct DefineCommandIf + { + void operator()() {} + }; - FifoMsg mFifoMsg; - char* mCompletionMessage = nullptr; - void* mReplyAddr = nullptr; - const char* mName = nullptr; - index checkThreadInterval; - index pollCounter{0}; - index mPreviousTrigger{0}; - bool mSynchronous{true}; - Result mResult; + + template + struct DefineCommandIf + { + void operator()() + { + // std::cout << CommandType::name() << std::endl; + defineNRTCommand(); + } }; - - template - typename NonRealTime::Cache NonRealTime::mCache{}; - template - typename NonRealTime::Spinlock NonRealTime::mSpinlock{}; + template + struct RegisterUnitIf + { + void operator()(InterfaceTable*) {} + }; + + template + struct RegisterUnitIf + { + void operator()(InterfaceTable* ft) + { + registerUnit(ft, UnitType::name()); + } + }; + + + using IsRTQueryModel_t = typename Client::isRealTime; + static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; + + static constexpr bool IsModel = Client::isModelObject::value; + + +public: + static void setup(InterfaceTable* ft, const char*) + { + defineNRTCommand(); + DefineCommandIf()(); + DefineCommandIf()(); + DefineCommandIf()(); + + DefineCommandIf()(); + + defineNRTCommand(); + RegisterUnitIf()(ft); + RegisterUnitIf()(ft); + + RegisterUnitIf()(ft); + Client::getMessageDescriptors().template iterate(); + + + static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; + + ft->fDefinePlugInCmd( + flushCmd.c_str(), + [](World*, void*, struct sc_msg_iter*, void*) { mCache.clear(); }, + nullptr); + } + + + void init(){}; + +private: + static Result validateParameters(ParamSetType& p) + { + auto results = p.constrainParameterValues(); + for (auto& r : results) + { + if (!r.ok()) return r; + } + return {}; + } + + template + struct AssignBuffer + { + void operator()(const typename BufferT::type& p, World* w) + { + if (auto b = static_cast(p.get())) b->assignToRT(w); + } + }; + + template + struct CleanUpBuffer + { + void operator()(const typename BufferT::type& p) + { + if (auto b = static_cast(p.get())) b->cleanUp(); + } + }; + + FifoMsg mFifoMsg; + char* mCompletionMessage = nullptr; + void* mReplyAddr = nullptr; + const char* mName = nullptr; + index checkThreadInterval; + index pollCounter{0}; + index mPreviousTrigger{0}; + bool mSynchronous{true}; + Result mResult; +}; + +template +typename NonRealTime::Cache + NonRealTime::mCache{}; + +template +typename NonRealTime::Spinlock + NonRealTime::mSpinlock{}; -} -} -} +} // namespace impl +} // namespace client +} // namespace fluid From 5892a60391563072bb35cf7c1b3fde02b86c1041 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Tue, 22 Jun 2021 00:38:48 +0100 Subject: [PATCH 2/8] Better safety and better contention. Fixes sutble race condition in async callback (lock NRT on line 620). Uses mirror cache of weak_ptr for RT thread --- include/wrapper/NonRealtime.hpp | 568 ++++++++++++++++++-------------- 1 file changed, 312 insertions(+), 256 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index bae040c..7b002b9 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -48,7 +47,10 @@ class NonRealTime : public SCUnit public: using Cache = std::unordered_map; - static Cache mCache; + using RTCacheMirror = std::unordered_map; + + static Cache mCache; + static RTCacheMirror mRTCache; private: static bool isNull(WeakCacheEntryPointer const& weak) @@ -57,58 +59,74 @@ private: !WeakCacheEntryPointer{}.owner_before(weak); } - // https://rigtorp.se/spinlock/ - struct Spinlock + // shouldn't be called without at least *thinking* about getting spin lock + // first + static WeakCacheEntryPointer unsafeGet(index id) { - std::atomic lock_ = {0}; + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } + + + static WeakCacheEntryPointer rtget(index id) + { + auto lookup = mRTCache.find(id); + return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second; + } - void lock() noexcept + using RawCacheEntry = typename Cache::value_type; + + struct addToRTCache + { + void operator()(World* w, RawCacheEntry& r) { - for (;;) + FifoMsg msg; + auto add = [](FifoMsg* m) { + RawCacheEntry* r = static_cast(m->mData); + auto res = mRTCache.emplace(r->first, r->second); + if (!res.second) { std::cout << "ERROR: Could not add to RT cache"; } + }; + msg.Set(w, add, nullptr, &r); + auto ft = Wrapper::getInterfaceTable(); + if (!ft->fSendMsgToRT(w, msg)) { - // Optimistically assume the lock is free on the first try - if (!lock_.exchange(true, std::memory_order_acquire)) { return; } - // Wait for lock to be released without generating cache misses - while (lock_.load(std::memory_order_relaxed)) - { - // Issue X86 PAUSE or ARM YIELD instruction to reduce contention - // between hyper-threads - _mm_pause(); - } + std::cout << "ERROR: Message to RT failed"; } } + }; - bool try_lock() noexcept + struct removeFromRTCache + { + void operator()(World* world, index id) { - // First do a relaxed load to check if lock is free in order to prevent - // unnecessary cache misses if someone does while(!try_lock()) - return !lock_.load(std::memory_order_relaxed) && - !lock_.exchange(true, std::memory_order_acquire); - } + index* data = new index(); + *data = id; - void unlock() noexcept { lock_.store(false, std::memory_order_release); } - }; + FifoMsg msg; + + auto remove = [](FifoMsg* m) { + int* id = static_cast(m->mData); + mRTCache.erase(*id); + }; - static Spinlock mSpinlock; - using ScopedSpinLock = std::unique_lock; + auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; + + msg.Set(world, remove, cleanup, data); + } + }; - // shouldn't be called without at least *thinking* about getting spin lock - // first - static WeakCacheEntryPointer unsafeGet(index id) - { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; - } public: static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } - static WeakCacheEntryPointer add(index id, const Params& params) + static WeakCacheEntryPointer add(World* world, index id, const Params& params) { if (isNull(get(id))) { auto result = mCache.emplace(id, std::make_shared(params)); + addToRTCache{}(world, *(result.first)); + return result.second ? (result.first)->second : WeakCacheEntryPointer(); // sob } @@ -120,7 +138,11 @@ public: } } - static void remove(index id) { mCache.erase(id); } + static void remove(World* world, index id) + { + mCache.erase(id); + removeFromRTCache{}(world, id); + } static void printNotFound(index id) { @@ -143,7 +165,6 @@ private: typename ClientParams::template Setter; - struct NRTCommand { NRTCommand(World*, sc_msg_iter* args, void* replyAddr, @@ -224,14 +245,11 @@ private: bool stage2(World* w) { - // auto entry = ; - ScopedSpinLock lock{mSpinlock}; - Result constraintsRes = validateParameters(mParams); if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); - mResult = (!isNull(add(NRTCommand::mID, mParams))); + mResult = (!isNull(add(w, NRTCommand::mID, mParams))); // Sigh. The cache entry above has both the client instance and main // params instance. @@ -280,150 +298,21 @@ private: return cmd.c_str(); } - bool stage2(World*) + bool stage2(World* world) { - ScopedSpinLock lock(mSpinlock); cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); - remove(NRTCommand::mID); + remove(world, NRTCommand::mID); NRTCommand::sendReply(name(), true); return true; } }; - /// Not registered as a PlugInCmd. Triggered by worker thread callback - struct CommandAsyncComplete : public NRTCommand - { - CommandAsyncComplete(World*, index id, void* replyAddress) - { - NRTCommand::mID = id; - NRTCommand::mReplyAddress = replyAddress; - } - - static const char* name() { return CommandProcess::name(); } - - bool stage2(World* world) - { - - // std::cout << "In Async completion\n"; - ScopedSpinLock lock{mSpinlock}; - if (auto ptr = get(NRTCommand::mID).lock()) - { - Result r; - mRecord = ptr; - auto& client = ptr->mClient; - ProcessState s = client.checkProgress(r); - if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) - { - if (r.status() == Result::Status::kCancelled) - { - std::cout << Wrapper::getName() << ": Processing cancelled" - << std::endl; - ptr->mDone.store(true, std::memory_order_relaxed); - return false; - } - - client.checkProgress(r); - mSuccess = !(r.status() == Result::Status::kError); - if (!r.ok()) - { - Wrapper::printResult(world, r); - if (r.status() == Result::Status::kError) - { - ptr->mDone.store(true, std::memory_order_relaxed); - return false; - } - } - // if we're progressing to stage3, don't unlock the lock just yet - lock.release(); - return true; - } - } - return false; - } - - bool stage3(World* world) - { - ScopedSpinLock lock(mSpinlock, std::adopt_lock); - if (auto ptr = mRecord.lock()) - { - auto& params = ptr->mParams; - params.template forEachParamType(world); - return true; - } - return false; - } - - bool stage4(World*) // nrt - { - ScopedSpinLock lock(mSpinlock); - if (auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template forEachParamType(); - - if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) - { - NRTCommand::sendReply(name(), mSuccess); - } - ptr->mDone.store(true, std::memory_order_relaxed); // = true; - return true; - } - std::cout << "ERROR: Failed to lock\n"; - return false; - } - - bool mSuccess; - WeakCacheEntryPointer mRecord; - }; - - - static void doProcessCallback(World* world, index id, - size_t completionMsgSize, - char* completionMessage, void* replyAddress) - { - auto ft = getInterfaceTable(); - struct Context - { - World* mWorld; - index mID; - size_t mCompletionMsgSize; - char* mCompletionMessage; - void* mReplyAddress; - }; - - Context* c = new Context{world, id, completionMsgSize, completionMessage, - replyAddress}; - - auto launchCompletionFromNRT = [](FifoMsg* inmsg) { - auto runCompletion = [](FifoMsg* msg) { - Context* c = static_cast(msg->mData); - World* world = c->mWorld; - index id = c->mID; - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); - CommandAsyncComplete* cmd = - new (space) CommandAsyncComplete(world, id, c->mReplyAddress); - runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, - c->mCompletionMessage); - if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); - }; - - auto tidyup = [](FifoMsg* msg) { - Context* c = static_cast(msg->mData); - delete c; - }; - - auto ft = getInterfaceTable(); - FifoMsg fwd = *inmsg; - fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); - if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd); - }; - - FifoMsg msg; - msg.Set(world, launchCompletionFromNRT, nullptr, c); - - if (world->mRunning) ft->fSendMsgFromRT(world, msg); - } +// struct UnitInfo +// { +// int mSynthIndex; +// int mNodeID{0}; +// }; struct CommandProcess : public NRTCommand { @@ -431,8 +320,7 @@ private: : NRTCommand{world, args, replyAddr}, mParams{Client::getParameterDescriptors()} { - auto& ar = *args; - ScopedSpinLock lock(mSpinlock); + auto& ar = *args; if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mDone.store(false, std::memory_order_relaxed); @@ -453,7 +341,6 @@ private: } } - static const char* name() { static std::string cmd = std::string(Wrapper::getName()) + "/process"; @@ -462,7 +349,6 @@ private: bool stage2(World* world) { - ScopedSpinLock lock(mSpinlock); mRecord = get(NRTCommand::mID); if (auto ptr = mRecord.lock()) { @@ -480,7 +366,6 @@ private: size_t completionMsgSize = mCompletionMsgSize; char* completionMessage = mCompletionMessage; void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - auto callback = [world, id, completionMsgSize, completionMessage, replyAddress]() { doProcessCallback(world, id, completionMsgSize, completionMessage, @@ -491,7 +376,7 @@ private: : client.enqueue(params, callback); Wrapper::printResult(world, result); - if (result.ok()) + if (result.status() != Result::Status::kError) { ptr->mDone.store(false, std::memory_order_relaxed); mResult = client.process(); @@ -499,10 +384,10 @@ private: bool error = mResult.status() == Result::Status::kError; - if (error) ptr->mDone.store(true, std::memory_order_relaxed); + if (error) + ptr->mDone.store(true, + std::memory_order_relaxed); bool toStage3 = mSynchronous && !error; - - if (toStage3) lock.release(); return toStage3; } } @@ -519,7 +404,6 @@ private: // Only for blocking execution bool stage3(World* world) // rt { - ScopedSpinLock lock(mSpinlock, std::adopt_lock); if (auto ptr = mRecord.lock()) { ptr->mParams.template forEachParamType(world); @@ -529,22 +413,21 @@ private: } // Only for blocking execution - bool stage4(World*) // nrt + bool stage4(World* w) // nrt { - ScopedSpinLock lock(mSpinlock); if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mParams.template forEachParamType(); if (NRTCommand::mID >= 0 && mSynchronous) - NRTCommand::sendReply(name(), mResult.ok()); + NRTCommand::sendReply(name(), + mResult.status() != Result::Status::kError); ptr->mDone.store(true, std::memory_order_relaxed); return true; } return false; } - bool synchronous() { return mSynchronous; } void addCompletionMessage(size_t size, char* message) //, void* addr) @@ -553,7 +436,7 @@ private: mCompletionMessage = message; } - // private: + // private: Result mResult; bool mSynchronous; size_t mCompletionMsgSize{0}; @@ -561,8 +444,191 @@ private: Params mParams; bool mOverwriteParams{false}; WeakCacheEntryPointer mRecord; + + }; + + + /// Not registered as a PlugInCmd. Triggered by worker thread callback + struct CommandAsyncComplete : public NRTCommand + { + CommandAsyncComplete(World*, index id, void* replyAddress) + { + NRTCommand::mID = id; + NRTCommand::mReplyAddress = replyAddress; + } + + static const char* name() { return CommandProcess::name(); } + + bool stage2(World* world) + { + if (auto ptr = get(NRTCommand::mID).lock()) + { + Result r; + mRecord = ptr; + auto& client = ptr->mClient; + ProcessState s = client.checkProgress(r); + if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) + { + if (r.status() == Result::Status::kCancelled) + { + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + + client.checkProgress(r); + mSuccess = !(r.status() == Result::Status::kError); + Wrapper::printResult(world, r); + if (!mSuccess) + { + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + // if we're progressing to stage3, don't unlock the lock just yet + // lock.release(); + return true; + } + } + std::cout << "Error: Entered callback but thread not done!?\n"; + return false; + } + + bool stage3(World* world) + { + if (auto ptr = mRecord.lock()) + { + auto& params = ptr->mParams; + params.template forEachParamType(world); + } + return true; + } + + bool stage4(World* w) // nrt + { + if (auto ptr = get(NRTCommand::mID).lock()) + { + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) + { + NRTCommand::sendReply(name(), mSuccess); + } + + ptr->mDone.store(true, std::memory_order_relaxed); // = true; + return true; + } + std::cout << "ERROR: Failed to lock\n"; + return false; + } + + + // void notifyUnit(World* w) + // { + // if(mUnitInfo.mNodeID > 0) + // { + // auto ft = Wrapper::getInterfaceTable(); + // + // NRTDoneCount++; + // FifoMsg msg; + // + // auto updateUnitDone = [](FifoMsg* m) + // { + // UnitInfo* info = static_cast(m->mData); + // + // auto ft = Wrapper::getInterfaceTable(); + // Graph* g = ft->fGetGraph(m->mWorld,info->mNodeID); + // if(g) + // { + // Unit* u = g->mUnits[info->mSynthIndex]; + // if(u) + // { + // RTDoneCount++; + // u->mDone = true; + // } + // } + // }; + // + // msg.Set(w, updateUnitDone, nullptr, &mUnitInfo); + // ft->fSendMsgToRT(w,msg); + // } + // } + + + bool mSuccess; + WeakCacheEntryPointer mRecord; }; + + static void doProcessCallback(World* world, index id, + size_t completionMsgSize, + char* completionMessage, void* replyAddress) + { + auto ft = getInterfaceTable(); + struct Context + { + World* mWorld; + index mID; + size_t mCompletionMsgSize; + char* mCompletionMessage; + void* mReplyAddress; + }; + + Context* c = new Context{world, id, completionMsgSize, completionMessage, + replyAddress}; + + auto launchCompletionFromNRT = [](FifoMsg* inmsg) { + + auto runCompletion = [](FifoMsg* msg) { + + Context* c = static_cast(msg->mData); + World* world = c->mWorld; + index id = c->mID; + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); + CommandAsyncComplete* cmd = + new (space) CommandAsyncComplete(world, id, c->mReplyAddress); + if (runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, + c->mCompletionMessage) != 0) + { + std::cout << "ERROR: Async cmf failed in callback" << std::endl; + } + if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); + }; + + auto tidyup = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + delete c; + }; + + auto ft = getInterfaceTable(); + FifoMsg fwd = *inmsg; + fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); + if (inmsg->mWorld->mRunning) + if (!ft->fSendMsgToRT(inmsg->mWorld, fwd)) + { + std::cout << "ERROR: Failed to queue -> RT\n"; + } + }; + + FifoMsg msg; + msg.Set(world, launchCompletionFromNRT, nullptr, c); + + if (world->mRunning) + { + ft->fNRTLock(world); + msg.Perform(); + // if(!ft->fSendMsgFromRT(world, msg)) + // { + // std::cout << "ERROR: Failed to queue -> NRT\n"; + // } + ft->fNRTUnlock(world); + } + else + std::cout << "ERROR: World not running??"; + } + + struct CommandProcessNew : public NRTCommand { CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) @@ -572,10 +638,6 @@ private: mProcess.mReplyAddress = mNew.mReplyAddress; } - CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) - : mNew{id, world, args, x}, mProcess{id} - {} - static const char* name() { static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; @@ -630,13 +692,14 @@ private: bool stage2(World*) { - ScopedSpinLock lock(mSpinlock); if (auto ptr = get(NRTCommand::mID).lock()) { auto& client = ptr->mClient; if (!client.synchronous()) { client.cancel(); + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; return true; } } @@ -649,8 +712,7 @@ private: CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) : NRTCommand{world, args, replyAddr} { - auto& ar = *args; - ScopedSpinLock lock(mSpinlock); + auto& ar = *args; if (auto ptr = get(NRTCommand::mID).lock()) { ptr->mParams.template setParameterValuesRT(nullptr, @@ -773,33 +835,31 @@ private: NRTProgressUnit() { mInterval = static_cast(0.02 / controlDur()); + mID = static_cast(mInBuf[0][0]); + std::cout << mID << std::endl; + mRecord = rtget(mID); set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } void next(int) { - if (0 == mCounter) - { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (!lock.owns_lock()) return; - index id = static_cast(mInBuf[0][0]); - auto record = get(id); - mCounter++; + if (isNull(mRecord)) { mRecord = rtget(mID); }; - if (auto ptr = record.lock()) + if (0 == mCounter++) + { + if (auto ptr = mRecord.lock()) { mInit = true; - if (ptr->mClient.done()) mDone = 1; + mDone = ptr->mDone.load(std::memory_order_relaxed); out0(0) = static_cast(ptr->mClient.progress()); } else { if (!mInit) std::cout << "WARNING: No " << Wrapper::getName() << " with ID " - << id << std::endl; + << mID << std::endl; else mDone = 1; } @@ -808,9 +868,11 @@ private: } private: - index mInterval; - index mCounter{0}; - bool mInit{false}; + index mID; + index mInterval; + index mCounter{0}; + bool mInit{false}; + WeakCacheEntryPointer mRecord; }; @@ -845,7 +907,11 @@ private: if (mID == -1) mID = count(); auto cmd = NonRealTime::rtalloc(mWorld, mID, mWorld, mControlsIterator, this); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in NRTTriggerUnit()" + << std::endl; + } set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } @@ -854,12 +920,17 @@ private: { set_calc_function(); auto cmd = NonRealTime::rtalloc(mWorld, mID); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in ~NRTTriggerUnit()" + << std::endl; + } } void clear(int) { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + Wrapper::getInterfaceTable()->fClearUnitOutputs( + this, static_cast(mNumOutputs)); } void next(int) @@ -879,18 +950,18 @@ private: bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = - rtalloc(mWorld, mID, blocking, &mParams); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + CommandProcess* cmd = rtalloc(mWorld, mID, blocking, + &mParams); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) + { + std::cout << "ERROR: Async command failed in NRTTriggerUnit::next()" + << std::endl; + } mRunCount++; } else { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (!lock.owns_lock()) return; - - auto record = get(mID); + auto record = rtget(mID); if (auto ptr = record.lock()) { mInit = true; @@ -902,7 +973,6 @@ private: mDone = mInit; } } - private: bool mPreviousTrigger{false}; bool mTrigger{false}; @@ -945,42 +1015,32 @@ private: void init() { - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - - if (lock.owns_lock()) + mInit = false; + mInst = rtget(mID); + if (auto ptr = mInst.lock()) { - mInit = false; - mInst = get(mID); - if (auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - mDelegate.init(*this, client, mControls); - mInit = true; - } + auto& client = ptr->mClient; + mDelegate.init(*this, client, mControls); + mInit = true; } } void next(int) { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, + asSigned(mNumOutputs)); index id = static_cast(in0(1)); if (mID != id) init(); if (!mInit) return; - ScopedSpinLock lock(mSpinlock, std::try_to_lock); - if (lock.owns_lock()) - ; + if (auto ptr = mInst.lock()) { - if (auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - auto& params = ptr->mParams; - mControls.reset(mInBuf + ControlOffset()); - mDelegate.next(*this, client, params, mControls, - ptr.use_count() == 2); - } - else - printNotFound(id); + auto& client = ptr->mClient; + auto& params = ptr->mParams; + mControls.reset(mInBuf + ControlOffset()); + mDelegate.next(*this, client, params, mControls, ptr.use_count() == 2); } + else + printNotFound(id); } private: @@ -1012,7 +1072,6 @@ private: { void operator()() { - // std::cout << CommandType::name() << std::endl; defineNRTCommand(); } }; @@ -1032,13 +1091,11 @@ private: } }; - using IsRTQueryModel_t = typename Client::isRealTime; static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; static constexpr bool IsModel = Client::isModelObject::value; - - + public: static void setup(InterfaceTable* ft, const char*) { @@ -1113,9 +1170,8 @@ typename NonRealTime::Cache NonRealTime::mCache{}; template -typename NonRealTime::Spinlock - NonRealTime::mSpinlock{}; - +typename NonRealTime::RTCacheMirror + NonRealTime::mRTCache; } // namespace impl } // namespace client From 08be2a629f4779cf3770f9ce782fc68376605cfe Mon Sep 17 00:00:00 2001 From: Owen Green Date: Tue, 22 Jun 2021 18:20:33 +0100 Subject: [PATCH 3/8] Use SC RT memory pool for RT cache mirror --- include/wrapper/NonRealtime.hpp | 62 +++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 7b002b9..7a912f3 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -6,6 +6,7 @@ #include "Meta.hpp" #include "RealTimeBase.hpp" #include "SCBufferAdaptor.hpp" +#include "SCWorldAllocator.hpp" #include #include #include @@ -47,10 +48,16 @@ class NonRealTime : public SCUnit public: using Cache = std::unordered_map; - using RTCacheMirror = std::unordered_map; + using RTCacheAllocator = + SCWorldAllocator>; + using RTCacheMirror = + std::unordered_map, + std::equal_to, RTCacheAllocator>; + using RTCachePointer = + std::unique_ptr>; - static Cache mCache; - static RTCacheMirror mRTCache; + static Cache mCache; + static RTCachePointer mRTCache; private: static bool isNull(WeakCacheEntryPointer const& weak) @@ -70,8 +77,9 @@ private: static WeakCacheEntryPointer rtget(index id) { - auto lookup = mRTCache.find(id); - return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second; + if (!mRTCache) return {}; + auto lookup = mRTCache->find(id); + return lookup == mRTCache->end() ? WeakCacheEntryPointer() : lookup->second; } using RawCacheEntry = typename Cache::value_type; @@ -82,8 +90,18 @@ private: { FifoMsg msg; auto add = [](FifoMsg* m) { + if (!mRTCache) + { + RTCacheMirror* tmp = rtalloc( + m->mWorld, RTCacheAllocator{m->mWorld, getInterfaceTable()}); + World* w = m->mWorld; + mRTCache = RTCachePointer(tmp, [w](RTCacheMirror* x) { + if (w->mRunning) getInterfaceTable()->fRTFree(w, x); + }); + } + RawCacheEntry* r = static_cast(m->mData); - auto res = mRTCache.emplace(r->first, r->second); + auto res = mRTCache->emplace(r->first, r->second); if (!res.second) { std::cout << "ERROR: Could not add to RT cache"; } }; msg.Set(w, add, nullptr, &r); @@ -105,8 +123,9 @@ private: FifoMsg msg; auto remove = [](FifoMsg* m) { + if (!mRTCache) return; int* id = static_cast(m->mData); - mRTCache.erase(*id); + mRTCache->erase(*id); }; auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; @@ -366,7 +385,7 @@ private: size_t completionMsgSize = mCompletionMsgSize; char* completionMessage = mCompletionMessage; void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - auto callback = [world, id, completionMsgSize, completionMessage, + auto callback = [world, id, completionMsgSize, completionMessage, replyAddress]() { doProcessCallback(world, id, completionMsgSize, completionMessage, replyAddress); @@ -384,9 +403,7 @@ private: bool error = mResult.status() == Result::Status::kError; - if (error) - ptr->mDone.store(true, - std::memory_order_relaxed); + if (error) ptr->mDone.store(true, std::memory_order_relaxed); bool toStage3 = mSynchronous && !error; return toStage3; } @@ -436,7 +453,7 @@ private: mCompletionMessage = message; } - // private: + // private: Result mResult; bool mSynchronous; size_t mCompletionMsgSize{0}; @@ -845,6 +862,7 @@ private: void next(int) { + if (!mRTCache) return; if (isNull(mRecord)) { mRecord = rtget(mID); }; if (0 == mCounter++) @@ -950,8 +968,8 @@ private: bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = rtalloc(mWorld, mID, blocking, - &mParams); + CommandProcess* cmd = + rtalloc(mWorld, mID, blocking, &mParams); if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) { std::cout << "ERROR: Async command failed in NRTTriggerUnit::next()" @@ -1015,7 +1033,8 @@ private: void init() { - mInit = false; + mInit = false; + if (!mRTCache) return; mInst = rtget(mID); if (auto ptr = mInst.lock()) { @@ -1027,8 +1046,8 @@ private: void next(int) { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, - asSigned(mNumOutputs)); + Wrapper::getInterfaceTable()->fClearUnitOutputs( + this, static_cast(mNumOutputs)); index id = static_cast(in0(1)); if (mID != id) init(); if (!mInit) return; @@ -1070,10 +1089,7 @@ private: template struct DefineCommandIf { - void operator()() - { - defineNRTCommand(); - } + void operator()() { defineNRTCommand(); } }; template @@ -1170,8 +1186,8 @@ typename NonRealTime::Cache NonRealTime::mCache{}; template -typename NonRealTime::RTCacheMirror - NonRealTime::mRTCache; +typename NonRealTime::RTCachePointer + NonRealTime::mRTCache{}; } // namespace impl } // namespace client From 6b4f5f67ae40b042b7e18cee04fe2148fbe6a8a5 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Wed, 23 Jun 2021 12:06:38 +0100 Subject: [PATCH 4/8] Add fluid::SCWorldAllocator --- include/wrapper/SCWorldAllocator.hpp | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 include/wrapper/SCWorldAllocator.hpp diff --git a/include/wrapper/SCWorldAllocator.hpp b/include/wrapper/SCWorldAllocator.hpp new file mode 100644 index 0000000..a53d1fd --- /dev/null +++ b/include/wrapper/SCWorldAllocator.hpp @@ -0,0 +1,57 @@ +/* + Part of the Fluid Corpus Manipulation Project (http://www.flucoma.org/) + Copyright 2017-2019 University of Huddersfield. + Licensed under the BSD-3 License. + See license.md file in the project root for full license information. + This project has received funding from the European Research Council (ERC) + under the European Union’s Horizon 2020 research and innovation programme + (grant agreement No 725899). + */ +#pragma once + +#include +#include +#include +#include + +namespace fluid { + +template +class SCWorldAllocator +{ + World* mWorld{nullptr}; + InterfaceTable* mInterface{nullptr}; + +public: + using propagate_on_container_move_assignment = std::true_type; + using value_type = T; + + template + friend class SCWorldAllocator; + + SCWorldAllocator(World* w, InterfaceTable* ft) : mWorld{w}, mInterface{ft} {} + + template + SCWorldAllocator(const SCWorldAllocator& other) noexcept + : mWorld{other.mWorld}, mInterface{other.mInterface} + {} + + T* allocate(std::size_t n) + { + if (n > std::numeric_limits::max() / sizeof(T)) + throw std::bad_array_new_length(); + + if (mWorld && mInterface) + if (auto p = static_cast(mInterface->fRTAlloc(mWorld, n * sizeof(T)))) + return p; + + throw std::bad_alloc(); + } + + void deallocate(T* p, std::size_t n) noexcept + { + assert(mWorld && mInterface); + mInterface->fRTFree(mWorld, p); + } +}; +} // namespace fluid From 714c2062963ff4c81c05884ccd9509812509d24e Mon Sep 17 00:00:00 2001 From: Owen Green Date: Thu, 24 Jun 2021 08:42:34 +0100 Subject: [PATCH 5/8] Remove error messages that aren't errors --- include/wrapper/NonRealtime.hpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 7a912f3..f43ace6 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -102,7 +102,6 @@ private: RawCacheEntry* r = static_cast(m->mData); auto res = mRTCache->emplace(r->first, r->second); - if (!res.second) { std::cout << "ERROR: Could not add to RT cache"; } }; msg.Set(w, add, nullptr, &r); auto ft = Wrapper::getInterfaceTable(); @@ -507,7 +506,6 @@ private: return true; } } - std::cout << "Error: Entered callback but thread not done!?\n"; return false; } @@ -608,7 +606,7 @@ private: if (runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, c->mCompletionMessage) != 0) { - std::cout << "ERROR: Async cmf failed in callback" << std::endl; + std::cout << "ERROR: Async cmd failed in callback" << std::endl; } if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); }; @@ -635,14 +633,8 @@ private: { ft->fNRTLock(world); msg.Perform(); - // if(!ft->fSendMsgFromRT(world, msg)) - // { - // std::cout << "ERROR: Failed to queue -> NRT\n"; - // } ft->fNRTUnlock(world); } - else - std::cout << "ERROR: World not running??"; } From ac163c4a118cafebf39a32ea0ad8d8750e86c059 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Mon, 28 Jun 2021 09:00:37 +0100 Subject: [PATCH 6/8] Fix RT cache to avoid shutdown crashes --- include/wrapper/NonRealtime.hpp | 137 ++++++++++++--------------- include/wrapper/SCWorldAllocator.hpp | 28 +++--- 2 files changed, 73 insertions(+), 92 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index f43ace6..7f490fe 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -49,16 +49,20 @@ class NonRealTime : public SCUnit public: using Cache = std::unordered_map; using RTCacheAllocator = - SCWorldAllocator>; - using RTCacheMirror = - std::unordered_map, - std::equal_to, RTCacheAllocator>; - using RTCachePointer = - std::unique_ptr>; - + SCWorldAllocator,Wrapper>; + struct RTCacheMirror: public std::unordered_map, + std::equal_to, RTCacheAllocator> + { + + ~RTCacheMirror() + { + if(mWorld) mWorld = nullptr; //if we get as far as destroying this, then the world is gone and we should accept that and move on + } + + }; + static Cache mCache; - static RTCachePointer mRTCache; - + static RTCacheMirror mRTCache; private: static bool isNull(WeakCacheEntryPointer const& weak) { @@ -66,20 +70,10 @@ private: !WeakCacheEntryPointer{}.owner_before(weak); } - // shouldn't be called without at least *thinking* about getting spin lock - // first - static WeakCacheEntryPointer unsafeGet(index id) - { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; - } - - static WeakCacheEntryPointer rtget(index id) { - if (!mRTCache) return {}; - auto lookup = mRTCache->find(id); - return lookup == mRTCache->end() ? WeakCacheEntryPointer() : lookup->second; + auto lookup = mRTCache.find(id); + return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second; } using RawCacheEntry = typename Cache::value_type; @@ -90,18 +84,17 @@ private: { FifoMsg msg; auto add = [](FifoMsg* m) { - if (!mRTCache) + + if(!mWorld) mWorld = m->mWorld; + else if (mWorld != m->mWorld) //internal server has restarted { - RTCacheMirror* tmp = rtalloc( - m->mWorld, RTCacheAllocator{m->mWorld, getInterfaceTable()}); - World* w = m->mWorld; - mRTCache = RTCachePointer(tmp, [w](RTCacheMirror* x) { - if (w->mRunning) getInterfaceTable()->fRTFree(w, x); - }); + mWorld = nullptr; + mRTCache.clear(); + mWorld = m->mWorld; } - + RawCacheEntry* r = static_cast(m->mData); - auto res = mRTCache->emplace(r->first, r->second); + mRTCache.emplace(r->first, r->second); }; msg.Set(w, add, nullptr, &r); auto ft = Wrapper::getInterfaceTable(); @@ -122,20 +115,38 @@ private: FifoMsg msg; auto remove = [](FifoMsg* m) { - if (!mRTCache) return; + + if(!mWorld) + { + mRTCache.clear(); + mWorld = m->mWorld; // + } + else if (mWorld != m->mWorld) //internal server has restarted + { + mWorld = nullptr; + mRTCache.clear(); + mWorld = m->mWorld; + } + int* id = static_cast(m->mData); - mRTCache->erase(*id); + mRTCache.erase(*id); }; auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; msg.Set(world, remove, cleanup, data); + auto ft = Wrapper::getInterfaceTable(); + ft->fSendMsgToRT(world,msg); } }; public: - static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } + static WeakCacheEntryPointer get(index id) + { + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } static WeakCacheEntryPointer add(World* world, index id, const Params& params) { @@ -325,13 +336,6 @@ private: } }; - -// struct UnitInfo -// { -// int mSynthIndex; -// int mNodeID{0}; -// }; - struct CommandProcess : public NRTCommand { CommandProcess(World* world, sc_msg_iter* args, void* replyAddr) @@ -429,7 +433,7 @@ private: } // Only for blocking execution - bool stage4(World* w) // nrt + bool stage4(World*) // nrt { if (auto ptr = get(NRTCommand::mID).lock()) { @@ -519,7 +523,7 @@ private: return true; } - bool stage4(World* w) // nrt + bool stage4(World*) // nrt { if (auto ptr = get(NRTCommand::mID).lock()) { @@ -537,39 +541,6 @@ private: return false; } - - // void notifyUnit(World* w) - // { - // if(mUnitInfo.mNodeID > 0) - // { - // auto ft = Wrapper::getInterfaceTable(); - // - // NRTDoneCount++; - // FifoMsg msg; - // - // auto updateUnitDone = [](FifoMsg* m) - // { - // UnitInfo* info = static_cast(m->mData); - // - // auto ft = Wrapper::getInterfaceTable(); - // Graph* g = ft->fGetGraph(m->mWorld,info->mNodeID); - // if(g) - // { - // Unit* u = g->mUnits[info->mSynthIndex]; - // if(u) - // { - // RTDoneCount++; - // u->mDone = true; - // } - // } - // }; - // - // msg.Set(w, updateUnitDone, nullptr, &mUnitInfo); - // ft->fSendMsgToRT(w,msg); - // } - // } - - bool mSuccess; WeakCacheEntryPointer mRecord; }; @@ -854,7 +825,6 @@ private: void next(int) { - if (!mRTCache) return; if (isNull(mRecord)) { mRecord = rtget(mID); }; if (0 == mCounter++) @@ -1026,7 +996,6 @@ private: void init() { mInit = false; - if (!mRTCache) return; mInst = rtget(mID); if (auto ptr = mInst.lock()) { @@ -1132,8 +1101,17 @@ public: void init(){}; + + static World* getWorld() + { + return mWorld; + } private: + + + static World* mWorld; + static Result validateParameters(ParamSetType& p) { auto results = p.constrainParameterValues(); @@ -1173,12 +1151,15 @@ private: Result mResult; }; +template +World* NonRealTime::mWorld{nullptr}; + template typename NonRealTime::Cache NonRealTime::mCache{}; template -typename NonRealTime::RTCachePointer +typename NonRealTime::RTCacheMirror NonRealTime::mRTCache{}; } // namespace impl diff --git a/include/wrapper/SCWorldAllocator.hpp b/include/wrapper/SCWorldAllocator.hpp index a53d1fd..d7310cf 100644 --- a/include/wrapper/SCWorldAllocator.hpp +++ b/include/wrapper/SCWorldAllocator.hpp @@ -16,24 +16,20 @@ namespace fluid { -template +template class SCWorldAllocator { - World* mWorld{nullptr}; - InterfaceTable* mInterface{nullptr}; - public: using propagate_on_container_move_assignment = std::true_type; using value_type = T; - template + template friend class SCWorldAllocator; - SCWorldAllocator(World* w, InterfaceTable* ft) : mWorld{w}, mInterface{ft} {} + SCWorldAllocator() = default; - template - SCWorldAllocator(const SCWorldAllocator& other) noexcept - : mWorld{other.mWorld}, mInterface{other.mInterface} + template + SCWorldAllocator(const SCWorldAllocator&) noexcept {} T* allocate(std::size_t n) @@ -41,17 +37,21 @@ public: if (n > std::numeric_limits::max() / sizeof(T)) throw std::bad_array_new_length(); - if (mWorld && mInterface) - if (auto p = static_cast(mInterface->fRTAlloc(mWorld, n * sizeof(T)))) + World* world = Wrapper::getWorld(); + InterfaceTable* interface = Wrapper::getInterfaceTable(); + + if (world && interface) + if (auto p = static_cast(interface->fRTAlloc(world, n * sizeof(T)))) return p; throw std::bad_alloc(); } - void deallocate(T* p, std::size_t n) noexcept + void deallocate(T* p, std::size_t /*n*/) noexcept { - assert(mWorld && mInterface); - mInterface->fRTFree(mWorld, p); + World* world = Wrapper::getWorld(); + InterfaceTable* interface = Wrapper::getInterfaceTable(); + if(world && interface) interface->fRTFree(world, p); } }; } // namespace fluid From 52bb79a581a1d0c4f01f499b006e14e47473a719 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Tue, 29 Jun 2021 00:30:09 +0100 Subject: [PATCH 7/8] Use a thread_local to manage rt cache lifetime. Rollback to immutable allocator --- include/wrapper/NonRealtime.hpp | 103 ++++++++++++--------------- include/wrapper/SCWorldAllocator.hpp | 27 +++---- 2 files changed, 59 insertions(+), 71 deletions(-) diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 7f490fe..607ab1b 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -49,20 +49,37 @@ class NonRealTime : public SCUnit public: using Cache = std::unordered_map; using RTCacheAllocator = - SCWorldAllocator,Wrapper>; - struct RTCacheMirror: public std::unordered_map, - std::equal_to, RTCacheAllocator> + SCWorldAllocator, Wrapper>; + + struct RTCacheMirror + : public std::unordered_map, std::equal_to, + RTCacheAllocator> { - + + RTCacheMirror(RTCacheAllocator&& alloc) + : std::unordered_map, + std::equal_to, RTCacheAllocator>{ + std::move(alloc)} + { + // std::cout << "Warning: And up...\n" << std::endl; + } + ~RTCacheMirror() { - if(mWorld) mWorld = nullptr; //if we get as far as destroying this, then the world is gone and we should accept that and move on + // std::cout << "Warning: And down...\n" << std::endl; } - }; - - static Cache mCache; - static RTCacheMirror mRTCache; + + static Cache mCache; + + static RTCacheMirror& rtCache(World* world) + { + thread_local static RTCacheMirror mRTCache( + RTCacheAllocator(world, Wrapper::getInterfaceTable())); + return mRTCache; + } + private: static bool isNull(WeakCacheEntryPointer const& weak) { @@ -70,10 +87,11 @@ private: !WeakCacheEntryPointer{}.owner_before(weak); } - static WeakCacheEntryPointer rtget(index id) + static WeakCacheEntryPointer rtget(World* world, index id) { - auto lookup = mRTCache.find(id); - return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second; + auto lookup = rtCache(world).find(id); + return lookup == rtCache(world).end() ? WeakCacheEntryPointer() + : lookup->second; } using RawCacheEntry = typename Cache::value_type; @@ -84,17 +102,8 @@ private: { FifoMsg msg; auto add = [](FifoMsg* m) { - - if(!mWorld) mWorld = m->mWorld; - else if (mWorld != m->mWorld) //internal server has restarted - { - mWorld = nullptr; - mRTCache.clear(); - mWorld = m->mWorld; - } - RawCacheEntry* r = static_cast(m->mData); - mRTCache.emplace(r->first, r->second); + rtCache(m->mWorld).emplace(r->first, r->second); }; msg.Set(w, add, nullptr, &r); auto ft = Wrapper::getInterfaceTable(); @@ -115,28 +124,15 @@ private: FifoMsg msg; auto remove = [](FifoMsg* m) { - - if(!mWorld) - { - mRTCache.clear(); - mWorld = m->mWorld; // - } - else if (mWorld != m->mWorld) //internal server has restarted - { - mWorld = nullptr; - mRTCache.clear(); - mWorld = m->mWorld; - } - int* id = static_cast(m->mData); - mRTCache.erase(*id); + rtCache(m->mWorld).erase(*id); }; auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; msg.Set(world, remove, cleanup, data); auto ft = Wrapper::getInterfaceTable(); - ft->fSendMsgToRT(world,msg); + ft->fSendMsgToRT(world, msg); } }; @@ -330,7 +326,7 @@ private: bool stage2(World* world) { cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); - remove(world, NRTCommand::mID); + remove(world, NRTCommand::mID); NRTCommand::sendReply(name(), true); return true; } @@ -464,7 +460,6 @@ private: Params mParams; bool mOverwriteParams{false}; WeakCacheEntryPointer mRecord; - }; @@ -559,14 +554,12 @@ private: char* mCompletionMessage; void* mReplyAddress; }; - + Context* c = new Context{world, id, completionMsgSize, completionMessage, replyAddress}; auto launchCompletionFromNRT = [](FifoMsg* inmsg) { - auto runCompletion = [](FifoMsg* msg) { - Context* c = static_cast(msg->mData); World* world = c->mWorld; index id = c->mID; @@ -817,7 +810,7 @@ private: mInterval = static_cast(0.02 / controlDur()); mID = static_cast(mInBuf[0][0]); std::cout << mID << std::endl; - mRecord = rtget(mID); + mRecord = rtget(mWorld, mID); set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } @@ -825,7 +818,7 @@ private: void next(int) { - if (isNull(mRecord)) { mRecord = rtget(mID); }; + if (isNull(mRecord)) { mRecord = rtget(mWorld, mID); }; if (0 == mCounter++) { @@ -941,7 +934,7 @@ private: } else { - auto record = rtget(mID); + auto record = rtget(mWorld, mID); if (auto ptr = record.lock()) { mInit = true; @@ -953,6 +946,7 @@ private: mDone = mInit; } } + private: bool mPreviousTrigger{false}; bool mTrigger{false}; @@ -996,7 +990,7 @@ private: void init() { mInit = false; - mInst = rtget(mID); + mInst = rtget(mWorld, mID); if (auto ptr = mInst.lock()) { auto& client = ptr->mClient; @@ -1072,7 +1066,7 @@ private: static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; static constexpr bool IsModel = Client::isModelObject::value; - + public: static void setup(InterfaceTable* ft, const char*) { @@ -1101,15 +1095,10 @@ public: void init(){}; - - static World* getWorld() - { - return mWorld; - } - -private: + static World* getWorld() { return mWorld; } +private: static World* mWorld; static Result validateParameters(ParamSetType& p) @@ -1152,16 +1141,12 @@ private: }; template -World* NonRealTime::mWorld{nullptr}; +World* NonRealTime::mWorld{nullptr}; template typename NonRealTime::Cache NonRealTime::mCache{}; -template -typename NonRealTime::RTCacheMirror - NonRealTime::mRTCache{}; - } // namespace impl } // namespace client } // namespace fluid diff --git a/include/wrapper/SCWorldAllocator.hpp b/include/wrapper/SCWorldAllocator.hpp index d7310cf..51c697f 100644 --- a/include/wrapper/SCWorldAllocator.hpp +++ b/include/wrapper/SCWorldAllocator.hpp @@ -19,6 +19,9 @@ namespace fluid { template class SCWorldAllocator { + World* mWorld; + InterfaceTable* mInterface; + public: using propagate_on_container_move_assignment = std::true_type; using value_type = T; @@ -26,22 +29,24 @@ public: template friend class SCWorldAllocator; - SCWorldAllocator() = default; - - template - SCWorldAllocator(const SCWorldAllocator&) noexcept + SCWorldAllocator(World* w, InterfaceTable* interface) + : mWorld{w}, mInterface{interface} {} + template + SCWorldAllocator(const SCWorldAllocator& other) noexcept + { + mWorld = other.mWorld; + mInterface = other.mInterface; + } + T* allocate(std::size_t n) { if (n > std::numeric_limits::max() / sizeof(T)) throw std::bad_array_new_length(); - World* world = Wrapper::getWorld(); - InterfaceTable* interface = Wrapper::getInterfaceTable(); - - if (world && interface) - if (auto p = static_cast(interface->fRTAlloc(world, n * sizeof(T)))) + if (mWorld && mInterface) + if (auto p = static_cast(mInterface->fRTAlloc(mWorld, n * sizeof(T)))) return p; throw std::bad_alloc(); @@ -49,9 +54,7 @@ public: void deallocate(T* p, std::size_t /*n*/) noexcept { - World* world = Wrapper::getWorld(); - InterfaceTable* interface = Wrapper::getInterfaceTable(); - if(world && interface) interface->fRTFree(world, p); + if (mWorld && mInterface) mInterface->fRTFree(mWorld, p); } }; } // namespace fluid From 32971c7cab141e6af91f2856c98bc18c427a23b4 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Tue, 29 Jun 2021 00:31:10 +0100 Subject: [PATCH 8/8] LibManipulation now needs /bigobj with MSVC --- src/FluidManipulation/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/FluidManipulation/CMakeLists.txt b/src/FluidManipulation/CMakeLists.txt index cd88c0a..66790e5 100644 --- a/src/FluidManipulation/CMakeLists.txt +++ b/src/FluidManipulation/CMakeLists.txt @@ -17,8 +17,8 @@ target_link_libraries( ${PLUGIN} PRIVATE FLUID_DECOMPOSITION ) -include(${CMAKE_CURRENT_LIST_DIR}/../../scripts/target_post.cmake) - -if(MSVC) - target_compile_options(${PLUGIN} PRIVATE /bigobj) +if(WIN32) + target_compile_options(${PLUGIN} PUBLIC /bigobj) endif() + +include(${CMAKE_CURRENT_LIST_DIR}/../../scripts/target_post.cmake)