Skip to content

Commit 384e090

Browse files
author
MarcoFalke
committed
Merge #19509: Per-Peer Message Capture
bff7c66 Add documentation to contrib folder (Troy Giorshev) 381f77b Add Message Capture Test (Troy Giorshev) e4f378a Add capture parser (Troy Giorshev) 4d1a582 Call CaptureMessage at appropriate locations (Troy Giorshev) f2a77ff Add CaptureMessage (Troy Giorshev) dbf779d Clean PushMessage and ProcessMessages (Troy Giorshev) Pull request description: This PR introduces per-peer message capture into Bitcoin Core. 📓 ## Purpose The purpose and scope of this feature is intentionally limited. It answers a question anyone new to Bitcoin's P2P protocol has had: "Can I see what messages my node is sending and receiving?". ## Functionality When a new debug-only command line argument `capturemessages` is set, any message that the node receives or sends is captured. The capture occurs in the MessageHandler thread. When receiving a message, it is captured as soon as the MessageHandler thread takes the message off of the vProcessMsg queue. When sending, the message is captured just before the message is pushed onto the vSendMsg queue. The message capture is as minimal as possible to reduce the performance impact on the node. Messages are captured to a new `message_capture` folder in the datadir. Each node has their own subfolder named with their IP address and port. Inside, received and sent messages are captured into two binary files, msgs_recv.dat and msgs_sent.dat, like so: ``` message_capture/203.0.113.7:56072/msgs_recv.dat message_capture/203.0.113.7:56072/msgs_sent.dat ``` Because the messages are raw binary dumps, included in this PR is a Python parsing tool to convert the binary files into human-readable JSON. This script has been placed on its own and out of the way in the new `contrib/message-capture` folder. Its usage is simple and easily discovered by the autogenerated `-h` option. ## Future Maintenance I sympathize greatly with anyone who says "the best code is no code". The future maintenance of this feature will be minimal. The logic to deserialize the payload of the p2p messages exists in our testing framework. As long as our testing framework works, so will this tool. Additionally, I hope that the simplicity of this tool will mean that it gets used frequently, so that problems will be discovered and solved when they are small. ## FAQ "Why not just use Wireshark" Yes, Wireshark has the ability to filter and decode Bitcoin messages. However, the purpose of the message capture added in this PR is to assist with debugging, primarily for new developers looking to improve their knowledge of the Bitcoin Protocol. This drives the design in a different direction than Wireshark, in two different ways. First, this tool must be convenient and simple to use. Using an external tool, like Wireshark, requires setup and interpretation of the results. To a new user who doesn't necessarily know what to expect, this is unnecessary difficulty. This tool, on the other hand, "just works". Turn on the command line flag, run your node, run the script, read the JSON. Second, because this tool is being used for debugging, we want it to be as close to the true behavior of the node as possible. A lot can happen in the SocketHandler thread that would be missed by Wireshark. Additionally, if we are to use Wireshark, we are at the mercy of whoever it maintaining the protocol in Wireshark, both as to it being accurate and recent. As can be seen by the **many** previous attempts to include Bitcoin in Wireshark (google "bitcoin dissector") this is easier said than done. Lastly, I truly believe that this tool will be used significantly more by being included in the codebase. It's just that much more discoverable. ACKs for top commit: MarcoFalke: re-ACK bff7c66 only some minor changes: 👚 jnewbery: utACK bff7c66 theStack: re-ACK bff7c66 Tree-SHA512: e59e3160422269221f70f98720b47842775781c247c064071d546c24fa7a35a0e5534e8baa4b4591a750d7eb16de6b4ecf54cbee6d193b261f4f104e28c15f47
2 parents 1e69800 + bff7c66 commit 384e090

File tree

9 files changed

+369
-16
lines changed

9 files changed

