Use a thread_local to manage rt cache lifetime.

Rollback to immutable allocator
nix
Owen Green 5 years ago
parent ac163c4a11
commit 52bb79a581

@ -49,20 +49,37 @@ class NonRealTime : public SCUnit
public:
using Cache = std::unordered_map<index, CacheEntryPointer>;
using RTCacheAllocator =
SCWorldAllocator<std::pair<const index, WeakCacheEntryPointer>,Wrapper>;
struct RTCacheMirror: public std::unordered_map<index, WeakCacheEntryPointer, std::hash<index>,
std::equal_to<index>, RTCacheAllocator>
SCWorldAllocator<std::pair<const index, WeakCacheEntryPointer>, Wrapper>;
struct RTCacheMirror
: public std::unordered_map<index, WeakCacheEntryPointer,
std::hash<index>, std::equal_to<index>,
RTCacheAllocator>
{
RTCacheMirror(RTCacheAllocator&& alloc)
: std::unordered_map<index, WeakCacheEntryPointer, std::hash<index>,
std::equal_to<index>, RTCacheAllocator>{
std::move(alloc)}
{
// std::cout << "Warning: And up...\n" << std::endl;
}
~RTCacheMirror()
{
if(mWorld) mWorld = nullptr; //if we get as far as destroying this, then the world is gone and we should accept that and move on
// std::cout << "Warning: And down...\n" << std::endl;
}
};
static Cache mCache;
static RTCacheMirror mRTCache;
static Cache mCache;
static RTCacheMirror& rtCache(World* world)
{
thread_local static RTCacheMirror mRTCache(
RTCacheAllocator(world, Wrapper::getInterfaceTable()));
return mRTCache;
}
private:
static bool isNull(WeakCacheEntryPointer const& weak)
{
@ -70,10 +87,11 @@ private:
!WeakCacheEntryPointer{}.owner_before(weak);
}
static WeakCacheEntryPointer rtget(index id)
static WeakCacheEntryPointer rtget(World* world, index id)
{
auto lookup = mRTCache.find(id);
return lookup == mRTCache.end() ? WeakCacheEntryPointer() : lookup->second;
auto lookup = rtCache(world).find(id);
return lookup == rtCache(world).end() ? WeakCacheEntryPointer()
: lookup->second;
}
using RawCacheEntry = typename Cache::value_type;
@ -84,17 +102,8 @@ private:
{
FifoMsg msg;
auto add = [](FifoMsg* m) {
if(!mWorld) mWorld = m->mWorld;
else if (mWorld != m->mWorld) //internal server has restarted
{
mWorld = nullptr;
mRTCache.clear();
mWorld = m->mWorld;
}
RawCacheEntry* r = static_cast<RawCacheEntry*>(m->mData);
mRTCache.emplace(r->first, r->second);
rtCache(m->mWorld).emplace(r->first, r->second);
};
msg.Set(w, add, nullptr, &r);
auto ft = Wrapper::getInterfaceTable();
@ -115,28 +124,15 @@ private:
FifoMsg msg;
auto remove = [](FifoMsg* m) {
if(!mWorld)
{
mRTCache.clear();
mWorld = m->mWorld; //
}
else if (mWorld != m->mWorld) //internal server has restarted
{
mWorld = nullptr;
mRTCache.clear();
mWorld = m->mWorld;
}
int* id = static_cast<int*>(m->mData);
mRTCache.erase(*id);
rtCache(m->mWorld).erase(*id);
};
auto cleanup = [](FifoMsg* m) { delete static_cast<index*>(m->mData); };
msg.Set(world, remove, cleanup, data);
auto ft = Wrapper::getInterfaceTable();
ft->fSendMsgToRT(world,msg);
ft->fSendMsgToRT(world, msg);
}
};
@ -330,7 +326,7 @@ private:
bool stage2(World* world)
{
cancelCheck(IsRTQueryModel_t(), NRTCommand::mID);
remove(world, NRTCommand::mID);
remove(world, NRTCommand::mID);
NRTCommand::sendReply(name(), true);
return true;
}
@ -464,7 +460,6 @@ private:
Params mParams;
bool mOverwriteParams{false};
WeakCacheEntryPointer mRecord;
};
@ -559,14 +554,12 @@ private:
char* mCompletionMessage;
void* mReplyAddress;
};
Context* c = new Context{world, id, completionMsgSize, completionMessage,
replyAddress};
auto launchCompletionFromNRT = [](FifoMsg* inmsg) {
auto runCompletion = [](FifoMsg* msg) {
Context* c = static_cast<Context*>(msg->mData);
World* world = c->mWorld;
index id = c->mID;
@ -817,7 +810,7 @@ private:
mInterval = static_cast<index>(0.02 / controlDur());
mID = static_cast<index>(mInBuf[0][0]);
std::cout << mID << std::endl;
mRecord = rtget(mID);
mRecord = rtget(mWorld, mID);
set_calc_function<NRTProgressUnit, &NRTProgressUnit::next>();
Wrapper::getInterfaceTable()->fClearUnitOutputs(this, 1);
}
@ -825,7 +818,7 @@ private:
void next(int)
{
if (isNull(mRecord)) { mRecord = rtget(mID); };
if (isNull(mRecord)) { mRecord = rtget(mWorld, mID); };
if (0 == mCounter++)
{
@ -941,7 +934,7 @@ private:
}
else
{
auto record = rtget(mID);
auto record = rtget(mWorld, mID);
if (auto ptr = record.lock())
{
mInit = true;
@ -953,6 +946,7 @@ private:
mDone = mInit;
}
}
private:
bool mPreviousTrigger{false};
bool mTrigger{false};
@ -996,7 +990,7 @@ private:
void init()
{
mInit = false;
mInst = rtget(mID);
mInst = rtget(mWorld, mID);
if (auto ptr = mInst.lock())
{
auto& client = ptr->mClient;
@ -1072,7 +1066,7 @@ private:
static constexpr bool IsRTQueryModel = IsRTQueryModel_t::value;
static constexpr bool IsModel = Client::isModelObject::value;
public:
static void setup(InterfaceTable* ft, const char*)
{
@ -1101,15 +1095,10 @@ public:
void init(){};
static World* getWorld()
{
return mWorld;
}
private:
static World* getWorld() { return mWorld; }
private:
static World* mWorld;
static Result validateParameters(ParamSetType& p)
@ -1152,16 +1141,12 @@ private:
};
template <typename Client, typename Wrapper>
World* NonRealTime<Client,Wrapper>::mWorld{nullptr};
World* NonRealTime<Client, Wrapper>::mWorld{nullptr};
template <typename Client, typename Wrapper>
typename NonRealTime<Client, Wrapper>::Cache
NonRealTime<Client, Wrapper>::mCache{};
template <typename Client, typename Wrapper>
typename NonRealTime<Client, Wrapper>::RTCacheMirror
NonRealTime<Client, Wrapper>::mRTCache{};
} // namespace impl
} // namespace client
} // namespace fluid

@ -19,6 +19,9 @@ namespace fluid {
template <typename T, typename Wrapper>
class SCWorldAllocator
{
World* mWorld;
InterfaceTable* mInterface;
public:
using propagate_on_container_move_assignment = std::true_type;
using value_type = T;
@ -26,22 +29,24 @@ public:
template <typename U, typename W>
friend class SCWorldAllocator;
SCWorldAllocator() = default;
template <typename U,typename W>
SCWorldAllocator(const SCWorldAllocator<U,W>&) noexcept
SCWorldAllocator(World* w, InterfaceTable* interface)
: mWorld{w}, mInterface{interface}
{}
template <typename U, typename W>
SCWorldAllocator(const SCWorldAllocator<U, W>& other) noexcept
{
mWorld = other.mWorld;
mInterface = other.mInterface;
}
T* allocate(std::size_t n)
{
if (n > std::numeric_limits<std::size_t>::max() / sizeof(T))
throw std::bad_array_new_length();
World* world = Wrapper::getWorld();
InterfaceTable* interface = Wrapper::getInterfaceTable();
if (world && interface)
if (auto p = static_cast<T*>(interface->fRTAlloc(world, n * sizeof(T))))
if (mWorld && mInterface)
if (auto p = static_cast<T*>(mInterface->fRTAlloc(mWorld, n * sizeof(T))))
return p;
throw std::bad_alloc();
@ -49,9 +54,7 @@ public:
void deallocate(T* p, std::size_t /*n*/) noexcept
{
World* world = Wrapper::getWorld();
InterfaceTable* interface = Wrapper::getInterfaceTable();
if(world && interface) interface->fRTFree(world, p);
if (mWorld && mInterface) mInterface->fRTFree(mWorld, p);
}
};
} // namespace fluid

Loading…
Cancel
Save