diff --git a/include/FluidSCWrapper.hpp b/include/FluidSCWrapper.hpp index ce7926a..1c52928 100644 --- a/include/FluidSCWrapper.hpp +++ b/include/FluidSCWrapper.hpp @@ -20,6 +20,7 @@ under the European Union’s Horizon 2020 research and innovation programme #include #include #include +#include #include #include #include @@ -43,11 +44,12 @@ struct WrapperState typename Client::ParamSetType params; Client client; Node* mNode; - bool mCancelled{false}; - bool mJobDone{false}; - bool mHasTriggered{false}; - bool mSynchronous{false}; - bool mInNRT{false}; + std::atomic mCancelled{false}; + std::atomic mJobDone{false}; + std::atomic mHasTriggered{false}; + std::atomic mSynchronous{false}; + std::atomic mInNRT{false}; + std::atomic mNodeAlive{true}; Result mResult{}; }; @@ -341,7 +343,8 @@ public: ~NonRealTime() { - if (mWrapper->client().state() == ProcessState::kProcessing) + auto state = mWrapper->client().state(); + if (state == ProcessState::kProcessing) { std::cout << Wrapper::getName() << ": Processing cancelled" << std::endl; Wrapper::getInterfaceTable()->fSendNodeReply(&mParent->mNode, 1, "/done", @@ -371,7 +374,7 @@ public: void poll(int) { out0(0) = mDone ? 1.0f : static_cast(mWrapper->client().progress()); - + index triggerInput = static_cast(mInBuf[static_cast(mNumInputs) - mSpecialIndex - 2][0]); bool trigger = (mPreviousTrigger <= 0) && triggerInput > 0; mPreviousTrigger = triggerInput; @@ -408,21 +411,29 @@ public: if(!f->mData) return; auto w = static_cast(f->mData); SharedState& s = *w; - s->mInNRT = true; - s->mJobDone = false; - s->mCancelled = false; Result result = validateParameters(s->params); if (!result.ok()) { std::cout << "ERROR: " << Wrapper::getName() << ": " << result.message().c_str() << std::endl; + s->mInNRT = false; return; } s->client.setSynchronous(s->mSynchronous); - s->client.enqueue(s->params); - s->mResult = s->client.process(); + result = s->client.enqueue(s->params); + if (!result.ok()) + { + std::cout << "ERROR: " << Wrapper::getName() << ": " + << result.message().c_str() << std::endl; + s->mInNRT = false; + return; + } + + s->mJobDone = false; + s->mCancelled = false; s->mHasTriggered = true; + s->mResult = s->client.process(); s->mInNRT = false; w->~SharedState(); f->mWorld->ft->fRTFree(f->mWorld,w); @@ -483,8 +494,8 @@ public: static bool exchangeBuffers(World* world, void* data) { if(!data) return false; - SharedState& s = *(static_cast(data)); + if(!s->mNodeAlive) return false; s->params.template forEachParamType(world); // At this point, we can see if we're finished and let the language know (or // it can wait for the doneAction, but that takes extra time) use replyID to @@ -1401,6 +1412,12 @@ public: impl::FluidSCWrapperBase::init(); } + ~FluidSCWrapper() + { + mState->mNodeAlive = false; + } + + std::shared_ptr>& state() { return mState; }