Skip to content

Commit 0978d77

Browse files
committed
Init singleWorker with queue and inventory
1 parent e03b1df commit 0978d77

File tree

2 files changed

+39
-34
lines changed

2 files changed

+39
-34
lines changed

src/class_singleWorker.py

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from struct import pack
1313
from subprocess import call # nosec
1414

15+
from six.moves import configparser, queue
16+
1517
import defaults
1618
import helper_inbox
1719
import helper_msgcoding
@@ -30,9 +32,7 @@
3032
)
3133
from bmconfigparser import BMConfigParser
3234
from helper_sql import sqlExecute, sqlQuery
33-
from inventory import Inventory
3435
from network import knownnodes, StoppableThread
35-
from six.moves import configparser, queue
3636

3737

3838
def sizeof_fmt(num, suffix='h/s'):
@@ -48,15 +48,17 @@ def sizeof_fmt(num, suffix='h/s'):
4848
class singleWorker(StoppableThread):
4949
"""Thread for performing PoW"""
5050

51-
def __init__(self):
51+
def __init__(self, queue, inventory):
5252
super(singleWorker, self).__init__(name="singleWorker")
53+
self.inventory = inventory
54+
self.queue = queue
5355
proofofwork.init()
5456

5557
def stopThread(self):
5658
"""Signal through the queue that the thread should be stopped"""
5759

5860
try:
59-
queues.workerQueue.put(("stopThread", "data"))
61+
self.queue.put(("stopThread", "data"))
6062
except queue.Full:
6163
self.logger.error('workerQueue is Full')
6264
super(singleWorker, self).stopThread()
@@ -119,7 +121,8 @@ def run(self):
119121
# For the case if user deleted knownnodes
120122
# but is still having onionpeer objects in inventory
121123
if not knownnodes.knownNodesActual:
122-
for item in Inventory().by_type_and_tag(protocol.OBJECT_ONIONPEER):
124+
for item in self.inventory.by_type_and_tag(
125+
protocol.OBJECT_ONIONPEER):
123126
queues.objectProcessorQueue.put((
124127
protocol.OBJECT_ONIONPEER, item.payload
125128
))
@@ -134,17 +137,17 @@ def run(self):
134137

135138
# just in case there are any pending tasks for msg
136139
# messages that have yet to be sent.
137-
queues.workerQueue.put(('sendmessage', ''))
140+
self.queue.put(('sendmessage', ''))
138141
# just in case there are any tasks for Broadcasts
139142
# that have yet to be sent.
140-
queues.workerQueue.put(('sendbroadcast', ''))
143+
self.queue.put(('sendbroadcast', ''))
141144

142145
# send onionpeer object
143-
queues.workerQueue.put(('sendOnionPeerObj', ''))
146+
self.queue.put(('sendOnionPeerObj', ''))
144147

145148
while state.shutdown == 0:
146149
self.busy = 0
147-
command, data = queues.workerQueue.get()
150+
command, data = self.queue.get()
148151
self.busy = 1
149152
if command == 'sendmessage':
150153
try:
@@ -191,7 +194,7 @@ def run(self):
191194
command
192195
)
193196

194-
queues.workerQueue.task_done()
197+
self.queue.task_done()
195198
self.logger.info("Quitting...")
196199

197200
def _getKeysForAddress(self, address):
@@ -290,13 +293,12 @@ def doPOWForMyV2Pubkey(self, adressHash):
290293

291294
inventoryHash = calculateInventoryHash(payload)
292295
objectType = 1
293-
Inventory()[inventoryHash] = (
294-
objectType, streamNumber, payload, embeddedTime, '')
296+
self.inventory.put(
297+
inventoryHash, objectType, streamNumber, payload, embeddedTime)
295298

296299
self.logger.info(
297300
'broadcasting inv with hash: %s', hexlify(inventoryHash))
298301

