Skip to content

[Concurrency] Custom executors with move-only Job #63569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class alignas(2 * alignof(void*)) Job :
return Flags.getPriority();
}

uint32_t getJobId() const {
return Id;
}

/// Given that we've fully established the job context in the current
/// thread, actually start running this job. To establish the context
/// correctly, call swift_job_run or runJobInExecutorContext.
Expand Down
3 changes: 3 additions & 0 deletions include/swift/AST/Decl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3856,6 +3856,9 @@ class NominalTypeDecl : public GenericTypeDecl, public IterableDeclContext {
/// Find the 'RemoteCallArgument(label:name:value:)' initializer function.
ConstructorDecl *getDistributedRemoteCallArgumentInitFunction() const;

/// Get the move-only `enqueue(Job)` protocol requirement function on the `Executor` protocol.
AbstractFunctionDecl *getExecutorOwnedEnqueueFunction() const;

/// Collect the set of protocols to which this type should implicitly
/// conform, such as AnyObject (for classes).
void getImplicitProtocols(SmallVectorImpl<ProtocolDecl *> &protocols);
Expand Down
5 changes: 5 additions & 0 deletions include/swift/AST/DiagnosticsSema.def
Original file line number Diff line number Diff line change
Expand Up @@ -6408,6 +6408,11 @@ WARNING(hashvalue_implementation,Deprecation,
"conform type %0 to 'Hashable' by implementing 'hash(into:)' instead",
(Type))

WARNING(executor_enqueue_unowned_implementation,Deprecation,
"'Executor.enqueue(UnownedJob)' is deprecated as a protocol requirement; "
"conform type %0 to 'Executor' by implementing 'func enqueue(Job)' instead",
(Type))

//------------------------------------------------------------------------------
// MARK: property wrapper diagnostics
//------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions include/swift/AST/KnownProtocols.def
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ PROTOCOL(SIMDScalar)
PROTOCOL(BinaryInteger)
PROTOCOL(FixedWidthInteger)
PROTOCOL(RangeReplaceableCollection)
PROTOCOL(Executor)
PROTOCOL(SerialExecutor)
PROTOCOL(GlobalActor)

Expand Down
4 changes: 4 additions & 0 deletions include/swift/AST/KnownSDKTypes.def
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ KNOWN_SDK_TYPE_DECL(ObjectiveC, ObjCBool, StructDecl, 0)
// standardized
KNOWN_SDK_TYPE_DECL(Concurrency, UnsafeContinuation, NominalTypeDecl, 2)
KNOWN_SDK_TYPE_DECL(Concurrency, MainActor, NominalTypeDecl, 0)
KNOWN_SDK_TYPE_DECL(Concurrency, Job, StructDecl, 0)
KNOWN_SDK_TYPE_DECL(Concurrency, UnownedJob, StructDecl, 0)
KNOWN_SDK_TYPE_DECL(Concurrency, Executor, NominalTypeDecl, 0)
KNOWN_SDK_TYPE_DECL(Concurrency, SerialExecutor, NominalTypeDecl, 0)
KNOWN_SDK_TYPE_DECL(Concurrency, UnownedSerialExecutor, NominalTypeDecl, 0)

KNOWN_SDK_TYPE_DECL(Concurrency, TaskLocal, ClassDecl, 1)
Expand Down
5 changes: 5 additions & 0 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ bool swift_task_isOnExecutor(
const Metadata *selfType,
const SerialExecutorWitnessTable *wtable);

/// Return the 64bit TaskID (if the job is an AsyncTask),
/// or the 32bits of the job Id otherwise.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
uint64_t swift_task_getJobTaskId(Job *job);

#if SWIFT_CONCURRENCY_ENABLE_DISPATCH

/// Enqueue the given job on the main executor.
Expand Down
1 change: 1 addition & 0 deletions lib/AST/ASTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ ProtocolDecl *ASTContext::getProtocol(KnownProtocolKind kind) const {
case KnownProtocolKind::GlobalActor:
case KnownProtocolKind::AsyncSequence:
case KnownProtocolKind::AsyncIteratorProtocol:
case KnownProtocolKind::Executor:
case KnownProtocolKind::SerialExecutor:
M = getLoadedModule(Id_Concurrency);
break;
Expand Down
33 changes: 33 additions & 0 deletions lib/AST/Decl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5247,6 +5247,39 @@ VarDecl *NominalTypeDecl::getGlobalActorInstance() const {
nullptr);
}

AbstractFunctionDecl *
NominalTypeDecl::getExecutorOwnedEnqueueFunction() const {
auto &C = getASTContext();

auto proto = dyn_cast<ProtocolDecl>(this);
if (!proto)
return nullptr;

llvm::SmallVector<ValueDecl *, 2> results;
lookupQualified(getSelfNominalTypeDecl(),
DeclNameRef(C.Id_enqueue),
NL_ProtocolMembers,
results);

for (auto candidate: results) {
// we're specifically looking for the Executor protocol requirement
if (!isa<ProtocolDecl>(candidate->getDeclContext()))
continue;

if (auto *funcDecl = dyn_cast<AbstractFunctionDecl>(candidate)) {
if (funcDecl->getParameters()->size() != 1)
continue;

auto params = funcDecl->getParameters();
if (params->get(0)->getSpecifier() == ParamSpecifier::LegacyOwned) { // TODO: make this Consuming
return funcDecl;
}
}
}

return nullptr;
}

ClassDecl::ClassDecl(SourceLoc ClassLoc, Identifier Name, SourceLoc NameLoc,
ArrayRef<InheritedEntry> Inherited,
GenericParamList *GenericParams, DeclContext *Parent,
Expand Down
1 change: 1 addition & 0 deletions lib/IRGen/GenMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6178,6 +6178,7 @@ SpecialProtocol irgen::getSpecialProtocolID(ProtocolDecl *P) {
case KnownProtocolKind::CxxSequence:
case KnownProtocolKind::UnsafeCxxInputIterator:
case KnownProtocolKind::UnsafeCxxRandomAccessIterator:
case KnownProtocolKind::Executor:
case KnownProtocolKind::SerialExecutor:
case KnownProtocolKind::Sendable:
case KnownProtocolKind::UnsafeSendable:
Expand Down
73 changes: 73 additions & 0 deletions lib/Sema/TypeCheckConcurrency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,79 @@ void swift::diagnoseMissingExplicitSendable(NominalTypeDecl *nominal) {
}
}

void swift::tryDiagnoseExecutorConformance(ASTContext &C,
const NominalTypeDecl *nominal,
ProtocolDecl *proto) {
assert(proto->isSpecificProtocol(KnownProtocolKind::Executor) ||
proto->isSpecificProtocol(KnownProtocolKind::SerialExecutor));

auto &diags = C.Diags;
auto module = nominal->getParentModule();
Type nominalTy = nominal->getDeclaredInterfaceType();

// enqueue(_: UnownedJob)
auto enqueueDeclName = DeclName(C, DeclBaseName(C.Id_enqueue), { Identifier() });

FuncDecl *unownedEnqueueRequirement = nullptr;
FuncDecl *moveOnlyEnqueueRequirement = nullptr;
for (auto req: proto->getProtocolRequirements()) {
auto *funcDecl = dyn_cast<FuncDecl>(req);
if (!funcDecl)
continue;

if (funcDecl->getName() != enqueueDeclName)
continue;


// look for the first parameter being a Job or UnownedJob
if (funcDecl->getParameters()->size() != 1)
continue;
if (auto param = funcDecl->getParameters()->front()) {
if (param->getType()->isEqual(C.getJobDecl()->getDeclaredInterfaceType())) {
assert(moveOnlyEnqueueRequirement == nullptr);
moveOnlyEnqueueRequirement = funcDecl;
} else if (param->getType()->isEqual(C.getUnownedJobDecl()->getDeclaredInterfaceType())) {
assert(unownedEnqueueRequirement == nullptr);
unownedEnqueueRequirement = funcDecl;
}
}

// if we found both, we're done here and break out of the loop
if (unownedEnqueueRequirement && moveOnlyEnqueueRequirement)
break; // we're done looking for the requirements
}


auto conformance = module->lookupConformance(nominalTy, proto);
auto concreteConformance = conformance.getConcrete();
auto unownedEnqueueWitness = concreteConformance->getWitnessDeclRef(unownedEnqueueRequirement);
auto moveOnlyEnqueueWitness = concreteConformance->getWitnessDeclRef(moveOnlyEnqueueRequirement);

if (auto enqueueUnownedDecl = unownedEnqueueWitness.getDecl()) {
// Old UnownedJob based impl is present, warn about it suggesting the new protocol requirement.
if (enqueueUnownedDecl->getLoc().isValid()) {
diags.diagnose(enqueueUnownedDecl->getLoc(), diag::executor_enqueue_unowned_implementation, nominalTy);
}
}

if (auto unownedEnqueueDecl = unownedEnqueueWitness.getDecl()) {
if (auto moveOnlyEnqueueDecl = moveOnlyEnqueueWitness.getDecl()) {
if (unownedEnqueueDecl && unownedEnqueueDecl->getLoc().isInvalid() &&
moveOnlyEnqueueDecl && moveOnlyEnqueueDecl->getLoc().isInvalid()) {
// Neither old nor new implementation have been found, but we provide default impls for them
// that are mutually recursive, so we must error and suggest implementing the right requirement.
auto ownedRequirement = C.getExecutorDecl()->getExecutorOwnedEnqueueFunction();
nominal->diagnose(diag::type_does_not_conform, nominalTy, proto->getDeclaredInterfaceType());
ownedRequirement->diagnose(diag::no_witnesses,
getProtocolRequirementKind(ownedRequirement),
ownedRequirement->getName(),
proto->getDeclaredInterfaceType(),
/*AddFixIt=*/true);
}
}
}
}

/// Determine whether this is the main actor type.
static bool isMainActor(Type type) {
if (auto nominal = type->getAnyNominal())
Expand Down
3 changes: 3 additions & 0 deletions lib/Sema/TypeCheckConcurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ void diagnoseMissingSendableConformance(
/// state whether it conforms to Sendable, provide a diagnostic.
void diagnoseMissingExplicitSendable(NominalTypeDecl *nominal);

/// Warn about deprecated `Executor.enqueue` implementations.
void tryDiagnoseExecutorConformance(ASTContext &C, const NominalTypeDecl *nominal, ProtocolDecl *proto);

/// How the Sendable check should be performed.
enum class SendableCheck {
/// Sendable conformance was explicitly stated and should be
Expand Down
2 changes: 2 additions & 0 deletions lib/Sema/TypeCheckProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6496,6 +6496,8 @@ void TypeChecker::checkConformancesInContext(IterableDeclContext *idc) {
} else if (proto->isSpecificProtocol(
KnownProtocolKind::UnsafeSendable)) {
sendableConformanceIsUnchecked = true;
} else if (proto->isSpecificProtocol(KnownProtocolKind::Executor)) {
tryDiagnoseExecutorConformance(Context, nominal, proto);
}
}

Expand Down
1 change: 1 addition & 0 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ static bool swift_task_isCurrentExecutorImpl(ExecutorRef executor) {
return currentTracking->getActiveExecutor() == executor;
}

// TODO(ktoso): checking the "is main thread" is not correct, main executor can be not main thread, relates to rdar://106188692
return executor.isMainExecutor() && isExecutingOnMainThread();
}

Expand Down
88 changes: 78 additions & 10 deletions stdlib/public/Concurrency/Executor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ import Swift
/// A service that can execute jobs.
@available(SwiftStdlib 5.1, *)
public protocol Executor: AnyObject, Sendable {

#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
@available(macOS, introduced: 10.15, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(iOS, introduced: 13.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(watchOS, introduced: 6.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(tvOS, introduced: 13.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
func enqueue(_ job: UnownedJob)

#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
@available(SwiftStdlib 5.9, *)
func enqueue(_ job: __owned Job)
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
}

/// A service that executes jobs.
Expand All @@ -26,13 +38,51 @@ public protocol SerialExecutor: Executor {
// avoid drilling down to the base conformance just for the basic
// work-scheduling operation.
@_nonoverride
#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
@available(macOS, introduced: 10.15, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(iOS, introduced: 13.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(watchOS, introduced: 6.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
@available(tvOS, introduced: 13.0, deprecated: 9999, message: "Implement 'enqueue(_: __owned Job)' instead")
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
func enqueue(_ job: UnownedJob)

#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
// This requirement is repeated here as a non-override so that we
// get a redundant witness-table entry for it. This allows us to
// avoid drilling down to the base conformance just for the basic
// work-scheduling operation.
@_nonoverride
@available(SwiftStdlib 5.9, *)
func enqueue(_ job: __owned Job)
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY

/// Convert this executor value to the optimized form of borrowed
/// executor references.
@available(SwiftStdlib 5.9, *)
func asUnownedSerialExecutor() -> UnownedSerialExecutor
}

#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
@available(SwiftStdlib 5.9, *)
extension Executor {
public func enqueue(_ job: UnownedJob) {
self.enqueue(Job(job))
}

public func enqueue(_ job: __owned Job) {
self.enqueue(UnownedJob(job))
}
}
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY

@available(SwiftStdlib 5.9, *)
extension SerialExecutor {
@available(SwiftStdlib 5.9, *)
public func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(ordinary: self)
}
}

/// An unowned reference to a serial executor (a `SerialExecutor`
/// value).
///
Expand Down Expand Up @@ -88,14 +138,6 @@ public struct UnownedSerialExecutor: Sendable {
@_silgen_name("swift_task_isOnExecutor")
public func _taskIsOnExecutor<Executor: SerialExecutor>(_ executor: Executor) -> Bool

// Used by the concurrency runtime
@available(SwiftStdlib 5.1, *)
@_silgen_name("_swift_task_enqueueOnExecutor")
internal func _enqueueOnExecutor<E>(job: UnownedJob, executor: E)
where E: SerialExecutor {
executor.enqueue(job)
}

@available(SwiftStdlib 5.1, *)
@_transparent
public // COMPILER_INTRINSIC
Expand All @@ -109,7 +151,33 @@ func _checkExpectedExecutor(_filenameStart: Builtin.RawPointer,
}

_reportUnexpectedExecutor(
_filenameStart, _filenameLength, _filenameIsASCII, _line, _executor)
_filenameStart, _filenameLength, _filenameIsASCII, _line, _executor)
}

/// Primarily a debug utility.
///
/// If the passed in Job is a Task, returns the complete 64bit TaskId,
/// otherwise returns only the job's 32bit Id.
///
/// - Returns: the Id stored in this Job or Task, for purposes of debug printing
@available(SwiftStdlib 5.9, *)
@_silgen_name("swift_task_getJobTaskId")
internal func _getJobTaskId(_ job: UnownedJob) -> UInt64

// Used by the concurrency runtime
@available(SwiftStdlib 5.1, *)
@_silgen_name("_swift_task_enqueueOnExecutor")
internal func _enqueueOnExecutor<E>(job unownedJob: UnownedJob, executor: E)
where E: SerialExecutor {
#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
if #available(SwiftStdlib 5.9, *) {
executor.enqueue(Job(context: unownedJob._context))
} else {
executor.enqueue(unownedJob)
}
#else // SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
executor.enqueue(unownedJob)
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
}

#if !SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
Expand Down Expand Up @@ -139,4 +207,4 @@ internal final class DispatchQueueShim: @unchecked Sendable, SerialExecutor {
return UnownedSerialExecutor(ordinary: self)
}
}
#endif
#endif // SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY
10 changes: 5 additions & 5 deletions stdlib/public/Concurrency/ExecutorAssertions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import SwiftShims
/// programming error.
///
/// - Parameter executor: the expected current executor
@available(SwiftStdlib 5.9, *) // FIXME: use @backDeploy(before: SwiftStdlib 5.9)
@available(SwiftStdlib 5.9, *)
public
func preconditionTaskOnExecutor(
_ executor: some SerialExecutor,
Expand Down Expand Up @@ -71,7 +71,7 @@ func preconditionTaskOnExecutor(
/// programming error.
///
/// - Parameter actor: the actor whose serial executor we expect to be the current executor
@available(SwiftStdlib 5.9, *) // FIXME: use @backDeploy(before: SwiftStdlib 5.9)
@available(SwiftStdlib 5.9, *)
public
func preconditionTaskOnActorExecutor(
_ actor: some Actor,
Expand Down Expand Up @@ -108,7 +108,7 @@ func preconditionTaskOnActorExecutor(
/// assumption is a serious programming error.
///
/// - Parameter executor: the expected current executor
@available(SwiftStdlib 5.9, *) // FIXME: use @backDeploy(before: SwiftStdlib 5.9)
@available(SwiftStdlib 5.9, *)
public
func assertTaskOnExecutor(
_ executor: some SerialExecutor,
Expand Down Expand Up @@ -143,7 +143,7 @@ func assertTaskOnExecutor(
///
///
/// - Parameter actor: the actor whose serial executor we expect to be the current executor
@available(SwiftStdlib 5.9, *) // FIXME: use @backDeploy(before: SwiftStdlib 5.9)
@available(SwiftStdlib 5.9, *)
public
func assertTaskOnActorExecutor(
_ actor: some Actor,
Expand Down Expand Up @@ -180,7 +180,7 @@ func assertTaskOnActorExecutor(
/// if another actor uses the same serial executor--by using ``MainActor/sharedUnownedExecutor``
/// as its own ``Actor/unownedExecutor``--this check will succeed, as from a concurrency safety
/// perspective, the serial executor guarantees mutual exclusion of those two actors.
@available(SwiftStdlib 5.9, *) // FIXME: use @backDeploy(before: SwiftStdlib 5.9)
@available(SwiftStdlib 5.9, *)
@_unavailableFromAsync(message: "await the call to the @MainActor closure directly")
public
func assumeOnMainActorExecutor<T>(
Expand Down
Loading