diff options
Diffstat (limited to 'src/zmq')
| -rw-r--r-- | src/zmq/zmqabstractnotifier.cpp | 22 | ||||
| -rw-r--r-- | src/zmq/zmqabstractnotifier.h | 44 | ||||
| -rw-r--r-- | src/zmq/zmqconfig.h | 24 | ||||
| -rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 162 | ||||
| -rw-r--r-- | src/zmq/zmqnotificationinterface.h | 37 | ||||
| -rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 193 | ||||
| -rw-r--r-- | src/zmq/zmqpublishnotifier.h | 55 |
7 files changed, 537 insertions, 0 deletions
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp new file mode 100644 index 000000000..9f5cb3ba6 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqabstractnotifier.h" +#include "util.h" + + +CZMQAbstractNotifier::~CZMQAbstractNotifier() +{ + assert(!psocket); +} + +bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h new file mode 100644 index 000000000..77cf5141e --- /dev/null +++ b/src/zmq/zmqabstractnotifier.h @@ -0,0 +1,44 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H +#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H + +#include "zmqconfig.h" + +class CBlockIndex; +class CZMQAbstractNotifier; + +typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); + +class CZMQAbstractNotifier +{ +public: + CZMQAbstractNotifier() : psocket(0) { } + virtual ~CZMQAbstractNotifier(); + + template <typename T> + static CZMQAbstractNotifier* Create() + { + return new T(); + } + + std::string GetType() const { return type; } + void SetType(const std::string &t) { type = t; } + std::string GetAddress() const { return address; } + void SetAddress(const std::string &a) { address = a; } + + virtual bool Initialize(void *pcontext) = 0; + virtual void Shutdown() = 0; + + virtual bool NotifyBlock(const CBlockIndex *pindex); + virtual bool NotifyTransaction(const CTransaction &transaction); + +protected: + void *psocket; + std::string type; + std::string address; +}; + +#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h new file mode 100644 index 000000000..610d7fbda --- /dev/null +++ b/src/zmq/zmqconfig.h @@ -0,0 +1,24 @@ +// Copyright (c) 2014 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQCONFIG_H +#define BITCOIN_ZMQ_ZMQCONFIG_H + +#if defined(HAVE_CONFIG_H) +#include "config/bitcoin-config.h" +#endif + +#include <stdarg.h> +#include <string> + +#if ENABLE_ZMQ +#include <zmq.h> +#endif + +#include "primitives/block.h" +#include "primitives/transaction.h" + +void zmqError(const char *str); + +#endif // BITCOIN_ZMQ_ZMQCONFIG_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp new file mode 100644 index 000000000..431d8c9ac --- /dev/null +++ b/src/zmq/zmqnotificationinterface.cpp @@ -0,0 +1,162 @@ +// Copyright (c) 2015-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqnotificationinterface.h" +#include "zmqpublishnotifier.h" + +#include "version.h" +#include "validation.h" +#include "streams.h" +#include "util.h" + +void zmqError(const char *str) +{ + LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); +} + +CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) +{ +} + +CZMQNotificationInterface::~CZMQNotificationInterface() +{ + Shutdown(); + + for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + delete *i; + } +} + +CZMQNotificationInterface* CZMQNotificationInterface::Create() +{ + CZMQNotificationInterface* notificationInterface = NULL; + std::map<std::string, CZMQNotifierFactory> factories; + std::list<CZMQAbstractNotifier*> notifiers; + + factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; + factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; + factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; + factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; + + for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) + { + std::string arg("-zmq" + i->first); + if (IsArgSet(arg)) + { + CZMQNotifierFactory factory = i->second; + std::string address = GetArg(arg, ""); + CZMQAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) + { + notificationInterface = new CZMQNotificationInterface(); + notificationInterface->notifiers = notifiers; + + if (!notificationInterface->Initialize()) + { + delete notificationInterface; + notificationInterface = NULL; + } + } + + return notificationInterface; +} + +// Called at startup to conditionally set up ZMQ socket(s) +bool CZMQNotificationInterface::Initialize() +{ + LogPrint("zmq", "zmq: Initialize notification interface\n"); + assert(!pcontext); + + pcontext = zmq_init(1); + + if (!pcontext) + { + zmqError("Unable to initialize context"); + return false; + } + + std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); + for (; i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->Initialize(pcontext)) + { + LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } + else + { + LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i!=notifiers.end()) + { + return false; + } + + return true; +} + +// Called during shutdown sequence +void CZMQNotificationInterface::Shutdown() +{ + LogPrint("zmq", "zmq: Shutdown notification interface\n"); + if (pcontext) + { + for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + notifier->Shutdown(); + } + zmq_ctx_destroy(pcontext); + + pcontext = 0; + } +} + +void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) +{ + if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones + return; + + for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(pindexNew)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock) +{ + for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h new file mode 100644 index 000000000..beabb78da --- /dev/null +++ b/src/zmq/zmqnotificationinterface.h @@ -0,0 +1,37 @@ +// Copyright (c) 2015-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H +#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H + +#include "validationinterface.h" +#include <string> +#include <map> + +class CBlockIndex; +class CZMQAbstractNotifier; + +class CZMQNotificationInterface : public CValidationInterface +{ +public: + virtual ~CZMQNotificationInterface(); + + static CZMQNotificationInterface* Create(); + +protected: + bool Initialize(); + void Shutdown(); + + // CValidationInterface + void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock); + void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); + +private: + CZMQNotificationInterface(); + + void *pcontext; + std::list<CZMQAbstractNotifier*> notifiers; +}; + +#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 000000000..490e871fd --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,193 @@ +// Copyright (c) 2015-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "chainparams.h" +#include "streams.h" +#include "zmqpublishnotifier.h" +#include "validation.h" +#include "util.h" +#include "rpc/server.h" + +static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + zmq_close(psocket); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", "zmq: Reusing socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; + std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHBLOCK, data, 32); +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHTX, data, 32); +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + + const Consensus::Params& consensusParams = Params().GetConsensus(pindex->nHeight); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + { + LOCK(cs_main); + CBlock block; + if(!ReadBlockFromDisk(block, pindex, consensusParams)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + ss << transaction; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h new file mode 100644 index 000000000..bcbecf1bd --- /dev/null +++ b/src/zmq/zmqpublishnotifier.h @@ -0,0 +1,55 @@ +// Copyright (c) 2015-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H +#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H + +#include "zmqabstractnotifier.h" + +class CBlockIndex; + +class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier +{ +private: + uint32_t nSequence; //!< upcounting per message sequence number + +public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + + bool Initialize(void *pcontext); + void Shutdown(); +}; + +class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const CBlockIndex *pindex); +}; + +class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H |