Use (hacky) NRT SendReply for responses

This solves problems wih behaviour w/r/t server sync
nix
Owen Green 5 years ago
parent c0ec922564
commit 9eb2a44a69

@ -99,5 +99,103 @@ namespace client {
(convert(f + offsets[Is], std::get<Is>(t)), 0)...};
}
};
template<typename Packet>
struct ToOSCTypes
{
static index numTags(typename BufferT::type) { return 1; }
template <typename T>
static std::enable_if_t<
std::is_integral<T>::value || std::is_floating_point<T>::value, index>
numTags(T)
{
return 1;
}
static index numTags(std::string)
{
return 1;;
}
template <typename T>
static index numTags(FluidTensor<T, 1> s)
{
return s.size();
}
template <typename... Ts>
static index numTags(std::tuple<Ts...>&&)
{
return std::tuple_size<std::tuple<Ts...>>::value;
};
static void getTag(Packet& p, typename BufferT::type) { p.addtag('i'); }
template <typename T>
static std::enable_if_t<std::is_integral<std::decay_t<T>>::value>
getTag(Packet& p, T&&) { p.addtag('i'); }
template <typename T>
static std::enable_if_t<std::is_floating_point<std::decay_t<T>>::value>
getTag(Packet& p, T&&) { p.addtag('f'); }
static void getTag (Packet& p, std::string) { p.addtag('s'); }
template <typename T>
static void getTag(Packet& p, FluidTensor<T, 1> x)
{
T dummy{};
for (int i = 0; i < x.rows(); i++)
getTag(p, dummy);
}
template <typename... Ts, size_t... Is>
static void getTag(Packet& p, std::tuple<Ts...>&& t)
{
ForEach(t,[&p](auto& x){getTag(p,x);});
}
static void convert(Packet& p, typename BufferT::type buf)
{
p.addi(static_cast<SCBufferAdaptor*>(buf.get())->bufnum());
}
template <typename T>
static std::enable_if_t<std::is_integral<T>::value>
convert(Packet& p, T x)
{
p.addi(x);
}
template <typename T>
static std::enable_if_t<std::is_floating_point<T>::value>
convert(Packet& p, T x)
{
p.addf(x);
}
static void convert(Packet& p, std::string s)
{
p.adds(s.c_str());
}
template <typename T>
static void convert(Packet& p, FluidTensor<T, 1> s)
{
for(auto& x: s) convert(p,x);
}
template <typename... Ts, size_t... Is>
static void convert(Packet& p, std::tuple<Ts...>&& t)
{
ForEach(t,[&p](auto& x){ convert(p,x);});
}
};
}
}

