Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 171480d

Browse files
authored
Replication fix amendment to #778 (#881)
* fixes on replication * trying to fix #778 #issuecomment-907319726 * code spacing fixes * codestyle fixes * trigger workflow locally
1 parent 491d164 commit 171480d

File tree

3 files changed

+130
-54
lines changed

3 files changed

+130
-54
lines changed

src/ChannelManagers/LocalChannelManager.php

+42-20
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ public function getLocalConnectionsCount($appId, string $channelName = null): Pr
272272
return $channel->getName() === $channelName;
273273
});
274274
})
275-
->flatMap(function (Channel $channel) {
276-
return collect($channel->getConnections())->pluck('socketId');
277-
})
278-
->unique()->count();
275+
->flatMap(function (Channel $channel) {
276+
return collect($channel->getConnections())->pluck('socketId');
277+
})
278+
->unique()->count();
279279
});
280280
}
281281

@@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
429429
*/
430430
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
431431
{
432-
$connection->lastPongedAt = Carbon::now();
433-
434-
return $this->updateConnectionInChannels($connection);
432+
return $this->pongConnectionInChannels($connection);
435433
}
436434

437435
/**
@@ -441,23 +439,47 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
441439
*/
442440
public function removeObsoleteConnections(): PromiseInterface
443441
{
444-
if (! $this->lock()->acquire()) {
445-
return Helpers::createFulfilledPromise(false);
446-
}
442+
$lock = $this->lock();
443+
try {
444+
if (! $lock->acquire()) {
445+
return Helpers::createFulfilledPromise(false);
446+
}
447447

448-
$this->getLocalConnections()->then(function ($connections) {
449-
foreach ($connections as $connection) {
450-
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
448+
$this->getLocalConnections()->then(function ($connections) {
449+
foreach ($connections as $connection) {
450+
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
451451

452-
if ($differenceInSeconds > 120) {
453-
$this->unsubscribeFromAllChannels($connection);
452+
if ($differenceInSeconds > 120) {
453+
$this->unsubscribeFromAllChannels($connection);
454+
}
454455
}
455-
}
456-
});
456+
});
457457

458-
return Helpers::createFulfilledPromise(
459-
$this->lock()->forceRelease()
460-
);
458+
return Helpers::createFulfilledPromise(true);
459+
} finally {
460+
optional($lock)->forceRelease();
461+
}
462+
}
463+
464+
/**
465+
* Pong connection in channels.
466+
*
467+
* @param ConnectionInterface $connection
468+
* @return PromiseInterface[bool]
469+
*/
470+
public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
471+
{
472+
return $this->getLocalChannels($connection->app->id)
473+
->then(function ($channels) use ($connection) {
474+
foreach ($channels as $channel) {
475+
if ($conn = $channel->getConnection($connection->socketId)) {
476+
$conn->lastPongedAt = Carbon::now();
477+
$channel->saveConnection($conn);
478+
}
479+
}
480+
481+
return true;
482+
});
461483
}
462484

463485
/**

src/ChannelManagers/RedisChannelManager.php

+74-33
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace BeyondCode\LaravelWebSockets\ChannelManagers;
44

5-
use BeyondCode\LaravelWebSockets\Channels\Channel;
65
use BeyondCode\LaravelWebSockets\DashboardLogger;
76
use BeyondCode\LaravelWebSockets\Helpers;
87
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
@@ -145,31 +144,18 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
145144
*/
146145
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
147146
{
148-
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
147+
return parent::unsubscribeFromChannel($connection, $channelName, $payload)
148+
->then(function () use ($connection, $channelName) {
149+
return $this->decrementSubscriptionsCount($connection->app->id, $channelName);
150+
})
149151
->then(function ($count) use ($connection, $channelName) {
150-
if ($count === 0) {
151-
// Make sure to not stay subscribed to the PubSub topic
152-
// if there are no connections.
152+
$this->removeConnectionFromSet($connection);
153+
// If the total connections count gets to 0 after unsubscribe,
154+
// try again to check & unsubscribe from the PubSub topic if needed.
155+
if ($count < 1) {
156+
$this->removeChannelFromSet($connection->app->id, $channelName);
153157
$this->unsubscribeFromTopic($connection->app->id, $channelName);
154158
}
155-
156-
$this->decrementSubscriptionsCount($connection->app->id, $channelName)
157-
->then(function ($count) use ($connection, $channelName) {
158-
// If the total connections count gets to 0 after unsubscribe,
159-
// try again to check & unsubscribe from the PubSub topic if needed.
160-
if ($count < 1) {
161-
$this->unsubscribeFromTopic($connection->app->id, $channelName);
162-
}
163-
});
164-
})
165-
->then(function () use ($connection, $channelName) {
166-
return $this->removeChannelFromSet($connection->app->id, $channelName);
167-
})
168-
->then(function () use ($connection) {
169-
return $this->removeConnectionFromSet($connection);
170-
})
171-
->then(function () use ($connection, $channelName, $payload) {
172-
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
173159
});
174160
}
175161

@@ -363,6 +349,16 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
363349
{
364350
// This will update the score with the current timestamp.
365351
return $this->addConnectionToSet($connection, Carbon::now())
352+
->then(function () use ($connection) {
353+
$payload = [
354+
'socketId' => $connection->socketId,
355+
'appId' => $connection->app->id,
356+
'serverId' => $this->getServerId(),
357+
];
358+
359+
return $this->publishClient
360+
->publish($this->getPongRedisHash($connection->app->id), json_encode($payload));
361+
})
366362
->then(function () use ($connection) {
367363
return parent::connectionPonged($connection);
368364
});
@@ -375,18 +371,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
375371
*/
376372
public function removeObsoleteConnections(): PromiseInterface
377373
{
378-
$this->lock()->get(function () {
379-
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
380-
->then(function ($connections) {
381-
foreach ($connections as $socketId => $appId) {
382-
$connection = $this->fakeConnectionForApp($appId, $socketId);
374+
$lock = $this->lock();
375+
try {
376+
$lock->get(function () {
377+
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
378+
->then(function ($connections) {
379+
foreach ($connections as $socketId => $appId) {
380+
$connection = $this->fakeConnectionForApp($appId, $socketId);
383381

384-
$this->unsubscribeFromAllChannels($connection);
385-
}
386-
});
387-
});
382+
$this->unsubscribeFromAllChannels($connection);
383+
}
384+
});
385+
});
388386

389-
return parent::removeObsoleteConnections();
387+
return parent::removeObsoleteConnections();
388+
} finally {
389+
optional($lock)->forceRelease();
390+
}
390391
}
391392

392393
/**
@@ -404,6 +405,12 @@ public function onMessage(string $redisChannel, string $payload)
404405
return;
405406
}
406407

408+
if ($redisChannel == $this->getPongRedisHash($payload->appId)) {
409+
$connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId);
410+
411+
return parent::connectionPonged($connection);
412+
}
413+
407414
$payload->channel = Str::after($redisChannel, "{$payload->appId}:");
408415

409416
if (! $channel = $this->find($payload->appId, $payload->channel)) {
@@ -429,6 +436,16 @@ public function onMessage(string $redisChannel, string $payload)
429436
$channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
430437
}
431438

439+
public function find($appId, string $channel)
440+
{
441+
if (! $channelInstance = parent::find($appId, $channel)) {
442+
$class = $this->getChannelClassName($channel);
443+
$this->channels[$appId][$channel] = new $class($channel);
444+
}
445+
446+
return parent::find($appId, $channel);
447+
}
448+
432449
/**
433450
* Build the Redis connection URL from Laravel database config.
434451
*
@@ -601,6 +618,20 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface
601618
);
602619
}
603620

621+
/**
622+
* Check if channel is on the list.
623+
*
624+
* @param string|int $appId
625+
* @param string $channel
626+
* @return PromiseInterface
627+
*/
628+
public function isChannelInSet($appId, string $channel): PromiseInterface
629+
{
630+
return $this->publishClient->sismember(
631+
$this->getChannelsRedisHash($appId), $channel
632+
);
633+
}
634+
604635
/**
605636
* Set data for a topic. Might be used for the presence channels.
606637
*
@@ -729,6 +760,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
729760
return $hash;
730761
}
731762

763+
/**
764+
* Get the pong Redis hash.
765+
*
766+
* @param string|int $appId
767+
*/
768+
public function getPongRedisHash($appId): string
769+
{
770+
return $this->getRedisKey($appId, null, ['pong']);
771+
}
772+
732773
/**
733774
* Get the statistics Redis hash.
734775
*

src/Channels/Channel.php

+14-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ public function getConnections()
5959
return $this->connections;
6060
}
6161

62+
/**
63+
* Get connection by socketId.
64+
*
65+
* @param string socketId
66+
* @return ?ConnectionInterface
67+
*/
68+
public function getConnection(string $socketId): ?ConnectionInterface
69+
{
70+
return $this->connections[$socketId] ?? null;
71+
}
72+
6273
/**
6374
* Check if the channel has connections.
6475
*
@@ -159,6 +170,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo
159170
collect($this->getConnections())
160171
->each(function ($connection) use ($payload) {
161172
$connection->send(json_encode($payload));
173+
$this->channelManager->connectionPonged($connection);
162174
});
163175

164176
if ($replicate) {
@@ -196,12 +208,13 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId,
196208
}
197209

198210
if (is_null($socketId)) {
199-
return $this->broadcast($appId, $payload, $replicate);
211+
return $this->broadcast($appId, $payload, false);
200212
}
201213

202214
collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) {
203215
if ($connection->socketId !== $socketId) {
204216
$connection->send(json_encode($payload));
217+
$this->channelManager->connectionPonged($connection);
205218
}
206219
});
207220

0 commit comments

Comments
 (0)