299-
queues.invQueue.put((streamNumber, inventoryHash))
300302
queues.UISignalQueue.put(('updateStatusBar', ''))
301303
try:
302304
BMConfigParser().set(
@@ -378,13 +380,11 @@ def sendOutOrStoreMyV3Pubkey(self, adressHash):
378380

379381
inventoryHash = calculateInventoryHash(payload)
380382
objectType = 1
381-
Inventory()[inventoryHash] = (
382-
objectType, streamNumber, payload, embeddedTime, '')
383+
self.inventory.put(objectType, streamNumber, payload, embeddedTime)
383384

384385
self.logger.info(
385386
'broadcasting inv with hash: %s', hexlify(inventoryHash))
386387

387-
queues.invQueue.put((streamNumber, inventoryHash))
388388
queues.UISignalQueue.put(('updateStatusBar', ''))
389389
try:
390390
BMConfigParser().set(
@@ -471,15 +471,13 @@ def sendOutOrStoreMyV4Pubkey(self, myAddress):
471471

472472
inventoryHash = calculateInventoryHash(payload)
473473
objectType = 1
474-
Inventory()[inventoryHash] = (
474+
self.inventory.put(
475475
objectType, streamNumber, payload, embeddedTime,
476-
doubleHashOfAddressData[32:]
477-
)
476+
doubleHashOfAddressData[32:])
478477

479478
self.logger.info(
480479
'broadcasting inv with hash: %s', hexlify(inventoryHash))
481480

482-
queues.invQueue.put((streamNumber, inventoryHash))
483481
queues.UISignalQueue.put(('updateStatusBar', ''))
484482
try:
485483
BMConfigParser().set(
@@ -507,7 +505,7 @@ def sendOnionPeerObj(self, peer=None):
507505
objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host)
508506
tag = calculateInventoryHash(objectPayload)
509507

510-
if Inventory().by_type_and_tag(objectType, tag):
508+
if self.inventory.by_type_and_tag(objectType, tag):
511509
return # not expired
512510

513511
payload = pack('>Q', embeddedTime)
@@ -520,14 +518,14 @@ def sendOnionPeerObj(self, peer=None):
520518
payload, TTL, log_prefix='(For onionpeer object)')
521519

522520
inventoryHash = calculateInventoryHash(payload)
523-
Inventory()[inventoryHash] = (
521+
self.inventory.put(
524522
objectType, streamNumber, buffer(payload),
525-
embeddedTime, buffer(tag)
526-
)
523+
embeddedTime, buffer(tag))
524+
527525
self.logger.info(
528526
'sending inv (within sendOnionPeerObj function) for object: %s',
529527
hexlify(inventoryHash))
530-
queues.invQueue.put((streamNumber, inventoryHash))
528+
queues.UISignalQueue.put(('updateStatusBar', ''))
531529

532530
def sendBroadcast(self):
533531
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@@ -688,14 +686,14 @@ def sendBroadcast(self):
688686

689687
inventoryHash = calculateInventoryHash(payload)
690688
objectType = 3
691-
Inventory()[inventoryHash] = (
689+
self.inventory.put(
692690
objectType, streamNumber, payload, embeddedTime, tag)
691+
693692
self.logger.info(
694693
'sending inv (within sendBroadcast function)'
695694
' for object: %s',
696695
hexlify(inventoryHash)
697696
)
698-
queues.invQueue.put((streamNumber, inventoryHash))
699697

700698
queues.UISignalQueue.put((
701699
'updateSentItemStatusByAckdata', (
@@ -847,7 +845,8 @@ def sendMsg(self):
847845
hexlify(privEncryptionKey))
848846
)
849847

850-
for value in Inventory().by_type_and_tag(1, toTag):
848+
for value in self.inventory.by_type_and_tag(
849+
1, toTag):
851850
# if valid, this function also puts it
852851
# in the pubkeys table.
853852
if protocol.decryptAndCheckPubkeyPayload(
@@ -1303,8 +1302,9 @@ def sendMsg(self):
13031302

13041303
inventoryHash = calculateInventoryHash(encryptedPayload)
13051304
objectType = 2
1306-
Inventory()[inventoryHash] = (
1307-
objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
1305+
self.inventory.put(
1306+
objectType, toStreamNumber, encryptedPayload, embeddedTime)
1307+
13081308
if BMConfigParser().has_section(toaddress) or \
13091309
not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
13101310
queues.UISignalQueue.put((
@@ -1329,7 +1329,6 @@ def sendMsg(self):
13291329
'Broadcasting inv for my msg(within sendmsg function): %s',
13301330
hexlify(inventoryHash)
13311331
)
1332-
queues.invQueue.put((toStreamNumber, inventoryHash))
13331332

13341333
# Update the sent message in the sent table with the
13351334
# necessary information.
@@ -1461,10 +1460,10 @@ def requestPubKey(self, toAddress):
14611460

14621461
inventoryHash = calculateInventoryHash(payload)
14631462
objectType = 1
1464-
Inventory()[inventoryHash] = (
1465-
objectType, streamNumber, payload, embeddedTime, '')
1463+
self.inventory.put(
1464+
objectType, streamNumber, payload, embeddedTime)
1465+
14661466
self.logger.info('sending inv (for the getpubkey message)')
1467-
queues.invQueue.put((streamNumber, inventoryHash))
14681467

14691468
# wait 10% past expiration
14701469
sleeptill = int(time.time() + TTL * 1.1)

src/inventory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import storage.filesystem
55
import storage.sqlite
66
from bmconfigparser import BMConfigParser
7+
# TODO: init with queue
8+
from queues import invQueue
79
from singleton import Singleton
810

911

@@ -39,3 +41,7 @@ def __getattr__(self, attr):
3941
# hint for pylint: this is dictionary like object
4042
def __getitem__(self, key):
4143
return self._realInventory[key]
44+
45+
def put(self, invhash, obj_type, stream, payload, embedded_time, tag=''):
46+
self[invhash] = (obj_type, stream, payload, embedded_time, tag)
47+
invQueue.put((stream, invhash))

0 commit comments

Comments
 (0)