+369
-16
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Per-Peer Message Capture
2+
3+
## Purpose
4+
5+
This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"
6+
7+
## Usage and Functionality
8+
9+
* Run `bitcoind` with the `-capturemessages` option.
10+
* Look in the `message_capture` folder in your datadir.
11+
* Typically this will be `~/.bitcoin/message_capture`.
12+
* See that there are many folders inside, one for each peer names with its IP address and port.
13+
* Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
14+
* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
15+
* See the `-h` option for help.
16+
* To see all messages, both sent and received, for all peers use:
17+
```
18+
./contrib/message-capture/message-capture-parser.py -o out.json \
19+
~/.bitcoin/message_capture/**/*.dat
20+
```
21+
* Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
22+
* If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
23+
* View the resulting output.
24+
* The output file is `JSON` formatted.
25+
* Suggestion: use `jq` to view the output, with `jq . out.json`
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2020 The Bitcoin Core developers
3+
# Distributed under the MIT software license, see the accompanying
4+
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
"""Parse message capture binary files. To be used in conjunction with -capturemessages."""
6+
7+
import argparse
8+
import os
9+
import shutil
10+
import sys
11+
from io import BytesIO
12+
import json
13+
from pathlib import Path
14+
from typing import Any, List, Optional
15+
16+
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
17+
18+
from test_framework.messages import ser_uint256 # noqa: E402
19+
from test_framework.p2p import MESSAGEMAP # noqa: E402
20+
21+
TIME_SIZE = 8
22+
LENGTH_SIZE = 4
23+
MSGTYPE_SIZE = 12
24+
25+
# The test framework classes stores hashes as large ints in many cases.
26+
# These are variables of type uint256 in core.
27+
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
28+
# As such, they are itemized here.
29+
# Any variables with these names that are of type int are actually uint256 variables.
30+
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
31+
HASH_INTS = [
32+
"blockhash",
33+
"block_hash",
34+
"hash",
35+
"hashMerkleRoot",
36+
"hashPrevBlock",
37+
"hashstop",
38+
"prev_header",
39+
"sha256",
40+
"stop_hash",
41+
]
42+
43+
HASH_INT_VECTORS = [
44+
"hashes",
45+
"headers",
46+
"vHave",
47+
"vHash",
48+
]
49+
50+
51+
class ProgressBar:
52+
def __init__(self, total: float):
53+
self.total = total
54+
self.running = 0
55+
56+
def set_progress(self, progress: float):
57+
cols = shutil.get_terminal_size()[0]
58+
if cols <= 12:
59+
return
60+
max_blocks = cols - 9
61+
num_blocks = int(max_blocks * progress)
62+
print('\r[ {}{} ] {:3.0f}%'
63+
.format('#' * num_blocks,
64+
' ' * (max_blocks - num_blocks),
65+
progress * 100),
66+
end ='')
67+
68+
def update(self, more: float):
69+
self.running += more
70+
self.set_progress(self.running / self.total)
71+
72+
73+
def to_jsonable(obj: Any) -> Any:
74+
if hasattr(obj, "__dict__"):
75+
return obj.__dict__
76+
elif hasattr(obj, "__slots__"):
77+
ret = {} # type: Any
78+
for slot in obj.__slots__:
79+
val = getattr(obj, slot, None)
80+
if slot in HASH_INTS and isinstance(val, int):
81+
ret[slot] = ser_uint256(val).hex()
82+
elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
83+
ret[slot] = [ser_uint256(a).hex() for a in val]
84+
else:
85+
ret[slot] = to_jsonable(val)
86+
return ret
87+
elif isinstance(obj, list):
88+
return [to_jsonable(a) for a in obj]
89+
elif isinstance(obj, bytes):
90+
return obj.hex()
91+
else:
92+
return obj
93+
94+
95+
def process_file(path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
96+
with open(path, 'rb') as f_in:
97+
if progress_bar:
98+
bytes_read = 0
99+
100+
while True:
101+
if progress_bar:
102+
# Update progress bar
103+
diff = f_in.tell() - bytes_read - 1
104+
progress_bar.update(diff)
105+
bytes_read = f_in.tell() - 1
106+
107+
# Read the Header
108+
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
109+
if not tmp_header_raw:
110+
break
111+
tmp_header = BytesIO(tmp_header_raw)
112+
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
113+
msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] # type: bytes
114+
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
115+
116+
# Start converting the message to a dictionary
117+
msg_dict = {}
118+
msg_dict["direction"] = "recv" if recv else "sent"
119+
msg_dict["time"] = time
120+
msg_dict["size"] = length # "size" is less readable here, but more readable in the output
121+
122+
msg_ser = BytesIO(f_in.read(length))
123+
124+
# Determine message type
125+
if msgtype not in MESSAGEMAP:
126+
# Unrecognized message type
127+
try:
128+
msgtype_tmp = msgtype.decode()
129+
if not msgtype_tmp.isprintable():
130+
raise UnicodeDecodeError
131+
msg_dict["msgtype"] = msgtype_tmp
132+
except UnicodeDecodeError:
133+
msg_dict["msgtype"] = "UNREADABLE"
134+
msg_dict["body"] = msg_ser.read().hex()
135+
msg_dict["error"] = "Unrecognized message type."
136+
messages.append(msg_dict)
137+
print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
138+
continue
139+
140+
# Deserialize the message
141+
msg = MESSAGEMAP[msgtype]()
142+
msg_dict["msgtype"] = msgtype.decode()
143+
144+
try:
145+
msg.deserialize(msg_ser)
146+
except KeyboardInterrupt:
147+
raise
148+
except Exception:
149+
# Unable to deserialize message body
150+
msg_ser.seek(0, os.SEEK_SET)
151+
msg_dict["body"] = msg_ser.read().hex()
152+
msg_dict["error"] = "Unable to deserialize message."
153+
messages.append(msg_dict)
154+
print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
155+
continue
156+
157+
# Convert body of message into a jsonable object
158+
if length:
159+
msg_dict["body"] = to_jsonable(msg)
160+
messages.append(msg_dict)
161+
162+
if progress_bar:
163+
# Update the progress bar to the end of the current file
164+
# in case we exited the loop early
165+
f_in.seek(0, os.SEEK_END) # Go to end of file
166+
diff = f_in.tell() - bytes_read - 1
167+
progress_bar.update(diff)
168+
169+
170+
def main():
171+
parser = argparse.ArgumentParser(
172+
description=__doc__,
173+
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
174+
formatter_class=argparse.RawTextHelpFormatter)
175+
parser.add_argument(
176+
"capturepaths",
177+
nargs='+',
178+
help="binary message capture files to parse.")
179+
parser.add_argument(
180+
"-o", "--output",
181+
help="output file. If unset print to stdout")
182+
parser.add_argument(
183+
"-n", "--no-progress-bar",
184+
action='store_true',
185+
help="disable the progress bar. Automatically set if the output is not a terminal")
186+
args = parser.parse_args()
187+
capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
188+
output = Path.cwd() / Path(args.output) if args.output else False
189+
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
190+
191+
messages = [] # type: List[Any]
192+
if use_progress_bar:
193+
total_size = sum(capture.stat().st_size for capture in capturepaths)
194+
progress_bar = ProgressBar(total_size)
195+
else:
196+
progress_bar = None
197+
198+
for capture in capturepaths:
199+
process_file(str(capture), messages, "recv" in capture.stem, progress_bar)
200+
201+
messages.sort(key=lambda msg: msg['time'])
202+
203+
if use_progress_bar:
204+
progress_bar.set_progress(1)
205+
206+
jsonrep = json.dumps(messages)
207+
if output:
208+
with open(str(output), 'w+', encoding="utf8") as f_out:
209+
f_out.write(jsonrep)
210+
else:
211+
print(jsonrep)
212+
213+
if __name__ == "__main__":
214+
main()

