Toughen NRT processing by decoupling ownership of shared state from synth Node

nix
Owen Green 6 years ago
parent 699c648e27
commit 48dc167c1b

@ -21,6 +21,7 @@ under the European Unions Horizon 2020 research and innovation programme
#include <FluidVersion.hpp> #include <FluidVersion.hpp>
#include <SC_PlugIn.hpp> #include <SC_PlugIn.hpp>
#include <algorithm> #include <algorithm>
#include <memory>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <type_traits> #include <type_traits>
@ -35,6 +36,21 @@ namespace client {
template <typename Client> template <typename Client>
class FluidSCWrapper; class FluidSCWrapper;
template<typename Client>
struct WrapperState
{
typename Client::ParamSetType params;
Client client;
Node* mNode;
bool mCancelled{false};
bool mJobDone{false};
bool mHasTriggered{false};
bool mSynchronous{false};
bool mInNRT{false};
Result mResult{};
};
namespace impl { namespace impl {
template <size_t N, typename T> template <size_t N, typename T>
@ -169,7 +185,7 @@ public:
static void doLatency(Unit* unit, sc_msg_iter*) static void doLatency(Unit* unit, sc_msg_iter*)
{ {
float l[]{ float l[]{
static_cast<float>(static_cast<Wrapper*>(unit)->mClient.latency()) static_cast<float>(static_cast<Wrapper*>(unit)->client().latency())
}; };
auto ft = Wrapper::getInterfaceTable(); auto ft = Wrapper::getInterfaceTable();
@ -185,7 +201,7 @@ public:
void init() void init()
{ {
auto& client =static_cast<Wrapper*>(this)->mClient; auto& client =static_cast<Wrapper*>(this)->client();
assert( assert(
!(client.audioChannelsOut() > 0 && client.controlChannelsOut() > 0) && !(client.audioChannelsOut() > 0 && client.controlChannelsOut() > 0) &&
"Client can't have both audio and control outputs"); "Client can't have both audio and control outputs");
@ -233,13 +249,13 @@ public:
void next(int) void next(int)
{ {
auto& client = static_cast<Wrapper*>(this)->mClient; auto& client = mWrapper->client();
auto& params = static_cast<Wrapper*>(this)->mParams; auto& params = mWrapper->params();
static_cast<Wrapper*>(this)->mControlsIterator.reset(mInBuf + mSpecialIndex + mWrapper->mControlsIterator.reset(mInBuf + mSpecialIndex +
1); // mClient.audioChannelsIn()); 1); // mClient.audioChannelsIn());
Wrapper::setParams(static_cast<Wrapper*>(this), Wrapper::setParams(mWrapper,
params, static_cast<Wrapper*>(this)->mControlsIterator); // forward on inputs N + audio inputs as params params, mWrapper->mControlsIterator); // forward on inputs N + audio inputs as params
params.constrainParameterValues(); params.constrainParameterValues();
const Unit* unit = this; const Unit* unit = this;
for (index i = 0; i < client.audioChannelsIn(); ++i) for (index i = 0; i < client.audioChannelsIn(); ++i)
@ -280,7 +296,7 @@ template <typename Client, typename Wrapper>
class NonRealTime : public SCUnit class NonRealTime : public SCUnit
{ {
using ParamSetType = typename Client::ParamSetType; using ParamSetType = typename Client::ParamSetType;
using SharedState = std::shared_ptr<WrapperState<Client>>;
public: public:
static index ControlOffset(Unit*) { return 0; } static index ControlOffset(Unit*) { return 0; }
@ -289,32 +305,32 @@ public:
static void setup(InterfaceTable* ft, const char* name) static void setup(InterfaceTable* ft, const char* name)
{ {
ft->fDefineUnitCmd(name, "cancel", doCancel); ft->fDefineUnitCmd(name, "cancel", doCancel);
ft->fDefineUnitCmd( // ft->fDefineUnitCmd(
name, "queue_enabled", [](struct Unit* unit, struct sc_msg_iter* args) { // name, "queue_enabled", [](struct Unit* unit, struct sc_msg_iter* args) {
auto w = static_cast<Wrapper*>(unit); // auto w = static_cast<Wrapper*>(unit);
w->mQueueEnabled = args->geti(0); // w->mQueueEnabled = args->geti(0);
w->mFifoMsg.Set( // w->mFifoMsg.Set(
w->mWorld, // w->mWorld,
[](FifoMsg* f) { // [](FifoMsg* f) {
auto w = static_cast<Wrapper*>(f->mData); // auto w = static_cast<Wrapper*>(f->mData);
w->mClient.setQueueEnabled(w->mQueueEnabled); // w->client().setQueueEnabled(w->mQueueEnabled);
}, // },
nullptr, w); // nullptr, w);
Wrapper::getInterfaceTable()->fSendMsgFromRT(w->mWorld, w->mFifoMsg); // Wrapper::getInterfaceTable()->fSendMsgFromRT(w->mWorld, w->mFifoMsg);
}); // });
ft->fDefineUnitCmd( // ft->fDefineUnitCmd(
name, "synchronous", [](struct Unit* unit, struct sc_msg_iter* args) { // name, "synchronous", [](struct Unit* unit, struct sc_msg_iter* args) {
auto w = static_cast<Wrapper*>(unit); // auto w = static_cast<Wrapper*>(unit);
w->mSynchronous = args->geti(0); // w->mSynchronous = args->geti(0);
w->mFifoMsg.Set( // w->mFifoMsg.Set(
w->mWorld, // w->mWorld,
[](FifoMsg* f) { // [](FifoMsg* f) {
auto w = static_cast<Wrapper*>(f->mData); // auto w = static_cast<Wrapper*>(f->mData);
w->mClient.setSynchronous(w->mSynchronous); // w->client().setSynchronous(w->mSynchronous);
}, // },
nullptr, w); // nullptr, w);
Wrapper::getInterfaceTable()->fSendMsgFromRT(w->mWorld, w->mFifoMsg); // Wrapper::getInterfaceTable()->fSendMsgFromRT(w->mWorld, w->mFifoMsg);
}); // });
} }
/// Penultimate input is the trigger, final is blocking mode. Neither are /// Penultimate input is the trigger, final is blocking mode. Neither are
@ -325,7 +341,7 @@ public:
~NonRealTime() ~NonRealTime()
{ {
if (client().state() == ProcessState::kProcessing) if (mWrapper->client().state() == ProcessState::kProcessing)
{ {
std::cout << Wrapper::getName() << ": Processing cancelled" << std::endl; std::cout << Wrapper::getName() << ": Processing cancelled" << std::endl;
Wrapper::getInterfaceTable()->fSendNodeReply(&mParent->mNode, 1, "/done", Wrapper::getInterfaceTable()->fSendNodeReply(&mParent->mNode, 1, "/done",
@ -340,6 +356,8 @@ public:
/// UGen calc function going /// UGen calc function going
void init() void init()
{ {
mWrapper->state()->mSynchronous = mSynchronous;
mFifoMsg.Set(mWorld, initNRTJob, nullptr, this); mFifoMsg.Set(mWorld, initNRTJob, nullptr, this);
// we want to poll thread roughly every 20ms // we want to poll thread roughly every 20ms
@ -352,22 +370,31 @@ public:
/// launches tidy up when complete /// launches tidy up when complete
void poll(int) void poll(int)
{ {
out0(0) = mDone ? 1.0f : static_cast<float>(client().progress()); out0(0) = mDone ? 1.0f : static_cast<float>(mWrapper->client().progress());
index triggerInput = static_cast<index>(mInBuf[static_cast<index>(mNumInputs) - mSpecialIndex - 2][0]); index triggerInput = static_cast<index>(mInBuf[static_cast<index>(mNumInputs) - mSpecialIndex - 2][0]);
bool trigger = (mPreviousTrigger <= 0) && triggerInput > 0; bool trigger = (mPreviousTrigger <= 0) && triggerInput > 0;
mPreviousTrigger = triggerInput; mPreviousTrigger = triggerInput;
auto& sharedState = mWrapper->state();
mWrapper->mDone = sharedState->mJobDone;
if(trigger) if(trigger)
{ {
SharedState* statePtr = static_cast<SharedState*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState)));
statePtr = new (statePtr) SharedState(sharedState);
mFifoMsg.Set(mWorld, initNRTJob, nullptr, statePtr);
mWorld->ft->fSendMsgFromRT(mWorld, mFifoMsg); mWorld->ft->fSendMsgFromRT(mWorld, mFifoMsg);
return;
} }
if (0 == pollCounter++ && !mCheckingForDone && mHasTriggered) if (0 == pollCounter++ && !sharedState->mInNRT && sharedState->mHasTriggered)
{ {
mCheckingForDone = true; sharedState->mInNRT = true;
SharedState* statePtr = static_cast<SharedState*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState)));
statePtr = new (statePtr) SharedState(sharedState);
mWorld->ft->fDoAsynchronousCommand(mWorld, nullptr, Wrapper::getName(), mWorld->ft->fDoAsynchronousCommand(mWorld, nullptr, Wrapper::getName(),
this, postProcess, exchangeBuffers, statePtr, postProcess, exchangeBuffers,
tidyUp, destroy, 0, nullptr); tidyUp, destroy, 0, nullptr);
} }
pollCounter %= checkThreadInterval; pollCounter %= checkThreadInterval;
@ -378,12 +405,13 @@ public:
/// new thread /// new thread
static void initNRTJob(FifoMsg* f) static void initNRTJob(FifoMsg* f)
{ {
auto w = static_cast<Wrapper*>(f->mData); if(!f->mData) return;
w->mDone = false; auto w = static_cast<SharedState*>(f->mData);
w->mJobDone = false; SharedState& s = *w;
w->mCancelled = false; s->mInNRT = true;
s->mJobDone = false;
Result result = validateParameters(w); s->mCancelled = false;
Result result = validateParameters(s->params);
if (!result.ok()) if (!result.ok())
{ {
@ -391,18 +419,25 @@ public:
<< result.message().c_str() << std::endl; << result.message().c_str() << std::endl;
return; return;
} }
w->mClient.setSynchronous(w->mSynchronous); s->client.setSynchronous(s->mSynchronous);
w->mClient.enqueue(w->mParams); s->client.enqueue(s->params);
w->mResult = w->mClient.process(); s->mResult = s->client.process();
w->mHasTriggered = true; s->mHasTriggered = true;
s->mInNRT = false;
w->~SharedState();
f->mWorld->ft->fRTFree(f->mWorld,w);
} }
/// Check result and report if bad /// Check result and report if bad
static bool postProcess(World*, void* data) static bool postProcess(World*, void* data)
{ {
auto w = static_cast<Wrapper*>(data);
if(!data) return false;
auto& w = *static_cast<SharedState*>(data);
Result r; Result r;
ProcessState s = w->mClient.checkProgress(r); w->mInNRT = true;
ProcessState s = w->client.checkProgress(r);
if(w->mSynchronous) r = w->mResult; if(w->mSynchronous) r = w->mResult;
@ -430,7 +465,7 @@ public:
if (!r.ok()) if (!r.ok())
{ {
if(!w->mDone) if(!w->mJobDone)
std::cout << "ERROR: " << Wrapper::getName() << ": " std::cout << "ERROR: " << Wrapper::getName() << ": "
<< r.message().c_str() << std::endl; << r.message().c_str() << std::endl;
w->mJobDone = true; w->mJobDone = true;
@ -447,34 +482,48 @@ public:
/// swap NRT buffers back to RT-land /// swap NRT buffers back to RT-land
static bool exchangeBuffers(World* world, void* data) static bool exchangeBuffers(World* world, void* data)
{ {
return static_cast<Wrapper*>(data)->exchangeBuffers(world); if(!data) return false;
SharedState& s = *(static_cast<SharedState*>(data));
s->params.template forEachParamType<BufferT, AssignBuffer>(world);
// At this point, we can see if we're finished and let the language know (or
// it can wait for the doneAction, but that takes extra time) use replyID to
// convey status (0 = normal completion, 1 = cancelled)
if (s->mJobDone && !s->mCancelled)
world->ft->fSendNodeReply(s->mNode, 0, "/done", 0, nullptr);
if (s->mCancelled)
world->ft->fSendNodeReply(s->mNode, 1, "/done", 0, nullptr);
return true;
} }
/// Tidy up any temporary buffers /// Tidy up any temporary buffers
static bool tidyUp(World* world, void* data) static bool tidyUp(World* , void* data)
{ {
return static_cast<Wrapper*>(data)->tidyUp(world); if(!data) return false;
SharedState& s = *(static_cast<SharedState*>(data));
s->params.template forEachParamType<BufferT, impl::CleanUpBuffer>();
return true;
} }
/// if we're properly done set the Unit done flag /// if we're properly done set the Unit done flag
static void destroy(World*, void* data) static void destroy(World* world, void* data)
{ {
auto w = static_cast<Wrapper*>(data); if(!data) return;
w->mDone = w->mJobDone; auto& s = *static_cast<SharedState*>(data);
w->mCheckingForDone = false; s->mInNRT = false;
s.~SharedState();
world->ft->fRTFree(world,data);
} }
static void doCancel(Unit* unit, sc_msg_iter*) static void doCancel(Unit* unit, sc_msg_iter*)
{ {
static_cast<Wrapper*>(unit)->mClient.cancel(); static_cast<Wrapper*>(unit)->client().cancel();
} }
ParamSetType& params() { return mWrapper->mParams; }
Client& client() { return mWrapper->mClient; }
private: private:
static Result validateParameters(NonRealTime* nrt) static Result validateParameters(ParamSetType& p)
{ {
auto results = nrt->params().constrainParameterValues(); auto results = p.constrainParameterValues();
for (auto& r : results) for (auto& r : results)
{ {
if (!r.ok()) return r; if (!r.ok()) return r;
@ -482,25 +531,6 @@ private:
return {}; return {};
} }
bool exchangeBuffers(World* world) // RT thread
{
params().template forEachParamType<BufferT, AssignBuffer>(world);
// At this point, we can see if we're finished and let the language know (or
// it can wait for the doneAction, but that takes extra time) use replyID to
// convey status (0 = normal completion, 1 = cancelled)
if (mJobDone)
world->ft->fSendNodeReply(&mParent->mNode, 0, "/done", 0, nullptr);
if (mCancelled)
world->ft->fSendNodeReply(&mParent->mNode, 1, "/done", 0, nullptr);
return true;
}
bool tidyUp(World*) // NRT thread
{
params().template forEachParamType<BufferT, impl::CleanUpBuffer>();
return true;
}
template <size_t N, typename T> template <size_t N, typename T>
struct AssignBuffer struct AssignBuffer
{ {
@ -528,13 +558,8 @@ private:
index mPreviousTrigger{0}; index mPreviousTrigger{0};
bool mSynchronous{true}; bool mSynchronous{true};
bool mQueueEnabled{false};
bool mCheckingForDone{false}; // only write to this from RT thread kthx
bool mCancelled{false};
bool mJobDone{false};
bool mHasTriggered{false};
Wrapper* mWrapper{static_cast<Wrapper*>(this)}; Wrapper* mWrapper{static_cast<Wrapper*>(this)};
Result mResult; Result mResult;
}; };
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1260,14 +1285,10 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
ft->fRTFree(x->mWorld, msgptr); ft->fRTFree(x->mWorld, msgptr);
return; return;
} }
///
ForEach(args,[x,&inArgs](auto& arg){ ForEach(args,[x,&inArgs](auto& arg){
arg = ParamReader<sc_msg_iter*>::fromArgs(x, inArgs,arg,0); arg = ParamReader<sc_msg_iter*>::fromArgs(x, inArgs,arg,0);
}); });
// x->mDone = false;
ft->fDoAsynchronousCommand( ft->fDoAsynchronousCommand(
x->mWorld, nullptr, getName(), msg, x->mWorld, nullptr, getName(), msg,
[](World*, void* data) // NRT thread: invocation [](World*, void* data) // NRT thread: invocation
@ -1312,7 +1333,7 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
static decltype(auto) invokeImpl(FluidSCWrapper* x, ArgsTuple& args, static decltype(auto) invokeImpl(FluidSCWrapper* x, ArgsTuple& args,
std::index_sequence<Is...>) std::index_sequence<Is...>)
{ {
return x->mClient.template invoke<N>(x->mClient, std::get<Is>(args)...); return x->client().template invoke<N>(x->client(), std::get<Is>(args)...);
} }
template <typename T> // call from RT template <typename T> // call from RT
@ -1372,16 +1393,17 @@ public:
using Client = C; using Client = C;
using ParamSetType = typename C::ParamSetType; using ParamSetType = typename C::ParamSetType;
FluidSCWrapper(FloatControlsIter&& i, Client&& c, ParamSetType&& p): FluidSCWrapper(FloatControlsIter&& i, Client&& c, ParamSetType&& p):
mControlsIterator{std::move(i)}, mControlsIterator{std::move(i)},
mParams{std::move(p)}, mClient{std::move(c)} mState{new WrapperState<Client>{std::move(p),std::move(c),&SCUnit::mParent->mNode}}
{ {
mClient.setParams(mParams); //<-IMPORTANT: client's ref to params is by address, and this has just changed client().setParams(params()); //<-IMPORTANT: client's ref to params is by address, and this has just changed
impl::FluidSCWrapperBase<Client>::init(); impl::FluidSCWrapperBase<Client>::init();
} }
std::shared_ptr<WrapperState<Client>>& state() { return mState; }
static const char* getName(const char* setName = nullptr) static const char* getName(const char* setName = nullptr)
{ {
static const char* name = nullptr; static const char* name = nullptr;
@ -1449,10 +1471,10 @@ public:
} }
} }
auto& client() { return mClient; } auto& client() { return mState->client; }
auto& params() { return mParams; } auto& params() { return mState->params; }
private: private:
static void registerUnit(InterfaceTable* ft, const char* name) { static void registerUnit(InterfaceTable* ft, const char* name) {
UnitCtorFunc ctor =impl::FluidSCWrapperBase<Client>::constructClass; UnitCtorFunc ctor =impl::FluidSCWrapperBase<Client>::constructClass;
@ -1460,9 +1482,8 @@ public:
(*ft->fDefineUnit)(name, sizeof(FluidSCWrapper), ctor, dtor, 0); (*ft->fDefineUnit)(name, sizeof(FluidSCWrapper), ctor, dtor, 0);
} }
FloatControlsIter mControlsIterator; FloatControlsIter mControlsIterator;
ParamSetType mParams; std::shared_ptr<WrapperState<Client>> mState;
Client mClient;
}; };
template <typename Client> template <typename Client>

Loading…
Cancel
Save