diff options
Diffstat (limited to 'src/zmq/zmqpublishnotifier.cpp')
| -rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 000000000..490e871fd --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,193 @@ +// Copyright (c) 2015-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "chainparams.h" +#include "streams.h" +#include "zmqpublishnotifier.h" +#include "validation.h" +#include "util.h" +#include "rpc/server.h" + +static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + zmq_close(psocket); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", "zmq: Reusing socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; + std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHBLOCK, data, 32); +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHTX, data, 32); +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) +{ + LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + + const Consensus::Params& consensusParams = Params().GetConsensus(pindex->nHeight); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + { + LOCK(cs_main); + CBlock block; + if(!ReadBlockFromDisk(block, pindex, consensusParams)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + ss << transaction; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); +} |