Skip to content

Commit 9d9d7aa

Browse files
authored
Disallow adding listener to completed jobs [HZ-2254] (#24197)
Protocol changes: hazelcast/hazelcast-client-protocol#463
1 parent 4a03057 commit 9d9d7aa

27 files changed

+445
-140
lines changed

hazelcast/src/main/java/com/hazelcast/client/impl/protocol/codec/JetAddJobStatusListenerCodec.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,15 @@
3737
/**
3838
* Adds a JobStatusListener to the specified job.
3939
*/
40-
@Generated("cc493370802f788c8333a1e7c81a676c")
40+
@Generated("b54281a30cabeb2b34dfee722257fd03")
4141
public final class JetAddJobStatusListenerCodec {
4242
//hex: 0xFE1300
4343
public static final int REQUEST_MESSAGE_TYPE = 16651008;
4444
//hex: 0xFE1301
4545
public static final int RESPONSE_MESSAGE_TYPE = 16651009;
4646
private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
47-
private static final int REQUEST_LOCAL_ONLY_FIELD_OFFSET = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
47+
private static final int REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
48+
private static final int REQUEST_LOCAL_ONLY_FIELD_OFFSET = REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET + UUID_SIZE_IN_BYTES;
4849
private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_LOCAL_ONLY_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
4950
private static final int RESPONSE_RESPONSE_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
5051
private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_RESPONSE_FIELD_OFFSET + UUID_SIZE_IN_BYTES;
@@ -67,20 +68,26 @@ public static class RequestParameters {
6768
*/
6869
public long jobId;
6970

71+
/**
72+
* Address of the job coordinator for light jobs, null otherwise.
73+
*/
74+
public @Nullable java.util.UUID lightJobCoordinator;
75+
7076
/**
7177
* If true fires events that originated from this node only, otherwise fires all events.
7278
*/
7379
public boolean localOnly;
7480
}
7581

76-
public static ClientMessage encodeRequest(long jobId, boolean localOnly) {
82+
public static ClientMessage encodeRequest(long jobId, @Nullable java.util.UUID lightJobCoordinator, boolean localOnly) {
7783
ClientMessage clientMessage = ClientMessage.createForEncode();
7884
clientMessage.setRetryable(false);
7985
clientMessage.setOperationName("Jet.AddJobStatusListener");
8086
ClientMessage.Frame initialFrame = new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
8187
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
8288
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
8389
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
90+
encodeUUID(initialFrame.content, REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET, lightJobCoordinator);
8491
encodeBoolean(initialFrame.content, REQUEST_LOCAL_ONLY_FIELD_OFFSET, localOnly);
8592
clientMessage.add(initialFrame);
8693
return clientMessage;
@@ -91,6 +98,7 @@ public static JetAddJobStatusListenerCodec.RequestParameters decodeRequest(Clien
9198
RequestParameters request = new RequestParameters();
9299
ClientMessage.Frame initialFrame = iterator.next();
93100
request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
101+
request.lightJobCoordinator = decodeUUID(initialFrame.content, REQUEST_LIGHT_JOB_COORDINATOR_FIELD_OFFSET);
94102
request.localOnly = decodeBoolean(initialFrame.content, REQUEST_LOCAL_ONLY_FIELD_OFFSET);
95103
return request;
96104
}

hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/listener/ClientListenerServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public boolean removeListener(UUID userRegistrationId) {
357357
return false;
358358
}
359359
listenerRegistration.getConnectionRegistrations().forEach((connection, registration) ->
360-
((ClientConnection) connection).removeEventHandler(registration.getCallId()));
360+
connection.removeEventHandler(registration.getCallId()));
361361
return true;
362362
}).get();
363363
} catch (Exception e) {

hazelcast/src/main/java/com/hazelcast/jet/Job.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ default void join() {
164164
*
165165
* @return The registration id
166166
* @throws UnsupportedOperationException if the cluster version is less than 5.3
167+
* @throws IllegalStateException if the job is completed or failed
167168
* @since 5.3
168169
*/
169170
UUID addStatusListener(@Nonnull JobStatusListener listener);

hazelcast/src/main/java/com/hazelcast/jet/JobStatusListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @see Job#removeStatusListener(UUID)
4545
* @since 5.3
4646
*/
47+
@FunctionalInterface
4748
public interface JobStatusListener {
4849
/**
4950
* Invoked upon job status change. <ol>

hazelcast/src/main/java/com/hazelcast/jet/impl/AbstractJobProxy.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,25 @@
2121
import com.hazelcast.core.MemberLeftException;
2222
import com.hazelcast.internal.serialization.SerializationService;
2323
import com.hazelcast.jet.Job;
24+
import com.hazelcast.jet.JobStatusListener;
2425
import com.hazelcast.jet.config.DeltaJobConfig;
2526
import com.hazelcast.jet.config.JobConfig;
2627
import com.hazelcast.jet.core.JobNotFoundException;
2728
import com.hazelcast.jet.core.JobStatus;
2829
import com.hazelcast.jet.impl.exception.CancellationByUserException;
30+
import com.hazelcast.jet.impl.operation.AddJobStatusListenerOperation;
2931
import com.hazelcast.jet.impl.operation.UpdateJobConfigOperation;
3032
import com.hazelcast.jet.impl.util.NonCompletableFuture;
3133
import com.hazelcast.logging.ILogger;
3234
import com.hazelcast.logging.LoggingService;
3335
import com.hazelcast.spi.exception.TargetDisconnectedException;
3436
import com.hazelcast.spi.exception.TargetNotMemberException;
37+
import com.hazelcast.spi.impl.eventservice.impl.Registration;
38+
import com.hazelcast.spi.impl.eventservice.impl.operations.RegistrationOperation;
3539

3640
import javax.annotation.Nonnull;
3741
import javax.annotation.Nullable;
42+
import java.util.UUID;
3843
import java.util.concurrent.CancellationException;
3944
import java.util.concurrent.CompletableFuture;
4045
import java.util.concurrent.ExecutionException;
@@ -43,6 +48,9 @@
4348
import java.util.function.BiConsumer;
4449
import java.util.function.Supplier;
4550

51+
import static com.hazelcast.jet.core.JobStatus.COMPLETED;
52+
import static com.hazelcast.jet.core.JobStatus.FAILED;
53+
import static com.hazelcast.jet.core.JobStatus.RUNNING;
4654
import static com.hazelcast.jet.impl.util.ExceptionUtil.peel;
4755
import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;
4856
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
@@ -190,10 +198,9 @@ public final JobStatus getStatus() {
190198
if (isLightJob()) {
191199
CompletableFuture<Void> f = getFuture();
192200
if (!f.isDone()) {
193-
return JobStatus.RUNNING;
201+
return RUNNING;
194202
}
195-
return f.isCompletedExceptionally()
196-
? JobStatus.FAILED : JobStatus.COMPLETED;
203+
return f.isCompletedExceptionally() ? FAILED : COMPLETED;
197204
} else {
198205
return getStatus0();
199206
}
@@ -294,6 +301,16 @@ private void terminate(TerminationMode mode) {
294301
}
295302
}
296303

304+
@Override
305+
public UUID addStatusListener(@Nonnull JobStatusListener listener) {
306+
try {
307+
return doAddStatusListener(listener);
308+
} catch (JobNotFoundException ignored) {
309+
throw cannotAddStatusListener(
310+
future.isCompletedExceptionally() ? FAILED : COMPLETED);
311+
}
312+
}
313+
297314
@Override
298315
public String toString() {
299316
return "Job{id=" + getIdString()
@@ -329,12 +346,43 @@ public boolean isLightJob() {
329346
protected abstract JobConfig doGetJobConfig();
330347

331348
/**
332-
* Sends an {@link UpdateJobConfigOperation} to the master member. On the master member,
333-
* if the job is SUSPENDED, the job record is updated both locally and {@linkplain
334-
* JobRepository#JOB_RECORDS_MAP_NAME globally} (in order for {@link #getConfig()} to
335-
* reflect the changes); otherwise, the operation fails.
349+
* Applies the specified delta configuration to this job and returns the updated
350+
* configuration. Synchronization with {@link #getConfig()} is handled by {@link
351+
* #updateConfig}.
352+
* @implNote
353+
* Sends an {@link UpdateJobConfigOperation} to the master member. On the master
354+
* member, if the job is SUSPENDED, the job record is updated both locally and
355+
* {@linkplain JobRepository#JOB_RECORDS_MAP_NAME globally} (in order for {@link
356+
* #getConfig()} to reflect the changes); otherwise, the operation fails.
336357
*/
337-
protected abstract JobConfig doUpdateJobConfig(DeltaJobConfig deltaConfig);
358+
protected abstract JobConfig doUpdateJobConfig(@Nonnull DeltaJobConfig deltaConfig);
359+
360+
/**
361+
* Associates the specified listener to this job.
362+
* @throws JobNotFoundException if the job's master context is cleaned up after job
363+
* completion/failure. This is translated to {@link IllegalStateException} by
364+
* {@link #addStatusListener}.
365+
* @implNote
366+
* Listeners added to a job after it completes will not be removed automatically since
367+
* the job has already produced a terminal event. In order to make auto-deregistration
368+
* race-free, it is not allowed to add listeners to completed jobs. Checking the job
369+
* status before the listener registration will not work since they are not atomic. The
370+
* registration should be delegated to the job coordinator, but the {@code listener}
371+
* is local. To overcome this, the following algorithm is used: <ol>
372+
* <li> A {@link Registration} object is created with a unique registration id. The
373+
* {@code listener} is cached locally by the registration id.
374+
* <li> The {@link Registration} object is delivered to the job coordinator via an
375+
* {@link AddJobStatusListenerOperation}. If the job is not completed/failed, the
376+
* coordinator invokes a {@link RegistrationOperation} on the subscriber member
377+
* —or all members if the registration is global. The registration operation is
378+
* guaranteed to be executed earlier than a possible terminal event since the
379+
* operation is executed as an event callback with the same {@code orderKey} as
380+
* job events.
381+
* <li> When the subscriber member receives the {@link RegistrationOperation}, the
382+
* {@link Registration}'s {@code listener} is restored from the cache and the
383+
* registration is completed. </ol>
384+
*/
385+
protected abstract UUID doAddStatusListener(@Nonnull JobStatusListener listener);
338386

339387
/**
340388
* Return the ID of the coordinator - the master member for normal jobs and
@@ -397,6 +445,10 @@ protected void checkNotLightJob(String msg) {
397445
}
398446
}
399447

448+
public static IllegalStateException cannotAddStatusListener(JobStatus status) {
449+
return new IllegalStateException("Cannot add status listener to a " + status + " job");
450+
}
451+
400452
private abstract class CallbackBase implements BiConsumer<Void, Throwable> {
401453
private final NonCompletableFuture future;
402454

hazelcast/src/main/java/com/hazelcast/jet/impl/ClientJobProxy.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ protected JobConfig doGetJobConfig() {
220220
}
221221

222222
@Override
223-
protected JobConfig doUpdateJobConfig(DeltaJobConfig deltaConfig) {
223+
protected JobConfig doUpdateJobConfig(@Nonnull DeltaJobConfig deltaConfig) {
224224
return callAndRetryIfTargetNotFound(() -> {
225225
Data deltaConfigData = serializationService().toData(deltaConfig);
226226
ClientMessage request = JetUpdateJobConfigCodec.encodeRequest(getId(), deltaConfigData);
@@ -256,12 +256,16 @@ protected boolean isRunning() {
256256

257257
@Nonnull
258258
@Override
259-
public UUID addStatusListener(@Nonnull JobStatusListener listener) {
259+
protected UUID doAddStatusListener(@Nonnull JobStatusListener listener) {
260260
requireNonNull(listener, "Listener cannot be null");
261-
ClientJobStatusEventHandler handler = new ClientJobStatusEventHandler(listener);
262-
handler.registrationId = container().getListenerService().registerListener(
263-
createJobStatusListenerCodec(getId()), handler);
264-
return handler.registrationId;
261+
try {
262+
ClientJobStatusEventHandler handler = new ClientJobStatusEventHandler(listener);
263+
handler.registrationId = container().getListenerService()
264+
.registerListener(createJobStatusListenerCodec(getId()), handler);
265+
return handler.registrationId;
266+
} catch (Throwable t) {
267+
throw rethrow(t.getCause());
268+
}
265269
}
266270

267271
@Override
@@ -273,7 +277,7 @@ private ListenerMessageCodec createJobStatusListenerCodec(final long jobId) {
273277
return new ListenerMessageCodec() {
274278
@Override
275279
public ClientMessage encodeAddRequest(boolean localOnly) {
276-
return JetAddJobStatusListenerCodec.encodeRequest(jobId, localOnly);
280+
return JetAddJobStatusListenerCodec.encodeRequest(jobId, lightJobCoordinator, localOnly);
277281
}
278282

279283
@Override

hazelcast/src/main/java/com/hazelcast/jet/impl/JobCoordinationService.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import com.hazelcast.security.SecurityContext;
6767
import com.hazelcast.spi.exception.RetryableHazelcastException;
6868
import com.hazelcast.spi.impl.NodeEngineImpl;
69+
import com.hazelcast.spi.impl.eventservice.impl.Registration;
6970
import com.hazelcast.spi.impl.executionservice.ExecutionService;
7071
import com.hazelcast.spi.properties.HazelcastProperties;
7172
import com.hazelcast.version.Version;
@@ -113,6 +114,7 @@
113114
import static com.hazelcast.jet.core.JobStatus.RUNNING;
114115
import static com.hazelcast.jet.core.JobStatus.SUSPENDED;
115116
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
117+
import static com.hazelcast.jet.impl.AbstractJobProxy.cannotAddStatusListener;
116118
import static com.hazelcast.jet.impl.JobClassLoaderService.JobPhase.COORDINATOR;
117119
import static com.hazelcast.jet.impl.TerminationMode.CANCEL_FORCEFUL;
118120
import static com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.deserializeWithCustomClassLoader;
@@ -841,6 +843,31 @@ public CompletableFuture<JobConfig> updateJobConfig(long jobId, @Nonnull DeltaJo
841843
);
842844
}
843845

846+
/**
847+
* Applies the specified listener registration if the job is not completed/failed.
848+
* Otherwise, an {@link IllegalStateException} is thrown by the returned future.
849+
*/
850+
public CompletableFuture<UUID> addJobStatusListener(long jobId, boolean isLightJob, Registration registration) {
851+
if (isLightJob) {
852+
Object mc = lightMasterContexts.get(jobId);
853+
if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
854+
throw new JobNotFoundException(jobId);
855+
} else {
856+
return completedFuture(((LightMasterContext) mc).addStatusListener(registration));
857+
}
858+
}
859+
return callWithJob(jobId,
860+
masterContext -> masterContext.addStatusListener(registration),
861+
jobResult -> {
862+
throw cannotAddStatusListener(jobResult.getJobStatus());
863+
},
864+
jobRecord -> {
865+
JobEventService jobEventService = nodeEngine.getService(JobEventService.SERVICE_NAME);
866+
return jobEventService.handleAllRegistrations(jobId, registration).getId();
867+
},
868+
null);
869+
}
870+
844871
/**
845872
* Add the given member to shutting down members. This will prevent
846873
* submission of more executions until the member actually leaves the

hazelcast/src/main/java/com/hazelcast/jet/impl/JobEventService.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,33 @@
1616

1717
package com.hazelcast.jet.impl;
1818

19+
import com.hazelcast.cluster.Address;
20+
import com.hazelcast.internal.util.UuidUtil;
1921
import com.hazelcast.jet.JobStatusEvent;
2022
import com.hazelcast.jet.JobStatusListener;
2123
import com.hazelcast.jet.core.JobStatus;
2224
import com.hazelcast.spi.impl.NodeEngine;
2325
import com.hazelcast.spi.impl.eventservice.EventPublishingService;
2426
import com.hazelcast.spi.impl.eventservice.EventRegistration;
25-
import com.hazelcast.spi.impl.eventservice.EventService;
27+
import com.hazelcast.spi.impl.eventservice.impl.EventServiceImpl;
28+
import com.hazelcast.spi.impl.eventservice.impl.Registration;
29+
import com.hazelcast.spi.impl.eventservice.impl.TrueEventFilter;
2630

2731
import java.util.Collection;
2832
import java.util.UUID;
2933
import java.util.concurrent.CompletableFuture;
3034

31-
import static com.hazelcast.internal.util.ConcurrencyUtil.CALLER_RUNS;
3235
import static com.hazelcast.jet.Util.idToString;
3336

3437
public class JobEventService implements EventPublishingService<JobStatusEvent, JobStatusListener> {
3538
public static final String SERVICE_NAME = "hz:impl:jobEventService";
3639

37-
private final EventService eventService;
40+
private final EventServiceImpl eventService;
41+
private final Address address;
3842

3943
public JobEventService(NodeEngine nodeEngine) {
40-
eventService = nodeEngine.getEventService();
44+
eventService = (EventServiceImpl) nodeEngine.getEventService();
45+
address = nodeEngine.getThisAddress();
4146
}
4247

4348
@Override
@@ -54,17 +59,20 @@ public void publishEvent(long jobId, JobStatus oldStatus, JobStatus newStatus,
5459
}
5560
}
5661

57-
public UUID addLocalEventListener(long jobId, JobStatusListener listener) {
58-
return eventService.registerLocalListener(SERVICE_NAME, idToString(jobId), listener).getId();
59-
}
60-
6162
public UUID addEventListener(long jobId, JobStatusListener listener) {
6263
return eventService.registerListener(SERVICE_NAME, idToString(jobId), listener).getId();
6364
}
6465

65-
public CompletableFuture<UUID> addEventListenerAsync(long jobId, JobStatusListener listener) {
66-
return eventService.registerListenerAsync(SERVICE_NAME, idToString(jobId), listener)
67-
.thenApplyAsync(EventRegistration::getId, CALLER_RUNS);
66+
public Registration prepareRegistration(long jobId, JobStatusListener listener, boolean localOnly) {
67+
UUID registrationId = UuidUtil.newUnsecureUUID();
68+
Registration registration = new Registration(registrationId, SERVICE_NAME, idToString(jobId),
69+
TrueEventFilter.INSTANCE, address, listener, localOnly);
70+
eventService.cacheListener(registration);
71+
return registration;
72+
}
73+
74+
public EventRegistration handleAllRegistrations(long jobId, Registration registration) {
75+
return eventService.handleAllRegistrations(registration, (int) jobId);
6876
}
6977

7078
public boolean removeEventListener(long jobId, UUID id) {

0 commit comments

Comments
 (0)