src/init.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ void SetupServerArgs(NodeContext& node)
519519
argsman.AddArg("-limitdescendantcount=<n>", strprintf("Do not accept transactions if any ancestor would have <n> or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
520520
argsman.AddArg("-limitdescendantsize=<n>", strprintf("Do not accept transactions if any ancestor would have more than <n> kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
521521
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
522+
argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
522523
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
523524
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ".",
524525
ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
@@ -1040,16 +1041,17 @@ bool AppInitParameterInteraction(const ArgsManager& args)
10401041

10411042
// Trim requested connection counts, to fit into system limitations
10421043
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
1043-
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind);
1044+
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE);
1045+
10441046
#ifdef USE_POLL
10451047
int fd_max = nFD;
10461048
#else
10471049
int fd_max = FD_SETSIZE;
10481050
#endif
1049-
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
1051+
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0);
10501052
if (nFD < MIN_CORE_FILEDESCRIPTORS)
10511053
return InitError(_("Not enough file descriptors available."));
1052-
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);
1054+
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections);
10531055

10541056
if (nMaxConnections < nUserMaxConnections)
10551057
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));

src/net.cpp

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2879,6 +2879,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
28792879
{
28802880
size_t nMessageSize = msg.data.size();
28812881
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.m_type), nMessageSize, pnode->GetId());
2882+
if (gArgs.GetBoolArg("-capturemessages", false)) {
2883+
CaptureMessage(pnode->addr, msg.m_type, msg.data, /* incoming */ false);
2884+
}
28822885

