Fix instability in batch processing: todo revert to SC memory pool and tidy up

nix
Owen Green 5 years ago
parent 63832bc512
commit d060839475

@ -59,9 +59,12 @@ struct WrapperState
std::cout << "Processing Cancelled" << std::endl;
}
}
};
template <typename Client>
using SharedState = std::shared_ptr<WrapperState<Client>>;
/// Named, shared clients already have a lookup table in their adaptor class
template <typename T>
struct IsPersistent
@ -348,7 +351,7 @@ template <typename Client, typename Wrapper>
class NonRealTime : public SCUnit
{
using ParamSetType = typename Client::ParamSetType;
using SharedState = std::shared_ptr<WrapperState<Client>>;
// using SharedState = std::shared_ptr<WrapperState<Client>>;
public:
static index ControlOffset(Unit*) { return 0; }
@ -436,11 +439,14 @@ public:
auto& sharedState = mWrapper->state();
mWrapper->mDone = sharedState->mJobDone;
if(trigger)
{
SharedState* statePtr = static_cast<SharedState*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState)));
statePtr = new (statePtr) SharedState(sharedState);
mFifoMsg.Set(mWorld, initNRTJob, nullptr, statePtr);
if(trigger && !sharedState->mInNRT)
{
// SharedState<Client>* statePtr = static_cast<SharedState<Client>*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState<Client>)));
SharedState<Client>* statePtr = new SharedState<Client>(sharedState);
// statePtr = new (statePtr) SharedState<Client>(sharedState);
mFifoMsg.Set(mWorld, initNRTJob, [](FifoMsg* m){
delete static_cast<SharedState<Client>*>(m->mData);
}, statePtr);
mWorld->ft->fSendMsgFromRT(mWorld, mFifoMsg);
return;
}
@ -449,12 +455,13 @@ public:
{
sharedState->mInNRT = true;
SharedState* statePtr = static_cast<SharedState*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState)));
statePtr = new (statePtr) SharedState(sharedState);
// SharedState<Client>* statePtr = static_cast<SharedState<Client>*>(mWorld->ft->fRTAlloc(mWorld, sizeof(SharedState<Client>)));
SharedState<Client>* statePtr = new SharedState<Client>(sharedState);
// statePtr = new (statePtr) SharedState<Client>(sharedState);
mWorld->ft->fDoAsynchronousCommand(mWorld, nullptr, Wrapper::getName(),
statePtr, postProcess, exchangeBuffers,
tidyUp, destroy, 0, nullptr);
}
}
pollCounter %= checkThreadInterval;
}
@ -464,34 +471,37 @@ public:
static void initNRTJob(FifoMsg* f)
{
if(!f->mData) return;
auto w = static_cast<SharedState*>(f->mData);
SharedState& s = *w;
auto w = static_cast<SharedState<Client>*>(f->mData);
SharedState<Client>& s = *w;
Result result = validateParameters(s->params);
if (!result.ok())
{
std::cout << "ERROR: " << Wrapper::getName() << ": "
<< result.message().c_str() << std::endl;
s->mInNRT = false;
return;
}
s->client.setSynchronous(s->mSynchronous);
result = s->client.enqueue(s->params);
if (!result.ok())
else
{
std::cout << "ERROR: " << Wrapper::getName() << ": "
<< result.message().c_str() << std::endl;
s->mInNRT = false;
return;
s->client.setSynchronous(s->mSynchronous);
result = s->client.enqueue(s->params);
if (!result.ok())
{
std::cout << "ERROR: " << Wrapper::getName() << ": "
<< result.message().c_str() << std::endl;
}
else
{
s->mJobDone = false;
s->mCancelled = false;
s->mResult = s->client.process();
s->mHasTriggered = true;
}
}
s->mJobDone = false;
s->mCancelled = false;
s->mHasTriggered = true;
s->mResult = s->client.process();
s->mInNRT = false;
w->~SharedState();
f->mWorld->ft->fRTFree(f->mWorld,w);
using namespace std;
// w->~shared_ptr<WrapperState<Client>>();
// f->mWorld->ft->fRTFree(f->mWorld,w);
// delete w;
}
/// Check result and report if bad
@ -500,8 +510,9 @@ public:
if(!data) return false;
auto& w = *static_cast<SharedState*>(data);
auto& w = *static_cast<SharedState<Client>*>(data);
Result r;
assert(w->mNode != nullptr) ;
w->mInNRT = true;
ProcessState s = w->client.checkProgress(r);
@ -549,7 +560,7 @@ public:
static bool exchangeBuffers(World* world, void* data)
{
if(!data) return false;
SharedState& s = *(static_cast<SharedState*>(data));
SharedState<Client>& s = *(static_cast<SharedState<Client>*>(data));
if(!s->mNodeAlive) return false;
s->params.template forEachParamType<BufferT, AssignBuffer>(world);
// At this point, we can see if we're finished and let the language know (or
@ -562,23 +573,25 @@ public:
return true;
}
/// Tidy up any temporary buffers
static bool tidyUp(World* , void* data)
{
if(!data) return false;
SharedState& s = *(static_cast<SharedState*>(data));
SharedState<Client>& s = *(static_cast<SharedState<Client>*>(data));
s->params.template forEachParamType<BufferT, impl::CleanUpBuffer>();
return true;
}
/// if we're properly done set the Unit done flag
static void destroy(World* world, void* data)
{
if(!data) return;
auto& s = *static_cast<SharedState*>(data);
auto& s = *static_cast<SharedState<Client>*>(data);
s->mInNRT = false;
s.~SharedState();
world->ft->fRTFree(world,data);
using namespace std;
// s.~shared_ptr<WrapperState<Client>>();
// world->ft->fRTFree(world,data);
delete static_cast<SharedState<Client>*>(data);
}
static void doCancel(Unit* unit, sc_msg_iter*)
@ -875,7 +888,7 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
using FloatControlsIter = impl::FloatControlsIter;
using SharedState = std::shared_ptr<WrapperState<C>>;
// using SharedState = std::shared_ptr<WrapperState<C>>;
//I would like to template these to something more scaleable, but baby steps
friend class impl::RealTime<C,FluidSCWrapper>;
friend class impl::NonRealTime<C,FluidSCWrapper>;
@ -978,22 +991,24 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
auto ft = FluidSCWrapper::getInterfaceTable();
auto w = x->mWorld;
char* chunk =
static_cast<char*>(ft->fRTAlloc(w, asUnsigned(size + 1)));
if (!chunk)
{
std::cout << "ERROR: " << FluidSCWrapper::getName()
<< ": RT memory allocation failed\n";
return std::string{""};
}
// char* chunk =
// static_cast<char*>(ft->fRTAlloc(w, asUnsigned(size + 1)));
std::string res;
res.resize(size);
// if (!chunk)
// {
// std::cout << "ERROR: " << FluidSCWrapper::getName()
// << ": RT memory allocation failed\n";
// return std::string{""};
// }
for (index i = 0; i < size; ++i)
chunk[i] = static_cast<char>(args.next());
res[i] = static_cast<char>(args.next());
chunk[size] = 0; // terminate string
auto res = std::string{chunk};
ft->fRTFree(w,chunk);
// res[size] = 0; // terminate string
// auto res = std::string{chunk};
// ft->fRTFree(w,chunk);
return res;
}
@ -1232,7 +1247,7 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
struct MessageDispatch
{
static constexpr size_t Message = N;
SharedState state;
SharedState<C> state;
ArgTuple args;
Ret result;
std::string name;
@ -1276,8 +1291,9 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
using IndexList = typename MessageDescriptor::IndexList;
using MessageData = MessageDispatch<N, ReturnType, ArgTuple>;
auto ft = getInterfaceTable();
void* msgptr = ft->fRTAlloc(x->mWorld, sizeof(MessageData));
MessageData* msg = new (msgptr) MessageData;
// void* msgptr = ft->fRTAlloc(x->mWorld, sizeof(MessageData));
// MessageData* msg = new (msgptr) MessageData;
MessageData* msg = new MessageData();
msg->name = '/' + Client::getMessageDescriptors().template name<N>();
msg->state = x->state();
ArgTuple& args = msg->args;
@ -1345,8 +1361,9 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
if(!willContinue)
{
msg->~MessageData();
ft->fRTFree(x->mWorld, msgptr);
// msg->~MessageData();
// ft->fRTFree(x->mWorld, msgptr);
delete msg;
return;
}
ForEach(args,[x,&inArgs](auto& arg){
@ -1390,21 +1407,22 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
[](World* w, void* data) // RT thread: clean up
{
MessageData* m = static_cast<MessageData*>(data);
m->~MessageData();
getInterfaceTable()->fRTFree(w, data);
// m->~MessageData();
// getInterfaceTable()->fRTFree(w, data);
delete m;
},
0, nullptr);
}
template <size_t N, typename ArgsTuple, size_t... Is> // Call from NRT
static decltype(auto) invokeImpl(SharedState& x, ArgsTuple& args,
static decltype(auto) invokeImpl(SharedState<C>& x, ArgsTuple& args,
std::index_sequence<Is...>)
{
return x->client.template invoke<N>(x->client, std::get<Is>(args)...);
}
template <typename T> // call from RT
static void messageOutput(SharedState& x, const std::string& s,
static void messageOutput(SharedState<C>& x, const std::string& s,
MessageResult<T>& result)
{
auto ft = getInterfaceTable();
@ -1419,17 +1437,22 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
return;
}
float* values = static_cast<float*>(
ft->fRTAlloc(x->mNode->mWorld, asUnsigned(numArgs) * sizeof(float)));
// float* values = static_cast<float*>(
// ft->fRTAlloc(x->mNode->mWorld, asUnsigned(numArgs) * sizeof(float)));
float* values = new float[numArgs];
// copy return data
ToFloatArray::convert(values, static_cast<T>(result));
ft->fSendNodeReply(x->mNode, -1, s.c_str(),
static_cast<int>(numArgs), values);
// ft->fRTFree(x->mNode->mWorld,values);
delete[] values;
}
static void messageOutput(SharedState& x, const std::string& s,
static void messageOutput(SharedState<C>& x, const std::string& s,
MessageResult<void>&)
{
auto ft = getInterfaceTable();
@ -1438,7 +1461,7 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
}
template <typename... Ts>
static void messageOutput(SharedState& x, const std::string& s,
static void messageOutput(SharedState<C>& x, const std::string& s,
MessageResult<std::tuple<Ts...>>& result)
{
auto ft = getInterfaceTable();
@ -1455,14 +1478,18 @@ class FluidSCWrapper : public impl::FluidSCWrapperBase<C>
return;
}
float* values = static_cast<float*>(
ft->fRTAlloc(x->mNode->mWorld, asUnsigned(numArgs) * sizeof(float)));
// float* values = static_cast<float*>(
// ft->fRTAlloc(x->mNode->mWorld, asUnsigned(numArgs) * sizeof(float)));
float* values = new float[numArgs];
ToFloatArray::convert(values, std::tuple<Ts...>(result), offsets,
std::index_sequence_for<Ts...>());
ft->fSendNodeReply(x->mNode, -1, s.c_str(),
static_cast<int>(numArgs), values);
// ft->fRTFree(x->mNode->mWorld,values);
delete[] values;
}
@ -1493,7 +1520,7 @@ public:
}
std::shared_ptr<WrapperState<Client>>& state() { return mState; }
SharedState<Client>& state() { return mState; }
static const char* getName(const char* setName = nullptr)
@ -1539,7 +1566,7 @@ public:
return p;
}
static void printResult(SharedState& x, Result& r)
static void printResult(SharedState<C>& x, Result& r)
{
if (!x.get() || !x->mNodeAlive) return;

Loading…
Cancel
Save