Functional attempt at NRT threading:

* changes to NRT SC Wrapper makes it a UGen, using the RT thread for polling and delegating to SC NRT thread for launching
* changes to FluidBufNMF: Now a UGen with kr (emitting progress updates); process() wraps .kr via Function.play. Sync at end managed through use of doneAction
* TODO: cancelling, checking progress, all the other client (and then the help...)
nix
Owen Green 7 years ago
parent b0a05ce44d
commit b3895c5be6

@ -118,11 +118,7 @@ public:
for (int i = 0; i < static_cast<int>(mClient.controlChannelsOut()); ++i) { mOutputs.emplace_back(nullptr, 0, 0); }
mCalcFunc = make_calc_function<RealTime, &RealTime::next>();
// set_calc_function<RealTime, &RealTime::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
}
void next(int)
@ -158,61 +154,92 @@ protected:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// Non Real Time Processor
/// This is also a UGen, but the main action is delegated off to a worker thread, via the NRT thread.
/// The RT bit is there to allow us (a) to poll our thread and (b) emit a kr progress update
template <typename Client, typename Wrapper>
class NonRealTime
class NonRealTime: public SCUnit
{
using ParamSetType = typename Client::ParamSetType;
public:
static void setup(InterfaceTable *ft, const char *name) { DefinePlugInCmd(name, launch, nullptr); }
NonRealTime(World* w, sc_msg_iter* args)
: mParams{Client::getParameterDescriptors()}
, mClient{Wrapper::setParams(mParams, false, w, args)}
static void setup(InterfaceTable *ft, const char *name) { registerUnit<Wrapper>(ft, name); }
/// Final input is the doneAction, not a param, so we skip it in the controlsIterator
NonRealTime() :
mControlsIterator{mInBuf,static_cast<size_t>(static_cast<ptrdiff_t>(mNumInputs) - mSpecialIndex - 1)}
, mParams{Wrapper::Client::getParameterDescriptors()}
, mClient{Wrapper::setParams(mParams,mWorld->mVerbosity > 0, mWorld, mControlsIterator,true)}
{}
void init(){};
static void launch(World *world, void */*inUserData*/, struct sc_msg_iter *args, void *replyAddr)
/// No option of not using a worker thread for now
/// init() sets up the NRT process via the SC NRT thread, and then sets our UGen calc function going
void init()
{
if (args->tags && ((std::string{args->tags}.size() - 1) != Client::getParameterDescriptors().count()))
mClient.setSynchronous(false);
mFifoMsg.Set(mWorld, initNRTJob, nullptr, this);
mWorld->ft->fSendMsgFromRT(mWorld,mFifoMsg);
set_calc_function<NonRealTime, &NonRealTime::poll>();
};
/// The calc function. Checks to see if we've cancelled, spits out progress, launches tidy up when complete
void poll(int)
{
if(!mClient.done())
{
std::cout << "ERROR: " << Wrapper::getName() << " wrong number of arguments. Expected "
<< Client::getParameterDescriptors().count() << ", got " << (std::string{args->tags}.size() - 1)
<< ". Your .sc file and binary plugin might be different versions." << std::endl;
return;
out0(0) = static_cast<float>(mClient.progress());
// if(in0(0) > 0) mClient.cancel();
}
else {
mDone = true;
mCalcFunc = mWorld->ft->fClearUnitOutputs;
mWorld->ft->fDoAsynchronousCommand(mWorld, nullptr, Wrapper::getName(), this,
postProcess, exchangeBuffers, tidyUp, destroy,
0, nullptr);
}
}
Wrapper *w = new Wrapper(
world, args); // this has to be on the heap, because it doesn't get destroyed until the async command is done
/// To be called on NRT thread. Validate parameters and commence processing in new thread
static void initNRTJob(FifoMsg* f)
{
auto w = static_cast<Wrapper*>(f->mData);
Result result = validateParameters(w);
if (!result.ok())
{
std::cout << "ERROR: " << Wrapper::getName() << ": " << result.message().c_str() << std::endl;
delete w;
return;
w->mDone = true;
return;
}
size_t msgSize = args->getbsize();
std::vector<char> completionMessage(msgSize);
// char * completionMsgData = 0;
if (msgSize) { args->getb(completionMessage.data(), msgSize); }
w->mClient.process();
}
world->ft->fDoAsynchronousCommand(world, replyAddr, Wrapper::getName(), w, process, exchangeBuffers, tidyUp, destroy,
static_cast<int>(msgSize), completionMessage.data());
/// Check result and report if bad
static bool postProcess(World*, void *data)
{
auto w = static_cast<Wrapper*>(data);
Result r;
w->mClient.checkProgress(r);
if(!r.ok())
{
std::cout << "ERROR: " << Wrapper::getName() << ": " << r.message().c_str() << '\n';
return false;
}
return true;
}
static bool process(World *world, void *data) { return static_cast<Wrapper *>(data)->process(world); }
/// swap NRT buffers back to RT-land
static bool exchangeBuffers(World *world, void *data) { return static_cast<Wrapper *>(data)->exchangeBuffers(world); }
/// Tidy up any temporary buffers
static bool tidyUp(World *world, void *data) { return static_cast<Wrapper *>(data)->tidyUp(world); }
static void destroy(World *, void *data) { delete static_cast<Wrapper *>(data); }
protected:
ParamSetType mParams;
Client mClient;
/// Now we're actually properly done, call the UGen's done action (possibly destroying this instance)
static void destroy(World *, void *data)
{
auto w = static_cast<Wrapper*>(data);
w->mWorld->ft->fDoneAction(static_cast<int>(w->mInBuf[w->mNumInputs - 1][0]),w);
}
private:
@ -226,19 +253,6 @@ private:
return {};
}
bool process(World *)
{
Result r = mClient.process();
if (!r.ok())
{
std::cout << "ERROR: " << Wrapper::getName() << ": " << r.message().c_str() << '\n';
return false;
}
return true;
}
bool exchangeBuffers(World *world)
{
mParams.template forEachParamType<BufferT, AssignBuffer>(world);
@ -268,10 +282,16 @@ private:
if (auto b = static_cast<SCBufferAdaptor *>(p.get())) b->cleanUp();
}
};
FloatControlsIter mControlsIterator;
FifoMsg mFifoMsg;
char * mCompletionMessage = nullptr;
void * mReplyAddr = nullptr;
const char *mName = nullptr;
protected:
ParamSetType mParams;
Client mClient;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -298,8 +318,8 @@ template <typename Client, typename Wrapper>
class FluidSCWrapperImpl<Client, Wrapper, std::true_type, std::false_type>
: public NonRealTime<Client, Wrapper>
{
public:
FluidSCWrapperImpl(World* w, sc_msg_iter *args): NonRealTime<Client, Wrapper>(w,args){};
//public:
// FluidSCWrapperImpl(World* w, sc_msg_iter *args): NonRealTime<Client, Wrapper>(w,args){};
};
template <typename Client, typename Wrapper>
@ -322,39 +342,33 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
{
using FloatControlsIter = impl::FloatControlsIter;
// Iterate over arguments in sc_msg_iter, via callbacks from params object
// Iterate over arguments via callbacks from params object
template <typename ArgType, size_t N, typename T>
struct Setter
{
static constexpr size_t argSize = C::getParameterDescriptors().template get<N>().fixedSize;
auto fromArgs(World *, FloatControlsIter& args, LongT::type, int) { return args.next(); }
auto fromArgs(World *, FloatControlsIter& args, FloatT::type, int) { return args.next(); }
auto fromArgs(World *, sc_msg_iter* args, LongT::type, int defVal) { return args->geti(defVal); }
auto fromArgs(World *, sc_msg_iter* args, FloatT::type, int) { return args->getf(); }
auto fromArgs(World *w, ArgType args, BufferT::type, int)
{
typename LongT::type bufnum = static_cast<LongT::type>(fromArgs(w, args, LongT::type(), -1));
return BufferT::type(bufnum >= 0 ? new SCBufferAdaptor(bufnum, w) : nullptr);
}
typename T::type operator()(World *w, ArgType args)
{
ParamLiteralConvertor<T, argSize> a;
using LiteralType = typename ParamLiteralConvertor<T, argSize>::LiteralType;
for (size_t i = 0; i < argSize; i++)
a[i] = static_cast<LiteralType>(fromArgs(w, args, a[0], 0));
return a.value();
}
};
template <size_t N, typename T>
using ArgumentSetter = Setter<sc_msg_iter*, N, T>;
template <size_t N, typename T>
using ControlSetter = Setter<FloatControlsIter&, N, T>;
@ -362,12 +376,7 @@ public:
using Client = C;
using ParameterSetType = typename C::ParamSetType;
FluidSCWrapper() // mParams{*getParamDescriptors()}, //impl::FluidSCWrapperBase<Client,Params>()
{
impl::FluidSCWrapperBase<Client>::init();
}
FluidSCWrapper(World* w, sc_msg_iter *args): impl::FluidSCWrapperBase<Client>(w,args)
FluidSCWrapper()
{
impl::FluidSCWrapperBase<Client>::init();
}
@ -398,15 +407,12 @@ public:
{
p.template setParameterValues<ControlSetter>(verbose, world, inputs);
if (constrain)p.constrainParameterValues();
} else {
std::cout << "ERROR: " << getName() << ": parameter count mismatch. Perhaps your binary plugins and SC sources are different versions";
//TODO: work out how to bring any further work to a halt
}
return p;
}
static auto& setParams(ParameterSetType& p, bool verbose, World* world, sc_msg_iter *args)
{
p.template setParameterValues<ArgumentSetter>(verbose,world, args);
return p;
}
};
template <template<typename T> class Client>

