aboutsummaryrefslogtreecommitdiff
path: root/src/zmq
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq')
-rw-r--r--src/zmq/zmqabstractnotifier.cpp22
-rw-r--r--src/zmq/zmqabstractnotifier.h44
-rw-r--r--src/zmq/zmqconfig.h24
-rw-r--r--src/zmq/zmqnotificationinterface.cpp162
-rw-r--r--src/zmq/zmqnotificationinterface.h37
-rw-r--r--src/zmq/zmqpublishnotifier.cpp193
-rw-r--r--src/zmq/zmqpublishnotifier.h55
7 files changed, 537 insertions, 0 deletions
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
new file mode 100644
index 000000000..9f5cb3ba6
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -0,0 +1,22 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqabstractnotifier.h"
+#include "util.h"
+
+
+CZMQAbstractNotifier::~CZMQAbstractNotifier()
+{
+ assert(!psocket);
+}
+
+bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
+{
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
+{
+ return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
new file mode 100644
index 000000000..77cf5141e
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.h
@@ -0,0 +1,44 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+
+#include "zmqconfig.h"
+
+class CBlockIndex;
+class CZMQAbstractNotifier;
+
+typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
+
+class CZMQAbstractNotifier
+{
+public:
+ CZMQAbstractNotifier() : psocket(0) { }
+ virtual ~CZMQAbstractNotifier();
+
+ template <typename T>
+ static CZMQAbstractNotifier* Create()
+ {
+ return new T();
+ }
+
+ std::string GetType() const { return type; }
+ void SetType(const std::string &t) { type = t; }
+ std::string GetAddress() const { return address; }
+ void SetAddress(const std::string &a) { address = a; }
+
+ virtual bool Initialize(void *pcontext) = 0;
+ virtual void Shutdown() = 0;
+
+ virtual bool NotifyBlock(const CBlockIndex *pindex);
+ virtual bool NotifyTransaction(const CTransaction &transaction);
+
+protected:
+ void *psocket;
+ std::string type;
+ std::string address;
+};
+
+#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h
new file mode 100644
index 000000000..610d7fbda
--- /dev/null
+++ b/src/zmq/zmqconfig.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2014 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
+#define BITCOIN_ZMQ_ZMQCONFIG_H
+
+#if defined(HAVE_CONFIG_H)
+#include "config/bitcoin-config.h"
+#endif
+
+#include <stdarg.h>
+#include <string>
+
+#if ENABLE_ZMQ
+#include <zmq.h>
+#endif
+
+#include "primitives/block.h"
+#include "primitives/transaction.h"
+
+void zmqError(const char *str);
+
+#endif // BITCOIN_ZMQ_ZMQCONFIG_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644
index 000000000..431d8c9ac
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -0,0 +1,162 @@
+// Copyright (c) 2015-2016 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "validation.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+ LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+ Shutdown();
+
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ delete *i;
+ }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::Create()
+{
+ CZMQNotificationInterface* notificationInterface = NULL;
+ std::map<std::string, CZMQNotifierFactory> factories;
+ std::list<CZMQAbstractNotifier*> notifiers;
+
+ factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+ factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+ factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+ factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+ for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+ {
+ std::string arg("-zmq" + i->first);
+ if (IsArgSet(arg))
+ {
+ CZMQNotifierFactory factory = i->second;
+ std::string address = GetArg(arg, "");
+ CZMQAbstractNotifier *notifier = factory();
+ notifier->SetType(i->first);
+ notifier->SetAddress(address);
+ notifiers.push_back(notifier);
+ }
+ }
+
+ if (!notifiers.empty())
+ {
+ notificationInterface = new CZMQNotificationInterface();
+ notificationInterface->notifiers = notifiers;
+
+ if (!notificationInterface->Initialize())
+ {
+ delete notificationInterface;
+ notificationInterface = NULL;
+ }
+ }
+
+ return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+ LogPrint("zmq", "zmq: Initialize notification interface\n");
+ assert(!pcontext);
+
+ pcontext = zmq_init(1);
+
+ if (!pcontext)
+ {
+ zmqError("Unable to initialize context");
+ return false;
+ }
+
+ std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+ for (; i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->Initialize(pcontext))
+ {
+ LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ }
+ else
+ {
+ LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ break;
+ }
+ }
+
+ if (i!=notifiers.end())
+ {
+ return false;
+ }
+
+ return true;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+ LogPrint("zmq", "zmq: Shutdown notification interface\n");
+ if (pcontext)
+ {
+ for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ notifier->Shutdown();
+ }
+ zmq_ctx_destroy(pcontext);
+
+ pcontext = 0;
+ }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
+{
+ if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
+ return;
+
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyBlock(pindexNew))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
+{
+ for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+ {
+ CZMQAbstractNotifier *notifier = *i;
+ if (notifier->NotifyTransaction(tx))
+ {
+ i++;
+ }
+ else
+ {
+ notifier->Shutdown();
+ i = notifiers.erase(i);
+ }
+ }
+}
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
new file mode 100644
index 000000000..beabb78da
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.h
@@ -0,0 +1,37 @@
+// Copyright (c) 2015-2016 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+
+#include "validationinterface.h"
+#include <string>
+#include <map>
+
+class CBlockIndex;
+class CZMQAbstractNotifier;
+
+class CZMQNotificationInterface : public CValidationInterface
+{
+public:
+ virtual ~CZMQNotificationInterface();
+
+ static CZMQNotificationInterface* Create();
+
+protected:
+ bool Initialize();
+ void Shutdown();
+
+ // CValidationInterface
+ void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock);
+ void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
+
+private:
+ CZMQNotificationInterface();
+
+ void *pcontext;
+ std::list<CZMQAbstractNotifier*> notifiers;
+};
+
+#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
new file mode 100644
index 000000000..490e871fd
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -0,0 +1,193 @@
+// Copyright (c) 2015-2016 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "chainparams.h"
+#include "streams.h"
+#include "zmqpublishnotifier.h"
+#include "validation.h"
+#include "util.h"
+#include "rpc/server.h"
+
+static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+
+static const char *MSG_HASHBLOCK = "hashblock";
+static const char *MSG_HASHTX = "hashtx";
+static const char *MSG_RAWBLOCK = "rawblock";
+static const char *MSG_RAWTX = "rawtx";
+
+// Internal function to send multipart message
+static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
+{
+ va_list args;
+ va_start(args, size);
+
+ while (1)
+ {
+ zmq_msg_t msg;
+
+ int rc = zmq_msg_init_size(&msg, size);
+ if (rc != 0)
+ {
+ zmqError("Unable to initialize ZMQ msg");
+ return -1;
+ }
+
+ void *buf = zmq_msg_data(&msg);
+ memcpy(buf, data, size);
+
+ data = va_arg(args, const void*);
+
+ rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
+ if (rc == -1)
+ {
+ zmqError("Unable to send ZMQ msg");
+ zmq_msg_close(&msg);
+ return -1;
+ }
+
+ zmq_msg_close(&msg);
+
+ if (!data)
+ break;
+
+ size = va_arg(args, size_t);
+ }
+ return 0;
+}
+
+bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
+{
+ assert(!psocket);
+
+ // check if address is being used by other publish notifier
+ std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
+
+ if (i==mapPublishNotifiers.end())
+ {
+ psocket = zmq_socket(pcontext, ZMQ_PUB);
+ if (!psocket)
+ {
+ zmqError("Failed to create socket");
+ return false;
+ }
+
+ int rc = zmq_bind(psocket, address.c_str());
+ if (rc!=0)
+ {
+ zmqError("Failed to bind address");
+ zmq_close(psocket);
+ return false;
+ }
+
+ // register this notifier for the address, so it can be reused for other publish notifier
+ mapPublishNotifiers.insert(std::make_pair(address, this));
+ return true;
+ }
+ else
+ {
+ LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
+
+ psocket = i->second->psocket;
+ mapPublishNotifiers.insert(std::make_pair(address, this));
+
+ return true;
+ }
+}
+
+void CZMQAbstractPublishNotifier::Shutdown()
+{
+ assert(psocket);
+
+ int count = mapPublishNotifiers.count(address);
+
+ // remove this notifier from the list of publishers using this address
+ typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
+ std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
+
+ for (iterator it = iterpair.first; it != iterpair.second; ++it)
+ {
+ if (it->second==this)
+ {
+ mapPublishNotifiers.erase(it);
+ break;
+ }
+ }
+
+ if (count == 1)
+ {
+ LogPrint("zmq", "Close socket at address %s\n", address);
+ int linger = 0;
+ zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
+ zmq_close(psocket);
+ }
+
+ psocket = 0;
+}
+
+bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
+{
+ assert(psocket);
+
+ /* send three parts, command & data & a LE 4byte sequence number */
+ unsigned char msgseq[sizeof(uint32_t)];
+ WriteLE32(&msgseq[0], nSequence);
+ int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
+ if (rc == -1)
+ return false;
+
+ /* increment memory only sequence number after sending */
+ nSequence++;
+
+ return true;
+}
+
+bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
+{
+ uint256 hash = pindex->GetBlockHash();
+ LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
+ char data[32];
+ for (unsigned int i = 0; i < 32; i++)
+ data[31 - i] = hash.begin()[i];
+ return SendMessage(MSG_HASHBLOCK, data, 32);
+}
+
+bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
+ char data[32];
+ for (unsigned int i = 0; i < 32; i++)
+ data[31 - i] = hash.begin()[i];
+ return SendMessage(MSG_HASHTX, data, 32);
+}
+
+bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
+{
+ LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
+
+ const Consensus::Params& consensusParams = Params().GetConsensus(pindex->nHeight);
+ CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
+ {
+ LOCK(cs_main);
+ CBlock block;
+ if(!ReadBlockFromDisk(block, pindex, consensusParams))
+ {
+ zmqError("Can't read block from disk");
+ return false;
+ }
+
+ ss << block;
+ }
+
+ return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
+}
+
+bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
+ CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
+ ss << transaction;
+ return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
new file mode 100644
index 000000000..bcbecf1bd
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.h
@@ -0,0 +1,55 @@
+// Copyright (c) 2015-2016 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+
+#include "zmqabstractnotifier.h"
+
+class CBlockIndex;
+
+class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
+{
+private:
+ uint32_t nSequence; //!< upcounting per message sequence number
+
+public:
+
+ /* send zmq multipart message
+ parts:
+ * command
+ * data
+ * message sequence number
+ */
+ bool SendMessage(const char *command, const void* data, size_t size);
+
+ bool Initialize(void *pcontext);
+ void Shutdown();
+};
+
+class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyBlock(const CBlockIndex *pindex);
+};
+
+class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyTransaction(const CTransaction &transaction);
+};
+
+class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyBlock(const CBlockIndex *pindex);
+};
+
+class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyTransaction(const CTransaction &transaction);
+};
+
+#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H