diff --git a/service/rest_proxy.cc b/service/rest_proxy.cc deleted file mode 100644 index b60c1703..00000000 --- a/service/rest_proxy.cc +++ /dev/null @@ -1,380 +0,0 @@ -/* rest_proxy.cc - Jeremy Barnes, 14 November 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - -*/ - -#include "rest_proxy.h" -#include "jml/arch/exception_handler.h" - -using namespace std; -using namespace ML; - -namespace Datacratic { - -/*****************************************************************************/ -/* REST PROXY */ -/*****************************************************************************/ - -RestProxy:: -RestProxy() - : operationQueue(1024), - numMessagesOutstanding_(0), - currentOpId(1) -{ - // What to do when we get a new entry in the queue? - operationQueue.onEvent = std::bind(&RestProxy::handleOperation, - this, std::placeholders::_1); - -} - -RestProxy:: -RestProxy(const std::shared_ptr & context) - : operationQueue(1024), - connection(context), - numMessagesOutstanding_(0), - currentOpId(1) -{ - // What to do when we get a new entry in the queue? - operationQueue.onEvent = std::bind(&RestProxy::handleOperation, - this, std::placeholders::_1); - -} - -RestProxy:: -~RestProxy() -{ - shutdown(); -} - -void -RestProxy:: -sleepUntilIdle() -{ - for (;;) { - int o = numMessagesOutstanding_; - //cerr << "numMessagesOustanding = " << o << endl; - if (!o) - return; - ML::futex_wait(numMessagesOutstanding_, o, 0.01); - } -} - -void -RestProxy:: -shutdown() -{ - // Stop processing messages - MessageLoop::shutdown(); - - connection.shutdown(); -} - -void -RestProxy:: -init(std::shared_ptr config, - const std::string & serviceName, - const std::string & endpointName) -{ - serviceName_ = serviceName; - - connection.init(config, ZMQ_XREQ); - connection.connect(serviceName + "/" + endpointName); - - addSource("RestProxy::operationQueue", operationQueue); - - // What to do when we get something back from zeromq? - addSource("RestProxy::handleZmqResponse", - std::make_shared - (connection.socket(), - std::bind(&RestProxy::handleZmqResponse, - this, - std::placeholders::_1))); -} - -void -RestProxy:: -initServiceClass(std::shared_ptr config, - const std::string & serviceClass, - const std::string & serviceEndpoint, - bool local) -{ - connection.init(config, ZMQ_XREQ); - connection.connectToServiceClass(serviceClass, serviceEndpoint, local); - - addSource("RestProxy::operationQueue", operationQueue); - - // What to do when we get something back from zeromq? - addSource("RestProxy::handleZmqResponse", - std::make_shared - (connection.socket(), - std::bind(&RestProxy::handleZmqResponse, - this, - std::placeholders::_1))); -} - -void -RestProxy:: -push(const RestRequest & request, const OnDone & onDone) -{ - Operation op; - op.request = request; - op.onDone = onDone; - if (operationQueue.tryPush(std::move(op))) - ML::atomic_inc(numMessagesOutstanding_); - else - throw ML::Exception("queue is full"); -} - -void -RestProxy:: -push(const OnDone & onDone, - const std::string & method, - const std::string & resource, - const RestParams & params, - const std::string & payload) -{ - RestRequest request(method, resource, params, payload); - push(request, onDone); -} - -void -RestProxy:: -handleOperation(const Operation & op) -{ - // Gets called when someone calls our API to make something happen; - // this is run by the main worker thread to actually do the work. - // It forwards the request off to the master banker. - uint64_t opId = 0; - if (op.onDone) - opId = currentOpId++; - - //cerr << "sending with payload " << op.request.payload - // << " and response id " << opId << endl; - - if (trySendMessage(connection.socket(), - std::to_string(opId), - op.request.verb, - op.request.resource, - op.request.params.toBinary(), - op.request.payload)) { - if (opId) - outstanding[opId] = op.onDone; - else { - int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1); - if (no == 0) - futex_wake(numMessagesOutstanding_); - } - } - else { - if (op.onDone) { - ML::Set_Trace_Exceptions notrace(false); - string exc_msg = ("connection to '" + serviceName_ - + "' is unavailable"); - op.onDone(make_exception_ptr(exc_msg), 0, ""); - } - int no = __sync_add_and_fetch(&numMessagesOutstanding_, -1); - if (no == 0) - futex_wake(numMessagesOutstanding_); - } -} - -void -RestProxy:: -handleZmqResponse(const std::vector & message) -{ - // Gets called when we get a response back from the master banker in - // response to one of our calls. - - // We call the callback associated with this code. - - //cerr << "response is " << message << endl; - - uint64_t opId = boost::lexical_cast(message.at(0)); - int responseCode = boost::lexical_cast(message.at(1)); - std::string body = message.at(2); - - ExcAssert(opId); - - auto it = outstanding.find(opId); - if (it == outstanding.end()) { - cerr << "unknown op ID " << endl; - return; - } - try { - if (responseCode >= 200 && responseCode < 300) - it->second(nullptr, responseCode, body); - else - it->second(std::make_exception_ptr(ML::Exception(body)), - responseCode, ""); - } catch (const std::exception & exc) { - cerr << "warning: exception handling banker result: " - << exc.what() << endl; - } catch (...) { - cerr << "warning: unknown exception handling banker result" - << endl; - } - - outstanding.erase(it); - - ML::atomic_dec(numMessagesOutstanding_); -} - - -/******************************************************************************/ -/* MULTI REST PROXY */ -/******************************************************************************/ - -void -MultiRestProxy:: -shutdown() -{ - if (!connected) return; - - MessageLoop::shutdown(); - - lock_guard guard(connectionsLock); - - for (auto& conn: connections) { - if (!conn.second) continue; - conn.second->shutdown(); - } - - connections.clear(); - connected = false; -} - -namespace { - -RestProxy::OnDone -makeResponseFn( - const std::string& serviceName, const MultiRestProxy::OnResponse& fn) -{ - return [=] (std::exception_ptr ex, int code, const std::string& msg) { - if (fn) fn(serviceName, ex, code, msg); - }; -} - -} // namespace anonymous - - - -void -MultiRestProxy:: -push(const RestRequest & request, const OnResponse & onResponse) -{ - lock_guard guard(connectionsLock); - - for (const auto& conn : connections) { - if (!conn.second) continue; - - auto onDone = makeResponseFn(conn.first, onResponse); - conn.second->push(request, onDone); - } -} - - -void -MultiRestProxy:: -push( const OnResponse & onResponse, - const string & method, - const string & resource, - const RestParams & params, - const string & payload) -{ - lock_guard guard(connectionsLock); - - for (const auto& conn : connections) { - if (!conn.second) continue; - - auto onDone = makeResponseFn(conn.first, onResponse); - conn.second->push(onDone , method, resource, params, payload); - } -} - - -void -MultiRestProxy:: -connectAllServiceProviders( - const string& serviceClass, const string& endpointName, bool local) -{ - ExcCheck(!connected, "Already connectoed to a service provider"); - - this->serviceClass = serviceClass; - this->endpointName = endpointName; - this->localized = local; - - serviceProvidersWatch.init( - [=] (const string&, ConfigurationService::ChangeType) { - onServiceProvidersChanged("serviceClass/" + serviceClass, local); - }); - - onServiceProvidersChanged("serviceClass/" + serviceClass, local); - connected = true; -} - -void -MultiRestProxy:: -connectServiceProvider(const string& serviceName) -{ - { - lock_guard guard(connectionsLock); - - auto& conn = connections[serviceName]; - if (conn) return; - - shared_ptr newConn(new RestProxy(context)); - newConn->init(config, serviceName, endpointName); - conn = std::move(newConn); - - addSource("MultiRestProxy::" + serviceName, conn); - } - - onConnect(serviceName); -} - -void -MultiRestProxy:: -onServiceProvidersChanged(const string& path, bool local) -{ - vector children = config->getChildren(path, serviceProvidersWatch); - - for (const auto& child : children) { - Json::Value value = config->getJson(path + "/" + child); - - string location = value["serviceLocation"].asString(); - if (local && location != config->currentLocation) { - cerr << "dropping " << location - << " != " << config->currentLocation - << endl; - continue; - } - - connectServiceProvider(value["serviceName"].asString()); - } - - vector disconnected; - { - lock_guard guard(connectionsLock); - - for (const auto& conn : connections) { - if (!conn.second) continue; - - auto it = find(children.begin(), children.end(), conn.first); - if (it != children.end()) continue; - - removeSource(conn.second.get()); - disconnected.push_back(conn.first); - } - - // We don't have to worry about invalidating iterators anymore. - for (const auto& conn : disconnected) - connections.erase(conn); - } - - // Lock has been released and it's now safe to trigger the callbacks. - for (const auto& conn : disconnected) - onDisconnect(conn); -} - -} // namespace Datacratic diff --git a/service/rest_proxy.h b/service/rest_proxy.h deleted file mode 100644 index 62255fba..00000000 --- a/service/rest_proxy.h +++ /dev/null @@ -1,246 +0,0 @@ -/* rest_proxy.h -*- C++ -*- - Jeremy Banres, 14 November 2012 - Copyright (c) 2012 Datacratic. All rights reserved. - -*/ - -#pragma once - -#include "soa/service/message_loop.h" -#include "soa/service/zmq_endpoint.h" -#include "soa/service/typed_message_channel.h" -#include "soa/service/rest_service_endpoint.h" - -namespace Datacratic { - - -/*****************************************************************************/ -/* HELPER FUNCTIONS */ -/*****************************************************************************/ - -/** Pass through a REST response to the given function. */ - -template -void decodeRestResponseJson(const std::string & functionName, - std::exception_ptr exc, - int resultCode, - const std::string & body, - std::function onDone) -{ - Result result; - - try { - if (exc) { - onDone(exc, std::move(result)); - return; - } - else if (resultCode < 200 || resultCode >= 300) { - onDone(std::make_exception_ptr - (ML::Exception("%s REST request failed: %d: %s", - functionName.c_str(), - resultCode, - body.c_str())), - std::move(result)); - return; - } - else { - onDone(nullptr, std::move(static_cast(Result::fromJson(Json::parse(body))))); - } - } catch (...) { - onDone(std::current_exception(), std::move(result)); - } -} - -template -std::function -makeRestResponseJsonDecoder(std::string functionName, - std::function onDone) -{ - return [=] (std::exception_ptr exc, - int resultCode, - std::string body) - { - if (!onDone) - return; - decodeRestResponseJson(functionName, - exc, resultCode, body, - onDone); - }; -} - - -/*****************************************************************************/ -/* REST PROXY */ -/*****************************************************************************/ - -/** Proxy that handles a set of outstanding asynchronous operations to a - rest endpoint and deals with getting the results back. -*/ - -struct RestProxy: public MessageLoop { - - RestProxy(); - - RestProxy(const std::shared_ptr & context); - - ~RestProxy(); - - void sleepUntilIdle(); - - void shutdown(); - - /** Initialize and connect to the given service on the "zeromq" endpoint. */ - void init(std::shared_ptr config, - const std::string & serviceName, - const std::string & endpointName = "zeromq"); - - /** Initialize and connect to an instance of the given service class. */ - void initServiceClass(std::shared_ptr config, - const std::string & serviceClass, - const std::string & endpointName, - bool local = true); - - typedef std::function OnDone; - - /** Push the given request to be performed. When the result comes - back, it will be passed to OnDone. - */ - void push(const RestRequest & request, const OnDone & onDone); - - /** Push the given request to be performed. */ - void push(const OnDone & onDone, - const std::string & method, - const std::string & resource, - const RestParams & params = RestParams(), - const std::string & payload = ""); - - size_t numMessagesOutstanding() const - { - return numMessagesOutstanding_; - } - -protected: - std::string serviceName_; - std::string endpointName_; - - struct Operation { - RestRequest request; - OnDone onDone; - }; - - TypedMessageSink operationQueue; - ZmqNamedProxy connection; - - std::map outstanding; - int numMessagesOutstanding_; // atomic so can be read with no lock - uint64_t currentOpId; - - void handleOperation(const Operation & op); - void handleZmqResponse(const std::vector & message); -}; - - -/******************************************************************************/ -/* MULTI REST PROXY */ -/******************************************************************************/ - -/** Provides a way to connect to all services under a given service class - through a REST interface. Note that this sets a watch on the service class - such that whenever a service goes down or comes up the rest proxy will be - notified and will adjust itself accordingly. - - Note: This class does not and should not provide a sleepUntilIdle interface - as it is very prone to deadlock when coupled with services being shutdown - and restarted asynchronously. - - \todo Provides just about the same pattern as ZmqMultipleNamedClientBusProxy - so there's common functionality here that could be merged. - */ -struct MultiRestProxy : public MessageLoop -{ - MultiRestProxy(std::shared_ptr context) : - connected(false), - context(std::move(context)) - {} - - ~MultiRestProxy() { shutdown(); } - - - void init(std::shared_ptr config) - { - this->config = std::move(config); - } - - void shutdown(); - - - typedef std::function ConnectionHandler; - - /** Called whenever we connect a new service. */ - ConnectionHandler connectHandler; - virtual void onConnect(const std::string& serviceName) - { - if (connectHandler) connectHandler(serviceName); - } - - /** Called whenever a service we were connected is disconnected. */ - ConnectionHandler disconnectHandler; - virtual void onDisconnect(const std::string& serviceName) - { - if (disconnectHandler) disconnectHandler(serviceName); - } - - - /** Connects our class to every service under the given service class. */ - void connectAllServiceProviders( - const std::string& serviceClass, - const std::string& endpointName, - bool local = true); - - - typedef std::function OnResponse; - - /** Send a REST message to every connected service. The response callback - will be invoked once for every answer we receive such that if we're - connected to 3 services then we can receive up to 5 messages. - */ - void push(const RestRequest & request, const OnResponse & onResponse); - void push( - const OnResponse & onResponse, - const std::string & method, - const std::string & resource, - const RestParams & params = RestParams(), - const std::string & payload = ""); - -private: - - bool connected; - - std::shared_ptr context; - std::shared_ptr config; - - std::string serviceClass; - std::string endpointName; - bool localized; - - ML::Spinlock connectionsLock; - - typedef std::map > ConnectionsMap; - ConnectionsMap connections; - - ConfigurationService::Watch serviceProvidersWatch; - - void onServiceProvidersChanged(const std::string& path, bool local); - void connectServiceProvider(const std::string& serviceName); - void disconnectServiceProvider(const std::string& serviceName); - -}; - -} // namespace Datacratic diff --git a/service/rest_service_endpoint.cc b/service/rest_service_endpoint.cc index dfc30e13..16dc9a19 100644 --- a/service/rest_service_endpoint.cc +++ b/service/rest_service_endpoint.cc @@ -45,20 +45,7 @@ sendResponse(int responseCode, itl->endpoint->logResponse(*this, responseCode, response, contentType); - if (itl->http) - itl->http->sendResponse(responseCode, response, contentType); - else { - std::vector message; - message.push_back(itl->zmqAddress); - message.push_back(itl->requestId); - message.push_back(std::to_string(responseCode)); - message.push_back(response); - - //std::cerr << "sending response to " << itl->requestId - // << std::endl; - itl->endpoint->zmqEndpoint.sendMessage(message); - } - + itl->http->sendResponse(responseCode, response, contentType); itl->responseSent = true; } @@ -79,17 +66,7 @@ sendResponse(int responseCode, itl->endpoint->logResponse(*this, responseCode, response.toString(), contentType); - if (itl->http) - itl->http->sendResponse(responseCode, response, contentType); - else { - std::vector message; - message.push_back(itl->zmqAddress); - message.push_back(itl->requestId); - message.push_back(std::to_string(responseCode)); - message.push_back(response.toString()); - itl->endpoint->zmqEndpoint.sendMessage(message); - } - + itl->http->sendResponse(responseCode, response, contentType); itl->responseSent = true; } @@ -111,17 +88,7 @@ sendErrorResponse(int responseCode, itl->endpoint->logResponse(*this, responseCode, error, contentType); - if (itl->http) - itl->http->sendResponse(responseCode, error); - else { - std::vector message; - message.push_back(itl->zmqAddress); - message.push_back(itl->requestId); - message.push_back(std::to_string(responseCode)); - message.push_back(error); - itl->endpoint->zmqEndpoint.sendMessage(message); - } - + itl->http->sendResponse(responseCode, error); itl->responseSent = true; } @@ -140,17 +107,7 @@ sendErrorResponse(int responseCode, const Json::Value & error) const itl->endpoint->logResponse(*this, responseCode, error.toString(), "application/json"); - if (itl->http) - itl->http->sendResponse(responseCode, error); - else { - std::vector message; - message.push_back(itl->zmqAddress); - message.push_back(itl->requestId); - message.push_back(std::to_string(responseCode)); - message.push_back(error.toString()); - itl->endpoint->zmqEndpoint.sendMessage(message); - } - + itl->http->sendResponse(responseCode, error); itl->responseSent = true; } @@ -165,13 +122,8 @@ sendRedirect(int responseCode, const std::string & location) const itl->endpoint->logResponse(*this, responseCode, location, "REDIRECT"); - if (itl->http) - itl->http->sendResponse(responseCode, string(""), "", - { { "Location", location } }); - else { - throw ML::Exception("zeromq redirects not done yet"); - } - + itl->http->sendResponse(responseCode, string(""), "", + { { "Location", location } }); itl->responseSent = true; } @@ -189,13 +141,8 @@ sendHttpResponse(int responseCode, itl->endpoint->logResponse(*this, responseCode, response, contentType); - if (itl->http) - itl->http->sendResponse(responseCode, response, contentType, - headers); - else { - throw ML::Exception("zeromq redirects not done yet"); - } - + itl->http->sendResponse(responseCode, response, contentType, + headers); itl->responseSent = true; } @@ -209,9 +156,6 @@ sendHttpResponseHeader(int responseCode, if (itl->responseSent) throw ML::Exception("response already sent"); - if (!itl->http) - throw ML::Exception("sendHttpResponseHeader only works on HTTP connections"); - if (itl->endpoint->logResponse) itl->endpoint->logResponse(*this, responseCode, "", contentType); @@ -265,8 +209,7 @@ finishResponse() /*****************************************************************************/ RestServiceEndpoint:: -RestServiceEndpoint(std::shared_ptr context) - : zmqEndpoint(context) +RestServiceEndpoint() { } @@ -286,10 +229,6 @@ shutdown() // 2. Shut down the message loop MessageLoop::shutdown(); - - // 3. Shut down the zmq endpoint now we know that the message loop is not using - // it. - zmqEndpoint.shutdown(); } void @@ -300,31 +239,8 @@ init(std::shared_ptr config, int numThreads) { MessageLoop::init(numThreads, maxAddedLatency); - zmqEndpoint.init(config, ZMQ_XREP, endpointName + "/zeromq"); httpEndpoint.init(config, endpointName + "/http"); - auto zmqHandler = [=] (std::vector && message) - { - using namespace std; - - if (message.size() < 6) { - cerr << "ignored message with invalid number of members:" - << message.size() - << endl; - return; - } - //cerr << "got REST message at " << this << " " << message << endl; - this->doHandleRequest(ConnectionId(message.at(0), - message.at(1), - this), - RestRequest(message.at(2), - message.at(3), - RestParams::fromBinary(message.at(4)), - message.at(5))); - }; - - zmqEndpoint.messageHandler = zmqHandler; - httpEndpoint.onRequest = [=] (std::shared_ptr connection, const HttpHeader & header, @@ -335,19 +251,15 @@ init(std::shared_ptr config, RestRequest(header, payload)); }; - addSource("RestServiceEndpoint::zmqEndpoint", zmqEndpoint); addSource("RestServiceEndpoint::httpEndpoint", httpEndpoint); } -std::pair +std::string RestServiceEndpoint:: -bindTcp(PortRange const & zmqRange, PortRange const & httpRange, - std::string host) +bindTcp(PortRange const & httpRange, std::string host) { - std::string httpAddr = httpEndpoint.bindTcp(httpRange, host); - std::string zmqAddr = zmqEndpoint.bindTcp(zmqRange, host); - return std::make_pair(zmqAddr, httpAddr); + return httpEndpoint.bindTcp(httpRange, host); } void diff --git a/service/rest_service_endpoint.h b/service/rest_service_endpoint.h index 71ef538f..e5088060 100644 --- a/service/rest_service_endpoint.h +++ b/service/rest_service_endpoint.h @@ -1,15 +1,14 @@ /* json_service_endpoint.h -*- C++ -*- Jeremy Barnes, 9 November 2012 - Copyright (c) 2012 Datacratic. All rights reserved. + Copyright (c) 2012-2016 Datacratic. All rights reserved. */ -#ifndef __service__zmq_json_endpoint_h__ -#define __service__zmq_json_endpoint_h__ +#pragma once -#include "zmq_endpoint.h" #include "jml/utils/vector_utils.h" -#include "http_named_endpoint.h" +#include "soa/service/message_loop.h" +#include "soa/service/http_named_endpoint.h" namespace Datacratic { @@ -72,7 +71,7 @@ struct RestServiceEndpoint: public MessageLoop { then the service will bind to those specific ports for the given endpoints, and so no service discovery will need to be done. */ - RestServiceEndpoint(std::shared_ptr context); + RestServiceEndpoint(); virtual ~RestServiceEndpoint(); @@ -88,14 +87,6 @@ struct RestServiceEndpoint: public MessageLoop { { } - /// Initialize for zeromq - ConnectionId(const std::string & zmqAddress, - const std::string & requestId, - RestServiceEndpoint * endpoint) - : itl(new Itl(zmqAddress, requestId, endpoint)) - { - } - /// Initialize for http ConnectionId(std::shared_ptr http, const std::string & requestId, @@ -118,27 +109,12 @@ struct RestServiceEndpoint: public MessageLoop { { } - Itl(const std::string & zmqAddress, - const std::string & requestId, - RestServiceEndpoint * endpoint) - : zmqAddress(zmqAddress), - requestId(requestId), - http(0), - endpoint(endpoint), - responseSent(false), - startDate(Date::now()), - chunkedEncoding(false), - keepAlive(true) - { - } - ~Itl() { if (!responseSent) throw ML::Exception("no response sent on connection"); } - std::string zmqAddress; std::string requestId; std::shared_ptr http; RestServiceEndpoint * endpoint; @@ -230,9 +206,7 @@ struct RestServiceEndpoint: public MessageLoop { bool isConnected() const { - if (itl->http) - return !itl->http->isZombie; // NOTE: race condition - else return true; // zmq is always "connected" + return !itl->http->isZombie; // NOTE: race condition } }; @@ -244,10 +218,8 @@ struct RestServiceEndpoint: public MessageLoop { /** Bind to TCP/IP ports. There is one for zeromq and one for http. */ - std::pair - bindTcp(PortRange const & zmqRange = PortRange(), - PortRange const & httpRange = PortRange(), - std::string host = ""); + std::string bindTcp(PortRange const & httpRange = PortRange(), + std::string host = ""); /** Bind to a fixed URI for the HTTP endpoint. This will throw an exception if it can't bind. @@ -276,7 +248,6 @@ struct RestServiceEndpoint: public MessageLoop { virtual void handleRequest(const ConnectionId & connection, const RestRequest & request) const; - ZmqNamedEndpoint zmqEndpoint; HttpNamedEndpoint httpEndpoint; std::function logRequest; @@ -304,5 +275,3 @@ struct RestServiceEndpoint: public MessageLoop { }; } // namespace Datacratic - -#endif /* __service__zmq_json_endpoint_h__ */ diff --git a/service/service.mk b/service/service.mk index 71c1c290..ffb0ce9f 100644 --- a/service/service.mk +++ b/service/service.mk @@ -12,16 +12,6 @@ $(eval $(call library,opstats,$(LIBOPSTATS_SOURCES),$(LIBOPSTATS_LINK))) -LIBRECOSET_ZEROMQ_SOURCES := \ - socket_per_thread.cc \ - zmq_utils.cc - -LIBRECOSET_ZEROMQ_LINK := \ - zmq - -$(eval $(call library,zeromq,$(LIBRECOSET_ZEROMQ_SOURCES),$(LIBRECOSET_ZEROMQ_LINK))) - - LIBRECOSET_RUNNERCOMMON_SOURCES := \ runner_common.cc @@ -48,13 +38,11 @@ LIBSERVICES_SOURCES := \ message_loop.cc \ loop_monitor.cc \ named_endpoint.cc \ - zmq_endpoint.cc \ async_event_source.cc \ async_writer_source.cc \ tcp_client.cc \ rest_service_endpoint.cc \ http_named_endpoint.cc \ - rest_proxy.cc \ rest_request_router.cc \ rest_request_binding.cc \ runner.cc \ @@ -74,7 +62,7 @@ LIBSERVICES_SOURCES := \ event_subscriber.cc \ nsq_client.cc -LIBSERVICES_LINK := opstats curl boost_regex runner_common zeromq ACE arch utils jsoncpp boost_thread zmq types tinyxml2 boost_system value_description crypto +LIBSERVICES_LINK := opstats curl boost_regex runner_common ACE arch utils jsoncpp boost_thread types tinyxml2 boost_system value_description crypto $(eval $(call library,services,$(LIBSERVICES_SOURCES),$(LIBSERVICES_LINK))) $(eval $(call set_compile_option,runner.cc,-DBIN=\"$(BIN)\")) diff --git a/service/service_base.cc b/service/service_base.cc index 1c502ff3..87844d05 100644 --- a/service/service_base.cc +++ b/service/service_base.cc @@ -15,7 +15,6 @@ #include #include #include -#include "zmq.hpp" #include "soa/jsoncpp/reader.h" #include "soa/jsoncpp/value.h" #include @@ -474,8 +473,7 @@ ServiceProxies:: ServiceProxies() : events(new NullEventService()), config(new InternalConfigurationService()), - ports(new DefaultPortRangeService()), - zmqContext(new zmq::context_t(1 /* num worker threads */)) + ports(new DefaultPortRangeService()) { checkSysLimits(); bootstrap(bootstrapConfigPath()); @@ -552,7 +550,6 @@ ServiceProxies::getEndpointInstances(std::string const & name, for(auto & entry: json) { std::string key; if(protocol == "http") key = "httpUri"; - if(protocol == "zeromq") key = "zmqConnectUri"; if(key.empty() || !entry.isMember(key)) continue; diff --git a/service/service_base.h b/service/service_base.h index 7abec03f..f3fed3f8 100644 --- a/service/service_base.h +++ b/service/service_base.h @@ -27,10 +27,6 @@ #include "jml/utils/unnamed_bool.h" -namespace zmq { -struct context_t; -} // namespace zmq - namespace Datacratic { @@ -404,9 +400,6 @@ struct ServiceProxies { std::string bankerUri; - /** Zeromq context for communication. */ - std::shared_ptr zmqContext; - template JML_ALWAYS_INLINE std::shared_ptr configAs() @@ -701,15 +694,6 @@ struct ServiceBase: public EventRecorder { const std::vector & serviceClasses); - /*************************************************************************/ - /* ZEROMQ CONTEXT */ - /*************************************************************************/ - - std::shared_ptr getZmqContext() const - { - return services_->zmqContext; - } - /*************************************************************************/ /* CONFIGURATION SERVICE */ /*************************************************************************/ diff --git a/service/socket_per_thread.cc b/service/socket_per_thread.cc deleted file mode 100644 index f996f859..00000000 --- a/service/socket_per_thread.cc +++ /dev/null @@ -1,203 +0,0 @@ -/* socket_per_thread.cc - Jeremy Barnes, 5 March 2012 - Copyright (c) 2012 Datacratic. All rights reserved. - - One socket per thread. -*/ - -#include "soa/service/socket_per_thread.h" -#include "jml/arch/format.h" -#include "ace/OS_NS_Thread.h" -#include "jml/arch/backtrace.h" -#include "jml/arch/spinlock.h" -#include "jml/arch/atomic_ops.h" -#include "jml/arch/exception.h" -#include "jml/arch/timers.h" -#include "jml/utils/exc_assert.h" -#include "soa/service/zmq_utils.h" - - -using namespace std; -using namespace ML; - - -namespace Datacratic { - -/*****************************************************************************/ -/* SOCKET PER THREAD */ -/*****************************************************************************/ - -SocketPerThread:: -SocketPerThread() - : context(0), type(0), numOpen(0), state(NOTINITIALIZED), - entries(onFreeEntry) -{ -} - -SocketPerThread:: -SocketPerThread(zmq::context_t & context, - int type, - const std::string & uri, - bool allowForceClose) - : context(&context), type(type), uri(uri), - allowForceClose(allowForceClose), numOpen(0), - state(READY), entries(onFreeEntry) -{ - //std::cerr << "finished init of socket " << uri << std::endl; -} - -SocketPerThread:: -~SocketPerThread() -{ - shutdown(); -} - -void -SocketPerThread:: -init(zmq::context_t & context, - int type, - const std::string & uri, - bool allowForceClose) -{ - if (this->context) - throw ML::Exception("attempt to double initialize a SocketPerThread"); - this->context = &context; - this->type = type; - this->uri = uri; - this->allowForceClose = allowForceClose; - - state = READY; - //using namespace std; - //cerr << "initializing SocketPerThread with uri " << uri << endl; -} - -void -SocketPerThread:: -shutdown() -{ - state = FINISHED; - - entries.reset(); - context = 0; - //using namespace std; - //cerr << "destroying SocketPerThread with uri " << uri << " and " - // << numOpen << " open entries" << endl; - - if (numOpen > 0) { - if (!allowForceClose) { - throw ML::Exception("attempt to destroy SocketPerThread with %d open entries", - numOpen); - } - - while (!allThreads.empty()) { - auto it = allThreads.begin(); - onFreeEntry(*it); - } - - ExcAssertEqual(numOpen, 0); - } -} - -void -SocketPerThread:: -initForThisThread() const -{ - if (entries.get()) - return; - - //cerr << "initializing zeromq socket for this thread to connect to " - // << uri << endl; - - if (!context) - throw ML::Exception("attempt to use a SocketPerThread " - "without initializing"); - - auto mThis = const_cast(this); - - std::auto_ptr newEntry(new Entry()); - newEntry->owner = mThis; - - std::auto_ptr newPtr - (new zmq::socket_t(*context, type)); - setIdentity(*newPtr, ML::format("thr%lld", - (long long)ACE_OS::thr_self())); - newPtr->connect(uri.c_str()); - - newEntry->sock = newPtr.release(); - //if (!allForThread.get()) - // allForThread.reset(new std::set()); - //allForThread->insert(mThis); - entries.reset(newEntry.release()); - - mThis->addThreadEntry(entries.get()); - ML::atomic_inc(numOpen); - - // wait for ZMQ when connecting... - ML::sleep(0.1); -} - -void -SocketPerThread:: -onFreeEntry(Entry * entry) -{ - using namespace std; - //cerr << "onFreeEntry " << entry << endl; - //cerr << "closing zmq socket" << entry->sock << " with owner " - // << entry->owner << endl; - delete entry->sock; - //cerr << "erasing" << endl; - //allForThread->erase(entry->owner); - //cerr << "unowning" << endl; - ML::atomic_dec(entry->owner->numOpen); - - entry->owner->removeThreadEntry(entry); - - delete entry; - - //Datacratic::close(*sock); - //ML::backtrace(); - //cerr << endl << endl; -} - -void -SocketPerThread:: -cleanupThisThread() -{ - //cerr << "cleaning up socket " << entries->sock << endl; - entries.reset(); -} - -void -SocketPerThread:: -cleanupAllForThread() -{ - if (!allForThread.get()) return; - //cerr << "cleaning up " << allForThread->size() << " sockets" << endl; - - for (auto it = allForThread->begin(); it != allForThread->end(); - it = allForThread->begin()) - (*it)->cleanupThisThread(); - - //cerr << "we now have " << allForThread->size() << " sockets left" << endl; -} - -void -SocketPerThread:: -removeThreadEntry(Entry * entry) -{ - Guard guard(allThreadsLock); - allThreads.erase(entry); -} - -void -SocketPerThread:: -addThreadEntry(Entry * entry) -{ - Guard guard(allThreadsLock); - allThreads.insert(entry); -} - -boost::thread_specific_ptr > -SocketPerThread::allForThread; - -} // namespace Datacratic diff --git a/service/socket_per_thread.h b/service/socket_per_thread.h deleted file mode 100644 index 80d8ca53..00000000 --- a/service/socket_per_thread.h +++ /dev/null @@ -1,137 +0,0 @@ -/* socket_per_thread.h -*- C++ -*- - Jeremy Barnes, 14 April 2011 - Copyright (c) 2011 Datacratic. All rights reserved. - - ZeroMQ Socket per thread. -*/ - -#ifndef __zmq__socket_per_thread_h__ -#define __zmq__socket_per_thread_h__ - - -#include "zmq.hpp" -#include -#include -#include -#include "jml/compiler/compiler.h" -#include "jml/arch/spinlock.h" -#include "jml/arch/exception.h" -#include - - -namespace Datacratic { - - -/*****************************************************************************/ -/* SOCKET PER THREAD */ -/*****************************************************************************/ - -/** A simple structure that creates on-demand a different zeromq socket per - thread that will connect to a given endpoint. Using this you can make - sure that you don't share sockets amongst multiple threads. - - Note that if you use this from the same thread that will be shutting down - the zeromq context, then you need to call cleanupAllForThread() before - you try to terminate zeromq, otherwise it will hang on shutdown. -*/ - -struct SocketPerThread : boost::noncopyable { - - /** Default constructor. You must call init() before accessing it. */ - SocketPerThread(); - - /** Constructor to create a socket of the given type within the given - context that will connect to the given URI on demand. */ - SocketPerThread(zmq::context_t & context, - int type, - const std::string & uri, - bool allowForceClose = true); - - ~SocketPerThread(); - - /** Initialize to create a socket of the given type within the given - context that will connect to the given URI on demand. */ - void init(zmq::context_t & context, - int type, - const std::string & uri, - bool allowForceClose = true); - - void shutdown(); - - zmq::context_t * context; ///< Owning zeromq context - int type; ///< Type to create - std::string uri; ///< URI to connect to - bool allowForceClose; ///< Cleanup open sockets on destruction? - mutable int numOpen; ///< Num of open connections to detect misuse - - /** Return (creating if necessary) the socket for this thread. */ - inline zmq::socket_t & operator () () const - { - if (state != READY) - throw ML::Exception("socket not ready: %d", state); - - if (JML_UNLIKELY(!entries.get())) { - initForThisThread(); - } - - return *entries->sock; - } - - /** Initialize the socket for this thread. */ - void initForThisThread() const; - - /** Prematurely pretend that this thread has exited for this socket. */ - void cleanupThisThread(); - - /** Prematurely pretend that this thread has exited for all of the - SocketPerThread instances that are open. Commonly called just - before shutdown in the main thread. - */ - static void cleanupAllForThread(); - -private: - enum { - NOTINITIALIZED = 12321, - READY = 349244, - FINISHED = 293845 - }; - int state; - - /** How we store the actual sockets internally. */ - struct Entry { - zmq::socket_t * sock; - SocketPerThread * owner; - }; - - /** All threads that are alive */ - std::set allThreads; - - /** Lock to protect allThreads. */ - typedef ML::Spinlock Lock; - typedef boost::unique_lock Guard; - mutable Lock allThreadsLock; - - /** Remove the given entry from allThreads. */ - void removeThreadEntry(Entry * entry); - - /** Add the given entry to allThreads. */ - void addThreadEntry(Entry * entry); - - /** Function called whenever a thread exits to clean up its entry. */ - static void onFreeEntry(Entry * entry); - - /** Thread-specific pointer to our entry that contains our socket. */ - mutable boost::thread_specific_ptr entries; - - /** Global thread-specific pointer to a set of all active sockets for this - thread. - */ - static boost::thread_specific_ptr > allForThread; -}; - -} // namespace Datacratic - - - -#endif /* __zmq__socket_per_thread_h__ */ - diff --git a/service/testing/http_client_bench.cc b/service/testing/http_client_bench.cc index 16b7b4ed..dcddcc9a 100644 --- a/service/testing/http_client_bench.cc +++ b/service/testing/http_client_bench.cc @@ -16,7 +16,6 @@ #include "soa/service/http_endpoint.h" #include "soa/service/named_endpoint.h" #include "soa/service/message_loop.h" -#include "soa/service/rest_proxy.h" #include "soa/service/rest_service_endpoint.h" #include "soa/service/runner.h" diff --git a/service/testing/http_client_test.cc b/service/testing/http_client_test.cc index 04e77087..dcc7aafc 100644 --- a/service/testing/http_client_test.cc +++ b/service/testing/http_client_test.cc @@ -10,7 +10,6 @@ #include "jml/arch/timers.h" #include "jml/utils/testing/watchdog.h" #include "soa/service/message_loop.h" -#include "soa/service/rest_proxy.h" #include "soa/service/http_client.h" #include "soa/utils/print_utils.h" diff --git a/service/testing/message_channel_test.cc b/service/testing/message_channel_test.cc index 722e0a54..61f9bc15 100644 --- a/service/testing/message_channel_test.cc +++ b/service/testing/message_channel_test.cc @@ -21,7 +21,6 @@ #include "jml/utils/vector_utils.h" #include "jml/arch/timers.h" #include -#include "soa/service/zmq_utils.h" #include #include "jml/utils/testing/watchdog.h" diff --git a/service/testing/named_endpoint_test.cc b/service/testing/named_endpoint_test.cc deleted file mode 100644 index 748adbef..00000000 --- a/service/testing/named_endpoint_test.cc +++ /dev/null @@ -1,143 +0,0 @@ -/* named_endpoint_test.cc -*- C++ -*- - Jeremy Barnes, 24 September 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - - Test for named endpoint. -*/ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include -#include "soa/service/named_endpoint.h" -#include "soa/service/message_loop.h" -#include "soa/service/zmq_endpoint.h" -#include -#include "jml/utils/guard.h" -#include "jml/arch/exception_handler.h" -#include "jml/utils/testing/watchdog.h" -#include "jml/utils/testing/fd_exhauster.h" -#include "jml/utils/vector_utils.h" -#include "jml/arch/timers.h" -#include -#include "soa/service/zmq_utils.h" - - -using namespace std; -using namespace ML; -using namespace Datacratic; - - -/*****************************************************************************/ -/* ECHO SERVICE */ -/*****************************************************************************/ - -/** Simple test service that listens on zeromq and simply echos everything - that it gets back. -*/ - -struct EchoService : public ServiceBase { - - EchoService(std::shared_ptr proxies, - const std::string & serviceName) - : ServiceBase(serviceName, proxies), - context(new zmq::context_t(1)), - endpoint(context), - loop(1 /* num threads */, 0.0001 /* maxAddedLatency */) - { - proxies->config->removePath(serviceName); - //registerService(); - endpoint.init(proxies->config, ZMQ_XREP, serviceName + "/echo"); - - auto handler = [=] (vector message) - { - //cerr << "got message " << message << endl; - ExcAssertEqual(message.size(), 3); - ExcAssertEqual(message[1], "ECHO"); - message[1] = "REPLY"; - - endpoint.sendMessage(message); - }; - - endpoint.messageHandler = handler; - - loop.addSource("EchoService::endpoint", endpoint); - } - - void start() - { - loop.start(); - } - - void shutdown() - { - loop.shutdown(); - } - - std::string bindTcp() - { - return endpoint.bindTcp(); - } - - std::shared_ptr context; - ZmqNamedEndpoint endpoint; - MessageLoop loop; -}; - -BOOST_AUTO_TEST_CASE( test_named_endpoint ) -{ - auto proxies = std::make_shared(); - - EchoService service(proxies, "echo"); - auto addr = service.bindTcp(); - cerr << "echo service is listening on " << addr << endl; - - service.start(); - - proxies->config->dump(cerr); - - - volatile int numPings = 0; - - auto runThread = [&] () - { - ZmqNamedProxy proxy; - proxy.init(proxies->config, ZMQ_XREQ); - proxy.connect("echo/echo"); - - ML::sleep(0.1); - - cerr << "connected" << endl; - - while (numPings < 100000) { - int i = __sync_add_and_fetch(&numPings, 1); - - if (i && i % 1000 == 0) - cerr << i << endl; - - vector request; - request.push_back("ECHO"); - request.push_back(to_string(i)); - - sendAll(proxy.socket(), request); - - vector res = recvAll(proxy.socket()); - - ExcAssertEqual(res.size(), 2); - ExcAssertEqual(res[0], "REPLY"); - ExcAssertEqual(res[1], to_string(i)); - } - }; - - boost::thread_group threads; - for (unsigned i = 0; i < 10; ++i) { - threads.create_thread(runThread); - } - - threads.join_all(); - - cerr << "finished requests" << endl; - - service.shutdown(); -} diff --git a/service/testing/rest_service_endpoint_test.cc b/service/testing/rest_service_endpoint_test.cc deleted file mode 100644 index ddaddbc8..00000000 --- a/service/testing/rest_service_endpoint_test.cc +++ /dev/null @@ -1,172 +0,0 @@ -/* json_service_endpoint_test.cc - Jeremy Barnes, 9 November 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - - Test for the JSON service endpoint. -*/ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include -#include "soa/service/named_endpoint.h" -#include "soa/service/message_loop.h" -#include "soa/service/rest_service_endpoint.h" -#include "soa/service/rest_proxy.h" -#include -#include "jml/utils/guard.h" -#include "jml/arch/exception_handler.h" -#include "jml/utils/testing/watchdog.h" -#include "jml/utils/testing/fd_exhauster.h" -#include "jml/utils/vector_utils.h" -#include "jml/arch/timers.h" -#include -#include "soa/service/zmq_utils.h" - - -using namespace std; -using namespace ML; -using namespace Datacratic; - - -/*****************************************************************************/ -/* ECHO SERVICE */ -/*****************************************************************************/ - -/** Simple test service that listens on zeromq and simply echos everything - that it gets back. -*/ - -struct EchoService : public ServiceBase, public RestServiceEndpoint { - - EchoService(std::shared_ptr proxies, - const std::string & serviceName) - : ServiceBase(serviceName, proxies), - RestServiceEndpoint(proxies->zmqContext) - { - proxies->config->removePath(serviceName); - RestServiceEndpoint::init(proxies->config, - serviceName, 0.0005 /* maxAddedLatency */); - } - - ~EchoService() - { - shutdown(); - } - - virtual void handleRequest(const ConnectionId & connection, - const RestRequest & request) const - { - //cerr << "handling request " << request << endl; - if (request.verb != "POST") - throw ML::Exception("echo service needs POST"); - if (request.resource != "/echo") - throw ML::Exception("echo service only responds to /echo"); - connection.sendResponse(200, request.payload, "text/plain"); - } -}; - -BOOST_AUTO_TEST_CASE( test_named_endpoint ) -{ - auto proxies = std::make_shared(); - - int totalPings = 1000; - - EchoService service(proxies, "echo"); - auto addr = service.bindTcp(); - cerr << "echo service is listening on " << addr.first << " and " - << addr.second << endl; - - service.start(); - - proxies->config->dump(cerr); - - - volatile int numPings = 0; - - auto runZmqThread = [=, &numPings] () - { - RestProxy proxy(proxies->zmqContext); - proxy.init(proxies->config, "echo"); - proxy.start(); - cerr << "connected" << endl; - - volatile int numOutstanding = 0; - - while (numPings < totalPings) { - int i = __sync_add_and_fetch(&numPings, 1); - - if (i && i % 1000 == 0) - cerr << i << " with " << numOutstanding << " outstanding" - << endl; - - auto onResponse = [=, &numOutstanding] - (std::exception_ptr ptr, - int responseCode, - std::string body) - { - //cerr << "got response " << responseCode - // << endl; - ML::atomic_dec(numOutstanding); - - if (ptr) - throw ML::Exception("response returned exception"); - ExcAssertEqual(responseCode, 200); - ExcAssertEqual(body, to_string(i)); - - futex_wake(numOutstanding); - }; - - proxy.push(onResponse, - "POST", "/echo", {}, to_string(i)); - ML::atomic_inc(numOutstanding); - } - - proxy.sleepUntilIdle(); - - //ML::sleep(1.0); - - cerr << "shutting down proxy " << this << endl; - proxy.shutdown(); - cerr << "done proxy shutdown" << endl; - }; - -#if 0 - auto runHttpThread = [&] () - { - HttpNamedRestProxy proxy; - proxy.init(proxies->config); - proxy.connect("echo/http"); - - while (numPings < totalPings) { - int i = __sync_add_and_fetch(&numPings, 1); - - if (i && i % 1000 == 0) - cerr << i << endl; - - auto response = proxy.post("/echo", to_string(i)); - - ExcAssertEqual(response.code_, 200); - ExcAssertEqual(response.body_, to_string(i)); - } - - }; -#endif - - boost::thread_group threads; - - for (unsigned i = 0; i < 8; ++i) { - threads.create_thread(runZmqThread); - } - - //for (unsigned i = 0; i < 5; ++i) { - // threads.create_thread(runHttpThread); - //} - - threads.join_all(); - - cerr << "finished requests" << endl; - - service.shutdown(); -} diff --git a/service/testing/service_proxies_test.cc b/service/testing/service_proxies_test.cc index f5da2464..1727a54b 100644 --- a/service/testing/service_proxies_test.cc +++ b/service/testing/service_proxies_test.cc @@ -22,8 +22,7 @@ struct MockRestService : public ServiceBase, { MockRestService(std::shared_ptr proxies, const string & serviceName) - : ServiceBase(serviceName, proxies), - RestServiceEndpoint(proxies->zmqContext) + : ServiceBase(serviceName, proxies) {} ~MockRestService() @@ -51,24 +50,20 @@ BOOST_AUTO_TEST_CASE( test_service_proxies_getServiceClassInstances ) MockRestService testService1(proxies, "testServiceName1"); testService1.init({"testServiceClass", "otherServiceClass"}); testService1.bindTcp(); - vector uris(testService1.zmqEndpoint.getPublishedUris()); - expectedUris.insert(uris.begin(), uris.end()); - uris = testService1.httpEndpoint.getPublishedUris(); + auto uris = testService1.httpEndpoint.getPublishedUris(); expectedUris.insert(uris.begin(), uris.end()); MockRestService testService2(proxies, "testServiceName2"); testService2.init({"testServiceClass"}); testService2.bindTcp(); - uris = testService2.zmqEndpoint.getPublishedUris(); - expectedUris.insert(uris.begin(), uris.end()); uris = testService2.httpEndpoint.getPublishedUris(); expectedUris.insert(uris.begin(), uris.end()); set instanceUris; uris = proxies->getServiceClassInstances("testServiceClass", "http"); instanceUris.insert(uris.begin(), uris.end()); - uris = proxies->getServiceClassInstances("testServiceClass", "zeromq"); - instanceUris.insert(uris.begin(), uris.end()); + /* Test fails because 127.0.0.1 is returned by getServiceClassInstances and + not the others. */ BOOST_CHECK_EQUAL(instanceUris, expectedUris); } diff --git a/service/testing/service_testing.mk b/service/testing/service_testing.mk index 55dc2a2e..a7626370 100644 --- a/service/testing/service_testing.mk +++ b/service/testing/service_testing.mk @@ -3,11 +3,7 @@ $(eval $(call library,mongo_tmp_server,mongo_temporary_server.cc, services)) $(eval $(call test,epoll_test,services,boost)) $(eval $(call test,epoll_wait_test,services,boost manual)) -$(eval $(call test,named_endpoint_test,services,boost manual)) -$(eval $(call test,zmq_named_pub_sub_test,services,boost manual)) -$(eval $(call test,zmq_endpoint_test,services,boost manual)) $(eval $(call test,message_channel_test,services,boost)) -$(eval $(call test,rest_service_endpoint_test,services,boost)) $(eval $(call test,aws_test,cloud,boost)) @@ -40,7 +36,6 @@ $(eval $(call test,runner_stress_test,services,boost)) $(TESTS)/runner_test $(TESTS)/runner_stress_test: $(BIN)/runner_test_helper $(eval $(call test,sink_test,services,boost)) -#$(eval $(call test,zmq_tcp_bench,services,boost manual timed)) $(eval $(call test,nprobe_test,services,boost manual)) $(eval $(call library,test_services,test_http_services.cc,services)) @@ -59,7 +54,6 @@ $(eval $(call test,http_parsers_test,services test_services,boost valgrind)) $(eval $(call test,logs_test,services,boost)) $(eval $(call test,sns_mock_test,cloud services,boost)) -$(eval $(call test,zmq_message_loop_test,services,boost)) $(eval $(call test,event_handler_test,cloud services,boost manual)) $(eval $(call test,mongo_basic_test,services boost_filesystem mongo_tmp_server,boost manual)) diff --git a/service/testing/zmq_endpoint_test.cc b/service/testing/zmq_endpoint_test.cc deleted file mode 100644 index 24fbaa64..00000000 --- a/service/testing/zmq_endpoint_test.cc +++ /dev/null @@ -1,41 +0,0 @@ -/** zmq_endpoint_test.cc -*- C++ -*- - RĂ©mi Attab, 13 Feb 2013 - Copyright (c) 2013 Datacratic. All rights reserved. - - Tests for the ZMQ endpoints. - -*/ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include - -#include "jml/arch/timers.h" -#include "jml/utils/testing/watchdog.h" - -#include "soa/service/zmq_endpoint.h" - -using namespace std; -using namespace ML; -using namespace Datacratic; - - -/** Tests a specific edge case where a message loop attached to a - ZmqNamedClientBusProxy would get blocked in zmq because it was attempting to - send a HEARTBEAT message to a socket that wasn't connected to anything. - */ -BOOST_AUTO_TEST_CASE( test_no_connect ) -{ - Watchdog watchdog(10.0); - - auto proxies = std::make_shared(); - - ZmqNamedClientBusProxy socket; - socket.init(proxies->config); - socket.connectToServiceClass("foo", "bar"); - - socket.start(); - ML::sleep(1.0); - socket.shutdown(); -} diff --git a/service/testing/zmq_message_loop_test.cc b/service/testing/zmq_message_loop_test.cc deleted file mode 100644 index 16e601ac..00000000 --- a/service/testing/zmq_message_loop_test.cc +++ /dev/null @@ -1,92 +0,0 @@ -/** zmq_message_loop_test.cc -*- C++ -*- - Jeremy Barnes, 13 Feb 2013 - Copyright (c) 2013 Datacratic. All rights reserved. - - Tests for the ZMQ endpoints. - -*/ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include - -#include "jml/arch/timers.h" -#include "jml/utils/testing/watchdog.h" -#include - -#include "soa/service/zmq_endpoint.h" - -using namespace std; -using namespace ML; -using namespace Datacratic; - - -BOOST_AUTO_TEST_CASE( test_plain_message_loop ) -{ - //Watchdog watchdog(10.0); - - auto proxies = std::make_shared(); - - zmq::socket_t sock1(*proxies->zmqContext, ZMQ_PULL); - zmq::socket_t sock2(*proxies->zmqContext, ZMQ_PUSH); - - PortRange ports(10000, 20000); - std::string uri = bindToOpenTcpPort(sock1, ports, "127.0.0.1"); - sock2.connect(uri); - - ZmqEventSource source(sock1); - - // Allow the message loop to sleep for 2 seconds at a time so that - // we can easily tell if we're blocking or not - MessageLoop loop(1, 0.5 /* max added time (seconds) */); - loop.addSource("asyncMessageSource", source); - - - MessageLoop loop2(1, 0.5); - loop2.addSource("loop1", loop); - - - MessageLoop loop3(1, 0.05); - loop3.addSource("loop2", loop2); - - loop3.start(); - - - std::atomic latency(0); - std::atomic numSent(0); - std::atomic numDone(0); - - source.asyncMessageHandler = [&] (const std::vector & msg) - { - cerr << "got message " << msg << endl; - Date date = Date::parseIso8601(msg.at(2)); - double elapsed = date.secondsUntil(Date::now()); - latency = elapsed * 1000; - cerr << "elapsed " << elapsed * 1000.0 << "ms" << endl; - ++numDone; - }; - - - while (numSent < 10) { - sendAll(sock2, { "hello", "world", Date::now().printIso8601() }); - ++numSent; - ML::sleep(0.1); - } - - while (numDone < numSent) { - ML::sleep(0.1); - } - - BOOST_CHECK_LE(latency, 100); - - //cerr << loop.stats() << endl; - - cerr << "shutting down" << endl; - Date before = Date::now(); - loop.shutdown(); - Date after = Date::now(); - cerr << "shutdown done" << endl; - - BOOST_CHECK_LE(after.secondsSince(before), 0.1); -} diff --git a/service/testing/zmq_named_pub_sub_test.cc b/service/testing/zmq_named_pub_sub_test.cc deleted file mode 100644 index 115e9879..00000000 --- a/service/testing/zmq_named_pub_sub_test.cc +++ /dev/null @@ -1,184 +0,0 @@ -/* named_endpoint_test.cc -*- C++ -*- - Jeremy Barnes, 24 September 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - - Test for named endpoint. -*/ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include -#include -#include "jml/utils/guard.h" -#include "jml/arch/exception_handler.h" -#include "jml/utils/vector_utils.h" -#include "jml/arch/timers.h" -#include -#include "soa/service/zmq_utils.h" -#include "soa/service/zmq_named_pub_sub.h" - -using namespace std; -using namespace ML; -using namespace Datacratic; - -struct Publisher : public ServiceBase, public ZmqNamedPublisher { - - Publisher(const std::string & name, - std::shared_ptr proxies) - : ServiceBase(name, proxies), - ZmqNamedPublisher(proxies->zmqContext) - { - } - - ~Publisher() - { - unregisterServiceProvider(serviceName(), { "publisher" }); - shutdown(); - } - - void init() - { - ZmqNamedPublisher::init(getServices()->config, serviceName() + "/publish"); - registerServiceProvider(serviceName(), { "publisher" }); - } -}; - -BOOST_AUTO_TEST_CASE( test_named_publisher ) -{ - auto proxies = std::make_shared(); - // proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); - - ZmqNamedSubscriber subscriber(*proxies->zmqContext); - subscriber.init(proxies->config); - - int numMessages = 0; - vector > subscriberMessages; - - subscriber.messageHandler = [&] (const std::vector & message) - { - vector msg2; - for (unsigned i = 0; i < message.size(); ++i) { - msg2.push_back(message[i].toString()); - } - cerr << "got subscriber message " << msg2 << endl; - - subscriberMessages.push_back(msg2); - ++numMessages; - futex_wake(numMessages); - }; - - subscriber.connectToEndpoint("pub/publish"); - subscriber.start(); - subscriber.subscribe("hello"); - - int numIter = 1; - //numIter = 10; // TODO: test fails here - - for (unsigned i = 0; i < numIter; ++i) { - ML::sleep(0.1); - - cerr << endl << endl << endl << endl; - - BOOST_CHECK_NE(subscriber.getConnectionState(), - ZmqNamedSubscriber::CONNECTED); - - Publisher pub("pub", proxies); - - pub.init(); - pub.bindTcp(); - pub.start(); - - proxies->config->dump(cerr); - - ML::sleep(0.1); - - BOOST_CHECK_EQUAL(subscriber.getConnectionState(), - ZmqNamedSubscriber::CONNECTED); - -#if 1 - { - //auto subp = new ZmqNamedSubscriber(*proxies->zmqContext); - //auto & sub = *subp; - - ZmqNamedSubscriber sub(*proxies->zmqContext); - sub.init(proxies->config); - sub.start(); - - vector > subscriberMessages; - volatile int numMessages = 0; - - auto onSubscriptionMessage = [&] (const std::vector & message) - { - vector msg2; - for (unsigned i = 0; i < message.size(); ++i) { - msg2.push_back(message[i].toString()); - } - cerr << "got message " << msg2 << endl; - - subscriberMessages.push_back(msg2); - ++numMessages; - futex_wake(numMessages); - }; - - - sub.messageHandler = onSubscriptionMessage; - - sub.connectToEndpoint("pub/publish"); - - // Busy wait (for now) - for (unsigned i = 0; subscriber.getConnectionState() != ZmqNamedSubscriber::CONNECTED; ++i) { - ML::sleep(0.01); - if (i && i % 10 == 0) - cerr << "warning: waited " << i / 10 << "ds for subscriber to connect" << endl; - //if (i == 200) - // throw ML::Exception("no connection in 2 seconds"); - } - - sub.subscribe("hello"); - - // Give the subscription message time to percolate through - ML::sleep(0.5); - - // Publish some messages - pub.publish("hello", "world"); - pub.publish("dog", "eats", "dog"); - pub.publish("hello", "stranger"); - - cerr << "published" << endl; - - // Wait until they are received - for (;;) { - int nm = numMessages; - if (nm == 2) break; - ML::futex_wait(numMessages, nm); - } - - BOOST_CHECK_EQUAL(subscriberMessages.size(), 2); - BOOST_CHECK_EQUAL(subscriberMessages.at(0), vector({ "hello", "world"}) ); - BOOST_CHECK_EQUAL(subscriberMessages.at(1), vector({ "hello", "stranger"}) ); - - sub.shutdown(); - } -#else - // Publish some messages - pub.publish("hello", "world"); - pub.publish("dog", "eats", "dog"); - pub.publish("hello", "stranger"); -#endif - - ML::sleep(0.1); - - cerr << "unregistered publisher" << endl; - - pub.shutdown(); - } - - ML::sleep(0.1); - - cerr << "got a total of " << numMessages << " subscriber messages" << endl; - - // Check that it got all of the messages - BOOST_CHECK_EQUAL(numMessages, numIter * 2); -} diff --git a/service/testing/zmq_tcp_bench.cc b/service/testing/zmq_tcp_bench.cc deleted file mode 100644 index 843a521f..00000000 --- a/service/testing/zmq_tcp_bench.cc +++ /dev/null @@ -1,163 +0,0 @@ -/* zmq_tcp_bench.cc - Wolfgang Sourdeau - April 2013 */ - -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include -#include -#include -#include -#include - -#include - -#include - -#include "jml/arch/exception.h" -#include "jml/arch/futex.h" - -#include "soa/service/service_base.h" -#include "soa/service/rest_service_endpoint.h" - -const int NbrMsgs = 1000000; - -using namespace std; -using namespace ML; - -using namespace Datacratic; - -#if 1 -BOOST_AUTO_TEST_CASE( test_zmq ) -{ - MessageLoop mainLoop; - - auto proxies = make_shared(); - int recvMsgs(0), sendMsgs(0); - struct timeval start, end; - - ZmqNamedEndpoint server(proxies->zmqContext); - server.init(proxies->config, ZMQ_XREP, "server"); - auto onServerMessage = [&] (vector && messages) { - const zmq::message_t & msg = messages[1]; - string message((const char *)msg.data(), - ((const char *)msg.data()) + msg.size()); - string expected("test" + to_string(recvMsgs)); - ExcAssertEqual(message, expected); - recvMsgs++; - if (recvMsgs == sendMsgs) { - futex_wake(recvMsgs); - } - }; - server.rawMessageHandler = onServerMessage; - server.bindTcp(); - mainLoop.addSource("server", server); - - proxies->config->dump(cerr); - - ZmqNamedProxy client(proxies->zmqContext); - client.init(proxies->config, ZMQ_XREQ, "client"); - mainLoop.addSource("client", client); - - client.connect("server"); - - cerr << "awaiting connection\n"; - while (!client.isConnected()) { - ML::sleep(0.1); - } - - mainLoop.start(); - cerr << "connected and sending\n"; - - gettimeofday(&start, NULL); - - for (int i = 0; i < NbrMsgs; i++) { - client.sendMessage("test" + to_string(sendMsgs)); - sendMsgs++; - } - - while (recvMsgs < sendMsgs) { - // cerr << "awaiting end of messages: " << recvMsgs << "\n"; - ML::futex_wait(recvMsgs, recvMsgs); - } - cerr << "zmq test: received messages: " << recvMsgs << "\n"; - - gettimeofday(&end, NULL); - - int delta_sec = (end.tv_sec - start.tv_sec); - if (start.tv_usec > end.tv_usec) { - delta_sec--; - end.tv_usec += 1000000; - } - printf ("delta: %d.%.6ld secs\n", delta_sec, (end.tv_usec - start.tv_usec)); -} -#endif - -#if 1 -#include "tcpsockets.h" - -BOOST_AUTO_TEST_CASE( test_unix_tcp ) -{ - auto proxies = make_shared(); - MessageLoop mainLoop; - int recvMsgs(0), sendMsgs(0); - struct timeval start, end; - - TcpNamedEndpoint server; - server.init(proxies->config, "server"); - auto onServerMessage = [&] (const string & message) { - // cerr << "received tcp message: " << message << endl; - string expected("test" + to_string(recvMsgs)); - ExcAssertEqual(message, expected); - recvMsgs++; - if (recvMsgs == sendMsgs) { - futex_wake(recvMsgs); - } - }; - server.onMessage_ = onServerMessage; - server.bindTcp(9876); - ML::sleep(1); - - TcpNamedProxy client; - client.init(proxies->config); - - mainLoop.addSource("server", server); - mainLoop.addSource("client", client); - mainLoop.start(); - - client.connectTo("127.0.0.1", 9876); - - cerr << "awaiting connection\n"; - while (!client.isConnected()) { - ML::sleep(0.1); - } - cerr << "connected and sending\n"; - - gettimeofday(&start, NULL); - - for (sendMsgs = 0; sendMsgs < NbrMsgs;) { - if (client.sendMessage("test" + to_string(sendMsgs))) { - sendMsgs++; - } - // else { - // ML::sleep(0.1); - // } - } - - cerr << "sent " << sendMsgs << " messages\n"; - while (recvMsgs < sendMsgs) { - // cerr << "awaiting end of messages: " << recvMsgs << "\n"; - ML::futex_wait(recvMsgs, recvMsgs); - } - cerr << "tcp test: received messages: " << recvMsgs << "\n"; - - gettimeofday(&end, NULL); - - int delta_sec = (end.tv_sec - start.tv_sec); - if (start.tv_usec > end.tv_usec) { - delta_sec--; - end.tv_usec += 1000000; - } - printf ("delta: %d.%.6ld secs\n", delta_sec, (end.tv_usec - start.tv_usec)); -} -#endif diff --git a/service/zmq.hpp b/service/zmq.hpp deleted file mode 100644 index 44748a00..00000000 --- a/service/zmq.hpp +++ /dev/null @@ -1,416 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_HPP_INCLUDED__ -#define __ZMQ_HPP_INCLUDED__ - -#include "zmq.h" - -#include -#include -#include -#include -#include -#include - -#include -#include -#include "jml/utils/exc_assert.h" - -namespace zmq -{ - - typedef zmq_free_fn free_fn; - typedef zmq_pollitem_t pollitem_t; - - class socket_t; - - class error_t : public std::exception - { - public: - - error_t () : errnum (zmq_errno ()) {} - - virtual const char *what () const throw () - { - return zmq_strerror (errnum); - } - - int num () const - { - return errnum; - } - - private: - - int errnum; - }; - - inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) - { - int rc = zmq_poll (items_, nitems_, timeout_); - if (rc < 0) - throw error_t (); - return rc; - } - - inline void device (int device_, void * insocket_, void* outsocket_) - { - int rc = zmq_device (device_, insocket_, outsocket_); - if (rc != 0) - throw error_t (); - } - - class message_t : public zmq_msg_t - { - friend class socket_t; - - public: - - inline message_t () - { - int rc = zmq_msg_init (this); - if (rc != 0) - throw error_t (); - } - - inline message_t (size_t size_) - { - int rc = zmq_msg_init_size (this, size_); - if (rc != 0) - throw error_t (); - } - - inline message_t (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); - } - - inline message_t (const std::string & s) - { - int rc = zmq_msg_init_size (this, s.size()); - if (rc != 0) - throw error_t (); - std::copy(s.begin(), s.end(), (char *)data()); - } - - inline message_t (const message_t & other) noexcept - { - int rc = zmq_msg_init (this); - if (rc == 0) - rc = zmq_msg_copy(this, const_cast(&other)); - if (rc != 0) - throw error_t (); - } - - inline message_t (message_t && other) noexcept - { - int rc = zmq_msg_init (this); - if (rc == 0) - rc = zmq_msg_move(this, &other); - if (rc != 0) - throw error_t (); - } - - inline message_t & operator = (const message_t & other) noexcept - { - int rc = zmq_msg_copy(this, const_cast(&other)); - if (rc != 0) - throw error_t (); - return *this; - } - - inline message_t & operator = (message_t && other) noexcept - { - int rc = zmq_msg_move(this, &other); - if (rc != 0) - throw error_t (); - return *this; - } - - inline ~message_t () - { - int rc = zmq_msg_close (this); - assert (rc == 0); - (void)rc; - } - - inline void rebuild () - { - int rc = zmq_msg_close (this); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init (this); - if (rc != 0) - throw error_t (); - } - - inline void rebuild (size_t size_) - { - int rc = zmq_msg_close (this); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_size (this, size_); - if (rc != 0) - throw error_t (); - } - - inline void rebuild (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_close (this); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); - } - - inline void move (message_t *msg_) - { - int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); - if (rc != 0) - throw error_t (); - } - - inline void copy (message_t *msg_) - { - int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); - if (rc != 0) - throw error_t (); - } - - inline char *data () - { - return (char *)zmq_msg_data (this); - } - - inline const char *data () const - { - return (const char *)zmq_msg_data (const_cast(this)); - } - - inline size_t size () const - { - return zmq_msg_size (const_cast(this)); - } - - std::string toString() const - { - return std::string(data(), data() + size()); - } - - private: - - // Disable implicit message copying, so that users won't use shared - // messages (less efficient) without being aware of the fact. - //message_t (const message_t&); - //void operator = (const message_t&); - }; - - class context_t - { - friend class socket_t; - - public: - - inline context_t (int io_threads_) - { - ptr = zmq_init (io_threads_); - if (ptr == NULL) - throw error_t (); - } - - inline ~context_t () - { - // If this throws, it means that you forgot to destroy a zmq_socket_t object - // created with this context before you closed the context. - ExcAssertEqual(sockets.size(), 0); - - int rc = zmq_term (ptr); - assert (rc == 0); - (void)rc; - } - - // Be careful with this, it's probably only useful for - // using the C api together with an existing C++ api. - // Normally you should never need to use this. - inline operator void* () - { - return ptr; - } - - void registerSocket(socket_t * sock) - { - std::unique_lock guard(lock); - ExcAssertEqual(sockets.count(sock), 0); - sockets.insert(sock); - } - - void unregisterSocket(socket_t * sock) - { - std::unique_lock guard(lock); - ExcAssertEqual(sockets.count(sock), 1); - sockets.erase(sock); - } - - private: - - void *ptr; - - context_t (const context_t&); - void operator = (const context_t&); - - std::mutex lock; - std::set sockets; - }; - - class socket_t - { - public: - context_t * context_; - - inline socket_t (context_t &context_, int type_) - : context_(&context_) - { - ptr = zmq_socket (context_.ptr, type_); - if (ptr == NULL) - throw error_t (); - - context_.registerSocket(this); - } - - inline ~socket_t () - { - int linger = 0; - setsockopt (ZMQ_LINGER, &linger, sizeof(linger)); - int rc = zmq_close (ptr); - assert (rc == 0); - (void)rc; - - context_->unregisterSocket(this); - } - - inline operator void* () - { - return ptr; - } - - inline void setsockopt (int option_, const void *optval_, - size_t optvallen_) - { - int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } - - inline void getsockopt (int option_, void *optval_, - size_t *optvallen_) - { - int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } - - inline void bind (const std::string & addr_) - { - int rc = zmq_bind (ptr, addr_.c_str()); - if (rc != 0) - throw error_t (); - } - - inline void unbind (const std::string & addr_) - { - int rc = zmq_unbind (ptr, addr_.c_str()); - if (rc != 0) - throw error_t (); - } - - inline int tryUnbind (const std::string & addr_) - { - int rc = zmq_unbind (ptr, addr_.c_str()); - return rc; - } - - inline void connect (const std::string & addr_) - { - int rc = zmq_connect (ptr, addr_.c_str()); - if (rc != 0) - throw error_t (); - } - - inline void disconnect (const std::string & addr_) - { - int rc = zmq_disconnect (ptr, addr_.c_str()); - if (rc != 0) - throw error_t (); - } - - inline int tryDisconnect (const std::string & addr_) - { - return zmq_disconnect (ptr, addr_.c_str()); - } - - inline bool send (message_t &msg_, int flags_ = 0) - { - int rc = zmq_sendmsg (ptr, &msg_, flags_); - if (rc >= 0) - return true; - if (rc == -1 && zmq_errno () == EAGAIN) - return false; - throw error_t (); - } - - inline bool send (message_t && msg_, int flags_ = 0) - { - int rc = zmq_sendmsg (ptr, &msg_, flags_); - if (rc >= 0) - return true; - if (rc == -1 && zmq_errno () == EAGAIN) - return false; - throw error_t (); - } - - inline bool recv (message_t *msg_, int flags_ = 0) - { - int rc = zmq_recvmsg (ptr, msg_, flags_); - if (rc >= 0) - return true; - if (rc == -1 && zmq_errno () == EAGAIN) - return false; - throw error_t (); - } - - private: - - void *ptr; - - socket_t (const socket_t&); - socket_t (socket_t&&); - void operator = (const socket_t&); - void operator = (socket_t&&); - }; - -} - -#endif diff --git a/service/zmq_endpoint.cc b/service/zmq_endpoint.cc deleted file mode 100644 index 4e300a16..00000000 --- a/service/zmq_endpoint.cc +++ /dev/null @@ -1,741 +0,0 @@ -/* zmq_endpoint.cc - Jeremy Barnes, 9 November 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - -*/ - -#include "zmq_endpoint.h" -#include "jml/utils/smart_ptr_utils.h" -#include -#include -#include "jml/arch/timers.h" -#include "jml/arch/info.h" - -using namespace std; - - -namespace Datacratic { - -/******************************************************************************/ -/* ZMQ LOGS */ -/******************************************************************************/ - -Logging::Category ZmqLogs::print("ZMQ"); -Logging::Category ZmqLogs::error("ZMQ Error", ZmqLogs::print); -Logging::Category ZmqLogs::trace("ZMQ Trace", ZmqLogs::print); - - - -/*****************************************************************************/ -/* ZMQ EVENT SOURCE */ -/*****************************************************************************/ - -ZmqEventSource:: -ZmqEventSource() - : socket_(0), socketLock_(nullptr) -{ - needsPoll = false; -} - -ZmqEventSource:: -ZmqEventSource(zmq::socket_t & socket, SocketLock * socketLock) - : socket_(&socket), socketLock_(socketLock) -{ - needsPoll = false; - updateEvents(); -} - -void -ZmqEventSource:: -init(zmq::socket_t & socket, SocketLock * socketLock) -{ - socket_ = &socket; - socketLock_ = socketLock; - needsPoll = false; - updateEvents(); -} - -int -ZmqEventSource:: -selectFd() const -{ - int res = -1; - size_t resSize = sizeof(int); - socket().getsockopt(ZMQ_FD, &res, &resSize); - if (res == -1) - THROW(ZmqLogs::error) << "no fd for zeromq socket" << endl; - return res; -} - -bool -ZmqEventSource:: -poll() const -{ - if (currentEvents & ZMQ_POLLIN) - return true; - - std::unique_lock guard; - - if (socketLock_) - guard = std::unique_lock(*socketLock_); - - updateEvents(); - - return currentEvents & ZMQ_POLLIN; -} - -void -ZmqEventSource:: -updateEvents() const -{ - size_t events_size = sizeof(currentEvents); - socket().getsockopt(ZMQ_EVENTS, ¤tEvents, &events_size); -} - -bool -ZmqEventSource:: -processOne() -{ - using namespace std; - if (debug_) - cerr << "called processOne on " << this << ", poll = " << poll() << endl; - - if (!poll()) - return false; - - std::vector msg; - - // We process all events, as otherwise the select fd can't be guaranteed to wake us up - for (;;) { - { - std::unique_lock guard; - if (socketLock_) - guard = std::unique_lock(*socketLock_); - - msg = recvAllNonBlocking(socket()); - - if (msg.empty()) { - if (currentEvents & ZMQ_POLLIN) - throw ML::Exception("empty message with currentEvents"); - return false; // no more events - } - - updateEvents(); - } - - if (debug_) - cerr << "got message of length " << msg.size() << endl; - handleMessage(msg); - } - - return currentEvents & ZMQ_POLLIN; -} - -void -ZmqEventSource:: -handleMessage(const std::vector & message) -{ - if (asyncMessageHandler) { - asyncMessageHandler(message); - return; - } - - auto reply = handleSyncMessage(message); - if (!reply.empty()) { - sendAll(socket(), reply); - } -} - -std::vector -ZmqEventSource:: -handleSyncMessage(const std::vector & message) -{ - if (!syncMessageHandler) - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - - return syncMessageHandler(message); -} - - -/*****************************************************************************/ -/* NAMED ZEROMQ ENDPOINT */ -/*****************************************************************************/ - -ZmqNamedEndpoint:: -ZmqNamedEndpoint(std::shared_ptr context) - : context_(context) -{ -} - -void -ZmqNamedEndpoint:: -init(std::shared_ptr config, - int socketType, - const std::string & endpointName) -{ - NamedEndpoint::init(config, endpointName); - this->socketType = socketType; - this->socket_.reset(new zmq::socket_t(*context_, socketType)); - setHwm(*socket_, 65536); - - addSource("ZmqNamedEndpoint::socket", - std::make_shared - (*socket_, [=] (std::vector && message) - { - handleRawMessage(std::move(message)); - })); -} - -std::string -ZmqNamedEndpoint:: -bindTcp(PortRange const & portRange, std::string host) -{ - std::unique_lock guard(lock); - - if (!socket_) - THROW(ZmqLogs::error) << "bind called before init" << std::endl; - - using namespace std; - - if (host == "") - host = "*"; - - int port = bindAndReturnOpenTcpPort(*socket_, portRange, host); - - auto getUri = [&] (const std::string & host) - { - return "tcp://" + host + ":" + to_string(port); - }; - - Json::Value config; - - auto addEntry = [&] (const std::string& addr, - const std::string& hostScope) - { - std::string uri; - - if(hostScope != "*") { - uri = "tcp://" + addr + ":" + to_string(port); - } - else { - uri = "tcp://" + ML::fqdn_hostname(to_string(port)) + ":" + to_string(port); - } - - Json::Value & entry = config[config.size()]; - entry["zmqConnectUri"] = uri; - - Json::Value & transports = entry["transports"]; - transports[0]["name"] = "tcp"; - transports[0]["addr"] = addr; - transports[0]["hostScope"] = hostScope; - transports[0]["port"] = port; - transports[1]["name"] = "zeromq"; - transports[1]["socketType"] = socketType; - transports[1]["uri"] = uri; - }; - - if (host == "*") { - auto interfaces = getInterfaces({AF_INET}); - for (unsigned i = 0; i < interfaces.size(); ++i) { - addEntry(interfaces[i].addr, interfaces[i].hostScope); - } - publishAddress("tcp", config); - return getUri(host); - } - else { - string host2 = addrToIp(host); - // TODO: compute the host scope; don't just assume "*" - addEntry(host2, "*"); - publishAddress("tcp", config); - return getUri(host2); - } -} - - - -/*****************************************************************************/ -/* NAMED ZEROMQ PROXY */ -/*****************************************************************************/ - -ZmqNamedProxy:: -ZmqNamedProxy() : - context_(new zmq::context_t(1)), - local(true), - shardIndex(-1) -{ -} - -ZmqNamedProxy:: -ZmqNamedProxy(std::shared_ptr context, int shardIndex) : - context_(context), - local(true), - shardIndex(shardIndex) -{ -} - -void -ZmqNamedProxy:: -init(std::shared_ptr config, - int socketType, - const std::string & identity) -{ - this->connectionType = NO_CONNECTION; - this->connectionState = NOT_CONNECTED; - - this->config = config; - socket_.reset(new zmq::socket_t(*context_, socketType)); - if (identity != "") - setIdentity(*socket_, identity); - setHwm(*socket_, 65536); - - serviceWatch.init(std::bind(&ZmqNamedProxy::onServiceNodeChange, - this, - std::placeholders::_1, - std::placeholders::_2)); - - endpointWatch.init(std::bind(&ZmqNamedProxy::onEndpointNodeChange, - this, - std::placeholders::_1, - std::placeholders::_2)); -} - -bool -ZmqNamedProxy:: -connect(const std::string & endpointName, - ConnectionStyle style) -{ - if (!config) { - THROW(ZmqLogs::error) - << "attempt to connect to " << endpointName - << " without calling init()" << endl; - } - - if (connectionState == CONNECTED) - THROW(ZmqLogs::error) << "already connected" << endl; - - this->connectedService = endpointName; - if (connectionType == NO_CONNECTION) - connectionType = CONNECT_DIRECT; - - LOG(ZmqLogs::print) << "connecting to " << endpointName << endl; - - vector children - = config->getChildren(endpointName, endpointWatch); - - auto setPending = [&] - { - std::lock_guard guard(socketLock_); - - if (connectionState == NOT_CONNECTED) - connectionState = CONNECTION_PENDING; - }; - - for (auto c: children) { - ExcAssertNotEqual(connectionState, CONNECTED); - string key = endpointName + "/" + c; - Json::Value epConfig = config->getJson(key); - - for (auto & entry: epConfig) { - - if (!entry.isMember("zmqConnectUri")) - return true; - - string uri = entry["zmqConnectUri"].asString(); - - auto hs = entry["transports"][0]["hostScope"]; - if (!hs) - continue; - - string hostScope = hs.asString(); - if (hs != "*") { - utsname name; - if (uname(&name)) { - THROW(ZmqLogs::error) - << "uname error: " << strerror(errno) << std::endl; - } - if (hostScope != name.nodename) - continue; // wrong host scope - } - - { - std::lock_guard guard(socketLock_); - socket().connect(uri.c_str()); - connectedUri = uri; - connectionState = CONNECTED; - } - - LOG(ZmqLogs::print) << "connected to " << uri << endl; - onConnect(uri); - return true; - } - - setPending(); - return false; - } - - if (style == CS_MUST_SUCCEED && connectionState != CONNECTED) { - THROW(ZmqLogs::error) - << "couldn't connect to any services of class " << serviceClass - << endl; - } - - setPending(); - return connectionState == CONNECTED; -} - -bool -ZmqNamedProxy:: -connectToServiceClass(const std::string & serviceClass, - const std::string & endpointName, - bool local_, - ConnectionStyle style) -{ - local = local_; - - // TODO: exception safety... if we bail don't screw around the auction - ExcAssertNotEqual(connectionType, CONNECT_DIRECT); - ExcAssertNotEqual(serviceClass, ""); - ExcAssertNotEqual(endpointName, ""); - - this->serviceClass = serviceClass; - this->endpointName = endpointName; - - if (connectionType == NO_CONNECTION) - connectionType = CONNECT_TO_CLASS; - - if (!config) { - THROW(ZmqLogs::error) - << "attempt to connect to " << endpointName - << " without calling init()" << endl; - } - - if (connectionState == CONNECTED) - THROW(ZmqLogs::error) << "attempt to double connect connection" << endl; - - vector children - = config->getChildren("serviceClass/" + serviceClass, serviceWatch); - - for (auto c: children) { - string key = "serviceClass/" + serviceClass + "/" + c; - Json::Value value = config->getJson(key); - std::string name = value["serviceName"].asString(); - std::string path = value["servicePath"].asString(); - - std::string location = value["serviceLocation"].asString(); - if (local && location != config->currentLocation) { - LOG(ZmqLogs::trace) - << path << " / " << name << " dropped while connecting to " - << serviceClass << "/" << endpointName - << "(" << location << " != " << config->currentLocation << ")" - << std::endl; - continue; - } - - bool ok = connect( - path + "/" + endpointName, - style == CS_ASYNCHRONOUS ? CS_ASYNCHRONOUS : CS_SYNCHRONOUS); - if (!ok) continue; - - shardIndex = value.get("shardIndex", -1).asInt(); - return true; - } - - if (style == CS_MUST_SUCCEED && connectionState != CONNECTED) { - THROW(ZmqLogs::error) - << "couldn't connect to any services of class " << serviceClass - << endl; - } - - { - std::lock_guard guard(socketLock_); - - if (connectionState == NOT_CONNECTED) - connectionState = CONNECTION_PENDING; - } - - return connectionState == CONNECTED; -} - -void -ZmqNamedProxy:: -onServiceNodeChange(const std::string & path, - ConfigurationService::ChangeType change) -{ - if (connectionState != CONNECTION_PENDING) - return; // no need to watch anymore - - connectToServiceClass(serviceClass, endpointName, local, CS_ASYNCHRONOUS); -} - -void -ZmqNamedProxy:: -onEndpointNodeChange(const std::string & path, - ConfigurationService::ChangeType change) -{ - if (connectionState != CONNECTION_PENDING) - return; // no need to watch anymore - - connect(connectedService, CS_ASYNCHRONOUS); -} - - -/******************************************************************************/ -/* ZMQ MULTIPLE NAMED CLIENT BUS PROXY */ -/******************************************************************************/ - -void -ZmqMultipleNamedClientBusProxy:: -connectAllServiceProviders(const std::string & serviceClass, - const std::string & endpointName, - bool local) -{ - if (connected) { - THROW(ZmqLogs::error) - << "already connected to " - << serviceClass << " / " << endpointName - << std::endl; - } - - this->serviceClass = serviceClass; - this->endpointName = endpointName; - - serviceProvidersWatch.init([=] (const std::string & path, - ConfigurationService::ChangeType change) - { - ++changesCount[change]; - onServiceProvidersChanged("serviceClass/" + serviceClass, local); - }); - - onServiceProvidersChanged("serviceClass/" + serviceClass, local); - connected = true; -} - -/** Zookeeper makes the onServiceProvidersChanged calls re-entrant which is - annoying to deal with. Instead, when a re-entrant call is detected we defer - the call until we're done with the original call. - */ -bool -ZmqMultipleNamedClientBusProxy:: -enterProvidersChanged(const std::string& path, bool local) -{ - std::lock_guard guard(providersChangedLock); - - if (!inProvidersChanged) { - inProvidersChanged = true; - return true; - } - - LOG(ZmqLogs::trace) - << "defering providers changed for " << path << std::endl; - - deferedProvidersChanges.emplace_back(path, local); - return false; -} - -void -ZmqMultipleNamedClientBusProxy:: -exitProvidersChanged() -{ - std::vector defered; - - { - std::lock_guard guard(providersChangedLock); - - defered = std::move(deferedProvidersChanges); - inProvidersChanged = false; - } - - for (const auto& item : defered) - onServiceProvidersChanged(item.first, item.second); -} - -void -ZmqMultipleNamedClientBusProxy:: -onServiceProvidersChanged(const std::string & path, bool local) -{ - if (!enterProvidersChanged(path, local)) return; - - // The list of service providers has changed - - std::vector children - = config->getChildren(path, serviceProvidersWatch); - - for (auto c: children) { - Json::Value value = config->getJson(path + "/" + c); - std::string name = value["serviceName"].asString(); - std::string path = value["servicePath"].asString(); - int shardIndex = value.get("shardIndex", -1).asInt(); - - std::string location = value["serviceLocation"].asString(); - if (local && location != config->currentLocation) { - LOG(ZmqLogs::trace) - << path << " / " << name << " dropped (" - << location << " != " << config->currentLocation << ")" - << std::endl; - continue; - } - - watchServiceProvider(name, path, shardIndex); - } - - // deleting the connection could trigger a callback which is a bad idea - // while we're holding the connections lock. So instead we move all the - // connections to be deleted to a temp map which we'll wipe once the - // lock is released. - ConnectionMap pendingDisconnects; - { - std::unique_lock guard(connectionsLock); - - // Services that are no longer in zookeeper are considered to be - // disconnected so remove them from our connection map. - for (auto& conn : connections) { - auto it = find(children.begin(), children.end(), conn.first); - if (it != children.end()) continue; - - // Erasing from connections in this loop would invalidate our - // iterator so defer until we're done with the connections map. - removeSource(conn.second.get()); - pendingDisconnects[conn.first] = std::move(conn.second); - } - - for (const auto& conn : pendingDisconnects) - connections.erase(conn.first); - } - // We're no longer holding the lock so any delayed. Time to really - // disconnect and trigger the callbacks. - pendingDisconnects.clear(); - - exitProvidersChanged(); -} - - -/** Encapsulates a lock-free state machine that manages the logic of the on - config callback. The problem being solved is that the onConnect callback - should not call the user's callback while we're holding the connection - lock but should do it when the lock is released. - - The lock-free state machine guarantees that no callbacks are lost and - that no callbacks will be triggered before a call to release is made. - The overhead of this class amounts to at most 2 CAS when the callback is - triggered; one if there's no contention. - - Note that this isn't an ideal solution to this problem because we really - should get rid of the locks when manipulating these events. - - \todo could be generalized if we need this pattern elsewhere. -*/ -struct ZmqMultipleNamedClientBusProxy::OnConnectCallback -{ - OnConnectCallback(const ConnectionHandler& fn, std::string name) : - fn(fn), name(name), state(DEFER) - {} - - /** Should ONLY be called AFTER the lock is released. */ - void release() - { - State old = state; - - ExcAssertNotEqual(old, CALL); - - // If the callback wasn't triggered while we were holding the lock - // then trigger it the next time we see it. - if (old == DEFER && ML::cmp_xchg(state, old, CALL)) return; - - ExcAssertEqual(old, DEFERRED); - fn(name); - } - - void operator() (std::string blah) - { - State old = state; - ExcAssertNotEqual(old, DEFERRED); - - // If we're still in the locked section then trigger the callback - // when release is called. - if (old == DEFER && ML::cmp_xchg(state, old, DEFERRED)) return; - - // We're out of the locked section so just trigger the callback. - ExcAssertEqual(old, CALL); - fn(name); - } - -private: - - ConnectionHandler fn; - std::string name; - - enum State { - DEFER, // We're holding the lock so defer an incoming callback. - DEFERRED, // We were called while holding the lock. - CALL // We were not called while holding the lock. - } state; -}; - - -void -ZmqMultipleNamedClientBusProxy:: -watchServiceProvider(const std::string & name, const std::string & path, int shardIndex) -{ - // Protects the connections map... I think. - std::unique_lock guard(connectionsLock); - - auto & c = connections[name]; - - // already connected - if (c) { - LOG(ZmqLogs::trace) - << path << " / " << name << " is already connected" - << std::endl; - return; - } - - LOG(ZmqLogs::trace) - << "connecting to " << path << " / " << name << std::endl; - - try { - auto newClient = std::make_shared(zmqContext, shardIndex); - newClient->init(config, identity); - - // The connect call below could trigger this callback while we're - // holding the connectionsLock which is a big no-no. This fancy - // wrapper ensures that it's only called after we call its release - // function. - if (connectHandler) - newClient->connectHandler = OnConnectCallback(connectHandler, name); - - newClient->disconnectHandler = [=] (std::string s) - { - // TODO: chain in so that we know it's not around any more - this->onDisconnect(s); - }; - - newClient->connect(path + "/" + endpointName); - newClient->messageHandler = [=] (const std::vector & msg) - { - this->handleMessage(name, msg); - }; - - c = std::move(newClient); - - // Add it to our message loop so that it can process messages - addSource("ZmqMultipleNamedClientBusProxy child " + name, c); - - guard.unlock(); - if (connectHandler) - c->connectHandler.target()->release(); - - } catch (...) { - // Avoid triggering the disconnect callbacks while holding the - // connectionsLock by defering the delete of the connection until - // we've manually released the lock. - ConnectionMap::mapped_type conn(std::move(connections[name])); - connections.erase(name); - - guard.unlock(); - // conn is a unique_ptr so it gets destroyed here. - throw; - } -} - - -} // namespace Datacratic diff --git a/service/zmq_endpoint.h b/service/zmq_endpoint.h deleted file mode 100644 index 8bf9e91b..00000000 --- a/service/zmq_endpoint.h +++ /dev/null @@ -1,1267 +0,0 @@ -/* zmq_endpoint.h -*- C++ -*- - Jeremy Barnes, 25 September 2012 - Copyright (c) 2012 Datacratic Inc. All rights reserved. - - Endpoints for zeromq. -*/ - -#ifndef __service__zmq_endpoint_h__ -#define __service__zmq_endpoint_h__ - -#include "named_endpoint.h" -#include "message_loop.h" -#include "logs.h" -#include -#include -#include "jml/utils/smart_ptr_utils.h" -#include "jml/utils/vector_utils.h" -#include -#include "jml/arch/backtrace.h" -#include "jml/arch/timers.h" -#include "jml/arch/cmp_xchg.h" -#include "zmq_utils.h" - -namespace Datacratic { - - -/******************************************************************************/ -/* ZMQ LOGS */ -/******************************************************************************/ - -struct ZmqLogs -{ - static Logging::Category print; - static Logging::Category error; - static Logging::Category trace; -}; - - -/*****************************************************************************/ -/* ZMQ EVENT SOURCE */ -/*****************************************************************************/ - -/** Adaptor that allows any zeromq socket to hook into an event loop. */ - -struct ZmqEventSource : public AsyncEventSource { - - typedef std::function)> - AsyncMessageHandler; - AsyncMessageHandler asyncMessageHandler; - - typedef std::function (std::vector)> - SyncMessageHandler; - SyncMessageHandler syncMessageHandler; - - typedef std::mutex SocketLock; - - ZmqEventSource(); - - ZmqEventSource(zmq::socket_t & socket, SocketLock * lock = nullptr); - - /** Construct the event source from a function object that returns - something that is not convertible to a std::vector. - This will cause the asynchronous message handler to be replaced - by the passed function. - */ - template - ZmqEventSource(zmq::socket_t & socket, - const T & handler, - SocketLock * lock = nullptr, - typename std::enable_if()(std::declval >())), - std::vector >::value, void>::type * = 0) - : asyncMessageHandler(handler) - { - init(socket, lock); - } - - /** Construct the event source from a function object that returns a - std::vector. This will cause the synchronous message - handler to be replaced by the passed function. - */ - template - ZmqEventSource(zmq::socket_t & socket, - const T & handler, - SocketLock * lock = nullptr, - typename std::enable_if()(std::declval >())), - std::vector >::value, void>::type * = 0) - : syncMessageHandler(handler) - { - init(socket, lock); - } - - void init(zmq::socket_t & socket, SocketLock * lock = nullptr); - - virtual int selectFd() const; - - virtual bool poll() const; - - virtual bool processOne(); - - /** Handle a message. The default implementation will call - syncMessageHandler if it is defined; otherwise it calls - handleSyncMessage and writes back the response to the socket. - */ - virtual void handleMessage(const std::vector & message); - - /** Handle a message and write a synchronous response. This will forward - to asyncMessageHandler if defined, or otherwise throw an exception. - */ - virtual std::vector - handleSyncMessage(const std::vector & message); - - zmq::socket_t & socket() const - { - ExcAssert(socket_); - return *socket_; - } - - SocketLock * socketLock() const - { - return socketLock_; - } - -protected: - zmq::socket_t * socket_; - - SocketLock * socketLock_; - - /** Update the current cached event mask. Note that this requires that the - socketLock be taken if non-null. - */ - void updateEvents() const; - - /// Mask of current events that are pending on the socket. - mutable int currentEvents; -}; - - -/*****************************************************************************/ -/* ZMQ BINARY EVENT SOURCE */ -/*****************************************************************************/ - -/** Adaptor that allows any zeromq socket to hook into an event loop. */ - -struct ZmqBinaryEventSource : public AsyncEventSource { - - typedef std::function &&)> - MessageHandler; - MessageHandler messageHandler; - - ZmqBinaryEventSource() - : socket_(0) - { - needsPoll = true; - } - - ZmqBinaryEventSource(zmq::socket_t & socket, - MessageHandler messageHandler = MessageHandler()) - : messageHandler(std::move(messageHandler)), - socket_(&socket) - { - needsPoll = true; - } - - void init(zmq::socket_t & socket) - { - socket_ = &socket; - needsPoll = true; - } - - virtual int selectFd() const - { - int res = -1; - size_t resSize = sizeof(int); - socket().getsockopt(ZMQ_FD, &res, &resSize); - if (res == -1) - THROW(ZmqLogs::error) << "no fd for zeromq socket" << std::endl; - return res; - } - - virtual bool poll() const - { - return getEvents(socket()).first; - } - - virtual bool processOne() - { - ExcAssert(socket_); - - std::vector messages; - - int64_t more = 1; - size_t more_size = sizeof (more); - - while (more) { - zmq::message_t message; - bool got = socket_->recv(&message, messages.empty() ? ZMQ_NOBLOCK: 0); - if (!got) return false; // no first part available - messages.emplace_back(std::move(message)); - socket_->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } - - handleMessage(std::move(messages)); - - return poll(); - } - - /** Handle a message. The default implementation will call - syncMessageHandler if it is defined; otherwise it calls - handleSyncMessage and writes back the response to the socket. - */ - virtual void handleMessage(std::vector && message) - { - if (messageHandler) - messageHandler(std::move(message)); - else { - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - } - - zmq::socket_t & socket() const - { - ExcAssert(socket_); - return *socket_; - } - - zmq::socket_t * socket_; - -}; - - -/*****************************************************************************/ -/* ZMQ BINARY TYPED EVENT SOURCE */ -/*****************************************************************************/ - -/** An adaptor that is used to deal with a zeromq connection that sends - length one messages of a binary data structure. -*/ - -template -struct ZmqBinaryTypedEventSource: public AsyncEventSource { - - typedef std::function MessageHandler; - MessageHandler messageHandler; - - ZmqBinaryTypedEventSource() - : socket_(0) - { - needsPoll = true; - } - - ZmqBinaryTypedEventSource(zmq::socket_t & socket, - MessageHandler messageHandler = MessageHandler()) - : messageHandler(std::move(messageHandler)), - socket_(&socket) - { - needsPoll = true; - } - - void init(zmq::socket_t & socket) - { - socket_ = &socket; - needsPoll = true; - } - - virtual int selectFd() const - { - int res = -1; - size_t resSize = sizeof(int); - socket().getsockopt(ZMQ_FD, &res, &resSize); - if (res == -1) - THROW(ZmqLogs::error) << "no fd for zeromq socket" << std::endl; - return res; - } - - virtual bool poll() const - { - return getEvents(socket()).first; - } - - virtual bool processOne() - { - zmq::message_t message; - ExcAssert(socket_); - bool got = socket_->recv(&message, ZMQ_NOBLOCK); - if (!got) return false; - - handleMessage(* reinterpret_cast(message.data())); - - return poll(); - } - - virtual void handleMessage(const Arg & arg) - { - if (messageHandler) - messageHandler(arg); - else { - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - } - - zmq::socket_t & socket() const - { - ExcAssert(socket_); - return *socket_; - } - - zmq::socket_t * socket_; -}; - - -/*****************************************************************************/ -/* ZMQ TYPED EVENT SOURCE */ -/*****************************************************************************/ - -/** A sink that listens for zeromq messages of the format: - - 1. address (optional) - 2. message kind - 3. message type - 4. binary payload - - and calls a callback on the decoded message. -*/ - -template -struct ZmqTypedEventSource: public ZmqEventSource { - typedef std::function - OnMessage; - - OnMessage onMessage; - bool routable; - std::string messageTopic; - - ZmqTypedEventSource() - { - } - - ZmqTypedEventSource(zmq::socket_t & socket, - bool routable, - const std::string & messageTopic) - { - init(routable, messageTopic); - } - - void init(zmq::socket_t & socket, - bool routable, - const std::string messageTopic) - { - this->routable = routable; - this->messageTopic = messageTopic; - } - - virtual void handleMessage(const std::vector & message) - { - int expectedSize = routable + 2; - if (message.size() != expectedSize) { - THROW(ZmqLogs::error) << "unexpected message size: " - << "expected=" << expectedSize << ", got=" << message.size() - << std::endl; - } - - int i = routable; - if (message[i + 1] != messageTopic) { - THROW(ZmqLogs::error) << "unexpected message kind: " - << "expected=" << message[i + 1] << ", got=" << messageTopic - << std::endl; - } - - std::istringstream stream(message[i + 2]); - ML::DB::Store_Reader store(stream); - T result; - store >> result; - - handleTypedMessage(std::move(result), routable ? message[0] : ""); - } - - virtual void handleTypedMessage(T && message, const std::string & address) - { - if (onMessage) - onMessage(message, address); - else { - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - } -}; - - -/*****************************************************************************/ -/* ZEROMQ NAMED ENDPOINT */ -/*****************************************************************************/ - -/** An endpoint that exposes a zeromq interface that is passive (bound to - one or more listening ports) and is published in a configuration service. - - Note that the endpoint may be connected to more than one thing. -*/ - -struct ZmqNamedEndpoint : public NamedEndpoint, public MessageLoop { - - ZmqNamedEndpoint(std::shared_ptr context); - - ~ZmqNamedEndpoint() - { - shutdown(); - } - - void init(std::shared_ptr config, - int socketType, - const std::string & endpointName); - - void shutdown() - { - MessageLoop::shutdown(); - - if (socket_) { - unbindAll(); - socket_.reset(); - } - } - - /** Bind into a tcp port. If the preferred port is not available, it will - scan until it finds one that is. - - Returns the uri to connect to. - */ - std::string bindTcp(PortRange const & portRange = PortRange(), std::string host = ""); - - /** Bind to the given zeromq uri, but don't publish it. */ - void bind(const std::string & address) - { - if (!socket_) - THROW(ZmqLogs::error) << "bind called before init" << std::endl; - - std::unique_lock guard(lock); - socket_->bind(address); - boundAddresses[address]; - } - - /** Unbind to all addresses. */ - void unbindAll() - { - std::unique_lock guard(lock); - ExcAssert(socket_); - for (auto addr: boundAddresses) - socket_->tryUnbind(addr.first); - } - - template - void sendMessage(Args&&... args) - { - using namespace std; - std::unique_lock guard(lock); - ExcAssert(socket_); - Datacratic::sendMessage(*socket_, std::forward(args)...); - } - - void sendMessage(const std::vector & message) - { - using namespace std; - std::unique_lock guard(lock); - ExcAssert(socket_); - Datacratic::sendAll(*socket_, message); - } - - /** Send a raw message on. */ - void sendMessage(std::vector && message) - { - using namespace std; - std::unique_lock guard(lock); - ExcAssert(socket_); - for (unsigned i = 0; i < message.size(); ++i) { - socket_->send(message[i], - i == message.size() - 1 - ? 0 : ZMQ_SNDMORE); - } - } - - /** Very unsafe method as it bypasses all thread safety. */ - zmq::socket_t & getSocketUnsafe() const - { - ExcAssert(socket_); - return *socket_; - } - - typedef std::function &&)> - RawMessageHandler; - RawMessageHandler rawMessageHandler; - - typedef std::function &&)> MessageHandler; - MessageHandler messageHandler; - - /** Handle a message. The default implementation will call - syncMessageHandler if it is defined; otherwise it converts the message - to strings and calls handleMessage. - */ - virtual void handleRawMessage(std::vector && message) - { - if (rawMessageHandler) - rawMessageHandler(std::move(message)); - else { - std::vector msg2; - for (unsigned i = 0; i < message.size(); ++i) { - msg2.push_back(message[i].toString()); - } - handleMessage(std::move(msg2)); - } - } - - virtual void handleMessage(std::vector && message) - { - if (messageHandler) - messageHandler(std::move(message)); - else - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - - typedef std::function - ConnectionEventHandler; - - /** Callback for when we accept an incoming connection. */ - ConnectionEventHandler acceptEventHandler; - - /** Callback for when an incoming connection is closed. */ - ConnectionEventHandler disconnectEventHandler; - - /** Callback for when we are no longer bound to an address. */ - ConnectionEventHandler closeEventHandler; - - /** Callback for when we accept an incoming connection. */ - virtual void handleAcceptEvent(std::string boundAddress) - { - if (acceptEventHandler) - acceptEventHandler(boundAddress); - } - - /** Callback for when an incoming connection is closed. */ - virtual void handleDisconnectEvent(std::string boundAddress) - { - if (disconnectEventHandler) - disconnectEventHandler(boundAddress); - } - - /** Callback for when we are no longer bound to an address. */ - virtual void handleCloseEvent(std::string boundAddress) - { - if (closeEventHandler) - closeEventHandler(boundAddress); - } - - /** Interrogates the number of addresses we're bound to. */ - size_t numBoundAddresses() const - { - std::unique_lock guard(lock); - return boundAddresses.size(); - } - - /** Interrogate the number of connections. If addr is the empty string - then it is over all bound addresses. - */ - size_t numActiveConnections(std::string addr = "") const - { - std::unique_lock guard(lock); - if (addr == "") { - size_t result = 0; - for (auto & addr: boundAddresses) - result += addr.second.connectedFds.size(); - return result; - } - else { - auto it = boundAddresses.find(addr); - if (it == boundAddresses.end()) - return 0; - return it->second.connectedFds.size(); - } - } - -private: - typedef std::mutex Lock; - mutable Lock lock; - - std::shared_ptr context_; - std::shared_ptr socket_; - - struct AddressInfo { - AddressInfo() - : listeningFd(-1) - { - } - - /// File descriptor we're listening on - int listeningFd; - - /// Set of file descriptors we're connected to - std::set connectedFds; - }; - - /// Information for each bound address - std::map boundAddresses; - - /// Zeromq socket type - int socketType; -}; - - -/*****************************************************************************/ -/* ZMQ NAMED CLIENT BUS */ -/*****************************************************************************/ - -/** A named service endpoint that keeps track of the clients that are - connected and will notify on connection and disconnection. -*/ -struct ZmqNamedClientBus: public ZmqNamedEndpoint { - - ZmqNamedClientBus(std::shared_ptr context, - double deadClientDelay = 5.0) - : ZmqNamedEndpoint(context), deadClientDelay(deadClientDelay) - { - } - - void init(std::shared_ptr config, - const std::string & endpointName) - { - ZmqNamedEndpoint::init(config, ZMQ_XREP, endpointName); - addPeriodic("ZmqNamedClientBus::checkClient", 1.0, - [=] (uint64_t v) { this->onCheckClient(v); }); - } - - virtual ~ZmqNamedClientBus() - { - shutdown(); - } - - void shutdown() - { - MessageLoop::shutdown(); - ZmqNamedEndpoint::shutdown(); - } - - /** How long until we decide a client that's not sending a heartbeat is - dead. - */ - double deadClientDelay; - - /** Function called when something connects to the bus */ - std::function onConnection; - - /** Function called when something disconnects from the bus (we can tell - due to timeouts). - */ - std::function onDisconnection; - - - - template - void sendMessage(const std::string & address, - const std::string & topic, - Args&&... args) - { - ZmqNamedEndpoint::sendMessage(address, topic, - std::forward(args)...); - } - - virtual void handleMessage(std::vector && message) - { - using namespace std; - - const std::string & agent = message.at(0); - const std::string & topic = message.at(1); - - if (topic == "HEARTBEAT") { - // Not the first message from the client - auto it = clientInfo.find(agent); - if (it == clientInfo.end()) { - // Disconnection then reconnection - if (onConnection) - onConnection(agent); - it = clientInfo.insert(make_pair(agent, ClientInfo())).first; - } - it->second.lastHeartbeat = Date::now(); - sendMessage(agent, "HEARTBEAT"); - } - - else if (topic == "HELLO") { - // First message from client - auto it = clientInfo.find(agent); - if (it == clientInfo.end()) { - // New connection - if (onConnection) - onConnection(agent); - it = clientInfo.insert(make_pair(agent, ClientInfo())).first; - } - else { - // Client must have disappeared then reappared without us - // noticing. - // Do this disconnection, then the reconnection - if (onDisconnection) - onDisconnection(agent); - if (onConnection) - onConnection(agent); - } - it->second.lastHeartbeat = Date::now(); - sendMessage(agent, "HEARTBEAT"); - } - else { - handleClientMessage(message); - } - - } - - typedef std::function)> - ClientMessageHandler; - ClientMessageHandler clientMessageHandler; - - virtual void handleClientMessage(const std::vector & message) - { - if (clientMessageHandler) - clientMessageHandler(message); - else { - THROW(ZmqLogs::error) - << "no message handler set for client " << message.at(1) - << std::endl; - } - } - -private: - void onCheckClient(uint64_t numEvents) - { - Date now = Date::now(); - Date expiry = now.plusSeconds(-deadClientDelay); - - std::vector deadClients; - - for (auto & c: clientInfo) - if (c.second.lastHeartbeat < expiry) - deadClients.push_back(c.first); - - for (auto d: deadClients) { - if (onDisconnection) - onDisconnection(d); - clientInfo.erase(d); - } - } - - struct ClientInfo { - ClientInfo() - : lastHeartbeat(Date::now()) - { - } - - Date lastHeartbeat; - }; - - std::map clientInfo; -}; - - -/** Flags to modify how we create a connection. */ -enum ConnectionStyle { - CS_ASYNCHRONOUS, ///< Asynchronous; returns true - CS_SYNCHRONOUS, ///< Synchronous; returns state of connection - CS_MUST_SUCCEED ///< Synchronous; returns true or throws -}; - - - - -/*****************************************************************************/ -/* ZEROMQ NAMED PROXY */ -/*****************************************************************************/ - -/** Proxy to connect to a named zeromq-based service. */ - -// THIS SHOULD BE REPLACED BY ZmqNamedSocket - -struct ZmqNamedProxy: public MessageLoop { - - ZmqNamedProxy(); - - ZmqNamedProxy(std::shared_ptr context, int shardIndex = -1); - - ~ZmqNamedProxy() - { - shutdown(); - } - - void shutdown() - { - MessageLoop::shutdown(); - if(socket_) { - std::lock_guard guard(socketLock_); - socket_.reset(); - } - } - - bool isConnected() const { return connectionState == CONNECTED; } - - /** Type of callback for a new connection. */ - typedef std::function ConnectionHandler; - - /** Callback that will be called when we get a new connection if - onConnect() is not overridden. */ - ConnectionHandler connectHandler; - - /** Function that will be called when we make a new connection to a - remote service provider. - */ - virtual void onConnect(const std::string & source) - { - if (connectHandler) - connectHandler(source); - } - - /** Callback that will be called when we get a disconnection if - onDisconnect() is not overridden. */ - ConnectionHandler disconnectHandler; - - /** Function that will be called when we lose a connection to a - remote service provider. - */ - virtual void onDisconnect(const std::string & source) - { - if (disconnectHandler) - disconnectHandler(source); - } - - void init(std::shared_ptr config, - int socketType, - const std::string & identity = ""); - - /** Connect to the given endpoint via zeromq. Returns whether the - connection could be established. - - If synchronous is true, then the method will not return until - the connection is really established. - - If mustSucceed is true, then an exception will be thrown if the - connection cannot be established. - */ - bool connect(const std::string & endpointName, - ConnectionStyle style = CS_ASYNCHRONOUS); - - /** Connect to one of the endpoints that provides the given service. - The endpointName tells which endpoint of the service class that - should be connected to. - - Looks up the service providers in the configuration service. - */ - bool connectToServiceClass(const std::string & serviceClass, - const std::string & endpointName, - bool local = true, - ConnectionStyle style = CS_ASYNCHRONOUS); - - /** Called back when one of our endpoints either changes or disappears. */ - bool onConfigChange(ConfigurationService::ChangeType change, - const std::string & key, - const Json::Value & newValue); - - /** Get the zeromq socket to listen to. */ - zmq::socket_t & socket() const - { - ExcAssert(socket_); - return *socket_; - } - - ZmqEventSource::SocketLock * socketLock() const - { - return &socketLock_; - } - - template - void sendMessage(Args&&... args) - { - std::lock_guard guard(socketLock_); - - ExcCheckNotEqual(connectionState, NOT_CONNECTED, - "sending on an unconnected socket: " + endpointName); - - if (connectionState == CONNECTION_PENDING) { - LOG(ZmqLogs::error) - << "dropping message for " << endpointName << std::endl; - return; - } - - Datacratic::sendMessage(socket(), std::forward(args)...); - } - - void disconnect() - { - if (connectionState == NOT_CONNECTED) return; - - { - std::lock_guard guard(socketLock_); - - if (connectionState == CONNECTED) - socket_->disconnect(connectedUri); - - connectionState = NOT_CONNECTED; - } - - onDisconnect(connectedUri); - } - - size_t getShardIndex() const - { - return shardIndex; - } - -protected: - ConfigurationService::Watch serviceWatch, endpointWatch; - std::shared_ptr config; - std::shared_ptr context_; - std::shared_ptr socket_; - - mutable ZmqEventSource::SocketLock socketLock_; - - enum ConnectionType { - NO_CONNECTION, ///< No connection type yet - CONNECT_DIRECT, ///< Connect directly to a named service - CONNECT_TO_CLASS, ///< Connect to a service class - } connectionType; - - enum ConnectionState { - NOT_CONNECTED, // connect() was not called - CONNECTION_PENDING, // connect() was called but service is not available - CONNECTED // connect() was called and the socket was connected - } connectionState; - - void onServiceNodeChange(const std::string & path, - ConfigurationService::ChangeType change); - void onEndpointNodeChange(const std::string & path, - ConfigurationService::ChangeType change); - - std::string serviceClass; ///< Service class we're connecting to - std::string endpointName; ///< Name of endpoint to connect to - std::string connectedService; ///< Name of service we're connected to - std::string connectedUri; ///< URI we're connected to - bool local; - int shardIndex; -}; - - -/*****************************************************************************/ -/* ZEROMQ NAMED CLIENT BUS PROXY */ -/*****************************************************************************/ - -/** Class designed to go on the other end of a zeromq named client bus. This - takes care of sending the keepalives and will allow the other end to - detect when the connection is broken. - -*/ - -struct ZmqNamedClientBusProxy : public ZmqNamedProxy { - - ZmqNamedClientBusProxy() - : timeout(2.0) - { - } - - ZmqNamedClientBusProxy(std::shared_ptr context, int shardIndex = -1) - : ZmqNamedProxy(context, shardIndex), timeout(2.0) - { - } - - ~ZmqNamedClientBusProxy() - { - shutdown(); - } - - void init(std::shared_ptr config, - const std::string & identity = "") - { - ZmqNamedProxy::init(config, ZMQ_XREQ, identity); - - auto doMessage = [=] (const std::vector & message) - { - const std::string & topic = message.at(0); - if (topic == "HEARTBEAT") - this->lastHeartbeat = Date::now(); - else handleMessage(message); - }; - - addSource("ZmqNamedClientBusProxy::doMessage", - std::make_shared(socket(), doMessage, socketLock())); - - auto doHeartbeat = [=] (int64_t skipped) - { - if (connectionState != CONNECTED) return; - - sendMessage("HEARTBEAT"); - }; - - addPeriodic("ZmqNamedClientBusProxy::doHeartbeat", 1.0, doHeartbeat); - } - - void shutdown() - { - MessageLoop::shutdown(); - ZmqNamedProxy::shutdown(); - } - - virtual void onConnect(const std::string & where) - { - lastHeartbeat = Date::now(); - - sendMessage("HELLO"); - - if (connectHandler) - connectHandler(where); - } - - virtual void onDisconnect(const std::string & where) - { - if (disconnectHandler) - disconnectHandler(where); - } - - ZmqEventSource::AsyncMessageHandler messageHandler; - - virtual void handleMessage(const std::vector & message) - { - if (messageHandler) - messageHandler(message); - else - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - - Date lastHeartbeat; - double timeout; -}; - - -/*****************************************************************************/ -/* ZEROMQ MULTIPLE NAMED CLIENT BUS PROXY */ -/*****************************************************************************/ - -/** Class designed to go on the other end of a zeromq named client bus. This - takes care of sending the keepalives and will allow the other end to - detect when the connection is broken. - -*/ - -struct ZmqMultipleNamedClientBusProxy: public MessageLoop { - friend class ServiceDiscoveryScenario; - friend class ServiceDiscoveryScenarioTest; - -#define CHANGES_MAP_INITIALIZER \ - { \ - { ConfigurationService::VALUE_CHANGED, 0 }, \ - { ConfigurationService::DELETED, 0 }, \ - { ConfigurationService::CREATED, 0 }, \ - { ConfigurationService::NEW_CHILD, 0 } \ - } - - ZmqMultipleNamedClientBusProxy() - : zmqContext(new zmq::context_t(1)), - changesCount( CHANGES_MAP_INITIALIZER ) - { - connected = false; - inProvidersChanged = false; - } - - ZmqMultipleNamedClientBusProxy(std::shared_ptr context) - : zmqContext(context), - changesCount( CHANGES_MAP_INITIALIZER ) - { - connected = false; - inProvidersChanged = false; - } - -#undef CHANGES_MAP_INITIALIZER - - ~ZmqMultipleNamedClientBusProxy() - { - shutdown(); - } - - void init(std::shared_ptr config, - const std::string & identity = "") - { - this->config = config; - this->identity = identity; - } - - void shutdown() - { - MessageLoop::shutdown(); - for (auto & c: connections) - if (c.second) - c.second->shutdown(); - } - - template - void sendMessage(const std::string & recipient, - const std::string & topic, - Args&&... args) const - { - std::unique_lock guard(connectionsLock); - auto it = connections.find(recipient); - if (it == connections.end()) { - THROW(ZmqLogs::error) - << "unable to deliver " << topic - << " to unknown client " << recipient - << std::endl; - } - it->second->sendMessage(topic, std::forward(args)...); - } - - - template - bool sendMessageToShard(size_t shard, - const std::string & topic, - Args&&... args) const - { - std::unique_lock guard(connectionsLock); - for (const auto& conn : connections) { - if (conn.second->getShardIndex() != shard) continue; - - conn.second->sendMessage(topic, std::forward(args)...); - return true; - } - return false; - } - - bool isConnectedToShard(size_t shard) const - { - std::unique_lock guard(connectionsLock); - for (const auto& conn : connections) { - if (conn.second->getShardIndex() != shard) continue; - - return conn.second->isConnected(); - } - return false; - } - - /** Connect to all instances of the given service type, and make sure - that we listen and connect to any further ones that appear. - */ - void connectAllServiceProviders(const std::string & serviceClass, - const std::string & endpointName, - bool local = true); - - /** Connect to a single named service. */ - void connectSingleServiceProvider(const std::string & service); - - /** Connect directly to a zeromq uri. */ - void connectUri(const std::string & zmqUri); - - size_t connectionCount() const - { - std::unique_lock guard(connectionsLock); - return connections.size(); - } - - /** Type of callback for a new connection. */ - typedef std::function ConnectionHandler; - - /** Callback that will be called when we get a new connection if - onConnect() is not overridden. */ - ConnectionHandler connectHandler; - - /** Function that will be called when we make a new connection to a - remote service provider. - */ - virtual void onConnect(const std::string & source) - { - if (connectHandler) - connectHandler(source); - } - - /** Callback that will be called when we get a disconnection if - onDisconnect() is not overridden. */ - ConnectionHandler disconnectHandler; - - /** Function that will be called when we lose a connection to a - remote service provider. - */ - virtual void onDisconnect(const std::string & source) - { - if (disconnectHandler) - disconnectHandler(source); - } - - /** Type of handler to override what we do when we get a message. */ - typedef std::function)> - MessageHandler; - - /** Function to call when we get a message from a remote connection - if handleMessage() is not overridden. - */ - MessageHandler messageHandler; - - /** Handle a message from a remote service. Source is the name of the - service (this can be used to send something back). Message is the - content of the message. - */ - virtual void handleMessage(const std::string & source, - const std::vector & message) - { - if (messageHandler) - messageHandler(source, message); - else - THROW(ZmqLogs::error) << "no message handler set" << std::endl; - } - - -private: - /** Are we connected? */ - bool connected; - - /** Common zeromq context for all connections. */ - std::shared_ptr zmqContext; - - /** Number of times a particular change has occured. This is only meant for tests - * purposes. - */ - std::map changesCount; - - /** Configuration service from where we learn where to connect. */ - std::shared_ptr config; - - /** Class of service we're connecting to. */ - std::string serviceClass; - - /** Name of the endpoint on the service we're connecting to. */ - std::string endpointName; - - /** Identity for our zeromq socket. */ - std::string identity; - - typedef ML::Spinlock Lock; - - /** Lock to be used when modifying the list of connections. */ - mutable Lock connectionsLock; - - /** List of currently connected connections. */ - typedef std::map > ConnectionMap; - ConnectionMap connections; - - /** Current watch on the list of service providers. */ - ConfigurationService::Watch serviceProvidersWatch; - - /** Are we currently in onServiceProvidersChanged? **/ - bool inProvidersChanged; - ML::Spinlock providersChangedLock; - - /** Queue for defered onServiceProvidersChanged calls */ - typedef std::pair DeferedProvidersChanges; - std::vector deferedProvidersChanges; - - bool enterProvidersChanged(const std::string& path, bool local); - void exitProvidersChanged(); - - /** Callback that will be called when the list of service providers has changed. */ - void onServiceProvidersChanged(const std::string & path, bool local); - - struct OnConnectCallback; - - /** Call this to watch for a given service provider. */ - void watchServiceProvider(const std::string & name, const std::string & path, int shardIndex); - -}; - - - -} // namespace Datacratic - -#endif /* __service__zmq_endpoint_h__ */ diff --git a/service/zmq_message_router.h b/service/zmq_message_router.h deleted file mode 100644 index f67595bf..00000000 --- a/service/zmq_message_router.h +++ /dev/null @@ -1,61 +0,0 @@ -/* zmq_message_router.h -*- C++ -*- - Jeremy Barnes, 22 November 2012 - Copyright (c) 2012 Datacratic. All rights reserved. - - Router object to hook up zeromq messages (identified with a topic) - to callback functions. -*/ - -#pragma once - -#include "named_endpoint.h" -#include "message_loop.h" - - -namespace Datacratic { - - -/*****************************************************************************/ -/* ZMQ MESSAGE ROUTER */ -/*****************************************************************************/ - -struct ZmqMessageRouter: public ZmqEventSource { - - ZmqMessageRouter(bool routable = true) - : routable(routable) - { - } - - void addRoute(const std::string & topic, - AsyncMessageHandler handler) - { - messageHandlers[topic] = handler; - } - - void bind(const std::string & topic, - const std::function & args)> & handler) - { - messageHandlers[topic] = handler; - } - - virtual void handleMessage(const std::vector & message) - { - std::string topic = message.at(routable); - - //using namespace std; - //cerr << "got message " << topic << " " << message << endl; - - auto it = messageHandlers.find(topic); - if (it == messageHandlers.end()) - defaultHandler(message); - else it->second(message); - } - - bool routable; - std::map messageHandlers; - AsyncMessageHandler defaultHandler; -}; - - -} // namespace Datacratic - diff --git a/service/zmq_named_pub_sub.h b/service/zmq_named_pub_sub.h deleted file mode 100644 index a36b0d02..00000000 --- a/service/zmq_named_pub_sub.h +++ /dev/null @@ -1,1089 +0,0 @@ -/* zmq_named_pub_sub.h -*- C++ -*- - Jeremy Barnes, 8 January 2013 - Copyright (c) 2013 Datacratic Inc. All rights reserved. - - Named publish/subscribe endpoint. -*/ - -#pragma once - -#include "zmq_endpoint.h" -#include "typed_message_channel.h" -#include -#include "jml/arch/backtrace.h" - -namespace Datacratic { - - - -/*****************************************************************************/ -/* ZMQ NAMED PUBLISHER */ -/*****************************************************************************/ - -/** Class that publishes messages. It also knows what is connected to it. - */ -struct ZmqNamedPublisher: public MessageLoop { - - ZmqNamedPublisher(std::shared_ptr context, - int messageBufferSize = 65536) - : publishEndpoint(context), - publishQueue(messageBufferSize) - { - } - - virtual ~ZmqNamedPublisher() - { - shutdown(); - } - - void init(std::shared_ptr config, - const std::string & endpointName, - const std::string & identity = "") - { - using namespace std; - - // Initialize the publisher endpoint - publishEndpoint.init(config, ZMQ_XPUB, endpointName); - - // Called when we queue up a message to be published - publishQueue.onEvent = [=] (std::vector && message) - { - using namespace std; - //cerr << "popped message to publish" << endl; - publishEndpoint.sendMessage(std::move(message)); - }; - - // Called when there is a new subscription. - // The first byte is 1 (subscription) or 0 (unsubscription). - // The rest of the message is the filter for the subscription - auto doPublishMessage = [=] (const std::vector & msg) - { -#if 0 - cerr << "got publish subscription" << msg << endl; - cerr << "msg.size() = " << msg.size() << endl; - cerr << "msg[0].size() = " << msg[0].size() << endl; - cerr << "msg[0][0] = " << (int)msg[0][0] << endl; -#endif - }; - - publishEndpoint.messageHandler = doPublishMessage; - - // Hook everything into the message loop - addSource("ZmqNamedPublisher::publishEndpoint", publishEndpoint); - addSource("ZmqNamedPublisher::publishQueue", publishQueue); - - } - - std::string bindTcp(PortRange const & portRange = PortRange(), std::string host = "") - { - return publishEndpoint.bindTcp(portRange, host); - } - - void shutdown() - { - MessageLoop::shutdown(); - publishEndpoint.shutdown(); - //publishEndpointMonitor.shutdown(); - } - - //std::vector getSubscribers() - //{ - //} - - template - void encodeAll(std::vector & messages, - Head head, - Tail&&... tail) - { - messages.emplace_back(std::move(encodeMessage(head))); - encodeAll(messages, std::forward(tail)...); - } - - // Vectors treated specially... they are copied - template - void encodeAll(std::vector & messages, - const std::vector & head, - Tail&&... tail) - { - for (auto & m: head) - messages.emplace_back(std::move(encodeMessage(m))); - encodeAll(messages, std::forward(tail)...); - } - - void encodeAll(std::vector & messages) - { - } - - template - void publish(const std::string & channel, Args&&... args) - { - std::vector messages; - messages.reserve(sizeof...(Args) + 1); - - encodeAll(messages, channel, - std::forward(args)...); - publishQueue.push(messages); - } - -private: - /// Zeromq endpoint on which messages are published - ZmqNamedEndpoint publishEndpoint; - - /// Queue of things to be published - TypedMessageSink > publishQueue; -}; - - -/*****************************************************************************/ -/* ENDPOINT CONNECTOR */ -/*****************************************************************************/ - -/** This class keeps track of watching and connecting to a particular - endpoint. - - Specifically, an endpoint is in the configuration service under the - path - - /serviceName/endpoint - - It can have one or more entries under that path for different versions - of the endpoint: - - /serviceName/endpoint/tcp - /serviceName/endpoint/ipc - /serviceName/endpoint/shm - - etc. - - This class will: - 1. If there are no entries under the endpoint, then it will wait until - one or more appear in the object. - 2. Once there is one or more endpoints, it will attempt to connect to - each of them until it finds one that works. - 3. If there is a disconnection from an endpoint, it will attempt to - re-establish a connection. - 4. If the configuration string of an endpoint changes, it will attempt - a re-connection. - - It is designed to integrate into an existing message loop so that the - notifications will be processed in the same thread as the other messages - and locking is not required. -*/ - -struct EndpointConnector : public MessageLoop { - - EndpointConnector() - : changes(32) - { - } - - void init(std::shared_ptr config) - { - this->config = config; - - changes.onEvent - = std::bind(&EndpointConnector::handleEndpointChange, - this, - std::placeholders::_1); - - addSource("EndpointConnector::changes", changes); - } - - void watchEndpoint(const std::string & endpointPath) - { - std::unique_lock guard(lock); - - using namespace std; - //cerr << "watching endpoint " << endpointPath << endl; - - auto & entry = endpoints[endpointPath]; - - if (!entry.watch) { - // First time that we watch this service; set it up - - //cerr << "=============== initializing watch for " << endpointPath << endl; - - entry.watch.init([=] (const std::string & path, - ConfigurationService::ChangeType change) - { - using namespace std; - //cerr << "endpoint changed path " << path << " " << this << endl; - - { - std::unique_lock guard(lock); - if (endpoints.count(endpointPath)) { - auto & entry = endpoints[endpointPath]; - entry.watchIsSet = false; - } - } - - changes.push(endpointPath); - }); - - changes.push(endpointPath); - } - } - - /** Stop watching the given endpoint, which should have already been - watched via watchEndpoint. - */ - void unwatchEndpoint(const std::string & endpoint, - bool forceDisconnect) - { - throw ML::Exception("unwatchEndpoint not done"); - } - - std::function connectHandler; - - /** This method will be called when a connection is needed to a service. - It should call the onDone function with a boolean indicating whether - or not a connection was established. - - Return code: - false = connection failed - true = connection succeeded - - TODO: make this work with asynchronous connections - */ - virtual bool connect(const std::string & endpointPath, - const std::string & entryName, - const Json::Value & params) - { - if (connectHandler) - return connectHandler(endpointPath, entryName, params); - throw ML::Exception("no connect override"); - } - - /** Method to be called back when there is a disconnection. */ - void handleDisconnection(const std::string & endpointPath, - const std::string & entryName) - { - std::unique_lock guard(lock); - - if (!endpoints.count(endpointPath)) - return; - - auto & entry = endpoints[endpointPath]; - - ExcAssertEqual(entry.connectedTo, entryName); - entry.connectedTo = ""; - - // And we attempt to reconnect - // Note that we can't push endpointPath on changes - changes.push(endpointPath); - } - -private: - typedef std::mutex Lock; - mutable Lock lock; - - /** We put zookeeper messages on this queue so that they get handled in - the message loop thread and don't cause locking problems. - */ - TypedMessageSink changes; - - struct Entry { - Entry() - : watchIsSet(false) - { - } - - ConfigurationService::Watch watch; - std::string connectedTo; - bool watchIsSet; - }; - - std::map endpoints; - - std::shared_ptr config; - - void handleEndpointChange(const std::string & endpointPath) - { - using namespace std; - - //cerr << "handleEndpointChange " << endpointPath << endl; - - std::unique_lock guard(lock); - - if (!endpoints.count(endpointPath)) - return; - - auto & entry = endpoints[endpointPath]; - - //cerr << "handling service class change for " << endpointPath << endl; - - vector children; - if (entry.watchIsSet) - children = config->getChildren(endpointPath); - else children = config->getChildren(endpointPath, entry.watch); - entry.watchIsSet = true; - - //cerr << "children = " << children << endl; - - // If we're connected, look for a change in the endpoints - if (!entry.connectedTo.empty()) { - - // Does our connected node still exist? - if (std::find(children.begin(), children.end(), entry.connectedTo) - == children.end()) { - // Node disappeared; we need to disconnect - guard.unlock(); - handleDisconnection(endpointPath, entry.connectedTo); - return; // handleDisconnection will call into us recursively - } - else { - // Node is still there - // TODO: check for change in value - return; - } - } - - guard.unlock(); - - // If we got here, we're not connected - for (auto & c: children) { - Json::Value cfg = config->getJson(endpointPath + "/" + c); - if (connect(endpointPath, c, cfg)) { - notifyConnectionStatus(endpointPath, c, true); - return; - } - } - - cerr << "warning: could not connect to " << endpointPath << " immediately" - << endl; - } - - void notifyConnectionStatus(const std::string & endpointPath, - const std::string & entryName, - bool status) - { - std::unique_lock guard(lock); - - if (!endpoints.count(endpointPath)) - return; - - auto & entry = endpoints[endpointPath]; - - if (status) { - if (!entry.connectedTo.empty()) { - // TODO - throw ML::Exception("TODO: handle connection with one already " - "active"); - } - entry.connectedTo = entryName; - // TODO: watch config - } - else { - if (entry.connectedTo.empty()) - return; - if (entry.connectedTo != entryName) { - throw ML::Exception("TODO: handle disconnection from non-active"); - } - entry.connectedTo = ""; - } - } -}; - - -/*****************************************************************************/ -/* ZMQ NAMED SOCKET */ -/*****************************************************************************/ - -/** Active socket that attempts to connect into an endpoint. */ - -struct ZmqNamedSocket: public MessageLoop { - - enum ConnectionState { - NO_CONNECTION, ///< No connection was requested - CONNECTING, ///< Connection is attempting to connect - CONNECTED, ///< Connection is connected - DISCONNECTED ///< Connection was connected but has disconnected - }; - - ZmqNamedSocket(zmq::context_t & context, int type) - : context(&context), - socketType(type), - connectionState(NO_CONNECTION) - { - //using namespace std; - //cerr << "created zmqNamedSocket at " << this << endl; - } - - virtual ~ZmqNamedSocket() - { - shutdown(); - } - - /** Initialization. Can only be called once and is not protected from - multiple threads. - */ - void init(std::shared_ptr config) - { - if (socket) - throw ML::Exception("socket already initialized"); - socket.reset(new zmq::socket_t(*context, socketType)); - - using namespace std; - - connector.connectHandler - = std::bind(&ZmqNamedSocket::doConnect, - this, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3); - - connector.init(config); - - addSource("ZmqNamedSocket::connector", connector); - addSource("ZmqNamedSocket::socket", - std::make_shared - (*socket, [=] (std::vector && message) - { - this->handleMessage(std::move(message)); - })); - } - - void shutdown() - { - MessageLoop::shutdown(); - - if (!socket) - return; - - disconnect(); - connector.shutdown(); - - socket->tryDisconnect(this->connectedAddress); - socket.reset(); - } - - void connectToEndpoint(const std::string & endpointPath) - { - if (connectedEndpoint != "") - throw ML::Exception("attempt to connect a ZmqNamedSocket " - "to an enpoint that is already connected"); - - this->connectedEndpoint = connectedEndpoint; - this->connectionState = CONNECTING; - - // No lock needed as the connector has its own lock - - // Tell the connector to watch the endpoint. When an entry pops - // up, it will connect to it. - connector.watchEndpoint(endpointPath); - } - - /// Disconnect from the connected endpoint - void disconnect() - { - std::unique_lock guard(lock); - ExcAssert(socket); - socket->tryDisconnect(connectedAddress); - this->connectedEndpoint = ""; - this->connectedAddress = ""; - this->connectionState = DISCONNECTED; - //connector.disconnect(); - } - - /// Endpoint we're configured to connect to - std::string getConnectedEndpoint() const - { - std::unique_lock guard(lock); - return connectedEndpoint; - } - - /// Address we're currently connecting/connected to - std::string getConnectedAddress() const - { - std::unique_lock guard(lock); - return connectedAddress; - } - - /// Current state of the connection - ConnectionState getConnectionState() const - { - // No lock needed as performed atomically - return connectionState; - } - - /** Callback that will be called when a message is received if - handlerMessage is not overridden. - */ - typedef std::function &&)> MessageHandler; - MessageHandler messageHandler; - - /** Function called when a message is received on the socket. Default action - is to call messageHandler. Can be overridden or messageHandler can be - assigned to. - */ - virtual void handleMessage(std::vector && message) - { - using namespace std; - //cerr << "named socket got message " << message.at(0).toString() - // << endl; - if (messageHandler) - messageHandler(std::move(message)); - else throw ML::Exception("need to override either messageHandler " - "or handleMessage"); - } - - /** Call the given function synchronously with locking set up such that no - other socket operation can occur at the same time. This allows for - socket operations to be performed from any thread. Note that the - operation must not call any methods on this object or a locking - error will occur. - - Function signature must match - - void fn(zmq::socket_t &) - */ - template - void performSocketOpSync(Fn fn) - { - std::unique_lock guard(lock); - fn(*socket); - } - - /** Send the given message synchronously. */ - void sendSync(std::vector && message) - { - std::unique_lock guard(lock); - for (unsigned i = 0; i < message.size(); ++i) { - socket->send(message[i], i == message.size() - 1 - ? 0 : ZMQ_SNDMORE); - } - } - -private: - // This lock is used to allow the synchronous methods to work without - // needing ping-pong with the message loop. Normally it should be - // uncontended. - typedef std::mutex Lock; - mutable Lock lock; - - /** Function called by the endpoint connector to actually connect to - an endpoint. This may be called when we are already connected, - which means that we should disconnect from the old endpoint and - connect to the new one. - */ - virtual bool doConnect(const std::string & endpointPath, - const std::string & entryName, - const Json::Value & epConfig) - { - using namespace std; - - //cerr << " ((((((( doConnect for " << endpointPath << " " << entryName - // << endl; - - std::unique_lock guard(lock); - - for (auto & entry: epConfig) { - - //cerr << "entry is " << entry << endl; - - if (!entry.isMember("zmqConnectUri")) - continue; - - auto hs = entry["transports"][0]["hostScope"]; - if (!hs) - continue; - - string hostScope = hs.asString(); - if (hs != "*") { - utsname name; - if (uname(&name)) - throw ML::Exception(errno, "uname"); - if (hostScope != name.nodename) - continue; // wrong host scope - } - - string uri = entry["zmqConnectUri"].asString(); - - if (connectedAddress != "") { - // Already connected... - if (connectedAddress == uri) - return true; - else { - // Need to disconnect from the current address and connect to the new one - //cerr << "connectedAddress = " << connectedAddress << " uri = " << uri << endl; - socket->tryDisconnect(connectedAddress); - //throw ML::Exception("need to handle disconnect and reconnect to different " - // "address"); - } - } - - connectedAddress = uri; - connectedEndpointPath = endpointPath; - connectedEntryName = entryName; - socket->connect(uri); - - //cerr << "connection in progress to " << uri << endl; - connectionState = CONNECTED; - return true; - } - - return false; - } - - /// Zmq context we're working with - zmq::context_t * context; - - /// Socket type to create; one of the ZMQ_ constants - int socketType; - - /// Endpoint we're configured to connect to - std::string connectedEndpoint; - - /// Address we're currently connecting/connected to - std::string connectedAddress; - - /// Path of the endpoint we're currently connected to - std::string connectedEndpointPath; - - /// Name of the entry under the endpoint name which we're connected to - std::string connectedEntryName; - - /// File descriptor that our connection is on. Mostly used for testing. - int connectedFd; - - /// Current state of the connection - ConnectionState connectionState; - - /// Handles actually connecting to the socket - EndpointConnector connector; - - /// Socket that we connect - std::unique_ptr socket; -}; - - -/*****************************************************************************/ -/* ZMQ NAMED SUBSCRIBER */ -/*****************************************************************************/ - -/** A subscriber class built on top of the ZmqNamedSocket. */ - -struct ZmqNamedSubscriber: public ZmqNamedSocket { - - ZmqNamedSubscriber(zmq::context_t & context) - : ZmqNamedSocket(context, ZMQ_SUB) - { - } - - /// Subscribe to the given message prefix - void subscribe(const std::string & prefix) - { - auto doSubscribe = [&] (zmq::socket_t & socket) - { - subscribeChannel(socket, prefix); - }; - - performSocketOpSync(doSubscribe); - } - - /// Subscribe to the given message prefix - void subscribe(const std::vector & prefixes) - { - auto doSubscribe = [&] (zmq::socket_t & socket) - { - for (const auto& prefix : prefixes) - subscribeChannel(socket, prefix); - }; - - performSocketOpSync(doSubscribe); - } - -}; - - - - -/*****************************************************************************/ -/* SERVICE PROVIDER WATCHER */ -/*****************************************************************************/ - -/** This class keeps track of watching for service providers and making sure - that there is a connection to each of them. - - It performs the following actions: - * Watches the configuration node for a given service class - * If a new service provider appears, calls a connect callback - * If a service provider disappears, calls a disconnect callback - - It is designed to integrate into an existing message loop so that the - notifications will be processed in the same thread as the other messages - and locking is not required. -*/ - -struct ServiceProviderWatcher: public MessageLoop { - - ServiceProviderWatcher() - : currentToken(1), changes(128) - { - } - - ~ServiceProviderWatcher() - { - using namespace std; - //cerr << "shutting down service provider watcher" << endl; - shutdown(); - //cerr << "done" << endl; - } - - void init(std::shared_ptr config) - { - this->config = config; - - changes.onEvent - = std::bind(&ServiceProviderWatcher::handleServiceClassChange, - this, - std::placeholders::_1); - - addSource("ServiceProviderWatcher::changes", changes); - } - - /** Type of a function that will be called when there is a change on - the service providers. - */ - typedef std::function ChangeHandler; - ChangeHandler changeHandler; - - /** Notify whenever an instance of the given service class goes down or - up. - - Whenever there is a change, the onChange handler will be called - from within the owning message loop. - - Returns a token that can be used to unregister the watch. - */ - uint64_t watchServiceClass(const std::string & serviceClass, - ChangeHandler onChange) - { - using namespace std; - //cerr << "watching service class " << serviceClass << endl; - - // Allocate a token, then push the message on to the thread to be - // processed. - uint64_t token = __sync_fetch_and_add(¤tToken, 1); - - std::unique_lock guard(lock); - - auto & entry = serviceClasses[serviceClass]; - - //cerr << "entry.watch = " << entry.watch << endl; - - if (!entry.watch) { - // First time that we watch this service; set it up - - entry.watch.init([=] (const std::string & path, - ConfigurationService::ChangeType change) - { - using namespace std; - //cerr << "changed path " << path << endl; - - changes.push(serviceClass); - }); - - changes.push(serviceClass); - } - - entry.entries[token].onChange = onChange; - - return token; - } - - /** Stop watching the given service class. */ - void unwatchServiceClass(const std::string & serviceClass, - uint64_t token) - { - // TODO: synchronous? Yes, as otherwise we can't guarantee anything - // about keeping the entries valid - - throw ML::Exception("unwatchServiceClass: not done"); - -#if 0 - int done = 0; - - messages.push([&] () { this->doUnwatchServiceClass(serviceClass, token); done = 1; futex_wake(done); }); - - while (!done) - futex_wait(done, 0); -#endif - } - -private: - typedef std::mutex Lock; - mutable Lock lock; - - // Shared variable for the tokens to give out to allow unwatching - uint64_t currentToken; - - struct WatchEntry { - ChangeHandler onChange; - }; - - struct ServiceClassEntry { - /// Watch on the service providers node - ConfigurationService::Watch watch; - std::map entries; - - /** Current set of known children of the service nodes. Used to - perform a diff to detect changes. - */ - std::vector knownChildren; - }; - - /// Map from service classes to list of watches - std::map serviceClasses; - - std::shared_ptr config; - - /** Deal with a change in the service class node. */ - void handleServiceClassChange(const std::string & serviceClass) - { - using namespace std; - - //cerr << "handleServiceClassChange " << serviceClass << endl; - - std::unique_lock guard(lock); - - auto & entry = serviceClasses[serviceClass]; - - //cerr << "handling service class change for " << serviceClass << endl; - - vector children - = config->getChildren("serviceClass/" + serviceClass, - entry.watch); - - //cerr << "children = " << children << endl; - - std::sort(children.begin(), children.end()); - - auto & knownChildren = entry.knownChildren; - - // Perform a diff between previously and currently known children - vector addedChildren, deletedChildren; - std::set_difference(children.begin(), children.end(), - knownChildren.begin(), knownChildren.end(), - std::back_inserter(addedChildren)); - std::set_difference(knownChildren.begin(), knownChildren.end(), - children.begin(), children.end(), - std::back_inserter(deletedChildren)); - - knownChildren.swap(children); - - guard.unlock(); - - for (auto & c: addedChildren) { - for (auto & e: entry.entries) { - e.second.onChange("serviceClass/" + serviceClass + "/" + c, - true); - } - } - - for (auto & c: deletedChildren) { - for (auto & e: entry.entries) { - e.second.onChange("serviceClass/" + serviceClass + "/" + c, - false); - } - } - } - - /** We put zookeeper messages on this queue so that they get handled in - the message loop thread and don't cause locking problems. - */ - TypedMessageSink changes; -}; - - -/*****************************************************************************/ -/* ZMQ NAMED MULTIPLE SUBSCRIBER */ -/*****************************************************************************/ - -/** Counterpart to the ZmqNamedPublisher. It subscribes to all publishers - of a given class. -*/ - -struct ZmqNamedMultipleSubscriber: public MessageLoop { - - ZmqNamedMultipleSubscriber(std::shared_ptr context) - : context(context) - { - needsPoll = true; - } - - ~ZmqNamedMultipleSubscriber() - { - shutdown(); - } - - void init(std::shared_ptr config) - { - this->config = config; - - serviceWatcher.init(config); - - addSource("ZmqNamedMultipleSubscriber::serviceWatcher", serviceWatcher); - - //debug(true); - } - - void shutdown() - { - MessageLoop::shutdown(); - - for (auto & sub: subscribers) - sub.second->shutdown(); - subscribers.clear(); - } - - void connectAllServiceProviders(const std::string & serviceClass, - const std::string & endpointName, - const std::vector & prefixes - = std::vector(), - std::function filter - = nullptr, - bool local = true) - { - auto onServiceChange = [=] (const std::string & service, - bool created) - { - using namespace std; - //cerr << "onServiceChange " << serviceClass << " " << endpointName - //<< " " << service << " created " << created << endl; - - if (filter && !filter(service)) - return; - - if (created) - connectService(serviceClass, service, endpointName, local); - else - disconnectService(serviceClass, service, endpointName); - }; - - this->prefixes = prefixes; - serviceWatcher.watchServiceClass(serviceClass, onServiceChange); - } - - /** Callback that will be called when a message is received if - handlerMessage is not overridden. - */ - typedef std::function &&)> MessageHandler; - MessageHandler messageHandler; - - /** Function called when a message is received on the socket. Default action - is to call messageHandler. Can be overridden or messageHandler can be - assigned to. - */ - virtual void handleMessage(std::vector && message) - { - if (messageHandler) - messageHandler(std::move(message)); - else throw ML::Exception("need to override either messageHandler " - "or handleMessage"); - } - - /** Connect to the given service. */ - void connectService(std::string serviceClass, std::string service, - std::string endpointName, - bool local = true) - { - using namespace std; - - Json::Value value = config->getJson(service); - - std::string location = value["serviceLocation"].asString(); - if(local && location != config->currentLocation) { - std::cerr << "dropping " << location - << " != " << config->currentLocation << std::endl; - return; - } - - std::unique_lock guard(lock); - - SubscriberMap::const_iterator found = subscribers.find(service); - if(found != subscribers.end()) - { - if(found->second->getConnectionState() == ZmqNamedSocket::CONNECTED) - { - std::cerr << "Already connected to service " << service << std::endl; - return; - } - else - { - std::cerr << "we already had a connection entry to service " << service <<" - reuse " << std::endl; - std::string path = value["servicePath"].asString(); - found->second->connectToEndpoint(path); - return ; - } - } - - std::unique_ptr sub - (new ZmqNamedSubscriber(*context)); - - // Set up to handle the message - sub->messageHandler = [=] (std::vector && msg) - { - //cerr << "SUB MESSAGE HANDLER " << this << endl; - this->handleMessage(std::move(msg)); - }; - - sub->init(config); - - // TODO: put a watch in to reconnect if this changes - std::string path = value["servicePath"].asString(); - - //cerr << "(((((((((((( connecting to service " << service - // << " at endpoint " << path + "/" + endpointName << endl; - - //cerr << "+-+-+-+-+- connecting to endpoint " << path + "/" + endpointName << endl; - //cerr << config->getChildren(path + "/" + endpointName) << endl; - - if (!prefixes.empty()) - sub->subscribe(prefixes); - else sub->subscribe(""); // Subscribe to everything. - - sub->connectToEndpoint(path + "/" + endpointName); - addSource(service, *sub); - subscribers[service] = std::move(sub); - } - - /** Disconnect from the given service. */ - void disconnectService(std::string serviceClass, std::string service, - std::string endpointName) - { - using namespace std; - cerr << "need to disconenct from " << serviceClass << " " - << service << " " << endpointName << endl; -// cerr << "aborting as disconnect not done yet" << endl; - SubscriberMap::const_iterator found = subscribers.find(service); - if(found != subscribers.end()) - { - found->second->disconnect(); - } - //abort(); - } - - /** This is responsible for watching the service providers and connecting - to new ones when they pop up. - */ - ServiceProviderWatcher serviceWatcher; - - std::shared_ptr context; - - typedef std::mutex Lock; - mutable Lock lock; - - /** Map of service name to subscribers */ - typedef std::map > SubscriberMap; - SubscriberMap subscribers; - - std::shared_ptr config; - - std::vector prefixes; -}; - - -} // namespace Datacratic diff --git a/service/zmq_utils.cc b/service/zmq_utils.cc deleted file mode 100644 index 5e17eb21..00000000 --- a/service/zmq_utils.cc +++ /dev/null @@ -1,46 +0,0 @@ -/* zmq_utils.cc - Jeremy Barnes, 14 January 2013 - Copyright (c) 2013 Datacratic Inc. All rights reserved. - -*/ - -#include "zmq_utils.h" - -namespace Datacratic { - -std::string printZmqEvent(int event) -{ -#define printZmqEventImpl(ev) case ev: return #ev - - switch (event) { - printZmqEventImpl(ZMQ_EVENT_LISTENING); - printZmqEventImpl(ZMQ_EVENT_BIND_FAILED); - printZmqEventImpl(ZMQ_EVENT_ACCEPTED); - printZmqEventImpl(ZMQ_EVENT_ACCEPT_FAILED); - printZmqEventImpl(ZMQ_EVENT_CONNECTED); - printZmqEventImpl(ZMQ_EVENT_CONNECT_DELAYED); - printZmqEventImpl(ZMQ_EVENT_CONNECT_RETRIED); - printZmqEventImpl(ZMQ_EVENT_CLOSE_FAILED); - printZmqEventImpl(ZMQ_EVENT_CLOSED); - printZmqEventImpl(ZMQ_EVENT_DISCONNECTED); - default: - return ML::format("ZMQ_EVENT_UNKNOWN(%d)", event); - } - -#undef printZmqEventImpl -} - -bool zmqEventIsError(int event) -{ - switch (event) { - case ZMQ_EVENT_BIND_FAILED: - case ZMQ_EVENT_ACCEPT_FAILED: - case ZMQ_EVENT_CONNECT_DELAYED: - case ZMQ_EVENT_CLOSE_FAILED: - return true; - default: - return false; - } -} - -} // namespace Datacratic diff --git a/service/zmq_utils.h b/service/zmq_utils.h deleted file mode 100644 index b39a7106..00000000 --- a/service/zmq_utils.h +++ /dev/null @@ -1,471 +0,0 @@ -/* zmq_utils.h -*- C++ -*- - Jeremy Barnes, 15 March 2011 - Copyright (c) 2011 Datacratic. All rights reserved. - - Utilities for zmq. -*/ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include "soa/service/zmq.hpp" -#include "soa/jsoncpp/value.h" -#include "soa/types/date.h" -#include "soa/types/string.h" -#include "soa/service/port_range_service.h" -#include "jml/arch/format.h" -#include "jml/arch/exception.h" -#include "jml/compiler/compiler.h" - -#if 0 -#define BLOCK_FLAG 0 -#else -#define BLOCK_FLAG ZMQ_DONTWAIT -#endif - -namespace Datacratic { - -inline void setIdentity(zmq::socket_t & sock, const std::string & identity) -{ - sock.setsockopt(ZMQ_IDENTITY, (void *)identity.c_str(), identity.size()); -} - -inline void subscribeChannel(zmq::socket_t & sock, const std::string & channel) -{ - sock.setsockopt(ZMQ_SUBSCRIBE, channel.c_str(), channel.length()); -} - -inline void unsubscribeChannel(zmq::socket_t & sock, const std::string & channel) -{ - sock.setsockopt(ZMQ_UNSUBSCRIBE, channel.c_str(), channel.length()); -} - -inline void setHwm(zmq::socket_t & sock, int hwm) -{ - sock.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - sock.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); -} - -inline void throwSocketError(const char *data) -{ - throw ML::Exception(errno, - "unhandled error (" + std::to_string(errno) + ")", - data); -} - -/** Returns events available on a socket: (input, output). */ -inline std::pair -getEvents(zmq::socket_t & sock) -{ - int events = 0; - size_t events_size = sizeof(events); - sock.getsockopt(ZMQ_EVENTS, &events, &events_size); - return std::make_pair(events & ZMQ_POLLIN, events & ZMQ_POLLOUT); -} - -inline std::string recvMesg(zmq::socket_t & sock) -{ - zmq::message_t message; - while (!sock.recv(&message, 0)) ; - - return std::string((const char *)message.data(), - ((const char *)message.data()) + message.size()); -} - -inline std::pair -recvMesgNonBlocking(zmq::socket_t & sock) -{ - zmq::message_t message; - bool got = sock.recv(&message, ZMQ_NOBLOCK); - if (!got) - return std::make_pair("", false); - return std::make_pair - (std::string((const char *)message.data(), - ((const char *)message.data()) + message.size()), - true); -} - -inline void recvMesg(zmq::socket_t & sock, char & val) -{ - zmq::message_t message; - while (!sock.recv(&message, 0)) ; - if (message.size() != 1) - throw ML::Exception("invalid char message size"); - val = *(const char *)message.data(); -} - -inline std::vector recvAll(zmq::socket_t & sock) -{ - std::vector result; - - int64_t more = 1; - size_t more_size = sizeof (more); - - while (more) { - result.push_back(recvMesg(sock)); - sock.getsockopt(ZMQ_RCVMORE, &more, &more_size); - } - - return result; -} - -inline std::vector -recvAllNonBlocking(zmq::socket_t & sock) -{ - std::vector result; - - std::string msg0; - bool got; - std::tie(msg0, got) = recvMesgNonBlocking(sock); - if (got) { - result.push_back(msg0); - - for (;;) { - int64_t more = 1; - size_t more_size = sizeof (more); - sock.getsockopt(ZMQ_RCVMORE, &more, &more_size); - //using namespace std; - //cerr << "result.size() " << result.size() - // << " more " << more << endl; - if (!more) break; - result.push_back(recvMesg(sock)); - } - } - - return result; -} - -inline zmq::message_t encodeMessage(const std::string & message) -{ - return message; -} - -inline zmq::message_t encodeMessage(const Utf8String & message) -{ - return message.rawString(); -} - -inline zmq::message_t encodeMessage(const Utf32String & message) -{ - return message.utf8String(); -} - -inline zmq::message_t encodeMessage(const char * msg) -{ - size_t sz = strlen(msg); - zmq::message_t zmsg(sz); - std::copy(msg, msg + sz, (char *)zmsg.data()); - return zmsg; -} - -inline zmq::message_t encodeMessage(const Date & date) -{ - return ML::format("%.5f", date.secondsSinceEpoch()); -} - -inline zmq::message_t encodeMessage(int i) -{ - return ML::format("%d", i); -} - -inline zmq::message_t encodeMessage(unsigned int i) -{ - return ML::format("%u", i); -} - -inline zmq::message_t encodeMessage(double i) -{ - return ML::format("%f", i); -} - -inline zmq::message_t encodeMessage(long i) -{ - return ML::format("%ld", i); -} - -inline zmq::message_t encodeMessage(unsigned long i) -{ - return ML::format("%lu", i); -} - -inline zmq::message_t encodeMessage(char c) -{ - return ML::format("%c", c); -} - -inline std::string chomp(const std::string & s) -{ - const char * start = s.c_str(); - const char * end = start + s.length(); - - while (end > start && end[-1] == '\n') --end; - - if (end == start + s.length()) return s; - return std::string(start, end); -} - -inline zmq::message_t encodeMessage(const Json::Value & j) -{ - return chomp(j.toString()); -} - -inline bool sendMesg(zmq::socket_t & sock, - const std::string & msg, - int options = 0) -{ - zmq::message_t msg1(msg.size()); - std::copy(msg.begin(), msg.end(), (char *)msg1.data()); - return sock.send(msg1, options); -} - -inline bool sendMesg(zmq::socket_t & sock, - const char * msg, - int options = 0) -{ - size_t sz = strlen(msg); - zmq::message_t msg1(sz); - std::copy(msg, msg + sz, (char *)msg1.data()); - return sock.send(msg1, options); -} - -inline bool sendMesg(zmq::socket_t & sock, - const void * msg, - size_t sz, - int options = 0) -{ - zmq::message_t msg1(sz); - memcpy(msg1.data(), msg, sz); - return sock.send(msg1, options); -} - -template -inline bool sendMesg(zmq::socket_t & sock, - const T & obj, - int options = 0) -{ - return sock.send(encodeMessage(obj), options); -} - -template -bool sendMesg(zmq::socket_t & socket, const std::shared_ptr & val, - int flags = 0) -{ - return sendMesg(socket, sharedPtrToMessage(val), flags); -} - -inline void sendAll(zmq::socket_t & sock, - const std::vector & message, - int lastFlags = 0) -{ - if (message.empty()) - throw ML::Exception("can't send an empty message vector"); - - for (unsigned i = 0; i < message.size() - 1; ++i) - if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) { - throwSocketError(__FUNCTION__); - } - if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) { - throwSocketError(__FUNCTION__); - } -} - -inline void sendAll(zmq::socket_t & sock, - const std::initializer_list & message, - int lastFlags = 0) -{ - sendAll(sock, std::vector(message)); -} - -#if 0 -template -inline void sendAll(zmq::socket_t & socket, - const std::vector & vals, - int lastFlags) -{ - if (vals.empty()) { - throw ML::Exception("can't send empty vector"); - } - for (int i = 0; i < vals.size() - 1; ++i) - sendMesg(socket, vals[i], ZMQ_SNDMORE | BLOCK_FLAG); - sendMesg(socket, vals.back(), lastFlags | BLOCK_FLAG); -} -#endif - -template -void sendMessage(zmq::socket_t & socket, - const Arg1 & arg1) -{ - if (!sendMesg(socket, arg1, 0)) { - throwSocketError(__FUNCTION__); - } -} - -inline void sendMessage(zmq::socket_t & socket, - const std::vector & args) -{ - sendAll(socket, args, 0); -} - -template -void sendMessage(zmq::socket_t & socket, - const Arg1 & arg1, - Args... args) -{ - if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) { - throwSocketError(__FUNCTION__); - } - sendMessage(socket, args...); -} - -/* non-throwing versions, where EAGAIN cases would return false */ -template -bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1) -{ - if (!sendMesg(socket, arg1, 0)) { - if (errno == EAGAIN) - return false; - else - throwSocketError(__FUNCTION__); - } - - return true; -} - -template -bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1, Args... args) -{ - if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) { - if (errno == EAGAIN) - return false; - else - throwSocketError(__FUNCTION__); - } - return trySendMessage(socket, args...); -} - -inline bool trySendAll(zmq::socket_t & sock, - const std::vector & message, - int lastFlags = 0) -{ - if (message.empty()) - throw ML::Exception("can't send an empty message vector"); - - for (unsigned i = 0; i < message.size() - 1; ++i) { - if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) { - if (errno == EAGAIN) - return false; - else - throwSocketError(__FUNCTION__); - } - } - - if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) { - if (errno == EAGAIN) - return false; - else - throwSocketError(__FUNCTION__); - } - - return true; -} - -inline bool trySendAll(zmq::socket_t & sock, - const std::initializer_list & message, - int lastFlags = 0) -{ - return trySendAll(sock, std::vector(message), lastFlags); -} - -/* We take a copy of the shared pointer in a heap-allocated object that - makes sure that it continues to have a reference. The control - connection then takes control of the pointer. This allows us to - effectively transfer a shared pointer over a zeromq socket. -*/ -template -std::string sharedPtrToMessage(std::shared_ptr ptr) -{ - static const int mypid = getpid(); - - std::auto_ptr > ptrToTransfer - (new std::shared_ptr(ptr)); - - std::string ptrMsg - = ML::format("%p:%d:%p", - ptrToTransfer.get(), mypid, - typeid(T).name()); - - ptrToTransfer.release(); - - return ptrMsg; -} - -template -std::shared_ptr -sharedPtrFromMessage(const std::string & message) -{ - static const int mypid = getpid(); - - std::shared_ptr * ptr; - int pid; - const char * name; - - int res = std::sscanf(message.c_str(), "%p:%d:%p", &ptr, &pid, &name); - if (res != 3) - throw ML::Exception("failure reconstituting auction"); - if (pid != mypid) - throw ML::Exception("message comes from different pid"); - if (name != typeid(T).name()) - throw ML::Exception("wrong name for type info: %s vs %s", - name, typeid(T).name()); - - std::auto_ptr > ptrHolder(ptr); - return *ptr; -} - -template -void recvMesg(zmq::socket_t & sock, std::shared_ptr & val) -{ - return sharedPtrFromMessage(recvMesg(sock)); -} - -inline void close(zmq::socket_t & sock) -{ - zmq_close(sock); -} - -inline int -bindAndReturnOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) { - std::string uri; - int port = portRange.bindPort([&](int port) { - uri = ML::format("tcp://%s:%d", host.c_str(), port); - return zmq_bind(socket, uri.c_str()) == 0; - }); - - if(port == -1) - throw ML::Exception("no open TCP port '%s': %s %s", - uri.c_str(), - zmq_strerror(zmq_errno()), - strerror(errno)); - return port; -} - -inline std::string -bindToOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) { - int port = bindAndReturnOpenTcpPort(socket, portRange, host); - return ML::format("tcp://%s:%d", host.c_str(), port); -} - -/** Return a human readable string for a zeromq event name. */ -std::string printZmqEvent(int event); - -/** Return if the given error event represents some kind of failure or - not. -*/ -bool zmqEventIsError(int event); - -} // namespace Datacratic diff --git a/soa.mk b/soa.mk index ff4b32e2..4fb3d7ac 100644 --- a/soa.mk +++ b/soa.mk @@ -9,7 +9,7 @@ endif $(eval $(call include_sub_make,sigslot)) $(eval $(call include_sub_make,gc)) $(eval $(call include_sub_make,service)) -$(eval $(call include_sub_make,logger)) +# $(eval $(call include_sub_make,logger)) $(eval $(call include_sub_make,launcher)) $(eval $(call include_sub_make,utils)) $(eval $(call include_sub_make,pipeline))