Skip to content

Commit 3d56f6d

Browse files
author
Maksym Kotliar
committed
Improvements
- show if batch message is not processed - add env var to disable async mode - improve reply message
1 parent 006b809 commit 3d56f6d

File tree

3 files changed

+42
-15
lines changed

3 files changed

+42
-15
lines changed

Async/ElasticaPopulateProcessor.php

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,32 +32,39 @@ public function process(Message $message, Context $context)
3232
}
3333

3434
if ($message->isRedelivered()) {
35-
$replyMessage = $context->createMessage(false);
36-
$replyQueue = $context->createQueue($message->getReplyTo());
37-
$context->createProducer()->send($replyQueue, $replyMessage);
35+
$this->sendReply($context, $message, false);
3836

3937
return self::REJECT;
4038
}
4139

42-
$options = JSON::decode($message->getBody());
40+
try {
41+
$options = JSON::decode($message->getBody());
4342

44-
$provider = $this->providerRegistry->getProvider($options['indexName'], $options['typeName']);
45-
$provider->populate(null, $options);
43+
$provider = $this->providerRegistry->getProvider($options['indexName'], $options['typeName']);
44+
$provider->populate(null, $options);
4645

47-
$this->sendReply($context, $message->getReplyTo(), true);
46+
$this->sendReply($context, $message, true);
4847

49-
return self::ACK;
48+
return self::ACK;
49+
} catch (\Exception $e) {
50+
$this->sendReply($context, $message, false);
51+
52+
return self::REJECT;
53+
}
5054
}
5155

5256
/**
5357
* @param Context $context
54-
* @param string $replyTo
55-
* @param bool $message
58+
* @param Message $message
59+
* @param bool $successful
5660
*/
57-
private function sendReply(Context $context, $replyTo, $message)
61+
private function sendReply(Context $context, Message $message, $successful)
5862
{
59-
$replyMessage = $context->createMessage($message);
60-
$replyQueue = $context->createQueue($replyTo);
63+
$replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
64+
$replyMessage->setProperty('fos-populate-successful', (int) $successful);
65+
66+
$replyQueue = $context->createQueue($message->getReplyTo());
67+
6168
$context->createProducer()->send($replyQueue, $replyMessage);
6269
}
6370
}

Elastica/AsyncDoctrineOrmProvider.php

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public function setContext(Context $context)
2727
*/
2828
protected function doPopulate($options, \Closure $loggerClosure = null)
2929
{
30+
if (getenv('ENQUEUE_ELASTICA_DISABLE_ASYNC')) {
31+
return parent::doPopulate($options, $loggerClosure);
32+
}
33+
3034
$this->batchSize = null;
3135
if ($options['real_populate']) {
3236
$this->batchSize = $options['offset'] + $options['batch_size'];
@@ -58,8 +62,18 @@ protected function doPopulate($options, \Closure $loggerClosure = null)
5862
$limitTime = time() + 180;
5963
while ($nbMessages) {
6064
if ($message = $consumer->receive(20000)) {
61-
if (null !== $loggerClosure) {
62-
$loggerClosure($options['batch_size'], $nbObjects);
65+
$errorMessage = null;
66+
67+
$errorMessage = null;
68+
if (false == $message->getProperty('fos-populate-successful', false)) {
69+
$errorMessage = sprintf(
70+
'<error>Batch failed: </error> <comment>Failed to process message %s</comment>',
71+
$message->getBody()
72+
);
73+
}
74+
75+
if ($loggerClosure) {
76+
$loggerClosure($options['batch_size'], $nbObjects, $errorMessage);
6377
}
6478

6579
$consumer->acknowledge($message);

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ So you have to run as usual
8181
$ ./bin/console fos:elastica:populate
8282
```
8383

84+
If you want to disable this behavior you can un register the bundle or use env var
85+
86+
```bash
87+
$ ENQUEUE_ELASTICA_DISABLE_ASYNC=1 ./bin/console fos:elastica:populate
88+
```
89+
8490
and have pull of consumer commands run somewhere, run them as many as you'd like
8591

8692
```bash

0 commit comments

Comments
 (0)