28832886
// make sure we use the appropriate network transport format
28842887
std::vector<unsigned char> serializedHeader;
@@ -2894,18 +2897,14 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
28942897
pnode->mapSendBytesPerMsgCmd[msg.m_type] += nTotalSize;
28952898
pnode->nSendSize += nTotalSize;
28962899

2897-
if (pnode->nSendSize > nSendBufferMaxSize)
2898-
pnode->fPauseSend = true;
2900+
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
28992901
pnode->vSendMsg.push_back(std::move(serializedHeader));
2900-
if (nMessageSize)
2901-
pnode->vSendMsg.push_back(std::move(msg.data));
2902+
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
29022903

29032904
// If write queue empty, attempt "optimistic write"
2904-
if (optimisticSend == true)
2905-
nBytesSent = SocketSendData(*pnode);
2905+
if (optimisticSend) nBytesSent = SocketSendData(*pnode);
29062906
}
2907-
if (nBytesSent)
2908-
RecordBytesSent(nBytesSent);
2907+
if (nBytesSent) RecordBytesSent(nBytesSent);
29092908
}
29102909

29112910
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
@@ -2948,3 +2947,31 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
29482947

29492948
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
29502949
}
2950+
2951+
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming)
2952+
{
2953+
// Note: This function captures the message at the time of processing,
2954+
// not at socket receive/send time.
2955+
// This ensures that the messages are always in order from an application
2956+
// layer (processing) perspective.
2957+
auto now = GetTime<std::chrono::microseconds>();
2958+
2959+
// Windows folder names can not include a colon
2960+
std::string clean_addr = addr.ToString();
2961+
std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
2962+
2963+
fs::path base_path = GetDataDir() / "message_capture" / clean_addr;
2964+
fs::create_directories(base_path);
2965+
2966+
fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
2967+
CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
2968+
2969+
ser_writedata64(f, now.count());
2970+
f.write(msg_type.data(), msg_type.length());
2971+
for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
2972+
f << '\0';
2973+
}
2974+
uint32_t size = data.size();
2975+
ser_writedata32(f, size);
2976+
f.write((const char*)data.data(), data.size());
2977+
}

src/net.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <policy/feerate.h>
2121
#include <protocol.h>
2222
#include <random.h>
23+
#include <span.h>
2324
#include <streams.h>
2425
#include <sync.h>
2526
#include <threadinterrupt.h>
@@ -75,6 +76,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0;
7576
static const bool DEFAULT_BLOCKSONLY = false;
7677
/** -peertimeout default */
7778
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
79+
/** Number of file descriptors required for message capture **/
80+
static const int NUM_FDS_MESSAGE_CAPTURE = 1;
7881

7982
static const bool DEFAULT_FORCEDNSSEED = false;
8083
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
@@ -1241,6 +1244,9 @@ inline std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now,
12411244
return std::chrono::microseconds{PoissonNextSend(now.count(), average_interval.count())};
12421245
}
12431246

1247+
/** Dump binary message to file, with timestamp */
1248+
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming);
1249+
12441250
struct NodeEvictionCandidate
12451251
{
12461252
NodeId id;

src/net_processing.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4045,14 +4045,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
40454045
}
40464046

40474047
// Don't bother if send buffer is too full to respond anyway
4048-
if (pfrom->fPauseSend)
4049-
return false;
4048+
if (pfrom->fPauseSend) return false;
40504049

40514050
std::list<CNetMessage> msgs;
40524051
{
40534052
LOCK(pfrom->cs_vProcessMsg);
4054-
if (pfrom->vProcessMsg.empty())
4055-
return false;
4053+
if (pfrom->vProcessMsg.empty()) return false;
40564054
// Just take one message
40574055
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
40584056
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
@@ -4061,6 +4059,10 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
40614059
}
40624060
CNetMessage& msg(msgs.front());
40634061

4062+
if (gArgs.GetBoolArg("-capturemessages", false)) {
4063+
CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), /* incoming */ true);
4064+
}
4065+
40644066
msg.SetVersion(pfrom->GetCommonVersion());
40654067
const std::string& msg_type = msg.m_command;
40664068

0 commit comments

Comments
 (0)