From b3895c5be6512ffcf2c852e52a88e7c2eb470a74 Mon Sep 17 00:00:00 2001 From: Owen Green Date: Thu, 25 Jul 2019 01:07:58 +0100 Subject: [PATCH] Functional attempt at NRT threading: * changes to NRT SC Wrapper makes it a UGen, using the RT thread for polling and delegating to SC NRT thread for launching * changes to FluidBufNMF: Now a UGen with kr (emitting progress updates); process() wraps .kr via Function.play. Sync at end managed through use of doneAction * TODO: cancelling, checking progress, all the other client (and then the help...) --- include/FluidSCWrapper.hpp | 160 ++++++++++++----------- release-packaging/Classes/FluidBufNMF.sc | 70 +++++++--- 2 files changed, 133 insertions(+), 97 deletions(-) diff --git a/include/FluidSCWrapper.hpp b/include/FluidSCWrapper.hpp index 4bed7f7..ac624e9 100644 --- a/include/FluidSCWrapper.hpp +++ b/include/FluidSCWrapper.hpp @@ -118,11 +118,7 @@ public: for (int i = 0; i < static_cast(mClient.controlChannelsOut()); ++i) { mOutputs.emplace_back(nullptr, 0, 0); } mCalcFunc = make_calc_function(); -// set_calc_function(); Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1); - - - } void next(int) @@ -158,61 +154,92 @@ protected: //////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// Non Real Time Processor - +/// This is also a UGen, but the main action is delegated off to a worker thread, via the NRT thread. +/// The RT bit is there to allow us (a) to poll our thread and (b) emit a kr progress update template -class NonRealTime +class NonRealTime: public SCUnit { using ParamSetType = typename Client::ParamSetType; - public: - static void setup(InterfaceTable *ft, const char *name) { DefinePlugInCmd(name, launch, nullptr); } - NonRealTime(World* w, sc_msg_iter* args) - : mParams{Client::getParameterDescriptors()} - , mClient{Wrapper::setParams(mParams, false, w, args)} + static void setup(InterfaceTable *ft, const char *name) { registerUnit(ft, name); } + + /// Final input is the doneAction, not a param, so we skip it in the controlsIterator + NonRealTime() : + mControlsIterator{mInBuf,static_cast(static_cast(mNumInputs) - mSpecialIndex - 1)} + , mParams{Wrapper::Client::getParameterDescriptors()} + , mClient{Wrapper::setParams(mParams,mWorld->mVerbosity > 0, mWorld, mControlsIterator,true)} {} - void init(){}; - - static void launch(World *world, void */*inUserData*/, struct sc_msg_iter *args, void *replyAddr) + /// No option of not using a worker thread for now + /// init() sets up the NRT process via the SC NRT thread, and then sets our UGen calc function going + void init() { - - if (args->tags && ((std::string{args->tags}.size() - 1) != Client::getParameterDescriptors().count())) + mClient.setSynchronous(false); + mFifoMsg.Set(mWorld, initNRTJob, nullptr, this); + mWorld->ft->fSendMsgFromRT(mWorld,mFifoMsg); + set_calc_function(); + }; + + /// The calc function. Checks to see if we've cancelled, spits out progress, launches tidy up when complete + void poll(int) + { + if(!mClient.done()) { - std::cout << "ERROR: " << Wrapper::getName() << " wrong number of arguments. Expected " - << Client::getParameterDescriptors().count() << ", got " << (std::string{args->tags}.size() - 1) - << ". Your .sc file and binary plugin might be different versions." << std::endl; - return; + out0(0) = static_cast(mClient.progress()); +// if(in0(0) > 0) mClient.cancel(); } + else { + mDone = true; + mCalcFunc = mWorld->ft->fClearUnitOutputs; + mWorld->ft->fDoAsynchronousCommand(mWorld, nullptr, Wrapper::getName(), this, + postProcess, exchangeBuffers, tidyUp, destroy, + 0, nullptr); + + } + } - Wrapper *w = new Wrapper( - world, args); // this has to be on the heap, because it doesn't get destroyed until the async command is done - + /// To be called on NRT thread. Validate parameters and commence processing in new thread + static void initNRTJob(FifoMsg* f) + { + auto w = static_cast(f->mData); + Result result = validateParameters(w); if (!result.ok()) { std::cout << "ERROR: " << Wrapper::getName() << ": " << result.message().c_str() << std::endl; - delete w; - return; + w->mDone = true; + return; } - size_t msgSize = args->getbsize(); - std::vector completionMessage(msgSize); - // char * completionMsgData = 0; - if (msgSize) { args->getb(completionMessage.data(), msgSize); } + w->mClient.process(); + } - world->ft->fDoAsynchronousCommand(world, replyAddr, Wrapper::getName(), w, process, exchangeBuffers, tidyUp, destroy, - static_cast(msgSize), completionMessage.data()); + /// Check result and report if bad + static bool postProcess(World*, void *data) + { + auto w = static_cast(data); + Result r; + w->mClient.checkProgress(r); + if(!r.ok()) + { + std::cout << "ERROR: " << Wrapper::getName() << ": " << r.message().c_str() << '\n'; + return false; + } + return true; } - static bool process(World *world, void *data) { return static_cast(data)->process(world); } + /// swap NRT buffers back to RT-land static bool exchangeBuffers(World *world, void *data) { return static_cast(data)->exchangeBuffers(world); } + /// Tidy up any temporary buffers static bool tidyUp(World *world, void *data) { return static_cast(data)->tidyUp(world); } - static void destroy(World *, void *data) { delete static_cast(data); } - -protected: - ParamSetType mParams; - Client mClient; + + /// Now we're actually properly done, call the UGen's done action (possibly destroying this instance) + static void destroy(World *, void *data) + { + auto w = static_cast(data); + w->mWorld->ft->fDoneAction(static_cast(w->mInBuf[w->mNumInputs - 1][0]),w); + } private: @@ -226,19 +253,6 @@ private: return {}; } - bool process(World *) - { - Result r = mClient.process(); - - if (!r.ok()) - { - std::cout << "ERROR: " << Wrapper::getName() << ": " << r.message().c_str() << '\n'; - return false; - } - - return true; - } - bool exchangeBuffers(World *world) { mParams.template forEachParamType(world); @@ -268,10 +282,16 @@ private: if (auto b = static_cast(p.get())) b->cleanUp(); } }; - + + FloatControlsIter mControlsIterator; + FifoMsg mFifoMsg; char * mCompletionMessage = nullptr; void * mReplyAddr = nullptr; const char *mName = nullptr; +protected: + ParamSetType mParams; + Client mClient; + }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -298,8 +318,8 @@ template class FluidSCWrapperImpl : public NonRealTime { -public: - FluidSCWrapperImpl(World* w, sc_msg_iter *args): NonRealTime(w,args){}; +//public: +// FluidSCWrapperImpl(World* w, sc_msg_iter *args): NonRealTime(w,args){}; }; template @@ -322,39 +342,33 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase { using FloatControlsIter = impl::FloatControlsIter; - // Iterate over arguments in sc_msg_iter, via callbacks from params object - + // Iterate over arguments via callbacks from params object template struct Setter { static constexpr size_t argSize = C::getParameterDescriptors().template get().fixedSize; - + auto fromArgs(World *, FloatControlsIter& args, LongT::type, int) { return args.next(); } auto fromArgs(World *, FloatControlsIter& args, FloatT::type, int) { return args.next(); } - auto fromArgs(World *, sc_msg_iter* args, LongT::type, int defVal) { return args->geti(defVal); } - auto fromArgs(World *, sc_msg_iter* args, FloatT::type, int) { return args->getf(); } auto fromArgs(World *w, ArgType args, BufferT::type, int) { typename LongT::type bufnum = static_cast(fromArgs(w, args, LongT::type(), -1)); return BufferT::type(bufnum >= 0 ? new SCBufferAdaptor(bufnum, w) : nullptr); } - + typename T::type operator()(World *w, ArgType args) { ParamLiteralConvertor a; using LiteralType = typename ParamLiteralConvertor::LiteralType; - + for (size_t i = 0; i < argSize; i++) a[i] = static_cast(fromArgs(w, args, a[0], 0)); - + return a.value(); } }; - - template - using ArgumentSetter = Setter; - + template using ControlSetter = Setter; @@ -362,12 +376,7 @@ public: using Client = C; using ParameterSetType = typename C::ParamSetType; - FluidSCWrapper() // mParams{*getParamDescriptors()}, //impl::FluidSCWrapperBase() - { - impl::FluidSCWrapperBase::init(); - } - - FluidSCWrapper(World* w, sc_msg_iter *args): impl::FluidSCWrapperBase(w,args) + FluidSCWrapper() { impl::FluidSCWrapperBase::init(); } @@ -398,15 +407,12 @@ public: { p.template setParameterValues(verbose, world, inputs); if (constrain)p.constrainParameterValues(); + } else { + std::cout << "ERROR: " << getName() << ": parameter count mismatch. Perhaps your binary plugins and SC sources are different versions"; + //TODO: work out how to bring any further work to a halt } return p; } - - static auto& setParams(ParameterSetType& p, bool verbose, World* world, sc_msg_iter *args) - { - p.template setParameterValues(verbose,world, args); - return p; - } }; template class Client> diff --git a/release-packaging/Classes/FluidBufNMF.sc b/release-packaging/Classes/FluidBufNMF.sc index a46879b..1a37e85 100644 --- a/release-packaging/Classes/FluidBufNMF.sc +++ b/release-packaging/Classes/FluidBufNMF.sc @@ -1,27 +1,57 @@ -FluidBufNMF { - *process { arg server, source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, action; +FluidBufNMF : UGen { + *kr {|source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, doneAction = 0| - source = source.asUGenInput; - destination = destination.asUGenInput; - bases = bases.asUGenInput; - activations = activations.asUGenInput; + source = source.asUGenInput; + destination = destination.asUGenInput; + bases = bases.asUGenInput; + activations = activations.asUGenInput; - source.isNil.if {"FluidBufNMF: Invalid source buffer".throw}; + source.isNil.if {"FluidBufNMF: Invalid source buffer".throw}; - server = server ? Server.default; + destination = destination ? -1; + bases = bases ? -1; + activations = activations ? -1; - destination = destination ? -1; - bases = bases ? -1; - activations = activations ? -1; + ^this.multiNew('control',source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize, doneAction); - forkIfNeeded{ - server.sendMsg(\cmd, \BufNMF, source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize); - server.sync; - if (destination != -1) {destination = server.cachedBufferAt(destination); destination.updateInfo; server.sync;} {destination = nil}; - if (bases != -1) {bases = server.cachedBufferAt(bases); bases.updateInfo; server.sync;} {bases = nil}; - if (activations != -1) {activations = server.cachedBufferAt(activations); activations.updateInfo; server.sync;} {activations = nil}; - action.value(destination, bases, activations); - }; - } + } + + + *process { |server, source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, action| + + var synth; + source = source.asUGenInput; + destination = destination.asUGenInput; + bases = bases.asUGenInput; + activations = activations.asUGenInput; + + source.isNil.if {"FluidBufNMF: Invalid source buffer".throw}; + + destination = destination ? -1; + bases = bases ? -1; + activations = activations ? -1; + + synth = {FluidBufNMF.kr(source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize, doneAction: Done.freeSelf)}.play(server); + + synth.waitForFree({ forkIfNeeded{ + server.sync; + if (destination != -1) { + destination = server.cachedBufferAt(destination); + destination.updateInfo; + server.sync; + } {destination = nil}; + if (bases != -1) { + bases = server.cachedBufferAt(bases); + bases.updateInfo; + server.sync; + } {bases = nil}; + if (activations != -1) { + activations = server.cachedBufferAt(activations); + activations.updateInfo; + server.sync; + } {activations = nil}; + action.value(destination, bases, activations); + }}); + } }