diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 5df5f29..bae040c 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include "BufferFuncs.hpp" #include "CopyReplyAddress.hpp" @@ -9,1090 +9,1114 @@ #include #include #include +#include +#include #include #include namespace fluid { namespace client { -namespace impl { - - /// Non Real Time Processor - - template - class NonRealTime : public SCUnit +namespace impl { + +/// Non Real Time Processor + +template +class NonRealTime : public SCUnit +{ + using Params = typename Client::ParamSetType; + + template + static T* rtalloc(World* world, Args&&... args) + { + void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); + return new (space) T{std::forward(args)...}; + } + + /// Instance cache + struct CacheEntry { - using Params = typename Client::ParamSetType; - - template - static T* rtalloc(World* world,Args&&...args) + CacheEntry(const Params& p) : mParams{p}, mClient{mParams} {} + + Params mParams; + Client mClient; + std::atomic mDone{false}; + }; + + using CacheEntryPointer = std::shared_ptr; + using WeakCacheEntryPointer = + std::weak_ptr; // could use weak_type in 17 + +public: + using Cache = std::unordered_map; + static Cache mCache; + +private: + static bool isNull(WeakCacheEntryPointer const& weak) + { + return !weak.owner_before(WeakCacheEntryPointer{}) && + !WeakCacheEntryPointer{}.owner_before(weak); + } + + // https://rigtorp.se/spinlock/ + struct Spinlock + { + std::atomic lock_ = {0}; + + void lock() noexcept { - void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); - return new (space) T{std::forward(args)...}; + for (;;) + { + // Optimistically assume the lock is free on the first try + if (!lock_.exchange(true, std::memory_order_acquire)) { return; } + // Wait for lock to be released without generating cache misses + while (lock_.load(std::memory_order_relaxed)) + { + // Issue X86 PAUSE or ARM YIELD instruction to reduce contention + // between hyper-threads + _mm_pause(); + } + } } - - /// Instance cache - struct CacheEntry + + bool try_lock() noexcept { - - CacheEntry(const Params& p):mParams{p},mClient{mParams} - {} - - Params mParams; - Client mClient; - bool mDone{false}; - }; - - using CacheEntryPointer = std::shared_ptr; - using WeakCacheEntryPointer = std::weak_ptr; //could use weak_type in 17 - - public: - using Cache = std::unordered_map; - static Cache mCache; - private: - static bool isNull(WeakCacheEntryPointer const& weak) { - return !weak.owner_before(WeakCacheEntryPointer{}) && !WeakCacheEntryPointer{}.owner_before(weak); + // First do a relaxed load to check if lock is free in order to prevent + // unnecessary cache misses if someone does while(!try_lock()) + return !lock_.load(std::memory_order_relaxed) && + !lock_.exchange(true, std::memory_order_acquire); } - - // https://rigtorp.se/spinlock/ - struct Spinlock { - std::atomic lock_ = {0}; - - void lock() noexcept { - for (;;) { - // Optimistically assume the lock is free on the first try - if (!lock_.exchange(true, std::memory_order_acquire)) { - return; - } - // Wait for lock to be released without generating cache misses - while (lock_.load(std::memory_order_relaxed)) { - // Issue X86 PAUSE or ARM YIELD instruction to reduce contention between - // hyper-threads - //__builtin_ia32_pause(); - } - } - } - - bool tryLock() noexcept { - // First do a relaxed load to check if lock is free in order to prevent - // unnecessary cache misses if someone does while(!try_lock()) - return !lock_.load(std::memory_order_relaxed) && - !lock_.exchange(true, std::memory_order_acquire); - } - - void unlock() noexcept { - lock_.store(false, std::memory_order_release); - } - }; - - //RAII for above - struct ScopedSpinlock + + void unlock() noexcept { lock_.store(false, std::memory_order_release); } + }; + + static Spinlock mSpinlock; + using ScopedSpinLock = std::unique_lock; + + // shouldn't be called without at least *thinking* about getting spin lock + // first + static WeakCacheEntryPointer unsafeGet(index id) + { + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } + +public: + static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } + + static WeakCacheEntryPointer add(index id, const Params& params) + { + if (isNull(get(id))) { - ScopedSpinlock(Spinlock& _l) noexcept: mLock{_l} - { - mLock.lock(); - } - ~ScopedSpinlock() { mLock.unlock(); } - private: - Spinlock& mLock; - }; - - static Spinlock mSpinlock; - - // shouldn't be called without at least *thinking* about getting spin lock first - static inline WeakCacheEntryPointer unsafeGet(index id) + auto result = mCache.emplace(id, std::make_shared(params)); + + return result.second ? (result.first)->second + : WeakCacheEntryPointer(); // sob + } + else // client has screwed up { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + std::cout << "ERROR: " << Wrapper::getName() << " ID " << id + << " already in use\n"; + return {}; } - - public: - static WeakCacheEntryPointer get(index id) + } + + static void remove(index id) { mCache.erase(id); } + + static void printNotFound(index id) + { + std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " + << id << std::endl; + } + +private: + static InterfaceTable* getInterfaceTable() + { + return Wrapper::getInterfaceTable(); + } + + template + using ParamsFromOSC = + typename ClientParams::template Setter; + + template + using ParamsFromSynth = + typename ClientParams::template Setter; + + + struct NRTCommand + { + NRTCommand(World*, sc_msg_iter* args, void* replyAddr, + bool consumeID = true) { - ScopedSpinlock{mSpinlock}; - return unsafeGet(id); + auto count = args->count; + auto pos = args->rdpos; + + mID = args->geti(); + + if (!consumeID) + { + args->count = count; + args->rdpos = pos; + } + + if (replyAddr) mReplyAddress = copyReplyAddress(replyAddr); } - static WeakCacheEntryPointer tryGet(index id) + ~NRTCommand() { - if(mSpinlock.tryLock()) - { - auto ret = unsafeGet(id); - mSpinlock.unlock(); - return ret; - } - return WeakCacheEntryPointer{}; + if (mReplyAddress) deleteReplyAddress(mReplyAddress); } + NRTCommand() {} + + explicit NRTCommand(index id) : mID{id} {} + + bool stage2(World*) { return true; } // nrt + bool stage3(World*) { return true; } // rt + bool stage4(World*) { return false; } // nrt + void cleanup(World*) {} // rt - static WeakCacheEntryPointer add(index id, const Params& params) + void sendReply(const char* name, bool success) { - ScopedSpinlock{mSpinlock}; - if(isNull(get(id))) + if (mReplyAddress) { - auto result = mCache.emplace(id, - std::make_shared(params)); - - return result.second ? (result.first)->second : WeakCacheEntryPointer(); //sob - } - else //client has screwed up - { - std::cout << "ERROR: " << Wrapper::getName() << " ID " << id << " already in use\n"; - return {}; + std::string slash{"/"}; + small_scpacket packet; + packet.adds((slash + name).c_str()); + packet.maketags(3); + packet.addtag(','); + packet.addtag('i'); + packet.addtag('i'); + packet.addi(success); + packet.addi(static_cast(mID)); + + SendReply(mReplyAddress, packet.data(), + static_cast(packet.size())); } } - - static void remove(index id) + // protected: + index mID; + void* mReplyAddress{nullptr}; + }; + + struct CommandNew : public NRTCommand + { + CommandNew(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr, !IsNamedShared_v}, + mParams{Client::getParameterDescriptors()} { - ScopedSpinlock{mSpinlock}; - mCache.erase(id); + mParams.template setParameterValuesRT(nullptr, world, + *args); } - - static void printNotFound(index id) + + CommandNew(index id, World*, FloatControlsIter& args, Unit* x) + : NRTCommand{id}, mParams{Client::getParameterDescriptors()} { - std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " << id << std::endl; + mParams.template setParameterValuesRT(nullptr, x, args); } - - private: - static InterfaceTable* getInterfaceTable() { return Wrapper::getInterfaceTable() ;} - - template - using ParamsFromOSC = typename ClientParams::template Setter; - - template - using ParamsFromSynth = typename ClientParams::template Setter; - - struct NRTCommand + static const char* name() { - NRTCommand(World*, sc_msg_iter* args, void* replyAddr, bool consumeID = true) - { - auto count = args->count; - auto pos = args->rdpos; - - mID = args->geti(); - - if(!consumeID) - { - args->count = count; - args->rdpos = pos; - } - - if(replyAddr) - mReplyAddress = copyReplyAddress(replyAddr); - } - - ~NRTCommand() - { - if(mReplyAddress) deleteReplyAddress(mReplyAddress); - } - - NRTCommand(){} - - explicit NRTCommand(index id):mID{id}{} - - bool stage2(World*) { return true; } //nrt - bool stage3(World*) { return true; } //rt - bool stage4(World*) { return false; } //nrt - void cleanup(World*) {} //rt - - void sendReply(const char* name,bool success) - { - if(mReplyAddress) - { - std::string slash{"/"}; - small_scpacket packet; - packet.adds((slash+name).c_str()); - packet.maketags(3); - packet.addtag(','); - packet.addtag('i'); - packet.addtag('i'); - packet.addi(success); - packet.addi(static_cast(mID)); - - SendReply(mReplyAddress,packet.data(), static_cast(packet.size())); - } - } -// protected: - index mID; - void* mReplyAddress{nullptr}; - }; - - struct CommandNew : public NRTCommand + static std::string cmd = std::string(Wrapper::getName()) + "/new"; + return cmd.c_str(); + } + + bool stage2(World* w) { - CommandNew(World* world, sc_msg_iter* args,void* replyAddr) - : NRTCommand{world,args, replyAddr, !IsNamedShared_v}, - mParams{Client::getParameterDescriptors()} - { - mParams.template setParameterValuesRT(nullptr, world, *args); - } + // auto entry = ; + ScopedSpinLock lock{mSpinlock}; - CommandNew(index id, World*, FloatControlsIter& args, Unit* x) - :NRTCommand{id}, - mParams{Client::getParameterDescriptors()} - { - mParams.template setParameterValuesRT(nullptr, x, args); - } + Result constraintsRes = validateParameters(mParams); + + if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); - static const char* name() + mResult = (!isNull(add(NRTCommand::mID, mParams))); + + // Sigh. The cache entry above has both the client instance and main + // params instance. + // The client is linked to the params by reference; I've not got the + // in-place constrction working properly so that params are in their final + // resting place by the time we make the client so (for) now we need to + // manually repoint the client to the correct place. Or badness. + if (mResult) { - static std::string cmd = std::string(Wrapper::getName()) + "/new"; - return cmd.c_str(); + auto ptr = get(NRTCommand::mID).lock(); + ptr->mClient.setParams(ptr->mParams); } - bool stage2(World* w) - { -// auto entry = ; + NRTCommand::sendReply(name(), mResult); + return mResult; + } - Result constraintsRes = validateParameters(mParams); + private: + bool mResult; + Params mParams; + }; - if(!constraintsRes.ok()) Wrapper::printResult(w,constraintsRes); + struct CommandFree : public NRTCommand + { + using NRTCommand::NRTCommand; - mResult = (!isNull(add(NRTCommand::mID, mParams))); - - //Sigh. The cache entry above has both the client instance and main params instance. - // The client is linked to the params by reference; I've not got the in-place constrction - // working properly so that params are in their final resting place by the time we make the client - // so (for) now we need to manually repoint the client to the correct place. Or badness. - if(mResult) - { - auto ptr = get(NRTCommand::mID).lock(); - ptr->mClient.setParams(ptr->mParams); - } - - NRTCommand::sendReply(name(),mResult); - - return mResult; - } - - private: - bool mResult; - Params mParams; - }; - - struct CommandFree: public NRTCommand + void cancelCheck(std::false_type, index id) { - using NRTCommand::NRTCommand; - - void cancelCheck(std::false_type, index id) - { - if(auto ptr = get(id).lock()) - { - auto& client = ptr->mClient; - if(!client.synchronous() && client.state() == ProcessState::kProcessing) - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; - } - } - - void cancelCheck(std::true_type, index){} - - - static const char* name() + if (auto ptr = get(id).lock()) { - static std::string cmd = std::string(Wrapper::getName()) + "/free"; - return cmd.c_str(); - } - - bool stage2(World*) - { - cancelCheck(IsRTQueryModel_t(),NRTCommand::mID); - remove(NRTCommand::mID); - NRTCommand::sendReply(name(), true); - return true; + auto& client = ptr->mClient; + if (!client.synchronous() && + client.state() == ProcessState::kProcessing) + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; } + } - }; - - - /// Not registered as a PlugInCmd. Triggered by worker thread callback - struct CommandAsyncComplete: public NRTCommand + void cancelCheck(std::true_type, index) {} + + + static const char* name() { - CommandAsyncComplete(World*, index id, void* replyAddress) - { - NRTCommand::mID = id; - NRTCommand::mReplyAddress = replyAddress; - } - - static const char* name() { return CommandProcess::name(); } - - bool stage2(World* world) + static std::string cmd = std::string(Wrapper::getName()) + "/free"; + return cmd.c_str(); + } + + bool stage2(World*) + { + ScopedSpinLock lock(mSpinlock); + cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); + remove(NRTCommand::mID); + NRTCommand::sendReply(name(), true); + return true; + } + }; + + + /// Not registered as a PlugInCmd. Triggered by worker thread callback + struct CommandAsyncComplete : public NRTCommand + { + CommandAsyncComplete(World*, index id, void* replyAddress) + { + NRTCommand::mID = id; + NRTCommand::mReplyAddress = replyAddress; + } + + static const char* name() { return CommandProcess::name(); } + + bool stage2(World* world) + { + + // std::cout << "In Async completion\n"; + ScopedSpinLock lock{mSpinlock}; + if (auto ptr = get(NRTCommand::mID).lock()) { - - // std::cout << "In Async completion\n"; - if(auto ptr = get(NRTCommand::mID).lock()) + Result r; + mRecord = ptr; + auto& client = ptr->mClient; + ProcessState s = client.checkProgress(r); + if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) { - Result r; - mRecord = ptr; - auto& client = ptr->mClient; - ProcessState s = client.checkProgress(r); - if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) + if (r.status() == Result::Status::kCancelled) + { + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + ptr->mDone.store(true, std::memory_order_relaxed); + return false; + } + + client.checkProgress(r); + mSuccess = !(r.status() == Result::Status::kError); + if (!r.ok()) { - if (r.status() == Result::Status::kCancelled) + Wrapper::printResult(world, r); + if (r.status() == Result::Status::kError) { - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; - ptr->mDone = true; + ptr->mDone.store(true, std::memory_order_relaxed); return false; } - - client.checkProgress(r); - mSuccess = !(r.status() == Result::Status::kError); - if (!r.ok()) - { - Wrapper::printResult(world,r); - if(r.status() == Result::Status::kError) - { - ptr->mDone = true; - return false; - } - } - - return true; } + // if we're progressing to stage3, don't unlock the lock just yet + lock.release(); + return true; } - return false; } - - bool stage3(World* world) + return false; + } + + bool stage3(World* world) + { + ScopedSpinLock lock(mSpinlock, std::adopt_lock); + if (auto ptr = mRecord.lock()) { - if(auto ptr = mRecord.lock()) - { - auto& params = ptr->mParams; - params.template forEachParamType(world); - return true; - } - return false; + auto& params = ptr->mParams; + params.template forEachParamType(world); + return true; } - - bool stage4(World*) //nrt + return false; + } + + bool stage4(World*) // nrt + { + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - if(auto ptr = get(NRTCommand::mID).lock()) + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) - { - NRTCommand::sendReply(name(),mSuccess); - } - ptr->mDone = true; - return true; + NRTCommand::sendReply(name(), mSuccess); } - return false; + ptr->mDone.store(true, std::memory_order_relaxed); // = true; + return true; } - - bool mSuccess; - WeakCacheEntryPointer mRecord; - }; - - - static void doProcessCallback(World* world, index id,size_t completionMsgSize,char* completionMessage,void* replyAddress) + std::cout << "ERROR: Failed to lock\n"; + return false; + } + + bool mSuccess; + WeakCacheEntryPointer mRecord; + }; + + + static void doProcessCallback(World* world, index id, + size_t completionMsgSize, + char* completionMessage, void* replyAddress) + { + auto ft = getInterfaceTable(); + struct Context { - auto ft = getInterfaceTable(); - struct Context{ - World* mWorld; - index mID; - size_t mCompletionMsgSize; - char* mCompletionMessage; - void* mReplyAddress; + World* mWorld; + index mID; + size_t mCompletionMsgSize; + char* mCompletionMessage; + void* mReplyAddress; + }; + + Context* c = new Context{world, id, completionMsgSize, completionMessage, + replyAddress}; + + auto launchCompletionFromNRT = [](FifoMsg* inmsg) { + auto runCompletion = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + World* world = c->mWorld; + index id = c->mID; + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); + CommandAsyncComplete* cmd = + new (space) CommandAsyncComplete(world, id, c->mReplyAddress); + runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, + c->mCompletionMessage); + if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); }; - - Context* c = new Context{world,id,completionMsgSize,completionMessage,replyAddress}; - - auto launchCompletionFromNRT = [](FifoMsg* inmsg) - { - auto runCompletion = [](FifoMsg* msg){ - Context* c = static_cast(msg->mData); - World* world = c->mWorld; - index id = c->mID; - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(CommandAsyncComplete)); - CommandAsyncComplete* cmd = new (space) CommandAsyncComplete(world, id,c->mReplyAddress); - runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, c->mCompletionMessage); - if(c->mCompletionMsgSize) ft->fRTFree(world,c->mCompletionMessage); - }; - - auto tidyup = [](FifoMsg* msg) - { - Context* c = static_cast(msg->mData); - delete c; - }; - - auto ft = getInterfaceTable(); - FifoMsg fwd = *inmsg; - fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); - if(inmsg->mWorld->mRunning) - ft->fSendMsgToRT(inmsg->mWorld,fwd); + + auto tidyup = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + delete c; }; - - FifoMsg msg; - msg.Set(world, launchCompletionFromNRT, nullptr, c); - - if(world->mRunning) ft->fSendMsgFromRT(world,msg); - } - - struct CommandProcess: public NRTCommand + + auto ft = getInterfaceTable(); + FifoMsg fwd = *inmsg; + fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); + if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd); + }; + + FifoMsg msg; + msg.Set(world, launchCompletionFromNRT, nullptr, c); + + if (world->mRunning) ft->fSendMsgFromRT(world, msg); + } + + struct CommandProcess : public NRTCommand + { + CommandProcess(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr}, + mParams{Client::getParameterDescriptors()} { - CommandProcess(World* world, sc_msg_iter* args, void* replyAddr): NRTCommand{world, args, replyAddr},mParams{Client::getParameterDescriptors()} + auto& ar = *args; + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - auto& ar = *args; - - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mDone = false; - mParams.template setParameterValuesRT(nullptr, world, ar); - mSynchronous = static_cast(ar.geti()); - } //if this fails, we'll hear about it in stage2 anyway - } + ptr->mDone.store(false, std::memory_order_relaxed); + mParams.template setParameterValuesRT(nullptr, world, + ar); + mSynchronous = static_cast(ar.geti()); + } // if this fails, we'll hear about it in stage2 anyway + } - explicit CommandProcess(index id,bool synchronous,Params* params):NRTCommand{id},mSynchronous(synchronous), - mParams{Client::getParameterDescriptors()} - { - if(params) - { - mParams = *params; - mOverwriteParams = true; - } - } - - - static const char* name() + explicit CommandProcess(index id, bool synchronous, Params* params) + : NRTCommand{id}, + mSynchronous(synchronous), mParams{Client::getParameterDescriptors()} + { + if (params) { - static std::string cmd = std::string(Wrapper::getName()) + "/process"; - return cmd.c_str(); + mParams = *params; + mOverwriteParams = true; } - - bool stage2(World* world) + } + + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/process"; + return cmd.c_str(); + } + + bool stage2(World* world) + { + ScopedSpinLock lock(mSpinlock); + mRecord = get(NRTCommand::mID); + if (auto ptr = mRecord.lock()) { - mRecord = get(NRTCommand::mID); - if(auto ptr = mRecord.lock()) + + auto& params = ptr->mParams; + if (mOverwriteParams) params = mParams; + auto& client = ptr->mClient; + + Result result = validateParameters(params); + Wrapper::printResult(world, result); + if (result.status() != Result::Status::kError) { - - auto& params = ptr->mParams; - if(mOverwriteParams) params = mParams; - auto& client = ptr->mClient; - - - -// if(mOSCData) -// { -// params.template setParameterValuesRT(nullptr, world, *mOSCData); -// mSynchronous = static_cast(mOSCData->geti()); -// } - - Result result = validateParameters(params); + client.setSynchronous(mSynchronous); + index id = NRTCommand::mID; + size_t completionMsgSize = mCompletionMsgSize; + char* completionMessage = mCompletionMessage; + void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); + + auto callback = [world, id, completionMsgSize, completionMessage, + replyAddress]() { + doProcessCallback(world, id, completionMsgSize, completionMessage, + replyAddress); + }; + + result = mSynchronous ? client.enqueue(params) + : client.enqueue(params, callback); Wrapper::printResult(world, result); - if (result.status() != Result::Status::kError) + + if (result.ok()) { -// client.done() - client.setSynchronous(mSynchronous); - index id = NRTCommand::mID; - size_t completionMsgSize = mCompletionMsgSize; - char* completionMessage = mCompletionMessage; - void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - - auto callback = [world,id,completionMsgSize,completionMessage,replyAddress](){ - doProcessCallback(world,id,completionMsgSize,completionMessage,replyAddress); - }; - - result = mSynchronous ? client.enqueue(params) : client.enqueue(params,callback); - Wrapper::printResult(world, result); - - if(result.ok()) - { - ptr->mDone = false; - mResult = client.process(); - Wrapper::printResult(world,mResult); - - bool error =mResult.status() == Result::Status::kError; - - if(error) ptr->mDone = true; - return mSynchronous && !error; - } - } - } - else - { - mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), " with ID ", NRTCommand::mID}; - Wrapper::printResult(world,mResult); - } - return false; - } + ptr->mDone.store(false, std::memory_order_relaxed); + mResult = client.process(); + Wrapper::printResult(world, mResult); - //Only for blocking execution - bool stage3(World* world) //rt - { - if(auto ptr = mRecord.lock()) - { - ptr->mParams.template forEachParamType(world); -// NRTCommand::sendReply(world, name(), mResult.ok()); - return true; + bool error = mResult.status() == Result::Status::kError; + + if (error) ptr->mDone.store(true, std::memory_order_relaxed); + bool toStage3 = mSynchronous && !error; + + if (toStage3) lock.release(); + return toStage3; + } } - // std::cout << "Ohno\n"; - return false; } - - //Only for blocking execution - bool stage4(World*) //nrt + else { - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && mSynchronous) - NRTCommand::sendReply(name(), mResult.ok()); - ptr->mDone = true; - return true; - } - return false; + mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), + " with ID ", NRTCommand::mID}; + Wrapper::printResult(world, mResult); } - + return false; + } - bool synchronous() + // Only for blocking execution + bool stage3(World* world) // rt + { + ScopedSpinLock lock(mSpinlock, std::adopt_lock); + if (auto ptr = mRecord.lock()) { - return mSynchronous; + ptr->mParams.template forEachParamType(world); + return true; } + return false; + } - void addCompletionMessage(size_t size, char* message)//, void* addr) - { - mCompletionMsgSize = size; - mCompletionMessage = message; - } - -// private: - Result mResult; - bool mSynchronous; - size_t mCompletionMsgSize{0}; - char* mCompletionMessage{nullptr}; - Params mParams; - bool mOverwriteParams{false}; - WeakCacheEntryPointer mRecord; - }; - - struct CommandProcessNew: public NRTCommand + // Only for blocking execution + bool stage4(World*) // nrt { - CommandProcessNew(World* world, sc_msg_iter* args,void* replyAddr) - : mNew{world, args, replyAddr}, - mProcess{mNew.mID,false,nullptr} - { - mProcess.mSynchronous = args->geti(); - mProcess.mReplyAddress = mNew.mReplyAddress; - } - - CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) - : mNew{id, world, args, x}, - mProcess{id} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; - return cmd.c_str(); - } - - bool stage2(World* world) + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - return mNew.stage2(world) ? mProcess.stage2(world) : false; - } - - bool stage3(World* world) //rt - { - return mProcess.stage3(world); - } + ptr->mParams.template forEachParamType(); - bool stage4(World* world) //nrt - { - return mProcess.stage4(world); - } - - void cleanup(World* world) - { - mProcess.mReplyAddress = nullptr; - mProcess.cleanup(world); - } - - bool synchronous() - { - return mProcess.synchronous(); - } - - void addCompletionMessage(size_t size, char* message) - { - mProcess.addCompletionMessage(size, message); + if (NRTCommand::mID >= 0 && mSynchronous) + NRTCommand::sendReply(name(), mResult.ok()); + ptr->mDone.store(true, std::memory_order_relaxed); + return true; } + return false; + } - private: - CommandNew mNew; - CommandProcess mProcess; - }; - - - struct CommandCancel: public NRTCommand + + bool synchronous() { return mSynchronous; } + + void addCompletionMessage(size_t size, char* message) //, void* addr) { - CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) - : NRTCommand{world, args, replyAddr} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; - return cmd.c_str(); - } - - bool stage2(World*) - { - if(auto ptr = get(NRTCommand::mID).lock()) - { - auto& client = ptr->mClient; - if(!client.synchronous()) - { - client.cancel(); - return true; - } - } - return false; - } - }; - - struct CommandSetParams: public NRTCommand + mCompletionMsgSize = size; + mCompletionMessage = message; + } + + // private: + Result mResult; + bool mSynchronous; + size_t mCompletionMsgSize{0}; + char* mCompletionMessage{nullptr}; + Params mParams; + bool mOverwriteParams{false}; + WeakCacheEntryPointer mRecord; + }; + + struct CommandProcessNew : public NRTCommand + { + CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) + : mNew{world, args, replyAddr}, mProcess{mNew.mID, false, nullptr} { - CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) - : NRTCommand{world, args, replyAddr} - { - auto& ar = *args; - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template setParameterValuesRT(nullptr, world, ar); - Result result = validateParameters(ptr->mParams); - ptr->mClient.setParams(ptr->mParams); - } else printNotFound(NRTCommand::mID); - } - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; - return cmd.c_str(); - } - }; - - - template - static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + mProcess.mSynchronous = args->geti(); + mProcess.mReplyAddress = mNew.mReplyAddress; + } + + CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) + : mNew{id, world, args, x}, mProcess{id} + {} + + static const char* name() { - auto ft = getInterfaceTable(); + static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; + return cmd.c_str(); + } - return ft->fDoAsynchronousCommand(world, replyAddr,Command::name(),cmd, - [](World* w, void* d) { return static_cast(d)->stage2(w); }, - [](World* w, void* d) { return static_cast(d)->stage3(w); }, - [](World* w, void* d) { return static_cast(d)->stage4(w); }, - [](World* w, void* d) - { - auto cmd = static_cast(d); - cmd->cleanup(w); - cmd->~Command(); - getInterfaceTable()->fRTFree(w,d); - }, - static_cast(completionMsgSize), completionMsgData); + bool stage2(World* world) + { + return mNew.stage2(world) ? mProcess.stage2(world) : false; } - - - static auto runAsyncCommand(World* world, CommandProcess* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + + bool stage3(World* world) // rt { - if(!cmd->synchronous()) - { - - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + return mProcess.stage3(world); } - - static auto runAsyncCommand(World* world, CommandProcessNew* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + + bool stage4(World* world) // nrt { - if(!cmd->synchronous()) - { - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + return mProcess.stage4(world); } - - - template - static void defineNRTCommand() + + void cleanup(World* world) { - auto ft = getInterfaceTable(); - auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, void* replyAddr) - { - - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(Command)); - Command* cmd = new (space) Command(world, args, replyAddr); - //This is brittle, but can't think of something better offhand - //This is the only place we can check for a completion message at the end of the OSC packet - //beause it has to be passed on to DoAsynhronousCommand at this point. However, detecting correctly - //relies on the Command type having fully consumed arguments from the args iterator in the constructor for cmd - size_t completionMsgSize{args ? args->getbsize() : 0}; - assert(completionMsgSize <= std::numeric_limits::max()); - char* completionMsgData = nullptr; - - if (completionMsgSize) { - completionMsgData = (char*)ft->fRTAlloc(world, completionMsgSize); - args->getb(completionMsgData, completionMsgSize); - } - runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); - - if(completionMsgSize) ft->fRTFree(world, completionMsgData); - - }; - ft->fDefinePlugInCmd(Command::name(),commandRunner,nullptr); + mProcess.mReplyAddress = nullptr; + mProcess.cleanup(world); } - - - - struct NRTProgressUnit: SCUnit + + bool synchronous() { return mProcess.synchronous(); } + + void addCompletionMessage(size_t size, char* message) { - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Monitor"; - return n.c_str(); - } - - NRTProgressUnit() - { - mInterval = static_cast(0.02 / controlDur()); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - void next(int) + mProcess.addCompletionMessage(size, message); + } + + private: + CommandNew mNew; + CommandProcess mProcess; + }; + + + struct CommandCancel : public NRTCommand + { + CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr} + {} + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; + return cmd.c_str(); + } + + bool stage2(World*) + { + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - if (0 == mCounter++) + auto& client = ptr->mClient; + if (!client.synchronous()) { - index id = static_cast(mInBuf[0][0]); - if(auto ptr = tryGet(id).lock()) - { - mInit = true; - if(ptr->mClient.done()) mDone = 1; - out0(0) = static_cast(ptr->mClient.progress()); - } - else - { - if(!mInit) - std::cout << "WARNING: No " << Wrapper::getName() << " with ID " << id << std::endl; - else mDone = 1; - } + client.cancel(); + return true; } - mCounter %= mInterval; } - - private: - index mInterval; - index mCounter{0}; - bool mInit{false}; - }; - - - struct NRTTriggerUnit: SCUnit + return false; + } + }; + + struct CommandSetParams : public NRTCommand + { + CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr} { - - static index count(){ - static index counter = -1; - return counter--; - } - - index ControlOffset() { return mSpecialIndex + 1; } - - index ControlSize() + auto& ar = *args; + ScopedSpinLock lock(mSpinlock); + if (auto ptr = get(NRTCommand::mID).lock()) { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 3; //id + trig + blocking; + ptr->mParams.template setParameterValuesRT(nullptr, + world, ar); + Result result = validateParameters(ptr->mParams); + ptr->mClient.setParams(ptr->mParams); } - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Trigger"; - return n.c_str(); - } - - NRTTriggerUnit() - : mControlsIterator{mInBuf + ControlOffset(),ControlSize()},mParams{Client::getParameterDescriptors()} - { - mID = static_cast(mInBuf[0][0]); - if(mID == -1) mID = count(); - auto cmd = NonRealTime::rtalloc(mWorld,mID,mWorld, mControlsIterator, this); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// mInst = get(mID); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - ~NRTTriggerUnit() - { -// if(auto ptr = mInst.lock()) -// { - auto cmd = NonRealTime::rtalloc(mWorld,mID); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// } - } - - void next(int) + else + printNotFound(NRTCommand::mID); + } + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; + return cmd.c_str(); + } + }; + + + template + static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, + size_t completionMsgSize, char* completionMsgData) + { + auto ft = getInterfaceTable(); + + return ft->fDoAsynchronousCommand( + world, replyAddr, Command::name(), cmd, + [](World* w, void* d) { return static_cast(d)->stage2(w); }, + [](World* w, void* d) { return static_cast(d)->stage3(w); }, + [](World* w, void* d) { return static_cast(d)->stage4(w); }, + [](World* w, void* d) { + auto cmd = static_cast(d); + cmd->cleanup(w); + cmd->~Command(); + getInterfaceTable()->fRTFree(w, d); + }, + static_cast(completionMsgSize), completionMsgData); + } + + + static auto runAsyncCommand(World* world, CommandProcess* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) + { + + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); + } + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + static auto runAsyncCommand(World* world, CommandProcessNew* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) + { + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, + nullptr); + } + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + + template + static void defineNRTCommand() + { + auto ft = getInterfaceTable(); + auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, + void* replyAddr) { + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(Command)); + Command* cmd = new (space) Command(world, args, replyAddr); + // This is brittle, but can't think of something better offhand + // This is the only place we can check for a completion message at the end + // of the OSC packet beause it has to be passed on to DoAsynhronousCommand + // at this point. However, detecting correctly relies on the Command type + // having fully consumed arguments from the args iterator in the + // constructor for cmd + size_t completionMsgSize{args ? args->getbsize() : 0}; + assert(completionMsgSize <= std::numeric_limits::max()); + char* completionMsgData = nullptr; + + if (completionMsgSize) { - - - index triggerInput = static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); - mTrigger = mTrigger || triggerInput; - -// if(auto ptr = mInst->lock()) -// if(auto ptr = get(mID).lock()) -// { - bool trigger = (!mPreviousTrigger) && triggerInput;//mTrigger; - mPreviousTrigger = triggerInput; - mTrigger = 0; -// auto& client = ptr->mClient; - - if(trigger) - { - mControlsIterator.reset(1 + mInBuf); //add one for ID -// auto& params = ptr->mParams; - Wrapper::setParams(this,mParams,mControlsIterator,true,false); - bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = rtalloc(mWorld,mID,blocking,&mParams); - runAsyncCommand(mWorld,cmd, nullptr,0, nullptr); - mRunCount++; - } - else - { - if(auto ptr = tryGet(mID).lock()) - { - mInit = true; - auto& client = ptr->mClient; - mDone = ptr->mDone; - out0(0) = mDone ? 1 : static_cast(client.progress()); - } else mDone = mInit; - } -// } -// else printNotFound(id); + completionMsgData = (char*) ft->fRTAlloc(world, completionMsgSize); + args->getb(completionMsgData, completionMsgSize); } - - private: - bool mPreviousTrigger{false}; - bool mTrigger{false}; - Result mResult; - impl::FloatControlsIter mControlsIterator; - index mID; - index mRunCount{0}; - WeakCacheEntryPointer mInst; - Params mParams; - bool mInit{false}; + runAsyncCommand(world, cmd, replyAddr, completionMsgSize, + completionMsgData); + + if (completionMsgSize) ft->fRTFree(world, completionMsgData); }; - - struct NRTModelQueryUnit: SCUnit + ft->fDefinePlugInCmd(Command::name(), commandRunner, nullptr); + } + + + struct NRTProgressUnit : SCUnit + { + + static const char* name() { - using Delegate = impl::RealTimeBase; - - index ControlOffset() { return mSpecialIndex + 2; } - index ControlSize() - { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 2; // trig + id - } - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Query"; - return n.c_str(); - } - - NRTModelQueryUnit() - //Offset controls by 1 to account for ID - : mControls{mInBuf + ControlOffset(),ControlSize()} - { - mID = static_cast(in0(1)); - init(); -// mInst = get(id); -// if(auto ptr = mInst.lock()) -// { -// auto& client = ptr->mClient; -// mDelegate.init(*this,client,mControls); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); -// }else printNotFound(mID); - } - - void init() + static std::string n = std::string(Wrapper::getName()) + "Monitor"; + return n.c_str(); + } + + NRTProgressUnit() + { + mInterval = static_cast(0.02 / controlDur()); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } + + void next(int) + { + if (0 == mCounter) { - if(mSpinlock.tryLock()) + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (!lock.owns_lock()) return; + + index id = static_cast(mInBuf[0][0]); + auto record = get(id); + mCounter++; + + if (auto ptr = record.lock()) { - mInit = false; - mInst = unsafeGet(mID); - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - mDelegate.init(*this,client,mControls); - mInit = true; - }//else printNotFound(mID); - mSpinlock.unlock(); + mInit = true; + if (ptr->mClient.done()) mDone = 1; + out0(0) = static_cast(ptr->mClient.progress()); } - } - - void next(int) - { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); - index id = static_cast(in0(1)); - if(mID != id) init(); - if(!mInit) return; - if(mSpinlock.tryLock()) + else { - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - auto& params = ptr->mParams; - mControls.reset(mInBuf + ControlOffset()); - mDelegate.next(*this,client,params,mControls, ptr.use_count() == 2); - }else printNotFound(id); - mSpinlock.unlock(); + if (!mInit) + std::cout << "WARNING: No " << Wrapper::getName() << " with ID " + << id << std::endl; + else + mDone = 1; } - } - - private: - Delegate mDelegate; - FloatControlsIter mControls; - index mID; - WeakCacheEntryPointer mInst; - bool mInit{false}; - }; - - - using ParamSetType = typename Client::ParamSetType; - - template - using SetupMessageCmd = typename FluidSCMessaging::template SetupMessageCmd; - - - template - struct DefineCommandIf - { - void operator()() { } - }; + mCounter %= mInterval; + } + private: + index mInterval; + index mCounter{0}; + bool mInit{false}; + }; + + + struct NRTTriggerUnit : SCUnit + { - template - struct DefineCommandIf + static index count() { - void operator()() { - // std::cout << CommandType::name() << std::endl; - defineNRTCommand(); - } - }; - - template - struct RegisterUnitIf + static index counter = -1; + return counter--; + } + + index ControlOffset() { return mSpecialIndex + 1; } + + index ControlSize() { - void operator()(InterfaceTable*) {} - }; + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 3; // id + trig + blocking; + } - template - struct RegisterUnitIf + static const char* name() { - void operator()(InterfaceTable* ft) { registerUnit(ft,UnitType::name()); } - }; + static std::string n = std::string(Wrapper::getName()) + "Trigger"; + return n.c_str(); + } - - using IsRTQueryModel_t = typename Client::isRealTime; - static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; - - static constexpr bool IsModel = Client::isModelObject::value; - - - public: - static void setup(InterfaceTable* ft, const char*) + NRTTriggerUnit() + : mControlsIterator{mInBuf + ControlOffset(), ControlSize()}, + mParams{Client::getParameterDescriptors()} { - defineNRTCommand(); - DefineCommandIf()(); - DefineCommandIf()(); - DefineCommandIf()(); - - DefineCommandIf()(); - - defineNRTCommand(); - RegisterUnitIf()(ft); - RegisterUnitIf()(ft); - - RegisterUnitIf()(ft); - Client::getMessageDescriptors().template iterate(); - - - static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; - - ft->fDefinePlugInCmd(flushCmd.c_str(),[](World*, void*, struct sc_msg_iter*, void* ){ - mCache.clear(); - },nullptr); + mID = static_cast(mInBuf[0][0]); + if (mID == -1) mID = count(); + auto cmd = NonRealTime::rtalloc(mWorld, mID, mWorld, + mControlsIterator, this); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } + ~NRTTriggerUnit() + { + set_calc_function(); + auto cmd = NonRealTime::rtalloc(mWorld, mID); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + } - void init(){}; + void clear(int) + { + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + } - private: - static Result validateParameters(ParamSetType& p) + void next(int) { - auto results = p.constrainParameterValues(); - for (auto& r : results) + index triggerInput = + static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); + mTrigger = mTrigger || triggerInput; + + bool trigger = (!mPreviousTrigger) && triggerInput; // mTrigger; + mPreviousTrigger = triggerInput; + mTrigger = 0; + + if (trigger) + { + mControlsIterator.reset(1 + mInBuf); // add one for ID + Wrapper::setParams(this, mParams, mControlsIterator, true, false); + + bool blocking = mInBuf[mNumInputs - 1][0] > 0; + + CommandProcess* cmd = + rtalloc(mWorld, mID, blocking, &mParams); + runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); + mRunCount++; + } + else { - if (!r.ok()) return r; + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (!lock.owns_lock()) return; + + auto record = get(mID); + if (auto ptr = record.lock()) + { + mInit = true; + auto& client = ptr->mClient; + mDone = ptr->mDone.load(std::memory_order_relaxed); + out0(0) = mDone ? 1 : static_cast(client.progress()); + } + else + mDone = mInit; } - return {}; } - template - struct AssignBuffer + private: + bool mPreviousTrigger{false}; + bool mTrigger{false}; + bool mBeingFreed{false}; + Result mResult; + impl::FloatControlsIter mControlsIterator; + index mID; + index mRunCount{0}; + WeakCacheEntryPointer mInst; + Params mParams; + bool mInit{false}; + }; + + struct NRTModelQueryUnit : SCUnit + { + using Delegate = impl::RealTimeBase; + + index ControlOffset() { return mSpecialIndex + 2; } + index ControlSize() + { + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 2; // trig + id + } + + static const char* name() { - void operator()(const typename BufferT::type& p, World* w) + static std::string n = std::string(Wrapper::getName()) + "Query"; + return n.c_str(); + } + + NRTModelQueryUnit() + // Offset controls by 1 to account for ID + : mControls{mInBuf + ControlOffset(), ControlSize()} + { + mID = static_cast(in0(1)); + init(); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } + + void init() + { + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + + if (lock.owns_lock()) { - if (auto b = static_cast(p.get())) b->assignToRT(w); + mInit = false; + mInst = get(mID); + if (auto ptr = mInst.lock()) + { + auto& client = ptr->mClient; + mDelegate.init(*this, client, mControls); + mInit = true; + } } - }; + } - template - struct CleanUpBuffer + void next(int) { - void operator()(const typename BufferT::type& p) + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); + index id = static_cast(in0(1)); + if (mID != id) init(); + if (!mInit) return; + ScopedSpinLock lock(mSpinlock, std::try_to_lock); + if (lock.owns_lock()) + ; { - if (auto b = static_cast(p.get())) b->cleanUp(); + if (auto ptr = mInst.lock()) + { + auto& client = ptr->mClient; + auto& params = ptr->mParams; + mControls.reset(mInBuf + ControlOffset()); + mDelegate.next(*this, client, params, mControls, + ptr.use_count() == 2); + } + else + printNotFound(id); } - }; + } + + private: + Delegate mDelegate; + FloatControlsIter mControls; + index mID; + WeakCacheEntryPointer mInst; + bool mInit{false}; + }; + + + using ParamSetType = typename Client::ParamSetType; + + template + using SetupMessageCmd = + typename FluidSCMessaging::template SetupMessageCmd; + + + template + struct DefineCommandIf + { + void operator()() {} + }; - FifoMsg mFifoMsg; - char* mCompletionMessage = nullptr; - void* mReplyAddr = nullptr; - const char* mName = nullptr; - index checkThreadInterval; - index pollCounter{0}; - index mPreviousTrigger{0}; - bool mSynchronous{true}; - Result mResult; + + template + struct DefineCommandIf + { + void operator()() + { + // std::cout << CommandType::name() << std::endl; + defineNRTCommand(); + } }; - - template - typename NonRealTime::Cache NonRealTime::mCache{}; - template - typename NonRealTime::Spinlock NonRealTime::mSpinlock{}; + template + struct RegisterUnitIf + { + void operator()(InterfaceTable*) {} + }; + + template + struct RegisterUnitIf + { + void operator()(InterfaceTable* ft) + { + registerUnit(ft, UnitType::name()); + } + }; + + + using IsRTQueryModel_t = typename Client::isRealTime; + static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; + + static constexpr bool IsModel = Client::isModelObject::value; + + +public: + static void setup(InterfaceTable* ft, const char*) + { + defineNRTCommand(); + DefineCommandIf()(); + DefineCommandIf()(); + DefineCommandIf()(); + + DefineCommandIf()(); + + defineNRTCommand(); + RegisterUnitIf()(ft); + RegisterUnitIf()(ft); + + RegisterUnitIf()(ft); + Client::getMessageDescriptors().template iterate(); + + + static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; + + ft->fDefinePlugInCmd( + flushCmd.c_str(), + [](World*, void*, struct sc_msg_iter*, void*) { mCache.clear(); }, + nullptr); + } + + + void init(){}; + +private: + static Result validateParameters(ParamSetType& p) + { + auto results = p.constrainParameterValues(); + for (auto& r : results) + { + if (!r.ok()) return r; + } + return {}; + } + + template + struct AssignBuffer + { + void operator()(const typename BufferT::type& p, World* w) + { + if (auto b = static_cast(p.get())) b->assignToRT(w); + } + }; + + template + struct CleanUpBuffer + { + void operator()(const typename BufferT::type& p) + { + if (auto b = static_cast(p.get())) b->cleanUp(); + } + }; + + FifoMsg mFifoMsg; + char* mCompletionMessage = nullptr; + void* mReplyAddr = nullptr; + const char* mName = nullptr; + index checkThreadInterval; + index pollCounter{0}; + index mPreviousTrigger{0}; + bool mSynchronous{true}; + Result mResult; +}; + +template +typename NonRealTime::Cache + NonRealTime::mCache{}; + +template +typename NonRealTime::Spinlock + NonRealTime::mSpinlock{}; -} -} -} +} // namespace impl +} // namespace client +} // namespace fluid