diff --git a/include/wrapper/NonRealtime.hpp b/include/wrapper/NonRealtime.hpp index 5df5f29..607ab1b 100644 --- a/include/wrapper/NonRealtime.hpp +++ b/include/wrapper/NonRealtime.hpp @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include "BufferFuncs.hpp" #include "CopyReplyAddress.hpp" @@ -6,1093 +6,1147 @@ #include "Meta.hpp" #include "RealTimeBase.hpp" #include "SCBufferAdaptor.hpp" +#include "SCWorldAllocator.hpp" #include #include #include +#include #include #include namespace fluid { namespace client { -namespace impl { - - /// Non Real Time Processor - - template - class NonRealTime : public SCUnit +namespace impl { + +/// Non Real Time Processor + +template +class NonRealTime : public SCUnit +{ + using Params = typename Client::ParamSetType; + + template + static T* rtalloc(World* world, Args&&... args) + { + void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); + return new (space) T{std::forward(args)...}; + } + + /// Instance cache + struct CacheEntry { - using Params = typename Client::ParamSetType; - - template - static T* rtalloc(World* world,Args&&...args) + CacheEntry(const Params& p) : mParams{p}, mClient{mParams} {} + + Params mParams; + Client mClient; + std::atomic mDone{false}; + }; + + using CacheEntryPointer = std::shared_ptr; + using WeakCacheEntryPointer = + std::weak_ptr; // could use weak_type in 17 + +public: + using Cache = std::unordered_map; + using RTCacheAllocator = + SCWorldAllocator, Wrapper>; + + struct RTCacheMirror + : public std::unordered_map, std::equal_to, + RTCacheAllocator> + { + + RTCacheMirror(RTCacheAllocator&& alloc) + : std::unordered_map, + std::equal_to, RTCacheAllocator>{ + std::move(alloc)} { - void* space = getInterfaceTable()->fRTAlloc(world, sizeof(T)); - return new (space) T{std::forward(args)...}; + // std::cout << "Warning: And up...\n" << std::endl; } - - /// Instance cache - struct CacheEntry + + ~RTCacheMirror() { - - CacheEntry(const Params& p):mParams{p},mClient{mParams} - {} - - Params mParams; - Client mClient; - bool mDone{false}; - }; - - using CacheEntryPointer = std::shared_ptr; - using WeakCacheEntryPointer = std::weak_ptr; //could use weak_type in 17 - - public: - using Cache = std::unordered_map; - static Cache mCache; - private: - static bool isNull(WeakCacheEntryPointer const& weak) { - return !weak.owner_before(WeakCacheEntryPointer{}) && !WeakCacheEntryPointer{}.owner_before(weak); + // std::cout << "Warning: And down...\n" << std::endl; } - - // https://rigtorp.se/spinlock/ - struct Spinlock { - std::atomic lock_ = {0}; - - void lock() noexcept { - for (;;) { - // Optimistically assume the lock is free on the first try - if (!lock_.exchange(true, std::memory_order_acquire)) { - return; - } - // Wait for lock to be released without generating cache misses - while (lock_.load(std::memory_order_relaxed)) { - // Issue X86 PAUSE or ARM YIELD instruction to reduce contention between - // hyper-threads - //__builtin_ia32_pause(); - } - } - } - - bool tryLock() noexcept { - // First do a relaxed load to check if lock is free in order to prevent - // unnecessary cache misses if someone does while(!try_lock()) - return !lock_.load(std::memory_order_relaxed) && - !lock_.exchange(true, std::memory_order_acquire); - } - - void unlock() noexcept { - lock_.store(false, std::memory_order_release); - } - }; - - //RAII for above - struct ScopedSpinlock + }; + + static Cache mCache; + + static RTCacheMirror& rtCache(World* world) + { + thread_local static RTCacheMirror mRTCache( + RTCacheAllocator(world, Wrapper::getInterfaceTable())); + return mRTCache; + } + +private: + static bool isNull(WeakCacheEntryPointer const& weak) + { + return !weak.owner_before(WeakCacheEntryPointer{}) && + !WeakCacheEntryPointer{}.owner_before(weak); + } + + static WeakCacheEntryPointer rtget(World* world, index id) + { + auto lookup = rtCache(world).find(id); + return lookup == rtCache(world).end() ? WeakCacheEntryPointer() + : lookup->second; + } + + using RawCacheEntry = typename Cache::value_type; + + struct addToRTCache + { + void operator()(World* w, RawCacheEntry& r) { - ScopedSpinlock(Spinlock& _l) noexcept: mLock{_l} + FifoMsg msg; + auto add = [](FifoMsg* m) { + RawCacheEntry* r = static_cast(m->mData); + rtCache(m->mWorld).emplace(r->first, r->second); + }; + msg.Set(w, add, nullptr, &r); + auto ft = Wrapper::getInterfaceTable(); + if (!ft->fSendMsgToRT(w, msg)) { - mLock.lock(); + std::cout << "ERROR: Message to RT failed"; } - ~ScopedSpinlock() { mLock.unlock(); } - private: - Spinlock& mLock; - }; - - static Spinlock mSpinlock; - - // shouldn't be called without at least *thinking* about getting spin lock first - static inline WeakCacheEntryPointer unsafeGet(index id) - { - auto lookup = mCache.find(id); - return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; } - - public: - static WeakCacheEntryPointer get(index id) + }; + + struct removeFromRTCache + { + void operator()(World* world, index id) { - ScopedSpinlock{mSpinlock}; - return unsafeGet(id); + index* data = new index(); + *data = id; + + FifoMsg msg; + + auto remove = [](FifoMsg* m) { + int* id = static_cast(m->mData); + rtCache(m->mWorld).erase(*id); + }; + + auto cleanup = [](FifoMsg* m) { delete static_cast(m->mData); }; + + msg.Set(world, remove, cleanup, data); + auto ft = Wrapper::getInterfaceTable(); + ft->fSendMsgToRT(world, msg); } + }; - static WeakCacheEntryPointer tryGet(index id) + +public: + static WeakCacheEntryPointer get(index id) + { + auto lookup = mCache.find(id); + return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; + } + + static WeakCacheEntryPointer add(World* world, index id, const Params& params) + { + if (isNull(get(id))) { - if(mSpinlock.tryLock()) - { - auto ret = unsafeGet(id); - mSpinlock.unlock(); - return ret; - } - return WeakCacheEntryPointer{}; + auto result = mCache.emplace(id, std::make_shared(params)); + + addToRTCache{}(world, *(result.first)); + + return result.second ? (result.first)->second + : WeakCacheEntryPointer(); // sob } + else // client has screwed up + { + std::cout << "ERROR: " << Wrapper::getName() << " ID " << id + << " already in use\n"; + return {}; + } + } + + static void remove(World* world, index id) + { + mCache.erase(id); + removeFromRTCache{}(world, id); + } + + static void printNotFound(index id) + { + std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " + << id << std::endl; + } + +private: + static InterfaceTable* getInterfaceTable() + { + return Wrapper::getInterfaceTable(); + } + + template + using ParamsFromOSC = + typename ClientParams::template Setter; + template + using ParamsFromSynth = + typename ClientParams::template Setter; - static WeakCacheEntryPointer add(index id, const Params& params) + struct NRTCommand + { + NRTCommand(World*, sc_msg_iter* args, void* replyAddr, + bool consumeID = true) { - ScopedSpinlock{mSpinlock}; - if(isNull(get(id))) + auto count = args->count; + auto pos = args->rdpos; + + mID = args->geti(); + + if (!consumeID) { - auto result = mCache.emplace(id, - std::make_shared(params)); - - return result.second ? (result.first)->second : WeakCacheEntryPointer(); //sob + args->count = count; + args->rdpos = pos; } - else //client has screwed up + + if (replyAddr) mReplyAddress = copyReplyAddress(replyAddr); + } + + ~NRTCommand() + { + if (mReplyAddress) deleteReplyAddress(mReplyAddress); + } + + NRTCommand() {} + + explicit NRTCommand(index id) : mID{id} {} + + bool stage2(World*) { return true; } // nrt + bool stage3(World*) { return true; } // rt + bool stage4(World*) { return false; } // nrt + void cleanup(World*) {} // rt + + void sendReply(const char* name, bool success) + { + if (mReplyAddress) { - std::cout << "ERROR: " << Wrapper::getName() << " ID " << id << " already in use\n"; - return {}; + std::string slash{"/"}; + small_scpacket packet; + packet.adds((slash + name).c_str()); + packet.maketags(3); + packet.addtag(','); + packet.addtag('i'); + packet.addtag('i'); + packet.addi(success); + packet.addi(static_cast(mID)); + + SendReply(mReplyAddress, packet.data(), + static_cast(packet.size())); } } - - static void remove(index id) + // protected: + index mID; + void* mReplyAddress{nullptr}; + }; + + struct CommandNew : public NRTCommand + { + CommandNew(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr, !IsNamedShared_v}, + mParams{Client::getParameterDescriptors()} { - ScopedSpinlock{mSpinlock}; - mCache.erase(id); + mParams.template setParameterValuesRT(nullptr, world, + *args); } - - static void printNotFound(index id) + + CommandNew(index id, World*, FloatControlsIter& args, Unit* x) + : NRTCommand{id}, mParams{Client::getParameterDescriptors()} { - std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " << id << std::endl; + mParams.template setParameterValuesRT(nullptr, x, args); } - - private: - static InterfaceTable* getInterfaceTable() { return Wrapper::getInterfaceTable() ;} - - template - using ParamsFromOSC = typename ClientParams::template Setter; - - template - using ParamsFromSynth = typename ClientParams::template Setter; + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/new"; + return cmd.c_str(); + } - struct NRTCommand + bool stage2(World* w) { - NRTCommand(World*, sc_msg_iter* args, void* replyAddr, bool consumeID = true) - { - auto count = args->count; - auto pos = args->rdpos; - - mID = args->geti(); - - if(!consumeID) - { - args->count = count; - args->rdpos = pos; - } - - if(replyAddr) - mReplyAddress = copyReplyAddress(replyAddr); - } - - ~NRTCommand() + Result constraintsRes = validateParameters(mParams); + + if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); + + mResult = (!isNull(add(w, NRTCommand::mID, mParams))); + + // Sigh. The cache entry above has both the client instance and main + // params instance. + // The client is linked to the params by reference; I've not got the + // in-place constrction working properly so that params are in their final + // resting place by the time we make the client so (for) now we need to + // manually repoint the client to the correct place. Or badness. + if (mResult) { - if(mReplyAddress) deleteReplyAddress(mReplyAddress); + auto ptr = get(NRTCommand::mID).lock(); + ptr->mClient.setParams(ptr->mParams); } - - NRTCommand(){} - - explicit NRTCommand(index id):mID{id}{} - - bool stage2(World*) { return true; } //nrt - bool stage3(World*) { return true; } //rt - bool stage4(World*) { return false; } //nrt - void cleanup(World*) {} //rt - - void sendReply(const char* name,bool success) + + NRTCommand::sendReply(name(), mResult); + + return mResult; + } + + private: + bool mResult; + Params mParams; + }; + + struct CommandFree : public NRTCommand + { + using NRTCommand::NRTCommand; + + void cancelCheck(std::false_type, index id) + { + if (auto ptr = get(id).lock()) { - if(mReplyAddress) - { - std::string slash{"/"}; - small_scpacket packet; - packet.adds((slash+name).c_str()); - packet.maketags(3); - packet.addtag(','); - packet.addtag('i'); - packet.addtag('i'); - packet.addi(success); - packet.addi(static_cast(mID)); - - SendReply(mReplyAddress,packet.data(), static_cast(packet.size())); - } + auto& client = ptr->mClient; + if (!client.synchronous() && + client.state() == ProcessState::kProcessing) + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; } -// protected: - index mID; - void* mReplyAddress{nullptr}; - }; - - struct CommandNew : public NRTCommand + } + + void cancelCheck(std::true_type, index) {} + + + static const char* name() { - CommandNew(World* world, sc_msg_iter* args,void* replyAddr) - : NRTCommand{world,args, replyAddr, !IsNamedShared_v}, + static std::string cmd = std::string(Wrapper::getName()) + "/free"; + return cmd.c_str(); + } + + bool stage2(World* world) + { + cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); + remove(world, NRTCommand::mID); + NRTCommand::sendReply(name(), true); + return true; + } + }; + + struct CommandProcess : public NRTCommand + { + CommandProcess(World* world, sc_msg_iter* args, void* replyAddr) + : NRTCommand{world, args, replyAddr}, mParams{Client::getParameterDescriptors()} + { + auto& ar = *args; + if (auto ptr = get(NRTCommand::mID).lock()) { - mParams.template setParameterValuesRT(nullptr, world, *args); - } + ptr->mDone.store(false, std::memory_order_relaxed); + mParams.template setParameterValuesRT(nullptr, world, + ar); + mSynchronous = static_cast(ar.geti()); + } // if this fails, we'll hear about it in stage2 anyway + } - CommandNew(index id, World*, FloatControlsIter& args, Unit* x) - :NRTCommand{id}, - mParams{Client::getParameterDescriptors()} + explicit CommandProcess(index id, bool synchronous, Params* params) + : NRTCommand{id}, + mSynchronous(synchronous), mParams{Client::getParameterDescriptors()} + { + if (params) { - mParams.template setParameterValuesRT(nullptr, x, args); + mParams = *params; + mOverwriteParams = true; } + } - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/new"; - return cmd.c_str(); - } + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/process"; + return cmd.c_str(); + } - bool stage2(World* w) + bool stage2(World* world) + { + mRecord = get(NRTCommand::mID); + if (auto ptr = mRecord.lock()) { -// auto entry = ; + auto& params = ptr->mParams; + if (mOverwriteParams) params = mParams; + auto& client = ptr->mClient; - Result constraintsRes = validateParameters(mParams); + Result result = validateParameters(params); + Wrapper::printResult(world, result); + if (result.status() != Result::Status::kError) + { + client.setSynchronous(mSynchronous); + index id = NRTCommand::mID; + size_t completionMsgSize = mCompletionMsgSize; + char* completionMessage = mCompletionMessage; + void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); + auto callback = [world, id, completionMsgSize, completionMessage, + replyAddress]() { + doProcessCallback(world, id, completionMsgSize, completionMessage, + replyAddress); + }; - if(!constraintsRes.ok()) Wrapper::printResult(w,constraintsRes); + result = mSynchronous ? client.enqueue(params) + : client.enqueue(params, callback); + Wrapper::printResult(world, result); - mResult = (!isNull(add(NRTCommand::mID, mParams))); - - //Sigh. The cache entry above has both the client instance and main params instance. - // The client is linked to the params by reference; I've not got the in-place constrction - // working properly so that params are in their final resting place by the time we make the client - // so (for) now we need to manually repoint the client to the correct place. Or badness. - if(mResult) - { - auto ptr = get(NRTCommand::mID).lock(); - ptr->mClient.setParams(ptr->mParams); - } - - NRTCommand::sendReply(name(),mResult); - - return mResult; - } - - private: - bool mResult; - Params mParams; - }; - - struct CommandFree: public NRTCommand - { - using NRTCommand::NRTCommand; - - void cancelCheck(std::false_type, index id) - { - if(auto ptr = get(id).lock()) - { - auto& client = ptr->mClient; - if(!client.synchronous() && client.state() == ProcessState::kProcessing) - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; + if (result.status() != Result::Status::kError) + { + ptr->mDone.store(false, std::memory_order_relaxed); + mResult = client.process(); + Wrapper::printResult(world, mResult); + + bool error = mResult.status() == Result::Status::kError; + + if (error) ptr->mDone.store(true, std::memory_order_relaxed); + bool toStage3 = mSynchronous && !error; + return toStage3; + } } } - - void cancelCheck(std::true_type, index){} - - - static const char* name() + else { - static std::string cmd = std::string(Wrapper::getName()) + "/free"; - return cmd.c_str(); + mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), + " with ID ", NRTCommand::mID}; + Wrapper::printResult(world, mResult); } - - bool stage2(World*) + return false; + } + + // Only for blocking execution + bool stage3(World* world) // rt + { + if (auto ptr = mRecord.lock()) { - cancelCheck(IsRTQueryModel_t(),NRTCommand::mID); - remove(NRTCommand::mID); - NRTCommand::sendReply(name(), true); + ptr->mParams.template forEachParamType(world); return true; } + return false; + } - }; - - - /// Not registered as a PlugInCmd. Triggered by worker thread callback - struct CommandAsyncComplete: public NRTCommand + // Only for blocking execution + bool stage4(World*) // nrt { - CommandAsyncComplete(World*, index id, void* replyAddress) + if (auto ptr = get(NRTCommand::mID).lock()) { - NRTCommand::mID = id; - NRTCommand::mReplyAddress = replyAddress; + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && mSynchronous) + NRTCommand::sendReply(name(), + mResult.status() != Result::Status::kError); + ptr->mDone.store(true, std::memory_order_relaxed); + return true; } - - static const char* name() { return CommandProcess::name(); } - - bool stage2(World* world) + return false; + } + + bool synchronous() { return mSynchronous; } + + void addCompletionMessage(size_t size, char* message) //, void* addr) + { + mCompletionMsgSize = size; + mCompletionMessage = message; + } + + // private: + Result mResult; + bool mSynchronous; + size_t mCompletionMsgSize{0}; + char* mCompletionMessage{nullptr}; + Params mParams; + bool mOverwriteParams{false}; + WeakCacheEntryPointer mRecord; + }; + + + /// Not registered as a PlugInCmd. Triggered by worker thread callback + struct CommandAsyncComplete : public NRTCommand + { + CommandAsyncComplete(World*, index id, void* replyAddress) + { + NRTCommand::mID = id; + NRTCommand::mReplyAddress = replyAddress; + } + + static const char* name() { return CommandProcess::name(); } + + bool stage2(World* world) + { + if (auto ptr = get(NRTCommand::mID).lock()) { - - // std::cout << "In Async completion\n"; - if(auto ptr = get(NRTCommand::mID).lock()) + Result r; + mRecord = ptr; + auto& client = ptr->mClient; + ProcessState s = client.checkProgress(r); + if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) { - Result r; - mRecord = ptr; - auto& client = ptr->mClient; - ProcessState s = client.checkProgress(r); - if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing) + if (r.status() == Result::Status::kCancelled) { - if (r.status() == Result::Status::kCancelled) - { - std::cout << Wrapper::getName() - << ": Processing cancelled" - << std::endl; - ptr->mDone = true; - return false; - } - - client.checkProgress(r); - mSuccess = !(r.status() == Result::Status::kError); - if (!r.ok()) - { - Wrapper::printResult(world,r); - if(r.status() == Result::Status::kError) - { - ptr->mDone = true; - return false; - } - } - - return true; + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + ptr->mDone.store(true, std::memory_order_relaxed); + return false; } - } - return false; - } - - bool stage3(World* world) - { - if(auto ptr = mRecord.lock()) - { - auto& params = ptr->mParams; - params.template forEachParamType(world); - return true; - } - return false; - } - - bool stage4(World*) //nrt - { - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) + + client.checkProgress(r); + mSuccess = !(r.status() == Result::Status::kError); + Wrapper::printResult(world, r); + if (!mSuccess) { - NRTCommand::sendReply(name(),mSuccess); + ptr->mDone.store(true, std::memory_order_relaxed); + return false; } - ptr->mDone = true; + // if we're progressing to stage3, don't unlock the lock just yet + // lock.release(); return true; } - return false; } - - bool mSuccess; - WeakCacheEntryPointer mRecord; - }; - - - static void doProcessCallback(World* world, index id,size_t completionMsgSize,char* completionMessage,void* replyAddress) - { - auto ft = getInterfaceTable(); - struct Context{ - World* mWorld; - index mID; - size_t mCompletionMsgSize; - char* mCompletionMessage; - void* mReplyAddress; - }; - - Context* c = new Context{world,id,completionMsgSize,completionMessage,replyAddress}; - - auto launchCompletionFromNRT = [](FifoMsg* inmsg) - { - auto runCompletion = [](FifoMsg* msg){ - Context* c = static_cast(msg->mData); - World* world = c->mWorld; - index id = c->mID; - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(CommandAsyncComplete)); - CommandAsyncComplete* cmd = new (space) CommandAsyncComplete(world, id,c->mReplyAddress); - runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, c->mCompletionMessage); - if(c->mCompletionMsgSize) ft->fRTFree(world,c->mCompletionMessage); - }; - - auto tidyup = [](FifoMsg* msg) - { - Context* c = static_cast(msg->mData); - delete c; - }; - - auto ft = getInterfaceTable(); - FifoMsg fwd = *inmsg; - fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); - if(inmsg->mWorld->mRunning) - ft->fSendMsgToRT(inmsg->mWorld,fwd); - }; - - FifoMsg msg; - msg.Set(world, launchCompletionFromNRT, nullptr, c); - - if(world->mRunning) ft->fSendMsgFromRT(world,msg); + return false; } - - struct CommandProcess: public NRTCommand + + bool stage3(World* world) { - CommandProcess(World* world, sc_msg_iter* args, void* replyAddr): NRTCommand{world, args, replyAddr},mParams{Client::getParameterDescriptors()} + if (auto ptr = mRecord.lock()) { - auto& ar = *args; - - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mDone = false; - mParams.template setParameterValuesRT(nullptr, world, ar); - mSynchronous = static_cast(ar.geti()); - } //if this fails, we'll hear about it in stage2 anyway + auto& params = ptr->mParams; + params.template forEachParamType(world); } + return true; + } - explicit CommandProcess(index id,bool synchronous,Params* params):NRTCommand{id},mSynchronous(synchronous), - mParams{Client::getParameterDescriptors()} - { - if(params) - { - mParams = *params; - mOverwriteParams = true; - } - } - - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/process"; - return cmd.c_str(); - } - - bool stage2(World* world) + bool stage4(World*) // nrt + { + if (auto ptr = get(NRTCommand::mID).lock()) { - mRecord = get(NRTCommand::mID); - if(auto ptr = mRecord.lock()) - { - - auto& params = ptr->mParams; - if(mOverwriteParams) params = mParams; - auto& client = ptr->mClient; - - - -// if(mOSCData) -// { -// params.template setParameterValuesRT(nullptr, world, *mOSCData); -// mSynchronous = static_cast(mOSCData->geti()); -// } - - Result result = validateParameters(params); - Wrapper::printResult(world, result); - if (result.status() != Result::Status::kError) - { -// client.done() - client.setSynchronous(mSynchronous); - index id = NRTCommand::mID; - size_t completionMsgSize = mCompletionMsgSize; - char* completionMessage = mCompletionMessage; - void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); - - auto callback = [world,id,completionMsgSize,completionMessage,replyAddress](){ - doProcessCallback(world,id,completionMsgSize,completionMessage,replyAddress); - }; - - result = mSynchronous ? client.enqueue(params) : client.enqueue(params,callback); - Wrapper::printResult(world, result); - - if(result.ok()) - { - ptr->mDone = false; - mResult = client.process(); - Wrapper::printResult(world,mResult); - - bool error =mResult.status() == Result::Status::kError; - - if(error) ptr->mDone = true; - return mSynchronous && !error; - } - } - } - else + ptr->mParams.template forEachParamType(); + + if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress) { - mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), " with ID ", NRTCommand::mID}; - Wrapper::printResult(world,mResult); + NRTCommand::sendReply(name(), mSuccess); } - return false; + + ptr->mDone.store(true, std::memory_order_relaxed); // = true; + return true; } + std::cout << "ERROR: Failed to lock\n"; + return false; + } - //Only for blocking execution - bool stage3(World* world) //rt - { - if(auto ptr = mRecord.lock()) + bool mSuccess; + WeakCacheEntryPointer mRecord; + }; + + + static void doProcessCallback(World* world, index id, + size_t completionMsgSize, + char* completionMessage, void* replyAddress) + { + auto ft = getInterfaceTable(); + struct Context + { + World* mWorld; + index mID; + size_t mCompletionMsgSize; + char* mCompletionMessage; + void* mReplyAddress; + }; + + Context* c = new Context{world, id, completionMsgSize, completionMessage, + replyAddress}; + + auto launchCompletionFromNRT = [](FifoMsg* inmsg) { + auto runCompletion = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + World* world = c->mWorld; + index id = c->mID; + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); + CommandAsyncComplete* cmd = + new (space) CommandAsyncComplete(world, id, c->mReplyAddress); + if (runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, + c->mCompletionMessage) != 0) { - ptr->mParams.template forEachParamType(world); -// NRTCommand::sendReply(world, name(), mResult.ok()); - return true; + std::cout << "ERROR: Async cmd failed in callback" << std::endl; } - // std::cout << "Ohno\n"; - return false; - } - - //Only for blocking execution - bool stage4(World*) //nrt - { - if(auto ptr = get(NRTCommand::mID).lock()) + if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); + }; + + auto tidyup = [](FifoMsg* msg) { + Context* c = static_cast(msg->mData); + delete c; + }; + + auto ft = getInterfaceTable(); + FifoMsg fwd = *inmsg; + fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); + if (inmsg->mWorld->mRunning) + if (!ft->fSendMsgToRT(inmsg->mWorld, fwd)) { - ptr->mParams.template forEachParamType(); - - if(NRTCommand::mID >= 0 && mSynchronous) - NRTCommand::sendReply(name(), mResult.ok()); - ptr->mDone = true; - return true; + std::cout << "ERROR: Failed to queue -> RT\n"; } - return false; - } - + }; - bool synchronous() - { - return mSynchronous; - } + FifoMsg msg; + msg.Set(world, launchCompletionFromNRT, nullptr, c); - void addCompletionMessage(size_t size, char* message)//, void* addr) - { - mCompletionMsgSize = size; - mCompletionMessage = message; - } - -// private: - Result mResult; - bool mSynchronous; - size_t mCompletionMsgSize{0}; - char* mCompletionMessage{nullptr}; - Params mParams; - bool mOverwriteParams{false}; - WeakCacheEntryPointer mRecord; - }; - - struct CommandProcessNew: public NRTCommand + if (world->mRunning) { - CommandProcessNew(World* world, sc_msg_iter* args,void* replyAddr) - : mNew{world, args, replyAddr}, - mProcess{mNew.mID,false,nullptr} - { - mProcess.mSynchronous = args->geti(); - mProcess.mReplyAddress = mNew.mReplyAddress; - } - - CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) - : mNew{id, world, args, x}, - mProcess{id} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; - return cmd.c_str(); - } - - bool stage2(World* world) - { - return mNew.stage2(world) ? mProcess.stage2(world) : false; - } - - bool stage3(World* world) //rt - { - return mProcess.stage3(world); - } + ft->fNRTLock(world); + msg.Perform(); + ft->fNRTUnlock(world); + } + } - bool stage4(World* world) //nrt - { - return mProcess.stage4(world); - } - - void cleanup(World* world) - { - mProcess.mReplyAddress = nullptr; - mProcess.cleanup(world); - } - - bool synchronous() - { - return mProcess.synchronous(); - } - - void addCompletionMessage(size_t size, char* message) - { - mProcess.addCompletionMessage(size, message); - } - private: - CommandNew mNew; - CommandProcess mProcess; - }; - - - struct CommandCancel: public NRTCommand + struct CommandProcessNew : public NRTCommand + { + CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) + : mNew{world, args, replyAddr}, mProcess{mNew.mID, false, nullptr} + { + mProcess.mSynchronous = args->geti(); + mProcess.mReplyAddress = mNew.mReplyAddress; + } + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; + return cmd.c_str(); + } + + bool stage2(World* world) + { + return mNew.stage2(world) ? mProcess.stage2(world) : false; + } + + bool stage3(World* world) // rt + { + return mProcess.stage3(world); + } + + bool stage4(World* world) // nrt + { + return mProcess.stage4(world); + } + + void cleanup(World* world) + { + mProcess.mReplyAddress = nullptr; + mProcess.cleanup(world); + } + + bool synchronous() { return mProcess.synchronous(); } + + void addCompletionMessage(size_t size, char* message) { - CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) + mProcess.addCompletionMessage(size, message); + } + + private: + CommandNew mNew; + CommandProcess mProcess; + }; + + + struct CommandCancel : public NRTCommand + { + CommandCancel(World* world, sc_msg_iter* args, void* replyAddr) : NRTCommand{world, args, replyAddr} - {} - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; - return cmd.c_str(); - } - - bool stage2(World*) + {} + + static const char* name() + { + static std::string cmd = std::string(Wrapper::getName()) + "/cancel"; + return cmd.c_str(); + } + + bool stage2(World*) + { + if (auto ptr = get(NRTCommand::mID).lock()) { - if(auto ptr = get(NRTCommand::mID).lock()) + auto& client = ptr->mClient; + if (!client.synchronous()) { - auto& client = ptr->mClient; - if(!client.synchronous()) - { - client.cancel(); - return true; - } + client.cancel(); + std::cout << Wrapper::getName() << ": Processing cancelled" + << std::endl; + return true; } - return false; } - }; - - struct CommandSetParams: public NRTCommand - { - CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) + return false; + } + }; + + struct CommandSetParams : public NRTCommand + { + CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr) : NRTCommand{world, args, replyAddr} + { + auto& ar = *args; + if (auto ptr = get(NRTCommand::mID).lock()) { - auto& ar = *args; - if(auto ptr = get(NRTCommand::mID).lock()) - { - ptr->mParams.template setParameterValuesRT(nullptr, world, ar); - Result result = validateParameters(ptr->mParams); - ptr->mClient.setParams(ptr->mParams); - } else printNotFound(NRTCommand::mID); - } - - static const char* name() - { - static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; - return cmd.c_str(); + ptr->mParams.template setParameterValuesRT(nullptr, + world, ar); + Result result = validateParameters(ptr->mParams); + ptr->mClient.setParams(ptr->mParams); } - }; - - - template - static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + else + printNotFound(NRTCommand::mID); + } + + static const char* name() { - auto ft = getInterfaceTable(); + static std::string cmd = std::string(Wrapper::getName()) + "/setParams"; + return cmd.c_str(); + } + }; - return ft->fDoAsynchronousCommand(world, replyAddr,Command::name(),cmd, - [](World* w, void* d) { return static_cast(d)->stage2(w); }, - [](World* w, void* d) { return static_cast(d)->stage3(w); }, - [](World* w, void* d) { return static_cast(d)->stage4(w); }, - [](World* w, void* d) - { - auto cmd = static_cast(d); - cmd->cleanup(w); - cmd->~Command(); - getInterfaceTable()->fRTFree(w,d); - }, - static_cast(completionMsgSize), completionMsgData); + + template + static auto runAsyncCommand(World* world, Command* cmd, void* replyAddr, + size_t completionMsgSize, char* completionMsgData) + { + auto ft = getInterfaceTable(); + + return ft->fDoAsynchronousCommand( + world, replyAddr, Command::name(), cmd, + [](World* w, void* d) { return static_cast(d)->stage2(w); }, + [](World* w, void* d) { return static_cast(d)->stage3(w); }, + [](World* w, void* d) { return static_cast(d)->stage4(w); }, + [](World* w, void* d) { + auto cmd = static_cast(d); + cmd->cleanup(w); + cmd->~Command(); + getInterfaceTable()->fRTFree(w, d); + }, + static_cast(completionMsgSize), completionMsgData); + } + + + static auto runAsyncCommand(World* world, CommandProcess* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) + { + + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); } - - - static auto runAsyncCommand(World* world, CommandProcess* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + static auto runAsyncCommand(World* world, CommandProcessNew* cmd, + void* replyAddr, size_t completionMsgSize, + char* completionMsgData) + { + if (!cmd->synchronous()) { - if(!cmd->synchronous()) - { - - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + auto msgcopy = + (char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize); + memcpy(msgcopy, completionMsgData, completionMsgSize); + cmd->addCompletionMessage(completionMsgSize, msgcopy); + return runAsyncCommand(world, cmd, replyAddr, 0, + nullptr); } - - static auto runAsyncCommand(World* world, CommandProcessNew* cmd, void* replyAddr, - size_t completionMsgSize, char* completionMsgData) + else + return runAsyncCommand( + world, cmd, replyAddr, completionMsgSize, completionMsgData); + } + + + template + static void defineNRTCommand() + { + auto ft = getInterfaceTable(); + auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, + void* replyAddr) { + auto ft = getInterfaceTable(); + void* space = ft->fRTAlloc(world, sizeof(Command)); + Command* cmd = new (space) Command(world, args, replyAddr); + // This is brittle, but can't think of something better offhand + // This is the only place we can check for a completion message at the end + // of the OSC packet beause it has to be passed on to DoAsynhronousCommand + // at this point. However, detecting correctly relies on the Command type + // having fully consumed arguments from the args iterator in the + // constructor for cmd + size_t completionMsgSize{args ? args->getbsize() : 0}; + assert(completionMsgSize <= std::numeric_limits::max()); + char* completionMsgData = nullptr; + + if (completionMsgSize) + { + completionMsgData = (char*) ft->fRTAlloc(world, completionMsgSize); + args->getb(completionMsgData, completionMsgSize); + } + runAsyncCommand(world, cmd, replyAddr, completionMsgSize, + completionMsgData); + + if (completionMsgSize) ft->fRTFree(world, completionMsgData); + }; + ft->fDefinePlugInCmd(Command::name(), commandRunner, nullptr); + } + + + struct NRTProgressUnit : SCUnit + { + + static const char* name() { - if(!cmd->synchronous()) - { - auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); - memcpy(msgcopy, completionMsgData, completionMsgSize); - cmd->addCompletionMessage(completionMsgSize,msgcopy); - return runAsyncCommand(world, cmd, replyAddr, 0, nullptr); - } - else return runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); + static std::string n = std::string(Wrapper::getName()) + "Monitor"; + return n.c_str(); } - - - template - static void defineNRTCommand() + + NRTProgressUnit() { - auto ft = getInterfaceTable(); - auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, void* replyAddr) - { - - auto ft = getInterfaceTable(); - void* space = ft->fRTAlloc(world,sizeof(Command)); - Command* cmd = new (space) Command(world, args, replyAddr); - //This is brittle, but can't think of something better offhand - //This is the only place we can check for a completion message at the end of the OSC packet - //beause it has to be passed on to DoAsynhronousCommand at this point. However, detecting correctly - //relies on the Command type having fully consumed arguments from the args iterator in the constructor for cmd - size_t completionMsgSize{args ? args->getbsize() : 0}; - assert(completionMsgSize <= std::numeric_limits::max()); - char* completionMsgData = nullptr; - - if (completionMsgSize) { - completionMsgData = (char*)ft->fRTAlloc(world, completionMsgSize); - args->getb(completionMsgData, completionMsgSize); - } - runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); - - if(completionMsgSize) ft->fRTFree(world, completionMsgData); - - }; - ft->fDefinePlugInCmd(Command::name(),commandRunner,nullptr); + mInterval = static_cast(0.02 / controlDur()); + mID = static_cast(mInBuf[0][0]); + std::cout << mID << std::endl; + mRecord = rtget(mWorld, mID); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); } - - - - struct NRTProgressUnit: SCUnit + + void next(int) { - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Monitor"; - return n.c_str(); - } - - NRTProgressUnit() - { - mInterval = static_cast(0.02 / controlDur()); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - void next(int) + + if (isNull(mRecord)) { mRecord = rtget(mWorld, mID); }; + + if (0 == mCounter++) { - if (0 == mCounter++) + if (auto ptr = mRecord.lock()) { - index id = static_cast(mInBuf[0][0]); - if(auto ptr = tryGet(id).lock()) - { - mInit = true; - if(ptr->mClient.done()) mDone = 1; - out0(0) = static_cast(ptr->mClient.progress()); - } + mInit = true; + mDone = ptr->mDone.load(std::memory_order_relaxed); + out0(0) = static_cast(ptr->mClient.progress()); + } + else + { + if (!mInit) + std::cout << "WARNING: No " << Wrapper::getName() << " with ID " + << mID << std::endl; else - { - if(!mInit) - std::cout << "WARNING: No " << Wrapper::getName() << " with ID " << id << std::endl; - else mDone = 1; - } + mDone = 1; } - mCounter %= mInterval; } - - private: - index mInterval; - index mCounter{0}; - bool mInit{false}; - }; - - - struct NRTTriggerUnit: SCUnit + mCounter %= mInterval; + } + + private: + index mID; + index mInterval; + index mCounter{0}; + bool mInit{false}; + WeakCacheEntryPointer mRecord; + }; + + + struct NRTTriggerUnit : SCUnit + { + + static index count() { - - static index count(){ - static index counter = -1; - return counter--; - } - - index ControlOffset() { return mSpecialIndex + 1; } - - index ControlSize() - { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 3; //id + trig + blocking; - } - - static const char* name() - { - static std::string n = std::string(Wrapper::getName()) + "Trigger"; - return n.c_str(); - } - - NRTTriggerUnit() - : mControlsIterator{mInBuf + ControlOffset(),ControlSize()},mParams{Client::getParameterDescriptors()} - { - mID = static_cast(mInBuf[0][0]); - if(mID == -1) mID = count(); - auto cmd = NonRealTime::rtalloc(mWorld,mID,mWorld, mControlsIterator, this); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// mInst = get(mID); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - } - - ~NRTTriggerUnit() - { -// if(auto ptr = mInst.lock()) -// { - auto cmd = NonRealTime::rtalloc(mWorld,mID); - runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); -// } - } - - void next(int) - { - - - index triggerInput = static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); - mTrigger = mTrigger || triggerInput; - -// if(auto ptr = mInst->lock()) -// if(auto ptr = get(mID).lock()) -// { - bool trigger = (!mPreviousTrigger) && triggerInput;//mTrigger; - mPreviousTrigger = triggerInput; - mTrigger = 0; -// auto& client = ptr->mClient; - - if(trigger) - { - mControlsIterator.reset(1 + mInBuf); //add one for ID -// auto& params = ptr->mParams; - Wrapper::setParams(this,mParams,mControlsIterator,true,false); - bool blocking = mInBuf[mNumInputs - 1][0] > 0; - CommandProcess* cmd = rtalloc(mWorld,mID,blocking,&mParams); - runAsyncCommand(mWorld,cmd, nullptr,0, nullptr); - mRunCount++; - } - else - { - if(auto ptr = tryGet(mID).lock()) - { - mInit = true; - auto& client = ptr->mClient; - mDone = ptr->mDone; - out0(0) = mDone ? 1 : static_cast(client.progress()); - } else mDone = mInit; - } -// } -// else printNotFound(id); - } - - private: - bool mPreviousTrigger{false}; - bool mTrigger{false}; - Result mResult; - impl::FloatControlsIter mControlsIterator; - index mID; - index mRunCount{0}; - WeakCacheEntryPointer mInst; - Params mParams; - bool mInit{false}; - }; - - struct NRTModelQueryUnit: SCUnit + static index counter = -1; + return counter--; + } + + index ControlOffset() { return mSpecialIndex + 1; } + + index ControlSize() { - using Delegate = impl::RealTimeBase; - - index ControlOffset() { return mSpecialIndex + 2; } - index ControlSize() - { - return index(mNumInputs) - - mSpecialIndex //used for oddball cases - - 2; // trig + id - } - - static const char* name() + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 3; // id + trig + blocking; + } + + static const char* name() + { + static std::string n = std::string(Wrapper::getName()) + "Trigger"; + return n.c_str(); + } + + NRTTriggerUnit() + : mControlsIterator{mInBuf + ControlOffset(), ControlSize()}, + mParams{Client::getParameterDescriptors()} + { + mID = static_cast(mInBuf[0][0]); + if (mID == -1) mID = count(); + auto cmd = NonRealTime::rtalloc(mWorld, mID, mWorld, + mControlsIterator, this); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) { - static std::string n = std::string(Wrapper::getName()) + "Query"; - return n.c_str(); + std::cout << "ERROR: Async command failed in NRTTriggerUnit()" + << std::endl; } - - NRTModelQueryUnit() - //Offset controls by 1 to account for ID - : mControls{mInBuf + ControlOffset(),ControlSize()} + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } + + ~NRTTriggerUnit() + { + set_calc_function(); + auto cmd = NonRealTime::rtalloc(mWorld, mID); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) { - mID = static_cast(in0(1)); - init(); -// mInst = get(id); -// if(auto ptr = mInst.lock()) -// { -// auto& client = ptr->mClient; -// mDelegate.init(*this,client,mControls); - set_calc_function(); - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); -// }else printNotFound(mID); + std::cout << "ERROR: Async command failed in ~NRTTriggerUnit()" + << std::endl; } - - void init() + } + + void clear(int) + { + Wrapper::getInterfaceTable()->fClearUnitOutputs( + this, static_cast(mNumOutputs)); + } + + void next(int) + { + index triggerInput = + static_cast(mInBuf[static_cast(mNumInputs) - 2][0]); + mTrigger = mTrigger || triggerInput; + + bool trigger = (!mPreviousTrigger) && triggerInput; // mTrigger; + mPreviousTrigger = triggerInput; + mTrigger = 0; + + if (trigger) { - if(mSpinlock.tryLock()) + mControlsIterator.reset(1 + mInBuf); // add one for ID + Wrapper::setParams(this, mParams, mControlsIterator, true, false); + + bool blocking = mInBuf[mNumInputs - 1][0] > 0; + + CommandProcess* cmd = + rtalloc(mWorld, mID, blocking, &mParams); + if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0) { - mInit = false; - mInst = unsafeGet(mID); - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - mDelegate.init(*this,client,mControls); - mInit = true; - }//else printNotFound(mID); - mSpinlock.unlock(); + std::cout << "ERROR: Async command failed in NRTTriggerUnit::next()" + << std::endl; } + mRunCount++; } - - void next(int) + else { - Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); - index id = static_cast(in0(1)); - if(mID != id) init(); - if(!mInit) return; - if(mSpinlock.tryLock()) + auto record = rtget(mWorld, mID); + if (auto ptr = record.lock()) { - if(auto ptr = mInst.lock()) - { - auto& client = ptr->mClient; - auto& params = ptr->mParams; - mControls.reset(mInBuf + ControlOffset()); - mDelegate.next(*this,client,params,mControls, ptr.use_count() == 2); - }else printNotFound(id); - mSpinlock.unlock(); + mInit = true; + auto& client = ptr->mClient; + mDone = ptr->mDone.load(std::memory_order_relaxed); + out0(0) = mDone ? 1 : static_cast(client.progress()); } - + else + mDone = mInit; } - - private: - Delegate mDelegate; - FloatControlsIter mControls; - index mID; - WeakCacheEntryPointer mInst; - bool mInit{false}; - }; - - - using ParamSetType = typename Client::ParamSetType; - - template - using SetupMessageCmd = typename FluidSCMessaging::template SetupMessageCmd; - - - template - struct DefineCommandIf - { - void operator()() { } - }; + } + private: + bool mPreviousTrigger{false}; + bool mTrigger{false}; + bool mBeingFreed{false}; + Result mResult; + impl::FloatControlsIter mControlsIterator; + index mID; + index mRunCount{0}; + WeakCacheEntryPointer mInst; + Params mParams; + bool mInit{false}; + }; - template - struct DefineCommandIf - { - void operator()() { - // std::cout << CommandType::name() << std::endl; - defineNRTCommand(); - } - }; - - template - struct RegisterUnitIf - { - void operator()(InterfaceTable*) {} - }; + struct NRTModelQueryUnit : SCUnit + { + using Delegate = impl::RealTimeBase; - template - struct RegisterUnitIf + index ControlOffset() { return mSpecialIndex + 2; } + index ControlSize() { - void operator()(InterfaceTable* ft) { registerUnit(ft,UnitType::name()); } - }; + return index(mNumInputs) - mSpecialIndex // used for oddball cases + - 2; // trig + id + } - - using IsRTQueryModel_t = typename Client::isRealTime; - static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; - - static constexpr bool IsModel = Client::isModelObject::value; - - - public: - static void setup(InterfaceTable* ft, const char*) + static const char* name() { - defineNRTCommand(); - DefineCommandIf()(); - DefineCommandIf()(); - DefineCommandIf()(); - - DefineCommandIf()(); - - defineNRTCommand(); - RegisterUnitIf()(ft); - RegisterUnitIf()(ft); - - RegisterUnitIf()(ft); - Client::getMessageDescriptors().template iterate(); - - - static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; - - ft->fDefinePlugInCmd(flushCmd.c_str(),[](World*, void*, struct sc_msg_iter*, void* ){ - mCache.clear(); - },nullptr); + static std::string n = std::string(Wrapper::getName()) + "Query"; + return n.c_str(); } + NRTModelQueryUnit() + // Offset controls by 1 to account for ID + : mControls{mInBuf + ControlOffset(), ControlSize()} + { + mID = static_cast(in0(1)); + init(); + set_calc_function(); + Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); + } - void init(){}; - - private: - static Result validateParameters(ParamSetType& p) + void init() { - auto results = p.constrainParameterValues(); - for (auto& r : results) + mInit = false; + mInst = rtget(mWorld, mID); + if (auto ptr = mInst.lock()) { - if (!r.ok()) return r; + auto& client = ptr->mClient; + mDelegate.init(*this, client, mControls); + mInit = true; } - return {}; } - template - struct AssignBuffer + void next(int) { - void operator()(const typename BufferT::type& p, World* w) + Wrapper::getInterfaceTable()->fClearUnitOutputs( + this, static_cast(mNumOutputs)); + index id = static_cast(in0(1)); + if (mID != id) init(); + if (!mInit) return; + if (auto ptr = mInst.lock()) { - if (auto b = static_cast(p.get())) b->assignToRT(w); + auto& client = ptr->mClient; + auto& params = ptr->mParams; + mControls.reset(mInBuf + ControlOffset()); + mDelegate.next(*this, client, params, mControls, ptr.use_count() == 2); } - }; + else + printNotFound(id); + } + + private: + Delegate mDelegate; + FloatControlsIter mControls; + index mID; + WeakCacheEntryPointer mInst; + bool mInit{false}; + }; + + + using ParamSetType = typename Client::ParamSetType; + + template + using SetupMessageCmd = + typename FluidSCMessaging::template SetupMessageCmd; + - template - struct CleanUpBuffer + template + struct DefineCommandIf + { + void operator()() {} + }; + + + template + struct DefineCommandIf + { + void operator()() { defineNRTCommand(); } + }; + + template + struct RegisterUnitIf + { + void operator()(InterfaceTable*) {} + }; + + template + struct RegisterUnitIf + { + void operator()(InterfaceTable* ft) { - void operator()(const typename BufferT::type& p) - { - if (auto b = static_cast(p.get())) b->cleanUp(); - } - }; + registerUnit(ft, UnitType::name()); + } + }; + + using IsRTQueryModel_t = typename Client::isRealTime; + static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; + + static constexpr bool IsModel = Client::isModelObject::value; + +public: + static void setup(InterfaceTable* ft, const char*) + { + defineNRTCommand(); + DefineCommandIf()(); + DefineCommandIf()(); + DefineCommandIf()(); + + DefineCommandIf()(); + + defineNRTCommand(); + RegisterUnitIf()(ft); + RegisterUnitIf()(ft); + + RegisterUnitIf()(ft); + Client::getMessageDescriptors().template iterate(); + + + static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; + + ft->fDefinePlugInCmd( + flushCmd.c_str(), + [](World*, void*, struct sc_msg_iter*, void*) { mCache.clear(); }, + nullptr); + } + + + void init(){}; + + static World* getWorld() { return mWorld; } - FifoMsg mFifoMsg; - char* mCompletionMessage = nullptr; - void* mReplyAddr = nullptr; - const char* mName = nullptr; - index checkThreadInterval; - index pollCounter{0}; - index mPreviousTrigger{0}; - bool mSynchronous{true}; - Result mResult; +private: + static World* mWorld; + + static Result validateParameters(ParamSetType& p) + { + auto results = p.constrainParameterValues(); + for (auto& r : results) + { + if (!r.ok()) return r; + } + return {}; + } + + template + struct AssignBuffer + { + void operator()(const typename BufferT::type& p, World* w) + { + if (auto b = static_cast(p.get())) b->assignToRT(w); + } }; - - template - typename NonRealTime::Cache NonRealTime::mCache{}; - template - typename NonRealTime::Spinlock NonRealTime::mSpinlock{}; + template + struct CleanUpBuffer + { + void operator()(const typename BufferT::type& p) + { + if (auto b = static_cast(p.get())) b->cleanUp(); + } + }; + + FifoMsg mFifoMsg; + char* mCompletionMessage = nullptr; + void* mReplyAddr = nullptr; + const char* mName = nullptr; + index checkThreadInterval; + index pollCounter{0}; + index mPreviousTrigger{0}; + bool mSynchronous{true}; + Result mResult; +}; + +template +World* NonRealTime::mWorld{nullptr}; +template +typename NonRealTime::Cache + NonRealTime::mCache{}; -} -} -} +} // namespace impl +} // namespace client +} // namespace fluid diff --git a/include/wrapper/SCWorldAllocator.hpp b/include/wrapper/SCWorldAllocator.hpp new file mode 100644 index 0000000..51c697f --- /dev/null +++ b/include/wrapper/SCWorldAllocator.hpp @@ -0,0 +1,60 @@ +/* + Part of the Fluid Corpus Manipulation Project (http://www.flucoma.org/) + Copyright 2017-2019 University of Huddersfield. + Licensed under the BSD-3 License. + See license.md file in the project root for full license information. + This project has received funding from the European Research Council (ERC) + under the European Union’s Horizon 2020 research and innovation programme + (grant agreement No 725899). + */ +#pragma once + +#include +#include +#include +#include + +namespace fluid { + +template +class SCWorldAllocator +{ + World* mWorld; + InterfaceTable* mInterface; + +public: + using propagate_on_container_move_assignment = std::true_type; + using value_type = T; + + template + friend class SCWorldAllocator; + + SCWorldAllocator(World* w, InterfaceTable* interface) + : mWorld{w}, mInterface{interface} + {} + + template + SCWorldAllocator(const SCWorldAllocator& other) noexcept + { + mWorld = other.mWorld; + mInterface = other.mInterface; + } + + T* allocate(std::size_t n) + { + if (n > std::numeric_limits::max() / sizeof(T)) + throw std::bad_array_new_length(); + + if (mWorld && mInterface) + if (auto p = static_cast(mInterface->fRTAlloc(mWorld, n * sizeof(T)))) + return p; + + throw std::bad_alloc(); + } + + void deallocate(T* p, std::size_t /*n*/) noexcept + { + if (mWorld && mInterface) mInterface->fRTFree(mWorld, p); + } +}; +} // namespace fluid diff --git a/src/FluidManipulation/CMakeLists.txt b/src/FluidManipulation/CMakeLists.txt index cd88c0a..66790e5 100644 --- a/src/FluidManipulation/CMakeLists.txt +++ b/src/FluidManipulation/CMakeLists.txt @@ -17,8 +17,8 @@ target_link_libraries( ${PLUGIN} PRIVATE FLUID_DECOMPOSITION ) -include(${CMAKE_CURRENT_LIST_DIR}/../../scripts/target_post.cmake) - -if(MSVC) - target_compile_options(${PLUGIN} PRIVATE /bigobj) +if(WIN32) + target_compile_options(${PLUGIN} PUBLIC /bigobj) endif() + +include(${CMAKE_CURRENT_LIST_DIR}/../../scripts/target_post.cmake)