Better safety and better contention.

Fixes sutble race condition in async callback (lock NRT on line 620). 
Uses mirror cache of weak_ptr for RT thread
nix
Owen Green 5 years ago
parent dcdc7f8f31
commit 5892a60391

@ -9,7 +9,6 @@
#include <clients/common/FluidBaseClient.hpp> #include <clients/common/FluidBaseClient.hpp>
#include <data/FluidMeta.hpp> #include <data/FluidMeta.hpp>
#include <SC_PlugIn.hpp> #include <SC_PlugIn.hpp>
#include <immintrin.h>
#include <mutex> #include <mutex>
#include <scsynthsend.h> #include <scsynthsend.h>
#include <unordered_map> #include <unordered_map>
@ -48,7 +47,10 @@ class NonRealTime : public SCUnit
public: public:
using Cache = std::unordered_map<index, CacheEntryPointer>; using Cache = std::unordered_map<index, CacheEntryPointer>;
using RTCacheMirror = std::unordered_map<index, WeakCacheEntryPointer>;
static Cache mCache; static Cache mCache;
static RTCacheMirror mRTCache;
private: private:
static bool isNull(WeakCacheEntryPointer const& weak) static bool isNull(WeakCacheEntryPointer const& weak)
@ -57,58 +59,74 @@ private:
!WeakCacheEntryPointer{}.owner_before(weak); !WeakCacheEntryPointer{}.owner_before(weak);
} }
// https://rigtorp.se/spinlock/ // shouldn't be called without at least *thinking* about getting spin lock
struct Spinlock // first
static WeakCacheEntryPointer unsafeGet(index id)
{ {
std::atomic<bool> lock_ = {0}; auto lookup = mCache.find(id);
return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second;
}
void lock() noexcept static WeakCacheEntryPointer rtget(index id)
{ {
for (;;) auto lookup = mRTCache.find(id);
return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second;
}
using RawCacheEntry = typename Cache::value_type;
struct addToRTCache
{ {
// Optimistically assume the lock is free on the first try void operator()(World* w, RawCacheEntry& r)
if (!lock_.exchange(true, std::memory_order_acquire)) { return; }
// Wait for lock to be released without generating cache misses
while (lock_.load(std::memory_order_relaxed))
{ {
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention FifoMsg msg;
// between hyper-threads auto add = [](FifoMsg* m) {
_mm_pause(); RawCacheEntry* r = static_cast<RawCacheEntry*>(m->mData);
} auto res = mRTCache.emplace(r->first, r->second);
if (!res.second) { std::cout << "ERROR: Could not add to RT cache"; }
};
msg.Set(w, add, nullptr, &r);
auto ft = Wrapper::getInterfaceTable();
if (!ft->fSendMsgToRT(w, msg))
{
std::cout << "ERROR: Message to RT failed";
} }
} }
};
bool try_lock() noexcept struct removeFromRTCache
{ {
// First do a relaxed load to check if lock is free in order to prevent void operator()(World* world, index id)
// unnecessary cache misses if someone does while(!try_lock()) {
return !lock_.load(std::memory_order_relaxed) && index* data = new index();
!lock_.exchange(true, std::memory_order_acquire); *data = id;
}
void unlock() noexcept { lock_.store(false, std::memory_order_release); } FifoMsg msg;
auto remove = [](FifoMsg* m) {
int* id = static_cast<int*>(m->mData);
mRTCache.erase(*id);
}; };
static Spinlock mSpinlock; auto cleanup = [](FifoMsg* m) { delete static_cast<index*>(m->mData); };
using ScopedSpinLock = std::unique_lock<Spinlock>;
// shouldn't be called without at least *thinking* about getting spin lock msg.Set(world, remove, cleanup, data);
// first
static WeakCacheEntryPointer unsafeGet(index id)
{
auto lookup = mCache.find(id);
return lookup == mCache.end() ? WeakCacheEntryPointer() : lookup->second;
} }
};
public: public:
static WeakCacheEntryPointer get(index id) { return unsafeGet(id); } static WeakCacheEntryPointer get(index id) { return unsafeGet(id); }
static WeakCacheEntryPointer add(index id, const Params& params) static WeakCacheEntryPointer add(World* world, index id, const Params& params)
{ {
if (isNull(get(id))) if (isNull(get(id)))
{ {
auto result = mCache.emplace(id, std::make_shared<CacheEntry>(params)); auto result = mCache.emplace(id, std::make_shared<CacheEntry>(params));
addToRTCache{}(world, *(result.first));
return result.second ? (result.first)->second return result.second ? (result.first)->second
: WeakCacheEntryPointer(); // sob : WeakCacheEntryPointer(); // sob
} }
@ -120,7 +138,11 @@ public:
} }
} }
static void remove(index id) { mCache.erase(id); } static void remove(World* world, index id)
{
mCache.erase(id);
removeFromRTCache{}(world, id);
}
static void printNotFound(index id) static void printNotFound(index id)
{ {
@ -143,7 +165,6 @@ private:
typename ClientParams<Wrapper>::template Setter<impl::FloatControlsIter, typename ClientParams<Wrapper>::template Setter<impl::FloatControlsIter,
N, T>; N, T>;
struct NRTCommand struct NRTCommand
{ {
NRTCommand(World*, sc_msg_iter* args, void* replyAddr, NRTCommand(World*, sc_msg_iter* args, void* replyAddr,
@ -224,14 +245,11 @@ private:
bool stage2(World* w) bool stage2(World* w)
{ {
// auto entry = ;
ScopedSpinLock lock{mSpinlock};
Result constraintsRes = validateParameters(mParams); Result constraintsRes = validateParameters(mParams);
if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes); if (!constraintsRes.ok()) Wrapper::printResult(w, constraintsRes);
mResult = (!isNull(add(NRTCommand::mID, mParams))); mResult = (!isNull(add(w, NRTCommand::mID, mParams)));
// Sigh. The cache entry above has both the client instance and main // Sigh. The cache entry above has both the client instance and main
// params instance. // params instance.
@ -280,150 +298,21 @@ private:
return cmd.c_str(); return cmd.c_str();
} }
bool stage2(World*) bool stage2(World* world)
{ {
ScopedSpinLock lock(mSpinlock);
cancelCheck(IsRTQueryModel_t(), NRTCommand::mID); cancelCheck(IsRTQueryModel_t(), NRTCommand::mID);
remove(NRTCommand::mID); remove(world, NRTCommand::mID);
NRTCommand::sendReply(name(), true); NRTCommand::sendReply(name(), true);
return true; return true;
} }
}; };
/// Not registered as a PlugInCmd. Triggered by worker thread callback // struct UnitInfo
struct CommandAsyncComplete : public NRTCommand // {
{ // int mSynthIndex;
CommandAsyncComplete(World*, index id, void* replyAddress) // int mNodeID{0};
{ // };
NRTCommand::mID = id;
NRTCommand::mReplyAddress = replyAddress;
}
static const char* name() { return CommandProcess::name(); }
bool stage2(World* world)
{
// std::cout << "In Async completion\n";
ScopedSpinLock lock{mSpinlock};
if (auto ptr = get(NRTCommand::mID).lock())
{
Result r;
mRecord = ptr;
auto& client = ptr->mClient;
ProcessState s = client.checkProgress(r);
if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing)
{
if (r.status() == Result::Status::kCancelled)
{
std::cout << Wrapper::getName() << ": Processing cancelled"
<< std::endl;
ptr->mDone.store(true, std::memory_order_relaxed);
return false;
}
client.checkProgress(r);
mSuccess = !(r.status() == Result::Status::kError);
if (!r.ok())
{
Wrapper::printResult(world, r);
if (r.status() == Result::Status::kError)
{
ptr->mDone.store(true, std::memory_order_relaxed);
return false;
}
}
// if we're progressing to stage3, don't unlock the lock just yet
lock.release();
return true;
}
}
return false;
}
bool stage3(World* world)
{
ScopedSpinLock lock(mSpinlock, std::adopt_lock);
if (auto ptr = mRecord.lock())
{
auto& params = ptr->mParams;
params.template forEachParamType<BufferT, AssignBuffer>(world);
return true;
}
return false;
}
bool stage4(World*) // nrt
{
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock())
{
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress)
{
NRTCommand::sendReply(name(), mSuccess);
}
ptr->mDone.store(true, std::memory_order_relaxed); // = true;
return true;
}
std::cout << "ERROR: Failed to lock\n";
return false;
}
bool mSuccess;
WeakCacheEntryPointer mRecord;
};
static void doProcessCallback(World* world, index id,
size_t completionMsgSize,
char* completionMessage, void* replyAddress)
{
auto ft = getInterfaceTable();
struct Context
{
World* mWorld;
index mID;
size_t mCompletionMsgSize;
char* mCompletionMessage;
void* mReplyAddress;
};
Context* c = new Context{world, id, completionMsgSize, completionMessage,
replyAddress};
auto launchCompletionFromNRT = [](FifoMsg* inmsg) {
auto runCompletion = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData);
World* world = c->mWorld;
index id = c->mID;
auto ft = getInterfaceTable();
void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete));
CommandAsyncComplete* cmd =
new (space) CommandAsyncComplete(world, id, c->mReplyAddress);
runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize,
c->mCompletionMessage);
if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage);
};
auto tidyup = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData);
delete c;
};
auto ft = getInterfaceTable();
FifoMsg fwd = *inmsg;
fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData);
if (inmsg->mWorld->mRunning) ft->fSendMsgToRT(inmsg->mWorld, fwd);
};
FifoMsg msg;
msg.Set(world, launchCompletionFromNRT, nullptr, c);
if (world->mRunning) ft->fSendMsgFromRT(world, msg);
}
struct CommandProcess : public NRTCommand struct CommandProcess : public NRTCommand
{ {
@ -432,7 +321,6 @@ private:
mParams{Client::getParameterDescriptors()} mParams{Client::getParameterDescriptors()}
{ {
auto& ar = *args; auto& ar = *args;
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mDone.store(false, std::memory_order_relaxed); ptr->mDone.store(false, std::memory_order_relaxed);
@ -453,7 +341,6 @@ private:
} }
} }
static const char* name() static const char* name()
{ {
static std::string cmd = std::string(Wrapper::getName()) + "/process"; static std::string cmd = std::string(Wrapper::getName()) + "/process";
@ -462,7 +349,6 @@ private:
bool stage2(World* world) bool stage2(World* world)
{ {
ScopedSpinLock lock(mSpinlock);
mRecord = get(NRTCommand::mID); mRecord = get(NRTCommand::mID);
if (auto ptr = mRecord.lock()) if (auto ptr = mRecord.lock())
{ {
@ -480,7 +366,6 @@ private:
size_t completionMsgSize = mCompletionMsgSize; size_t completionMsgSize = mCompletionMsgSize;
char* completionMessage = mCompletionMessage; char* completionMessage = mCompletionMessage;
void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress); void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress);
auto callback = [world, id, completionMsgSize, completionMessage, auto callback = [world, id, completionMsgSize, completionMessage,
replyAddress]() { replyAddress]() {
doProcessCallback(world, id, completionMsgSize, completionMessage, doProcessCallback(world, id, completionMsgSize, completionMessage,
@ -491,7 +376,7 @@ private:
: client.enqueue(params, callback); : client.enqueue(params, callback);
Wrapper::printResult(world, result); Wrapper::printResult(world, result);
if (result.ok()) if (result.status() != Result::Status::kError)
{ {
ptr->mDone.store(false, std::memory_order_relaxed); ptr->mDone.store(false, std::memory_order_relaxed);
mResult = client.process(); mResult = client.process();
@ -499,10 +384,10 @@ private:
bool error = mResult.status() == Result::Status::kError; bool error = mResult.status() == Result::Status::kError;
if (error) ptr->mDone.store(true, std::memory_order_relaxed); if (error)
ptr->mDone.store(true,
std::memory_order_relaxed);
bool toStage3 = mSynchronous && !error; bool toStage3 = mSynchronous && !error;
if (toStage3) lock.release();
return toStage3; return toStage3;
} }
} }
@ -519,7 +404,6 @@ private:
// Only for blocking execution // Only for blocking execution
bool stage3(World* world) // rt bool stage3(World* world) // rt
{ {
ScopedSpinLock lock(mSpinlock, std::adopt_lock);
if (auto ptr = mRecord.lock()) if (auto ptr = mRecord.lock())
{ {
ptr->mParams.template forEachParamType<BufferT, AssignBuffer>(world); ptr->mParams.template forEachParamType<BufferT, AssignBuffer>(world);
@ -529,22 +413,21 @@ private:
} }
// Only for blocking execution // Only for blocking execution
bool stage4(World*) // nrt bool stage4(World* w) // nrt
{ {
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>(); ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if (NRTCommand::mID >= 0 && mSynchronous) if (NRTCommand::mID >= 0 && mSynchronous)
NRTCommand::sendReply(name(), mResult.ok()); NRTCommand::sendReply(name(),
mResult.status() != Result::Status::kError);
ptr->mDone.store(true, std::memory_order_relaxed); ptr->mDone.store(true, std::memory_order_relaxed);
return true; return true;
} }
return false; return false;
} }
bool synchronous() { return mSynchronous; } bool synchronous() { return mSynchronous; }
void addCompletionMessage(size_t size, char* message) //, void* addr) void addCompletionMessage(size_t size, char* message) //, void* addr)
@ -561,8 +444,191 @@ private:
Params mParams; Params mParams;
bool mOverwriteParams{false}; bool mOverwriteParams{false};
WeakCacheEntryPointer mRecord; WeakCacheEntryPointer mRecord;
}; };
/// Not registered as a PlugInCmd. Triggered by worker thread callback
struct CommandAsyncComplete : public NRTCommand
{
CommandAsyncComplete(World*, index id, void* replyAddress)
{
NRTCommand::mID = id;
NRTCommand::mReplyAddress = replyAddress;
}
static const char* name() { return CommandProcess::name(); }
bool stage2(World* world)
{
if (auto ptr = get(NRTCommand::mID).lock())
{
Result r;
mRecord = ptr;
auto& client = ptr->mClient;
ProcessState s = client.checkProgress(r);
if (s == ProcessState::kDone || s == ProcessState::kDoneStillProcessing)
{
if (r.status() == Result::Status::kCancelled)
{
std::cout << Wrapper::getName() << ": Processing cancelled"
<< std::endl;
ptr->mDone.store(true, std::memory_order_relaxed);
return false;
}
client.checkProgress(r);
mSuccess = !(r.status() == Result::Status::kError);
Wrapper::printResult(world, r);
if (!mSuccess)
{
ptr->mDone.store(true, std::memory_order_relaxed);
return false;
}
// if we're progressing to stage3, don't unlock the lock just yet
// lock.release();
return true;
}
}
std::cout << "Error: Entered callback but thread not done!?\n";
return false;
}
bool stage3(World* world)
{
if (auto ptr = mRecord.lock())
{
auto& params = ptr->mParams;
params.template forEachParamType<BufferT, AssignBuffer>(world);
}
return true;
}
bool stage4(World* w) // nrt
{
if (auto ptr = get(NRTCommand::mID).lock())
{
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if (NRTCommand::mID >= 0 && NRTCommand::mReplyAddress)
{
NRTCommand::sendReply(name(), mSuccess);
}
ptr->mDone.store(true, std::memory_order_relaxed); // = true;
return true;
}
std::cout << "ERROR: Failed to lock\n";
return false;
}
// void notifyUnit(World* w)
// {
// if(mUnitInfo.mNodeID > 0)
// {
// auto ft = Wrapper::getInterfaceTable();
//
// NRTDoneCount++;
// FifoMsg msg;
//
// auto updateUnitDone = [](FifoMsg* m)
// {
// UnitInfo* info = static_cast<UnitInfo*>(m->mData);
//
// auto ft = Wrapper::getInterfaceTable();
// Graph* g = ft->fGetGraph(m->mWorld,info->mNodeID);
// if(g)
// {
// Unit* u = g->mUnits[info->mSynthIndex];
// if(u)
// {
// RTDoneCount++;
// u->mDone = true;
// }
// }
// };
//
// msg.Set(w, updateUnitDone, nullptr, &mUnitInfo);
// ft->fSendMsgToRT(w,msg);
// }
// }
bool mSuccess;
WeakCacheEntryPointer mRecord;
};
static void doProcessCallback(World* world, index id,
size_t completionMsgSize,
char* completionMessage, void* replyAddress)
{
auto ft = getInterfaceTable();
struct Context
{
World* mWorld;
index mID;
size_t mCompletionMsgSize;
char* mCompletionMessage;
void* mReplyAddress;
};
Context* c = new Context{world, id, completionMsgSize, completionMessage,
replyAddress};
auto launchCompletionFromNRT = [](FifoMsg* inmsg) {
auto runCompletion = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData);
World* world = c->mWorld;
index id = c->mID;
auto ft = getInterfaceTable();
void* space = ft->fRTAlloc(world, sizeof(CommandAsyncComplete));
CommandAsyncComplete* cmd =
new (space) CommandAsyncComplete(world, id, c->mReplyAddress);
if (runAsyncCommand(world, cmd, c->mReplyAddress, c->mCompletionMsgSize,
c->mCompletionMessage) != 0)
{
std::cout << "ERROR: Async cmf failed in callback" << std::endl;
}
if (c->mCompletionMsgSize) ft->fRTFree(world, c->mCompletionMessage);
};
auto tidyup = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData);
delete c;
};
auto ft = getInterfaceTable();
FifoMsg fwd = *inmsg;
fwd.Set(inmsg->mWorld, runCompletion, tidyup, inmsg->mData);
if (inmsg->mWorld->mRunning)
if (!ft->fSendMsgToRT(inmsg->mWorld, fwd))
{
std::cout << "ERROR: Failed to queue -> RT\n";
}
};
FifoMsg msg;
msg.Set(world, launchCompletionFromNRT, nullptr, c);
if (world->mRunning)
{
ft->fNRTLock(world);
msg.Perform();
// if(!ft->fSendMsgFromRT(world, msg))
// {
// std::cout << "ERROR: Failed to queue -> NRT\n";
// }
ft->fNRTUnlock(world);
}
else
std::cout << "ERROR: World not running??";
}
struct CommandProcessNew : public NRTCommand struct CommandProcessNew : public NRTCommand
{ {
CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr) CommandProcessNew(World* world, sc_msg_iter* args, void* replyAddr)
@ -572,10 +638,6 @@ private:
mProcess.mReplyAddress = mNew.mReplyAddress; mProcess.mReplyAddress = mNew.mReplyAddress;
} }
CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x)
: mNew{id, world, args, x}, mProcess{id}
{}
static const char* name() static const char* name()
{ {
static std::string cmd = std::string(Wrapper::getName()) + "/processNew"; static std::string cmd = std::string(Wrapper::getName()) + "/processNew";
@ -630,13 +692,14 @@ private:
bool stage2(World*) bool stage2(World*)
{ {
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
if (!client.synchronous()) if (!client.synchronous())
{ {
client.cancel(); client.cancel();
std::cout << Wrapper::getName() << ": Processing cancelled"
<< std::endl;
return true; return true;
} }
} }
@ -650,7 +713,6 @@ private:
: NRTCommand{world, args, replyAddr} : NRTCommand{world, args, replyAddr}
{ {
auto& ar = *args; auto& ar = *args;
ScopedSpinLock lock(mSpinlock);
if (auto ptr = get(NRTCommand::mID).lock()) if (auto ptr = get(NRTCommand::mID).lock())
{ {
ptr->mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, ptr->mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr,
@ -773,33 +835,31 @@ private:
NRTProgressUnit() NRTProgressUnit()
{ {
mInterval = static_cast<index>(0.02 / controlDur()); mInterval = static_cast<index>(0.02 / controlDur());
mID = static_cast<index>(mInBuf[0][0]);
std::cout << mID << std::endl;
mRecord = rtget(mID);
set_calc_function<NRTProgressUnit, &NRTProgressUnit::next>(); set_calc_function<NRTProgressUnit, &NRTProgressUnit::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
} }
void next(int) void next(int)
{ {
if (0 == mCounter)
{
ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (!lock.owns_lock()) return;
index id = static_cast<index>(mInBuf[0][0]); if (isNull(mRecord)) { mRecord = rtget(mID); };
auto record = get(id);
mCounter++;
if (auto ptr = record.lock()) if (0 == mCounter++)
{
if (auto ptr = mRecord.lock())
{ {
mInit = true; mInit = true;
if (ptr->mClient.done()) mDone = 1; mDone = ptr->mDone.load(std::memory_order_relaxed);
out0(0) = static_cast<float>(ptr->mClient.progress()); out0(0) = static_cast<float>(ptr->mClient.progress());
} }
else else
{ {
if (!mInit) if (!mInit)
std::cout << "WARNING: No " << Wrapper::getName() << " with ID " std::cout << "WARNING: No " << Wrapper::getName() << " with ID "
<< id << std::endl; << mID << std::endl;
else else
mDone = 1; mDone = 1;
} }
@ -808,9 +868,11 @@ private:
} }
private: private:
index mID;
index mInterval; index mInterval;
index mCounter{0}; index mCounter{0};
bool mInit{false}; bool mInit{false};
WeakCacheEntryPointer mRecord;
}; };
@ -845,7 +907,11 @@ private:
if (mID == -1) mID = count(); if (mID == -1) mID = count();
auto cmd = NonRealTime::rtalloc<CommandNew>(mWorld, mID, mWorld, auto cmd = NonRealTime::rtalloc<CommandNew>(mWorld, mID, mWorld,
mControlsIterator, this); mControlsIterator, this);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0)
{
std::cout << "ERROR: Async command failed in NRTTriggerUnit()"
<< std::endl;
}
set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::next>(); set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
} }
@ -854,12 +920,17 @@ private:
{ {
set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::clear>(); set_calc_function<NRTTriggerUnit, &NRTTriggerUnit::clear>();
auto cmd = NonRealTime::rtalloc<CommandFree>(mWorld, mID); auto cmd = NonRealTime::rtalloc<CommandFree>(mWorld, mID);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0)
{
std::cout << "ERROR: Async command failed in ~NRTTriggerUnit()"
<< std::endl;
}
} }
void clear(int) void clear(int)
{ {
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); Wrapper::getInterfaceTable()->fClearUnitOutputs(
this, static_cast<int>(mNumOutputs));
} }
void next(int) void next(int)
@ -879,18 +950,18 @@ private:
bool blocking = mInBuf[mNumInputs - 1][0] > 0; bool blocking = mInBuf[mNumInputs - 1][0] > 0;
CommandProcess* cmd = CommandProcess* cmd = rtalloc<CommandProcess>(mWorld, mID, blocking,
rtalloc<CommandProcess>(mWorld, mID, blocking, &mParams); &mParams);
runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr); if (runAsyncCommand(mWorld, cmd, nullptr, 0, nullptr) != 0)
{
std::cout << "ERROR: Async command failed in NRTTriggerUnit::next()"
<< std::endl;
}
mRunCount++; mRunCount++;
} }
else else
{ {
ScopedSpinLock lock(mSpinlock, std::try_to_lock); auto record = rtget(mID);
if (!lock.owns_lock()) return;
auto record = get(mID);
if (auto ptr = record.lock()) if (auto ptr = record.lock())
{ {
mInit = true; mInit = true;
@ -902,7 +973,6 @@ private:
mDone = mInit; mDone = mInit;
} }
} }
private: private:
bool mPreviousTrigger{false}; bool mPreviousTrigger{false};
bool mTrigger{false}; bool mTrigger{false};
@ -944,13 +1014,9 @@ private:
} }
void init() void init()
{
ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (lock.owns_lock())
{ {
mInit = false; mInit = false;
mInst = get(mID); mInst = rtget(mID);
if (auto ptr = mInst.lock()) if (auto ptr = mInst.lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
@ -958,30 +1024,24 @@ private:
mInit = true; mInit = true;
} }
} }
}
void next(int) void next(int)
{ {
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, mNumOutputs); Wrapper::getInterfaceTable()->fClearUnitOutputs(this,
asSigned(mNumOutputs));
index id = static_cast<index>(in0(1)); index id = static_cast<index>(in0(1));
if (mID != id) init(); if (mID != id) init();
if (!mInit) return; if (!mInit) return;
ScopedSpinLock lock(mSpinlock, std::try_to_lock);
if (lock.owns_lock())
;
{
if (auto ptr = mInst.lock()) if (auto ptr = mInst.lock())
{ {
auto& client = ptr->mClient; auto& client = ptr->mClient;
auto& params = ptr->mParams; auto& params = ptr->mParams;
mControls.reset(mInBuf + ControlOffset()); mControls.reset(mInBuf + ControlOffset());
mDelegate.next(*this, client, params, mControls, mDelegate.next(*this, client, params, mControls, ptr.use_count() == 2);
ptr.use_count() == 2);
} }
else else
printNotFound(id); printNotFound(id);
} }
}
private: private:
Delegate mDelegate; Delegate mDelegate;
@ -1012,7 +1072,6 @@ private:
{ {
void operator()() void operator()()
{ {
// std::cout << CommandType::name() << std::endl;
defineNRTCommand<CommandType>(); defineNRTCommand<CommandType>();
} }
}; };
@ -1032,13 +1091,11 @@ private:
} }
}; };
using IsRTQueryModel_t = typename Client::isRealTime; using IsRTQueryModel_t = typename Client::isRealTime;
static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value; static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value;
static constexpr bool IsModel = Client::isModelObject::value; static constexpr bool IsModel = Client::isModelObject::value;
public: public:
static void setup(InterfaceTable* ft, const char*) static void setup(InterfaceTable* ft, const char*)
{ {
@ -1113,9 +1170,8 @@ typename NonRealTime<Client, Wrapper>::Cache
NonRealTime<Client, Wrapper>::mCache{}; NonRealTime<Client, Wrapper>::mCache{};
template <typename Client, typename Wrapper> template <typename Client, typename Wrapper>
typename NonRealTime<Client, Wrapper>::Spinlock typename NonRealTime<Client, Wrapper>::RTCacheMirror
NonRealTime<Client, Wrapper>::mSpinlock{}; NonRealTime<Client, Wrapper>::mRTCache;
} // namespace impl } // namespace impl
} // namespace client } // namespace client

Loading…
Cancel
Save