@ -2,6 +2,7 @@
#include "ArgsFromClient.hpp"
#include "ArgsToClient.hpp"
#include <scsynthsend.h>
namespace fluid {
namespace client {
@ -27,6 +28,7 @@ struct FluidSCMessaging{
ReturnType result;
std::string name;
IndexList argIndices;
void* replyAddr{nullptr};
};
@ -123,6 +125,7 @@ struct FluidSCMessaging{
auto msg = new MessageData();
msg->id = args->geti();
msg->replyAddr = copyReplyAddress(replyAddr);
///TODO make this step contingent on verbosity or something, in the name of effieciency
bool willContinue = validateMessageArgs(msg, args);
@ -167,7 +170,7 @@ struct FluidSCMessaging{
return true;
},
[](World* world, void* data) // RT thread: response
[](World* world, void* data) // RT thread: buffer swap (and possible completion messages)
{
MessageData* m = static_cast<MessageData*>(data);
MessageData::Descriptor::template forEachArg<typename BufferT::type,
@ -175,19 +178,16 @@ struct FluidSCMessaging{
world);
return true;
},
nullptr, // NRT Thread: No-op
[](World* world, void* data) // RT thread: clean up
[](World* world, void* data) // NRT Thread: Send reply
{
MessageData* m = static_cast<MessageData*>(data);
if(m->result.status() != Result::Status::kError)
messageOutput(m->name, m->id, m->result, world);
else
{
auto ft = getInterfaceTable();
ft->fSendNodeReply(ft->fGetNode(world,0),-1, m->name.c_str(),0, nullptr);
}
messageOutput(m->name, m->id, m->result, m->replyAddr);
return false;
},
[](World*, void* data) // RT thread: clean up
{
MessageData* m = static_cast<MessageData*>(data);
delete m;
},
static_cast<int>(completionMsgSize), completionMsgData);
@ -201,61 +201,67 @@ struct FluidSCMessaging{
}
template <typename T> // call from RT
static void messageOutput(const std::string& s, index id, MessageResult<T>& result, World* world)
static void messageOutput(const std::string& s, index id, MessageResult<T>& result, void* replyAddr)
{
auto ft = getInterfaceTable();
// allocate return values
index numArgs = ToFloatArray::allocSize(static_cast<T>(result));
if(numArgs > 2048)
index numTags = ToOSCTypes<small_scpacket>::numTags(static_cast<T>(result));
if(numTags > 2048)
{
std::cout << "ERROR: Message response too big to send (" << asUnsigned(numArgs) * sizeof(float) << " bytes)." << std::endl;
std::cout << "ERROR: Message response too big to send (" << asUnsigned(numTags) * sizeof(float) << " bytes)." << std::endl;
return;
}
float* values = new float[asUnsigned(numArgs)];
// copy return data
ToFloatArray::convert(values, static_cast<T>(result));
ft->fSendNodeReply(ft->fGetNode(world,0), static_cast<int>(id), s.c_str(),
static_cast<int>(numArgs), values);
delete[] values;
small_scpacket packet;
packet.adds(s.c_str());
packet.maketags(static_cast<int>(numTags) + 2);
packet.addtag(',');
packet.addtag('i');
ToOSCTypes<small_scpacket>::getTag(packet, static_cast<T>(result));
packet.addi(static_cast<int>(id));
ToOSCTypes<small_scpacket>::convert(packet, static_cast<T>(result));
if(replyAddr)
::SendReply(static_cast<ReplyAddress*>(replyAddr),packet.data(),static_cast<int>(packet.size()));
}
static void messageOutput(const std::string& s,index id, MessageResult<void>&, World* world)
static void messageOutput(const std::string& s,index id, MessageResult<void>&, void* replyAddr)
{
auto ft = getInterfaceTable();
ft->fSendNodeReply(ft->fGetNode(world,0), static_cast<int>(id), s.c_str(), 0, nullptr);
small_scpacket packet;
packet.adds(s.c_str());
packet.maketags(2);
packet.addtag(',');
packet.addtag('i');
packet.addi(static_cast<int>(id));
if(replyAddr)
::SendReply(static_cast<ReplyAddress*>(replyAddr),packet.data(),static_cast<int>(packet.size()));
}
template <typename... Ts>
static void messageOutput(const std::string& s, index id, MessageResult<std::tuple<Ts...>>& result, World* world)
static void messageOutput(const std::string& s, index id, MessageResult<std::tuple<Ts...>>& result, void* replyAddr)
{
std::array<index, sizeof...(Ts)> offsets;
index numArgs;
std::tie(offsets, numArgs) =
ToFloatArray::allocSize(static_cast<std::tuple<Ts...>>(result));
using T = std::tuple<Ts...>;
if(numArgs > 2048)
index numTags = ToOSCTypes<small_scpacket>::numTags(static_cast<T>(result));
if(numTags > 2048)
{
std::cout << "ERROR: Message response too big to send (" << asUnsigned(numArgs) * sizeof(float) << " bytes)." << std::endl;
std::cout << "ERROR: Message response too big to send (" << asUnsigned(numTags) * sizeof(float) << " bytes)." << std::endl;
return;
}
float* values = new float[asUnsigned(numArgs)];
ToFloatArray::convert(values, std::tuple<Ts...>(result), offsets,
std::index_sequence_for<Ts...>());
auto ft = getInterfaceTable();
ft->fSendNodeReply(ft->fGetNode(world,0), static_cast<int>(id), s.c_str(),
static_cast<int>(numArgs), values);
small_scpacket packet;
packet.adds(s.c_str());
packet.maketags(static_cast<int>(numTags + 3));
packet.addtag(',');
packet.addtag('i');
ToOSCTypes<small_scpacket>::getTag(packet,static_cast<T>(result));
delete[] values;
packet.addi(static_cast<int>(id));
ToOSCTypes<small_scpacket>::convert(packet, static_cast<T>(result));
if(replyAddr)
::SendReply(static_cast<ReplyAddress*>(replyAddr),packet.data(),static_cast<int>(packet.size()));
}
};
}