@ -1,27 +1,57 @@
FluidBufNMF {
*process { arg server, source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, action;
FluidBufNMF : UGen {
*kr {|source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, doneAction = 0|
source = source.asUGenInput;
destination = destination.asUGenInput;
bases = bases.asUGenInput;
activations = activations.asUGenInput;
source = source.asUGenInput;
destination = destination.asUGenInput;
bases = bases.asUGenInput;
activations = activations.asUGenInput;
source.isNil.if {"FluidBufNMF: Invalid source buffer".throw};
source.isNil.if {"FluidBufNMF: Invalid source buffer".throw};
server = server ? Server.default;
destination = destination ? -1;
bases = bases ? -1;
activations = activations ? -1;
destination = destination ? -1;
bases = bases ? -1;
activations = activations ? -1;
^this.multiNew('control',source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize, doneAction);
forkIfNeeded{
server.sendMsg(\cmd, \BufNMF, source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize);
server.sync;
if (destination != -1) {destination = server.cachedBufferAt(destination); destination.updateInfo; server.sync;} {destination = nil};
if (bases != -1) {bases = server.cachedBufferAt(bases); bases.updateInfo; server.sync;} {bases = nil};
if (activations != -1) {activations = server.cachedBufferAt(activations); activations.updateInfo; server.sync;} {activations = nil};
action.value(destination, bases, activations);
};
}
}
*process { |server, source, startFrame = 0, numFrames = -1, startChan = 0, numChans = -1, destination, bases, basesMode = 0, activations, actMode = 0, components = 1, iterations = 100, windowSize = 1024, hopSize = -1, fftSize = -1, windowType = 0, randomSeed = -1, action|
var synth;
source = source.asUGenInput;
destination = destination.asUGenInput;
bases = bases.asUGenInput;
activations = activations.asUGenInput;
source.isNil.if {"FluidBufNMF: Invalid source buffer".throw};
destination = destination ? -1;
bases = bases ? -1;
activations = activations ? -1;
synth = {FluidBufNMF.kr(source, startFrame, numFrames, startChan, numChans, destination, bases, basesMode, activations, actMode, components, iterations, windowSize, hopSize, fftSize, doneAction: Done.freeSelf)}.play(server);
synth.waitForFree({ forkIfNeeded{
server.sync;
if (destination != -1) {
destination = server.cachedBufferAt(destination);
destination.updateInfo;
server.sync;
} {destination = nil};
if (bases != -1) {
bases = server.cachedBufferAt(bases);
bases.updateInfo;
server.sync;
} {bases = nil};
if (activations != -1) {
activations = server.cachedBufferAt(activations);
activations.updateInfo;
server.sync;
} {activations = nil};
action.value(destination, bases, activations);
}});
}
}

Loading…
Cancel
Save