Better safety, worse contention

Hold on to locks more strictly. Improves safety under load, but is 
slower.
nix
Owen Green 5 years ago
parent 5956e97925
commit dcdc7f8f31

@ -9,6 +9,8 @@
#include <clients/common/FluidBaseClient.hpp> #include <clients/common/FluidBaseClient.hpp>
#include <data/FluidMeta.hpp> #include <data/FluidMeta.hpp>
#include <SC_PlugIn.hpp> #include <SC_PlugIn.hpp>
#include <immintrin.h>
#include <mutex>
#include <scsynthsend.h> #include <scsynthsend.h>
#include <unordered_map> #include <unordered_map>
@ -33,138 +35,119 @@ namespace impl {
/// Instance cache /// Instance cache
struct CacheEntry struct CacheEntry
{ {
CacheEntry(const Params& p) : mParams{p}, mClient{mParams} {}
CacheEntry(const Params& p):mParams{p},mClient{mParams}
{}
Params mParams; Params mParams;
Client mClient; Client mClient;
bool mDone{false}; std::atomic<bool> mDone{false};
}; };
using CacheEntryPointer = std::shared_ptr<CacheEntry>; using CacheEntryPointer = std::shared_ptr<CacheEntry>;
using WeakCacheEntryPointer = std::weak_ptr<CacheEntry>; //could use weak_type in 17 using WeakCacheEntryPointer =
std::weak_ptr<CacheEntry>; // could use weak_type in 17
public: public:
using Cache = std::unordered_map<index, CacheEntryPointer>; using Cache = std::unordered_map<index, CacheEntryPointer>;
static Cache mCache; static Cache mCache;
private: private:
static bool isNull(WeakCacheEntryPointer const& weak) { static bool isNull(WeakCacheEntryPointer const& weak)
return !weak.owner_before(WeakCacheEntryPointer{}) && !WeakCacheEntryPointer{}.owner_before(weak); {
return !weak.owner_before(WeakCacheEntryPointer{}) &&
!WeakCacheEntryPointer{}.owner_before(weak);
} }
// https://rigtorp.se/spinlock/ // https://rigtorp.se/spinlock/
struct Spinlock { struct Spinlock
{
std::atomic<bool> lock_ = {0}; std::atomic<bool> lock_ = {0};
void lock() noexcept { void lock() noexcept
for (;;) { {
for (;;)
{
// Optimistically assume the lock is free on the first try // Optimistically assume the lock is free on the first try
if (!lock_.exchange(true, std::memory_order_acquire)) { if (!lock_.exchange(true, std::memory_order_acquire)) { return; }
return;
}
// Wait for lock to be released without generating cache misses // Wait for lock to be released without generating cache misses
while (lock_.load(std::memory_order_relaxed)) { while (lock_.load(std::memory_order_relaxed))
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention between {
// hyper-threads // Issue X86 PAUSE or ARM YIELD instruction to reduce contention
//__builtin_ia32_pause(); // between hyper-threads
_mm_pause();
} }
} }
} }
bool tryLock() noexcept { bool try_lock() noexcept
{
// First do a relaxed load to check if lock is free in order to prevent // First do a relaxed load to check if lock is free in order to prevent
// unnecessary cache misses if someone does while(!try_lock()) // unnecessary cache misses if someone does while(!try_lock())
return !lock_.load(std::memory_order_relaxed) && return !lock_.load(std::memory_order_relaxed) &&
!lock_.exchange(true, std::memory_order_acquire); !lock_.exchange(true, std::memory_order_acquire);
} }
void unlock() noexcept { void unlock() noexcept { lock_.store(false, std::memory_order_release); }
lock_.store(false, std::memory_order_release);
}
};
//RAII for above
struct ScopedSpinlock
{
ScopedSpinlock(Spinlock& _l) noexcept: mLock{_l}
{
mLock.lock();
}
~ScopedSpinlock() { mLock.unlock(); }
private:
Spinlock& mLock;
}; };
static Spinlock mSpinlock; static Spinlock mSpinlock;
using ScopedSpinLock = std::unique_lock<Spinlock>;
// shouldn't be called without at least *thinking* about getting spin lock first // shouldn't be called without at least *thinking* about getting spin lock
static inline WeakCacheEntryPointer unsafeGet(index id) // first
static WeakCacheEntryPointer unsafeGet(index id)
{ {
auto lookup = mCache.find(id); auto lookup = mCache.find(id);
return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second; return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second;
} }
public: public:
static WeakCacheEntryPointer get(index id) static WeakCacheEntryPointer get(index id) { return unsafeGet(id); }
{
ScopedSpinlock{mSpinlock};
return unsafeGet(id);
}
static WeakCacheEntryPointer tryGet(index id)
{
if(mSpinlock.tryLock())
{
auto ret = unsafeGet(id);
mSpinlock.unlock();
return ret;
}
return WeakCacheEntryPointer{};
}
static WeakCacheEntryPointer add(index id, const Params& params) static WeakCacheEntryPointer add(index id, const Params& params)
{ {
ScopedSpinlock{mSpinlock};
if (isNull(get(id))) if (isNull(get(id)))
{ {
auto result = mCache.emplace(id, auto result = mCache.emplace(id, std::make_shared<CacheEntry>(params));
std::make_shared<CacheEntry>(params));
return result.second ? (result.first)->second : WeakCacheEntryPointer(); //sob return result.second ? (result.first)->second
: WeakCacheEntryPointer(); // sob
} }
else // client has screwed up else // client has screwed up
{ {
std::cout << "ERROR: " << Wrapper::getName() << " ID " << id << " already in use\n"; std::cout << "ERROR: " << Wrapper::getName() << " ID " << id
<< " already in use\n";
return {}; return {};
} }
} }
static void remove(index id) static void remove(index id) { mCache.erase(id); }
{
ScopedSpinlock{mSpinlock};
mCache.erase(id);
}
static void printNotFound(index id) static void printNotFound(index id)
{ {
std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID " << id << std::endl; std::cout << "ERROR: " << Wrapper::getName() << " no instance with ID "
<< id << std::endl;
} }
private: private:
static InterfaceTable* getInterfaceTable() { return Wrapper::getInterfaceTable() ;} static InterfaceTable* getInterfaceTable()
{
return Wrapper::getInterfaceTable();
}
template <size_t N, typename T> template <size_t N, typename T>
using ParamsFromOSC = typename ClientParams<Wrapper>::template Setter<sc_msg_iter, N, T>; using ParamsFromOSC =
typename ClientParams<Wrapper>::template Setter<sc_msg_iter, N, T>;
template <size_t N, typename T> template <size_t N, typename T>
using ParamsFromSynth = typename ClientParams<Wrapper>::template Setter<impl::FloatControlsIter, N, T>; using ParamsFromSynth =
typename ClientParams<Wrapper>::template Setter<impl::FloatControlsIter,
N, T>;
struct NRTCommand struct NRTCommand
{ {
NRTCommand(World*, sc_msg_iter* args, void* replyAddr, bool consumeID = true) NRTCommand(World*, sc_msg_iter* args, void* replyAddr,
bool consumeID = true)
{ {
auto count = args->count; auto count = args->count;
auto pos = args->rdpos; auto pos = args->rdpos;
@ -177,8 +160,7 @@ namespace impl {
args->rdpos = pos; args->rdpos = pos;
} }
if(replyAddr) if (replyAddr) mReplyAddress = copyReplyAddress(replyAddr);
mReplyAddress = copyReplyAddress(replyAddr);
} }
~NRTCommand() ~NRTCommand()
@ -209,7 +191,8 @@ namespace impl {
packet.addi(success); packet.addi(success);
packet.addi(static_cast<int>(mID)); packet.addi(static_cast<int>(mID));
SendReply(mReplyAddress,packet.data(), static_cast<int>(packet.size())); SendReply(mReplyAddress, packet.data(),
static_cast<int>(packet.size()));
} }
} }
// protected: // protected:
@ -223,12 +206,12 @@ namespace impl {
: NRTCommand{world, args, replyAddr, !IsNamedShared_v<Client>}, : NRTCommand{world, args, replyAddr, !IsNamedShared_v<Client>},
mParams{Client::getParameterDescriptors()} mParams{Client::getParameterDescriptors()}
{ {
mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, *args); mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world,
*args);
} }
CommandNew(index id, World*, FloatControlsIter& args, Unit* x) CommandNew(index id, World*, FloatControlsIter& args, Unit* x)
:NRTCommand{id}, : NRTCommand{id}, mParams{Client::getParameterDescriptors()}
mParams{Client::getParameterDescriptors()}
{ {
mParams.template setParameterValuesRT<ParamsFromSynth>(nullptr, x, args); mParams.template setParameterValuesRT<ParamsFromSynth>(nullptr, x, args);
} }
@ -242,7 +225,7 @@ namespace impl {
bool stage2(World* w) bool stage2(World* w)
{ {
// auto entry = ; // auto entry = ;
ScopedSpinLock lock{mSpinlock};
Result constraintsRes = validateParameters(mParams); Result constraintsRes = validateParameters(mParams);
@ -250,10 +233,12 @@ namespace impl {
mResult = (!isNull(add(NRTCommand::mID, mParams))); mResult = (!isNull(add(NRTCommand::mID, mParams)));
//Sigh. The cache entry above has both the client instance and main params instance. // Sigh. The cache entry above has both the client instance and main
// The client is linked to the params by reference; I've not got the in-place constrction // params instance.
// working properly so that params are in their final resting place by the time we make the client // The client is linked to the params by reference; I've not got the
// so (for) now we need to manually repoint the client to the correct place. Or badness. // in-place constrction working properly so that params are in their final
// resting place by the time we make the client so (for) now we need to
// manually repoint the client to the correct place. Or badness.
if (mResult) if (mResult)
{ {
auto ptr = get(NRTCommand::mID).lock(); auto ptr = get(NRTCommand::mID).lock();
@ -279,9 +264,9 @@ namespace impl {
if (auto ptr = get(id).lock()) if (auto ptr = get(id).lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
if(!client.synchronous() && client.state() == ProcessState::kProcessing) if (!client.synchronous() &&
std::cout << Wrapper::getName() client.state() == ProcessState::kProcessing)
<< ": Processing cancelled" std::cout << Wrapper::getName() << ": Processing cancelled"
<< std::endl; << std::endl;
} }
} }
@ -297,12 +282,12 @@ namespace impl {
bool stage2(World*) bool stage2(World*)
{ {
ScopedSpinLock lock(mSpinlock);
cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); cancelCheck(IsRTQueryModel_t(), NRTCommand::mID);
remove(NRTCommand::mID); remove(NRTCommand::mID);
NRTCommand::sendReply(name(), true); NRTCommand::sendReply(name(), true);
return true; return true;
} }
}; };
@ -321,6 +306,7 @@ namespace impl {
{ {
// std::cout << "In Async completion\n"; // std::cout << "In Async completion\n";
ScopedSpinLock lock{mSpinlock};
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
Result r; Result r;
@ -331,10 +317,9 @@ namespace impl {
{ {
if (r.status() == Result::Status::kCancelled) if (r.status() == Result::Status::kCancelled)
{ {
std::cout << Wrapper::getName() std::cout << Wrapper::getName() << ": Processing cancelled"
<< ": Processing cancelled"
<< std::endl; << std::endl;
ptr->mDone = true; ptr->mDone.store(true, std::memory_order_relaxed);
return false; return false;
} }
@ -345,11 +330,12 @@ namespace impl {
Wrapper::printResult(world, r); Wrapper::printResult(world, r);
if (r.status() == Result::Status::kError) if (r.status() == Result::Status::kError)
{ {
ptr->mDone = true; ptr->mDone.store(true, std::memory_order_relaxed);
return false; return false;
} }
} }
// if we're progressing to stage3, don't unlock the lock just yet
lock.release();
return true; return true;
} }
} }
@ -358,6 +344,7 @@ namespace impl {
bool stage3(World* world) bool stage3(World* world)
{ {
ScopedSpinLock lock(mSpinlock, std::adopt_lock);
if (auto ptr = mRecord.lock()) if (auto ptr = mRecord.lock())
{ {
auto& params = ptr->mParams; auto& params = ptr->mParams;
@ -369,6 +356,7 @@ namespace impl {
bool stage4(World*) // nrt bool stage4(World*) // nrt
{ {
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>(); ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
@ -377,9 +365,10 @@ namespace impl {
{ {
NRTCommand::sendReply(name(), mSuccess); NRTCommand::sendReply(name(), mSuccess);
} }
ptr->mDone = true; ptr->mDone.store(true, std::memory_order_relaxed); // = true;
return true; return true;
} }
std::cout << "ERROR: Failed to lock\n";
return false; return false;
} }
@ -388,10 +377,13 @@ namespace impl {
}; };
static void doProcessCallback(World* world, index id,size_t completionMsgSize,char* completionMessage,void* replyAddress) static void doProcessCallback(World* world, index id,
size_t completionMsgSize,
char* completionMessage, void* replyAddress)
{ {
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
struct Context{ struct Context
{
World* mWorld; World* mWorld;
index mID; index mID;
size_t mCompletionMsgSize; size_t mCompletionMsgSize;
@ -399,23 +391,24 @@ namespace impl {
void* mReplyAddress; void* mReplyAddress;
}; };
Context* c = new Context{world,id,completionMsgSize,completionMessage,replyAddress}; Context* c = new Context{world, id, completionMsgSize, completionMessage,
replyAddress};
auto launchCompletionFromNRT = [](FifoMsg* inmsg) auto launchCompletionFromNRT = [](FifoMsg* inmsg) {
{
auto runCompletion = [](FifoMsg* msg) { auto runCompletion = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData); Context* c = static_cast<Context*>(msg->mData);
World* world = c->mWorld; World* world = c->mWorld;
index id = c->mID; index id = c->mID;
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete)); void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete));
CommandAsyncComplete* cmd = new (space) CommandAsyncComplete(world, id,c->mReplyAddress); CommandAsyncComplete* cmd =
runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize, c->mCompletionMessage); new (space) CommandAsyncComplete(world, id, c->mReplyAddress);
runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize,
c->mCompletionMessage);
if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage); if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage);
}; };
auto tidyup = [](FifoMsg* msg) auto tidyup = [](FifoMsg* msg) {
{
Context* c = static_cast<Context*>(msg->mData); Context* c = static_cast<Context*>(msg->mData);
delete c; delete c;
}; };
@ -423,8 +416,7 @@ namespace impl {
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
FifoMsg fwd = *inmsg; FifoMsg fwd = *inmsg;
fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData); fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData);
if(inmsg->mWorld->mRunning) if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd);
ft->fSendMsgToRT(inmsg->mWorld,fwd);
}; };
FifoMsg msg; FifoMsg msg;
@ -435,20 +427,24 @@ namespace impl {
struct CommandProcess : public NRTCommand struct CommandProcess : public NRTCommand
{ {
CommandProcess(World* world, sc_msg_iter* args, void* replyAddr): NRTCommand{world, args, replyAddr},mParams{Client::getParameterDescriptors()} CommandProcess(World* world, sc_msg_iter* args, void* replyAddr)
: NRTCommand{world, args, replyAddr},
mParams{Client::getParameterDescriptors()}
{ {
auto& ar = *args; auto& ar = *args;
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mDone = false; ptr->mDone.store(false, std::memory_order_relaxed);
mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, ar); mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world,
ar);
mSynchronous = static_cast<bool>(ar.geti()); mSynchronous = static_cast<bool>(ar.geti());
} // if this fails, we'll hear about it in stage2 anyway } // if this fails, we'll hear about it in stage2 anyway
} }
explicit CommandProcess(index id,bool synchronous,Params* params):NRTCommand{id},mSynchronous(synchronous), explicit CommandProcess(index id, bool synchronous, Params* params)
mParams{Client::getParameterDescriptors()} : NRTCommand{id},
mSynchronous(synchronous), mParams{Client::getParameterDescriptors()}
{ {
if (params) if (params)
{ {
@ -466,6 +462,7 @@ namespace impl {
bool stage2(World* world) bool stage2(World* world)
{ {
ScopedSpinLock lock(mSpinlock);
mRecord = get(NRTCommand::mID); mRecord = get(NRTCommand::mID);
if (auto ptr = mRecord.lock()) if (auto ptr = mRecord.lock())
{ {
@ -474,48 +471,46 @@ namespace impl {
if (mOverwriteParams) params = mParams; if (mOverwriteParams) params = mParams;
auto& client = ptr->mClient; auto& client = ptr->mClient;
// if(mOSCData)
// {
// params.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, *mOSCData);
// mSynchronous = static_cast<bool>(mOSCData->geti());
// }
Result result = validateParameters(params); Result result = validateParameters(params);
Wrapper::printResult(world, result); Wrapper::printResult(world, result);
if (result.status() != Result::Status::kError) if (result.status() != Result::Status::kError)
{ {
// client.done()
client.setSynchronous(mSynchronous); client.setSynchronous(mSynchronous);
index id = NRTCommand::mID; index id = NRTCommand::mID;
size_t completionMsgSize = mCompletionMsgSize; size_t completionMsgSize = mCompletionMsgSize;
char* completionMessage = mCompletionMessage; char* completionMessage = mCompletionMessage;
void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress);
auto callback = [world,id,completionMsgSize,completionMessage,replyAddress](){ auto callback = [world, id, completionMsgSize, completionMessage,
doProcessCallback(world,id,completionMsgSize,completionMessage,replyAddress); replyAddress]() {
doProcessCallback(world, id, completionMsgSize, completionMessage,
replyAddress);
}; };
result = mSynchronous ? client.enqueue(params) : client.enqueue(params,callback); result = mSynchronous ? client.enqueue(params)
: client.enqueue(params, callback);
Wrapper::printResult(world, result); Wrapper::printResult(world, result);
if (result.ok()) if (result.ok())
{ {
ptr->mDone = false; ptr->mDone.store(false, std::memory_order_relaxed);
mResult = client.process(); mResult = client.process();
Wrapper::printResult(world, mResult); Wrapper::printResult(world, mResult);
bool error = mResult.status() == Result::Status::kError; bool error = mResult.status() == Result::Status::kError;
if(error) ptr->mDone = true; if (error) ptr->mDone.store(true, std::memory_order_relaxed);
return mSynchronous && !error; bool toStage3 = mSynchronous && !error;
if (toStage3) lock.release();
return toStage3;
} }
} }
} }
else else
{ {
mResult = Result{Result::Status::kError, "No ", Wrapper::getName(), " with ID ", NRTCommand::mID}; mResult = Result{Result::Status::kError, "No ", Wrapper::getName(),
" with ID ", NRTCommand::mID};
Wrapper::printResult(world, mResult); Wrapper::printResult(world, mResult);
} }
return false; return false;
@ -524,36 +519,33 @@ namespace impl {
// Only for blocking execution // Only for blocking execution
bool stage3(World* world) // rt bool stage3(World* world) // rt
{ {
ScopedSpinLock lock(mSpinlock, std::adopt_lock);
if (auto ptr = mRecord.lock()) if (auto ptr = mRecord.lock())
{ {
ptr->mParams.template forEachParamType<BufferT, AssignBuffer>(world); ptr->mParams.template forEachParamType<BufferT, AssignBuffer>(world);
// NRTCommand::sendReply(world, name(), mResult.ok());
return true; return true;
} }
// std::cout << "Ohno\n";
return false; return false;
} }
// Only for blocking execution // Only for blocking execution
bool stage4(World*) // nrt bool stage4(World*) // nrt
{ {
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>(); ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if (NRTCommand::mID >= 0 && mSynchronous) if (NRTCommand::mID >= 0 && mSynchronous)
NRTCommand::sendReply(name(), mResult.ok()); NRTCommand::sendReply(name(), mResult.ok());
ptr->mDone = true; ptr->mDone.store(true, std::memory_order_relaxed);
return true; return true;
} }
return false; return false;
} }
bool synchronous() bool synchronous() { return mSynchronous; }
{
return mSynchronous;
}
void addCompletionMessage(size_t size, char* message) //, void* addr) void addCompletionMessage(size_t size, char* message) //, void* addr)
{ {
@ -574,16 +566,14 @@ namespace impl {
struct CommandProcessNew : public NRTCommand struct CommandProcessNew : public NRTCommand
{ {
CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr)
: mNew{world, args, replyAddr}, : mNew{world, args, replyAddr}, mProcess{mNew.mID, false, nullptr}
mProcess{mNew.mID,false,nullptr}
{ {
mProcess.mSynchronous = args->geti(); mProcess.mSynchronous = args->geti();
mProcess.mReplyAddress = mNew.mReplyAddress; mProcess.mReplyAddress = mNew.mReplyAddress;
} }
CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x) CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x)
: mNew{id, world, args, x}, : mNew{id, world, args, x}, mProcess{id}
mProcess{id}
{} {}
static const char* name() static const char* name()
@ -613,10 +603,7 @@ namespace impl {
mProcess.cleanup(world); mProcess.cleanup(world);
} }
bool synchronous() bool synchronous() { return mProcess.synchronous(); }
{
return mProcess.synchronous();
}
void addCompletionMessage(size_t size, char* message) void addCompletionMessage(size_t size, char* message)
{ {
@ -643,6 +630,7 @@ namespace impl {
bool stage2(World*) bool stage2(World*)
{ {
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
@ -662,12 +650,16 @@ namespace impl {
: NRTCommand{world, args, replyAddr} : NRTCommand{world, args, replyAddr}
{ {
auto& ar = *args; auto& ar = *args;
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, ar); ptr->mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr,
world, ar);
Result result = validateParameters(ptr->mParams); Result result = validateParameters(ptr->mParams);
ptr->mClient.setParams(ptr->mParams); ptr->mClient.setParams(ptr->mParams);
} else printNotFound(NRTCommand::mID); }
else
printNotFound(NRTCommand::mID);
} }
static const char* name() static const char* name()
@ -684,12 +676,12 @@ namespace impl {
{ {
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
return ft->fDoAsynchronousCommand(world, replyAddr,Command::name(),cmd, return ft->fDoAsynchronousCommand(
world, replyAddr, Command::name(), cmd,
[](World* w, void* d) { return static_cast<Command*>(d)->stage2(w); }, [](World* w, void* d) { return static_cast<Command*>(d)->stage2(w); },
[](World* w, void* d) { return static_cast<Command*>(d)->stage3(w); }, [](World* w, void* d) { return static_cast<Command*>(d)->stage3(w); },
[](World* w, void* d) { return static_cast<Command*>(d)->stage4(w); }, [](World* w, void* d) { return static_cast<Command*>(d)->stage4(w); },
[](World* w, void* d) [](World* w, void* d) {
{
auto cmd = static_cast<Command*>(d); auto cmd = static_cast<Command*>(d);
cmd->cleanup(w); cmd->cleanup(w);
cmd->~Command(); cmd->~Command();
@ -699,31 +691,40 @@ namespace impl {
} }
static auto runAsyncCommand(World* world, CommandProcess* cmd, void* replyAddr, static auto runAsyncCommand(World* world, CommandProcess* cmd,
size_t completionMsgSize, char* completionMsgData) void* replyAddr, size_t completionMsgSize,
char* completionMsgData)
{ {
if (!cmd->synchronous()) if (!cmd->synchronous())
{ {
auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); auto msgcopy =
(char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize);
memcpy(msgcopy, completionMsgData, completionMsgSize); memcpy(msgcopy, completionMsgData, completionMsgSize);
cmd->addCompletionMessage(completionMsgSize, msgcopy); cmd->addCompletionMessage(completionMsgSize, msgcopy);
return runAsyncCommand<CommandProcess>(world, cmd, replyAddr, 0, nullptr); return runAsyncCommand<CommandProcess>(world, cmd, replyAddr, 0, nullptr);
} }
else return runAsyncCommand<CommandProcess>(world, cmd, replyAddr, completionMsgSize, completionMsgData); else
return runAsyncCommand<CommandProcess>(
world, cmd, replyAddr, completionMsgSize, completionMsgData);
} }
static auto runAsyncCommand(World* world, CommandProcessNew* cmd, void* replyAddr, static auto runAsyncCommand(World* world, CommandProcessNew* cmd,
size_t completionMsgSize, char* completionMsgData) void* replyAddr, size_t completionMsgSize,
char* completionMsgData)
{ {
if (!cmd->synchronous()) if (!cmd->synchronous())
{ {
auto msgcopy = (char*)getInterfaceTable()->fRTAlloc(world,completionMsgSize); auto msgcopy =
(char*) getInterfaceTable()->fRTAlloc(world, completionMsgSize);
memcpy(msgcopy, completionMsgData, completionMsgSize); memcpy(msgcopy, completionMsgData, completionMsgSize);
cmd->addCompletionMessage(completionMsgSize, msgcopy); cmd->addCompletionMessage(completionMsgSize, msgcopy);
return runAsyncCommand<CommandProcessNew>(world, cmd, replyAddr, 0, nullptr); return runAsyncCommand<CommandProcessNew>(world, cmd, replyAddr, 0,
nullptr);
} }
else return runAsyncCommand<CommandProcessNew>(world, cmd, replyAddr, completionMsgSize, completionMsgData); else
return runAsyncCommand<CommandProcessNew>(
world, cmd, replyAddr, completionMsgSize, completionMsgData);
} }
@ -731,34 +732,35 @@ namespace impl {
static void defineNRTCommand() static void defineNRTCommand()
{ {
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
auto commandRunner = [](World* world, void*, struct sc_msg_iter* args, void* replyAddr) auto commandRunner = [](World* world, void*, struct sc_msg_iter* args,
{ void* replyAddr) {
auto ft = getInterfaceTable(); auto ft = getInterfaceTable();
void* space = ft->fRTAlloc(world, sizeof(Command)); void* space = ft->fRTAlloc(world, sizeof(Command));
Command* cmd = new (space) Command(world, args, replyAddr); Command* cmd = new (space) Command(world, args, replyAddr);
// This is brittle, but can't think of something better offhand // This is brittle, but can't think of something better offhand
//This is the only place we can check for a completion message at the end of the OSC packet // This is the only place we can check for a completion message at the end
//beause it has to be passed on to DoAsynhronousCommand at this point. However, detecting correctly // of the OSC packet beause it has to be passed on to DoAsynhronousCommand
//relies on the Command type having fully consumed arguments from the args iterator in the constructor for cmd // at this point. However, detecting correctly relies on the Command type
// having fully consumed arguments from the args iterator in the
// constructor for cmd
size_t completionMsgSize{args ? args->getbsize() : 0}; size_t completionMsgSize{args ? args->getbsize() : 0};
assert(completionMsgSize <= std::numeric_limits<int>::max()); assert(completionMsgSize <= std::numeric_limits<int>::max());
char* completionMsgData = nullptr; char* completionMsgData = nullptr;
if (completionMsgSize) { if (completionMsgSize)
{
completionMsgData = (char*) ft->fRTAlloc(world, completionMsgSize); completionMsgData = (char*) ft->fRTAlloc(world, completionMsgSize);
args->getb(completionMsgData, completionMsgSize); args->getb(completionMsgData, completionMsgSize);
} }
runAsyncCommand(world, cmd, replyAddr, completionMsgSize, completionMsgData); runAsyncCommand(world, cmd, replyAddr, completionMsgSize,
completionMsgData);
if (completionMsgSize) ft->fRTFree(world, completionMsgData); if (completionMsgSize) ft->fRTFree(world, completionMsgData);
}; };
ft->fDefinePlugInCmd(Command::name(), commandRunner, nullptr); ft->fDefinePlugInCmd(Command::name(), commandRunner, nullptr);
} }
struct NRTProgressUnit : SCUnit struct NRTProgressUnit : SCUnit
{ {
@ -777,10 +779,17 @@ namespace impl {
void next(int) void next(int)
{ {
if (0 == mCounter++) if (0 == mCounter)
{ {
ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (!lock.owns_lock()) return;
index id = static_cast<index>(mInBuf[0][0]); index id = static_cast<index>(mInBuf[0][0]);
if(auto ptr = tryGet(id).lock()) auto record = get(id);
mCounter++;
if (auto ptr = record.lock())
{ {
mInit = true; mInit = true;
if (ptr->mClient.done()) mDone = 1; if (ptr->mClient.done()) mDone = 1;
@ -789,8 +798,10 @@ namespace impl {
else else
{ {
if (!mInit) if (!mInit)
std::cout << "WARNING: No " << Wrapper::getName() << " with ID " << id << std::endl; std::cout << "WARNING: No " << Wrapper::getName() << " with ID "
else mDone = 1; << id << std::endl;
else
mDone = 1;
} }
} }
mCounter %= mInterval; mCounter %= mInterval;
@ -806,7 +817,8 @@ namespace impl {
struct NRTTriggerUnit : SCUnit struct NRTTriggerUnit : SCUnit
{ {
static index count(){ static index count()
{
static index counter = -1; static index counter = -1;
return counter--; return counter--;
} }
@ -815,8 +827,7 @@ namespace impl {
index ControlSize() index ControlSize()
{ {
return index(mNumInputs) return index(mNumInputs) - mSpecialIndex // used for oddball cases
- mSpecialIndex //used for oddball cases
- 3; // id + trig + blocking; - 3; // id + trig + blocking;
} }
@ -827,68 +838,75 @@ namespace impl {
} }
NRTTriggerUnit() NRTTriggerUnit()
: mControlsIterator{mInBuf + ControlOffset(),ControlSize()},mParams{Client::getParameterDescriptors()} : mControlsIterator{mInBuf + ControlOffset(), ControlSize()},
mParams{Client::getParameterDescriptors()}
{ {
mID = static_cast<index>(mInBuf[0][0]); mID = static_cast<index>(mInBuf[0][0]);
if (mID == -1) mID = count(); if (mID == -1) mID = count();
auto cmd = NonRealTime::rtalloc<CommandNew>(mWorld,mID,mWorld, mControlsIterator, this); auto cmd = NonRealTime::rtalloc<CommandNew>(mWorld, mID, mWorld,
mControlsIterator, this);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr);
// mInst = get(mID);
set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::next>(); set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
} }
~NRTTriggerUnit() ~NRTTriggerUnit()
{ {
// if(auto ptr = mInst.lock()) set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::clear>();
// {
auto cmd = NonRealTime::rtalloc<CommandFree>(mWorld, mID); auto cmd = NonRealTime::rtalloc<CommandFree>(mWorld, mID);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr);
// }
} }
void next(int) void clear(int)
{ {
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs);
}
void next(int)
index triggerInput = static_cast<index>(mInBuf[static_cast<index>(mNumInputs) - 2][0]); {
index triggerInput =
static_cast<index>(mInBuf[static_cast<index>(mNumInputs) - 2][0]);
mTrigger = mTrigger || triggerInput; mTrigger = mTrigger || triggerInput;
// if(auto ptr = mInst->lock())
// if(auto ptr = get(mID).lock())
// {
bool trigger = (!mPreviousTrigger) && triggerInput; // mTrigger; bool trigger = (!mPreviousTrigger) && triggerInput; // mTrigger;
mPreviousTrigger = triggerInput; mPreviousTrigger = triggerInput;
mTrigger = 0; mTrigger = 0;
// auto& client = ptr->mClient;
if (trigger) if (trigger)
{ {
mControlsIterator.reset(1 + mInBuf); // add one for ID mControlsIterator.reset(1 + mInBuf); // add one for ID
// auto& params = ptr->mParams;
Wrapper::setParams(this, mParams, mControlsIterator, true, false); Wrapper::setParams(this, mParams, mControlsIterator, true, false);
bool blocking = mInBuf[mNumInputs - 1][0] > 0; bool blocking = mInBuf[mNumInputs - 1][0] > 0;
CommandProcess* cmd = rtalloc<CommandProcess>(mWorld,mID,blocking,&mParams);
CommandProcess* cmd =
rtalloc<CommandProcess>(mWorld, mID, blocking, &mParams);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr);
mRunCount++; mRunCount++;
} }
else else
{ {
if(auto ptr = tryGet(mID).lock()) ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (!lock.owns_lock()) return;
auto record = get(mID);
if (auto ptr = record.lock())
{ {
mInit = true; mInit = true;
auto& client = ptr->mClient; auto& client = ptr->mClient;
mDone = ptr->mDone; mDone = ptr->mDone.load(std::memory_order_relaxed);
out0(0) = mDone ? 1 : static_cast<float>(client.progress()); out0(0) = mDone ? 1 : static_cast<float>(client.progress());
} else mDone = mInit;
} }
// } else
// else printNotFound(id); mDone = mInit;
}
} }
private: private:
bool mPreviousTrigger{false}; bool mPreviousTrigger{false};
bool mTrigger{false}; bool mTrigger{false};
bool mBeingFreed{false};
Result mResult; Result mResult;
impl::FloatControlsIter mControlsIterator; impl::FloatControlsIter mControlsIterator;
index mID; index mID;
@ -905,8 +923,7 @@ namespace impl {
index ControlOffset() { return mSpecialIndex + 2; } index ControlOffset() { return mSpecialIndex + 2; }
index ControlSize() index ControlSize()
{ {
return index(mNumInputs) return index(mNumInputs) - mSpecialIndex // used for oddball cases
- mSpecialIndex //used for oddball cases
- 2; // trig + id - 2; // trig + id
} }
@ -922,29 +939,24 @@ namespace impl {
{ {
mID = static_cast<index>(in0(1)); mID = static_cast<index>(in0(1));
init(); init();
// mInst = get(id);
// if(auto ptr = mInst.lock())
// {
// auto& client = ptr->mClient;
// mDelegate.init(*this,client,mControls);
set_calc_function<NRTModelQueryUnit, &NRTModelQueryUnit::next>(); set_calc_function<NRTModelQueryUnit, &NRTModelQueryUnit::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
// }else printNotFound(mID);
} }
void init() void init()
{ {
if(mSpinlock.tryLock()) ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (lock.owns_lock())
{ {
mInit = false; mInit = false;
mInst = unsafeGet(mID); mInst = get(mID);
if (auto ptr = mInst.lock()) if (auto ptr = mInst.lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
mDelegate.init(*this, client, mControls); mDelegate.init(*this, client, mControls);
mInit = true; mInit = true;
}//else printNotFound(mID); }
mSpinlock.unlock();
} }
} }
@ -954,18 +966,21 @@ namespace impl {
index id = static_cast<index>(in0(1)); index id = static_cast<index>(in0(1));
if (mID != id) init(); if (mID != id) init();
if (!mInit) return; if (!mInit) return;
if(mSpinlock.tryLock()) ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (lock.owns_lock())
;
{ {
if (auto ptr = mInst.lock()) if (auto ptr = mInst.lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
auto& params = ptr->mParams; auto& params = ptr->mParams;
mControls.reset(mInBuf + ControlOffset()); mControls.reset(mInBuf + ControlOffset());
mDelegate.next(*this,client,params,mControls, ptr.use_count() == 2); mDelegate.next(*this, client, params, mControls,
}else printNotFound(id); ptr.use_count() == 2);
mSpinlock.unlock(); }
else
printNotFound(id);
} }
} }
private: private:
@ -980,7 +995,9 @@ namespace impl {
using ParamSetType = typename Client::ParamSetType; using ParamSetType = typename Client::ParamSetType;
template <size_t N, typename T> template <size_t N, typename T>
using SetupMessageCmd = typename FluidSCMessaging<Wrapper,Client>::template SetupMessageCmd<N,T>; using SetupMessageCmd =
typename FluidSCMessaging<Wrapper, Client>::template SetupMessageCmd<N,
T>;
template <bool, typename CommandType> template <bool, typename CommandType>
@ -993,7 +1010,8 @@ namespace impl {
template <typename CommandType> template <typename CommandType>
struct DefineCommandIf<true, CommandType> struct DefineCommandIf<true, CommandType>
{ {
void operator()() { void operator()()
{
// std::cout << CommandType::name() << std::endl; // std::cout << CommandType::name() << std::endl;
defineNRTCommand<CommandType>(); defineNRTCommand<CommandType>();
} }
@ -1008,7 +1026,10 @@ namespace impl {
template <typename UnitType> template <typename UnitType>
struct RegisterUnitIf<true, UnitType> struct RegisterUnitIf<true, UnitType>
{ {
void operator()(InterfaceTable* ft) { registerUnit<UnitType>(ft,UnitType::name()); } void operator()(InterfaceTable* ft)
{
registerUnit<UnitType>(ft, UnitType::name());
}
}; };
@ -1038,9 +1059,10 @@ namespace impl {
static std::string flushCmd = std::string(Wrapper::getName()) + "/flush"; static std::string flushCmd = std::string(Wrapper::getName()) + "/flush";
ft->fDefinePlugInCmd(flushCmd.c_str(),[](World*, void*, struct sc_msg_iter*, void* ){ ft->fDefinePlugInCmd(
mCache.clear(); flushCmd.c_str(),
},nullptr); [](World*, void*, struct sc_msg_iter*, void*) { mCache.clear(); },
nullptr);
} }
@ -1087,12 +1109,14 @@ namespace impl {
}; };
template <typename Client, typename Wrapper> template <typename Client, typename Wrapper>
typename NonRealTime<Client, Wrapper>::Cache NonRealTime<Client,Wrapper>::mCache{}; typename NonRealTime<Client, Wrapper>::Cache
NonRealTime<Client, Wrapper>::mCache{};
template <typename Client, typename Wrapper> template <typename Client, typename Wrapper>
typename NonRealTime<Client, Wrapper>::Spinlock NonRealTime<Client,Wrapper>::mSpinlock{}; typename NonRealTime<Client, Wrapper>::Spinlock
NonRealTime<Client, Wrapper>::mSpinlock{};
} } // namespace impl
} } // namespace client
} } // namespace fluid

Loading…
Cancel
Save