@ -10,6 +10,7 @@
#include <clients/common/FluidBaseClient.hpp>
#include <SC_PlugIn.hpp>
#include <SC_ReplyImpl.hpp>
#include <scsynthsend.h>
#include <map>
namespace fluid {
@ -96,7 +97,7 @@ namespace impl {
struct NRTCommand
{
NRTCommand(World*, sc_msg_iter* args, bool consumeID = true)
NRTCommand(World*, sc_msg_iter* args, void* replyAddr, bool consumeID = true)
{
auto count = args->count;
auto pos = args->rdpos;
@ -108,34 +109,51 @@ namespace impl {
args->count = count;
args->rdpos = pos;
}
if(replyAddr)
mReplyAddress = copyReplyAddress(replyAddr);
}
~NRTCommand()
{
if(mReplyAddress) deleteReplyAddress(mReplyAddress);
}
NRTCommand(){}
explicit NRTCommand(index id):mID{id}{}
bool stage2(World*) { std::cout << "Nope\n"; return true; } //nrt
bool stage2(World*) { return true; } //nrt
bool stage3(World*) { return true; } //rt
bool stage4(World*) { return true; } //nrt
bool stage4(World*) { return false; } //nrt
void cleanup(World*) {} //rt
void sendReply(World* world, const char* name,bool success)
void sendReply(const char* name,bool success)
{
if(mID < 0) return;
auto ft = getInterfaceTable();
auto root = ft->fGetNode(world,0);
float res[2] = {static_cast<float>(success),static_cast<float>(mID)};
std::string slashname = std::string{'/'} + name;
ft->fSendNodeReply(root,0, slashname.c_str(), 2, res);
if(mReplyAddress)
{
std::string slash{"/"};
small_scpacket packet;
packet.adds((slash+name).c_str());
packet.maketags(3);
packet.addtag(',');
packet.addtag('i');
packet.addtag('i');
packet.addi(success);
packet.addi(static_cast<int>(mID));
::SendReply(static_cast<ReplyAddress*>(mReplyAddress),packet.data(), static_cast<int>(packet.size()));
}
}
// protected:
index mID;
void* mReplyAddress{nullptr};
};
struct CommandNew : public NRTCommand
{
CommandNew(World* world, sc_msg_iter* args)
: NRTCommand{world,args, !IsNamedShared_v<Client>},
CommandNew(World* world, sc_msg_iter* args,void* replyAddr)
: NRTCommand{world,args, replyAddr, !IsNamedShared_v<Client>},
mParams{Client::getParameterDescriptors()}
{
mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, *args);
@ -168,15 +186,12 @@ namespace impl {
auto ptr = get(NRTCommand::mID).lock();
ptr->mClient.setParams(ptr->mParams);
}
NRTCommand::sendReply(name(),mResult);
return mResult;
}
bool stage3(World* w)
{
NRTCommand::sendReply(w, name(), mResult);
return true;
}
private:
bool mResult;
Params mParams;
@ -218,14 +233,10 @@ namespace impl {
{
CancelCheck<IsRTQueryModel>()(NRTCommand::mID);
remove(NRTCommand::mID);
NRTCommand::sendReply(name(), true);
return true;
}
bool stage3(World* w)
{
NRTCommand::sendReply(w, name(), true);
return true;
}
};
@ -235,7 +246,7 @@ namespace impl {
CommandAsyncComplete(World*, index id, void* replyAddress)
{
NRTCommand::mID = id;
mReplyAddress = replyAddress;
NRTCommand::mReplyAddress = replyAddress;
}
static const char* name() { return CommandProcess::name(); }
@ -243,7 +254,7 @@ namespace impl {
bool stage2(World* world)
{
std::cout << "In Async completion\n";
// std::cout << "In Async completion\n";
if(auto ptr = get(NRTCommand::mID).lock())
{
Result r;
@ -278,7 +289,6 @@ namespace impl {
{
auto& params = ptr->mParams;
params.template forEachParamType<BufferT, AssignBuffer>(world);
//NRTCommand::sendReply(world, name(), true);
return true;
}
return false;
@ -289,29 +299,28 @@ namespace impl {
if(auto ptr = get(NRTCommand::mID).lock())
{
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if(NRTCommand::mID >= 0 && NRTCommand::mReplyAddress)
{
NRTCommand::sendReply(name(),mSuccess);
}
return true;
}
return false;
}
void cleanup(World* world) {
if(auto ptr = get(NRTCommand::mID).lock() && NRTCommand::mID >= 0)
NRTCommand::sendReply(world, name(), mSuccess);
if(mReplyAddress)
{
ReplyAddress* r = static_cast<ReplyAddress*>(mReplyAddress);
delete r;
}
void cleanup(World*) {
if(NRTCommand::mReplyAddress)
deleteReplyAddress(NRTCommand::mReplyAddress);
} //rt
bool mSuccess;
void* mReplyAddress{nullptr};
};
static void doProcessCallback(World* world, index id,size_t completionMsgSize,char* completionMessage,void* replyAddress)
{
std::cout << "In callback\n";
// std::cout << "In callback\n";
auto ft = getInterfaceTable();
struct Context{
@ -325,7 +334,7 @@ namespace impl {
Context* c = new Context{world,id,completionMsgSize,completionMessage,replyAddress};
auto runCompletion = [](FifoMsg* msg){
std::cout << "In FIFOMsg\n";
// std::cout << "In FIFOMsg\n";
Context* c = static_cast<Context*>(msg->mData);
World* world = c->mWorld;
index id = c->mID;
@ -348,7 +357,7 @@ namespace impl {
struct CommandProcess: public NRTCommand
{
CommandProcess(World* world, sc_msg_iter* args): NRTCommand{world, args}
CommandProcess(World* world, sc_msg_iter* args, void* replyAddr): NRTCommand{world, args, replyAddr}
{
auto& ar = *args;
@ -391,7 +400,7 @@ namespace impl {
index id = NRTCommand::mID;
size_t completionMsgSize = mCompletionMsgSize;
char* completionMessage = mCompletionMessage;
void* replyAddress = copyReplyAddress(mReplyAddr);
void* replyAddress = copyReplyAddress(NRTCommand::mReplyAddress);
auto callback = [world,id,completionMsgSize,completionMessage,replyAddress](){
doProcessCallback(world,id,completionMsgSize,completionMessage,replyAddress);
@ -425,7 +434,7 @@ namespace impl {
// NRTCommand::sendReply(world, name(), mResult.ok());
return true;
}
std::cout << "Ohno\n";
// std::cout << "Ohno\n";
return false;
}
@ -435,31 +444,25 @@ namespace impl {
if(auto ptr = get(NRTCommand::mID).lock())
{
ptr->mParams.template forEachParamType<BufferT, impl::CleanUpBuffer>();
if(NRTCommand::mID >= 0 && mSynchronous)
NRTCommand::sendReply(name(), mResult.ok());
return true;
}
return false;
}
void cleanup(World* world) {
if(auto ptr = get(NRTCommand::mID).lock() && NRTCommand::mID >= 0 && mSynchronous)
NRTCommand::sendReply(world, name(), mResult.ok());
if(mReplyAddr)
deleteReplyAddress(mReplyAddr);
// getInterfaceTable()->fRTFree(world,mReplyAddr);
} //rt
bool synchronous()
{
return mSynchronous;
}
void addCompletionMessage(size_t size, char* message, void* addr)
void addCompletionMessage(size_t size, char* message)//, void* addr)
{
mCompletionMsgSize = size;
mCompletionMessage = message;
mReplyAddr = copyReplyAddress(addr);
// ReplyAddress* rr = static_cast<ReplyAddress*>(addr);
// if(addr)mReplyAddr = *rr;
}
// private:
@ -467,17 +470,16 @@ namespace impl {
bool mSynchronous;
size_t mCompletionMsgSize{0};
char* mCompletionMessage{nullptr};
void* mReplyAddr{nullptr};
// sc_msg_iter* mOSCData;
};
struct CommandProcessNew: public NRTCommand
{
CommandProcessNew(World* world, sc_msg_iter* args)
: mNew{world,args},
CommandProcessNew(World* world, sc_msg_iter* args,void* replyAddr)
: mNew{world, args, replyAddr},
mProcess{mNew.mID,false}
{
mProcess.mSynchronous = args->geti();
mProcess.mReplyAddress = mNew.mReplyAddress;
}
CommandProcessNew(index id, World* world, FloatControlsIter& args, Unit* x)
@ -501,14 +503,15 @@ namespace impl {
return mProcess.stage3(world);
}
bool stage4(World* world) //rt
bool stage4(World* world) //nrt
{
return mProcess.stage4(world);
}
void cleanup(World* world)
{
mProcess.cleanup(world);
mProcess.mReplyAddress = nullptr;
mProcess.cleanup(world);
}
bool synchronous()
@ -516,9 +519,9 @@ namespace impl {
return mProcess.synchronous();
}
void addCompletionMessage(size_t size, char* message,void* addr)
void addCompletionMessage(size_t size, char* message)
{
mProcess.addCompletionMessage(size, message,addr);
mProcess.addCompletionMessage(size, message);
}
private:
@ -529,7 +532,8 @@ namespace impl {
struct CommandCancel: public NRTCommand
{
CommandCancel(World* world, sc_msg_iter* args): NRTCommand{world, args}
CommandCancel(World* world, sc_msg_iter* args, void* replyAddr)
: NRTCommand{world, args, replyAddr}
{}
static const char* name()
@ -555,8 +559,8 @@ namespace impl {
struct CommandSetParams: public NRTCommand
{
CommandSetParams(World* world, sc_msg_iter* args)
: NRTCommand{world, args}
CommandSetParams(World* world, sc_msg_iter* args, void* replyAddr)
: NRTCommand{world, args, replyAddr}
{
auto& ar = *args;
if(auto ptr = get(NRTCommand::mID).lock())
@ -564,7 +568,6 @@ namespace impl {
ptr->mParams.template setParameterValuesRT<ParamsFromOSC>(nullptr, world, ar);
Result result = validateParameters(ptr->mParams);
ptr->mClient.setParams(ptr->mParams);
NRTCommand::sendReply(world, name(), result.ok());
} else printNotFound(NRTCommand::mID);
}
@ -602,7 +605,7 @@ namespace impl {
{
if(!cmd->synchronous())
{
cmd->addCompletionMessage(completionMsgSize,completionMsgData,replyAddr);
cmd->addCompletionMessage(completionMsgSize,completionMsgData);
return runAsyncCommand<CommandProcess>(world, cmd, replyAddr, 0, nullptr);
}
else return runAsyncCommand<CommandProcess>(world, cmd, replyAddr, completionMsgSize, completionMsgData);
@ -613,7 +616,7 @@ namespace impl {
{
if(!cmd->synchronous())
{
cmd->addCompletionMessage(completionMsgSize,completionMsgData,replyAddr);
cmd->addCompletionMessage(completionMsgSize,completionMsgData);
return runAsyncCommand<CommandProcessNew>(world, cmd, replyAddr, 0, nullptr);
}
else return runAsyncCommand<CommandProcessNew>(world, cmd, replyAddr, completionMsgSize, completionMsgData);
@ -629,7 +632,7 @@ namespace impl {
auto ft = getInterfaceTable();
void* space = ft->fRTAlloc(world,sizeof(Command));
Command* cmd = new (space) Command(world, args);
Command* cmd = new (space) Command(world, args, replyAddr);
//This is brittle, but can't think of something better offhand
//This is the only place we can check for a completion message at the end of the OSC packet
//beause it has to be passed on to DoAsynhronousCommand at this point. However, detecting correctly
@ -840,7 +843,7 @@ namespace impl {
struct DefineCommandIf<true, CommandType>
{
void operator()() {
std::cout << CommandType::name() << std::endl;
// std::cout << CommandType::name() << std::endl;
defineNRTCommand<CommandType>();
}
};
@ -873,7 +876,6 @@ namespace impl {
DefineCommandIf<!IsRTQueryModel, CommandCancel>()();
DefineCommandIf<IsModel,CommandSetParams>()();
// DefineCommandIf<IsRTQueryModel,CommandGetParams>()();
defineNRTCommand<CommandFree>();
RegisterUnitIf<!IsRTQueryModel,NRTProgressUnit>()(ft);

@ -17,15 +17,12 @@ FluidMessageResponse : Object
}
*string{ |a, offset|
var split = a.find([0],offset);
var res;
if(split.isNil) {"ERROR: can't parse string from server".throw};
^[[a.copyRange(offset,split-1).keep(split).collectAs({|x|x.asInteger.asAscii},String)], split + 1]
^[a]
}
*strings {|a,offset|
//TODO add an n argument as with numbers() to make this less omnivorous
^[a.drop(offset).drop(-1).collectAs({|x|x.asInteger.asAscii},String).split(0.asAscii)]
^[a.drop(offset)];
}
*numbers{ |a, n, offset|
@ -38,7 +35,7 @@ FluidMessageResponse : Object
}
*buffer{ |a,server,offset|
server = server ? Server.default ;
server = server ? Server.default ;
^[Buffer.cachedBufferAt(server, a[offset]), offset + 1]
}
}

@ -199,23 +199,18 @@ FluidDataObject : FluidServerObject
if(serverCaches[this].includesKey(server,\messageResponder).not)
{
serverCaches[this].put(server,\messageResponder,OSCFunc.new({|m|
var id = m[2].asInteger;
var method;
// "I'm in the maccydees".postln;
serverCaches[this].at(server,id) !? { |p|
// "I'm in the burger king".postln ;
// m.postln;
method = m[0].asString.findRegexp("/"++this.name++"/(.*)")[1][1].asSymbol;
/* p.postln;
p.actions[method].postln;*/
p.actions[method] !? {|a|
//two items: parser and action
var parser = a[0];
var action = a[1];
var result = FluidMessageResponse.collectArgs(parser,m[3..]);
action.value(result);
}
var id = m[1].asInteger;
var method;
serverCaches[this].at(server,id) !? { |p|
method = m[0].asString.findRegexp("/"++this.name++"/(.*)")[1][1].asSymbol;
p.actions[method] !? {|a|
//two items: parser and action
var parser = a[0];
var action = a[1];
var result = FluidMessageResponse.collectArgs(parser,m[2..]);
action.value(result);
}
}
},'/' ++ this.objectClassName ++ '/*',server.addr, dispatcher:FluidOSCPatternInversion.new).fix)
}
}

Loading…
Cancel
Save