Qt
Internal/Contributor docs for the Qt SDK. <b>Note:</b> These are NOT official API docs; those are found <a href='https://doc.qt.io/'>here</a>.
Loading...
Searching...
No Matches
qfutureinterface.cpp
Go to the documentation of this file.
1// Copyright (C) 2020 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4// qfutureinterface.h included from qfuture.h
5#include "qfuture.h"
7
8#include <QtCore/qatomic.h>
9#include <QtCore/qcoreapplication.h>
10#include <QtCore/qthread.h>
11#include <QtCore/qvarlengtharray.h>
12#include <private/qthreadpool_p.h>
13#include <private/qobject_p.h>
14
15// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
16// reason
17// warning: ‘unsigned int __atomic_or_fetch_4(volatile void*, unsigned int, int)’ writing 4 bytes into a region of size 0 overflows the destination [-Wstringop-overflow=]
18QT_WARNING_DISABLE_GCC("-Wstringop-overflow")
19
21
22enum {
24};
25
26namespace {
27class ThreadPoolThreadReleaser {
28 QThreadPool *m_pool;
29public:
31 explicit ThreadPoolThreadReleaser(QThreadPool *pool)
32 : m_pool(pool)
33 { if (pool) pool->releaseThread(); }
34 ~ThreadPoolThreadReleaser()
35 { if (m_pool) m_pool->reserveThread(); }
36};
37
38const auto suspendingOrSuspended =
40
41} // unnamed namespace
42
44{
46public:
49 {
50 }
51
53 void run();
54};
55
58{
60 Q_ASSERT(slotObj);
61
62 auto slot = SlotObjUniquePtr(slotObj);
63
65 watcher->moveToThread(context->thread());
66
67 // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
68 // could be destroyed while the continuation that emits the signal is running. We have to
69 // prevent that.
70 // The mutex has to be recursive, because the continuation itself could delete the context
71 // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
72 auto watcherMutex = std::make_shared<QRecursiveMutex>();
73 const auto destroyWatcher = [watcherMutex, watcher]() mutable {
74 QMutexLocker lock(watcherMutex.get());
75 delete watcher;
76 };
77
78 // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
80 // for the following, cf. QMetaObject::invokeMethodImpl():
81 // we know `slot` is a lambda returning `void`, so we can just
82 // `call()` with `obj` and `args[0]` set to `nullptr`:
83 context, [slot = std::move(slot)] {
84 void *args[] = { nullptr }; // for `void` return value
85 slot->call(nullptr, args);
86 });
89
90 fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
91 (const QFutureInterfaceBase &parentData)
92 {
93 Q_UNUSED(parentData);
94 QMutexLocker lock(watcherMutex.get());
95 if (watcher)
96 emit watcher->run();
97 });
98}
99
101 = default;
102
104
106 : d(new QFutureInterfaceBasePrivate(initialState))
107{ }
108
114
116{
117 if (d && !d->refCount.deref())
118 delete d;
119}
120
121static inline int switch_on(QAtomicInt &a, int which)
122{
123 return a.fetchAndOrRelaxed(which) | which;
124}
125
126static inline int switch_off(QAtomicInt &a, int which)
127{
128 return a.fetchAndAndRelaxed(~which) & ~which;
129}
130
131static inline int switch_from_to(QAtomicInt &a, int from, int to)
132{
133 const auto adjusted = [&](int old) { return (old & ~from) | to; };
134 int value = a.loadRelaxed();
135 while (!a.testAndSetRelaxed(value, adjusted(value), value))
136 qYieldCpu();
137 return value;
138}
139
144
146{
147 QMutexLocker locker(&d->m_mutex);
148
149 const auto oldState = d->state.loadRelaxed();
150
151 switch (mode) {
153 if ((oldState & Finished) && (oldState & Canceled))
154 return;
155 switch_from_to(d->state, suspendingOrSuspended | Running, Canceled | Finished);
156 break;
158 if (oldState & Canceled)
159 return;
160 switch_from_to(d->state, suspendingOrSuspended, Canceled);
161 break;
162 }
163
164 // Cancel the continuations chain
166 while (next) {
167 next->continuationState = QFutureInterfaceBasePrivate::Canceled;
168 next = next->continuationData;
169 }
170
173
174 if (!(oldState & Canceled))
176 if (mode == CancelMode::CancelAndFinish && !(oldState & Finished))
178
179 d->isValid = false;
180}
181
194
196{
197 QMutexLocker locker(&d->m_mutex);
198 if (d->state.loadRelaxed() & suspendingOrSuspended) {
199 switch_off(d->state, suspendingOrSuspended);
202 } else {
205 }
206}
207
209{
210 // Needs to be called when pause is in effect,
211 // i.e. no more events will be reported.
212
213 QMutexLocker locker(&d->m_mutex);
214 const int state = d->state.loadRelaxed();
215 if (!(state & Suspending) || (state & Suspended))
216 return;
217
220}
221
223{
225 if (enable) {
227 } else {
229 if (!(d->state.loadRelaxed() & suspendingOrSuspended))
231 }
232}
233
234
236{
237 return queryState(Running);
238}
239
241{
242 return queryState(Started);
243}
244
246{
247 return queryState(Canceled);
248}
249
251{
252 return queryState(Finished);
253}
254
256{
257 return queryState(Suspending);
258}
259
260#if QT_DEPRECATED_SINCE(6, 0)
261bool QFutureInterfaceBase::isPaused() const
262{
263 return queryState(static_cast<State>(suspendingOrSuspended));
264}
265#endif
266
268{
269 return queryState(Suspended);
270}
271
273{
274 return queryState(Throttled);
275}
276
282
284{
285 const QMutexLocker lock(&d->m_mutex);
286 return d->isValid;
287}
288
290{
291 return queryState(static_cast<State>(Running | Pending));
292}
293
299
301{
302 // return early if possible to avoid taking the mutex lock.
303 {
304 const int state = d->state.loadRelaxed();
305 if (!(state & suspendingOrSuspended) || (state & Canceled))
306 return;
307 }
308
310 const int state = d->state.loadRelaxed();
311 if (!(state & suspendingOrSuspended) || (state & Canceled))
312 return;
313
314 // decrease active thread count since this thread will wait.
315 const ThreadPoolThreadReleaser releaser(d->pool());
316
318}
319
321{
322 const auto canSuspend = [] (int state) {
323 // can suspend only if 1) in any suspend-related state; 2) not canceled
324 return (state & suspendingOrSuspended) && !(state & Canceled);
325 };
326
327 // return early if possible to avoid taking the mutex lock.
328 {
329 const int state = d->state.loadRelaxed();
330 if (!canSuspend(state))
331 return;
332 }
333
335 const int state = d->state.loadRelaxed();
336 if (!canSuspend(state))
337 return;
338
339 // Note: expecting that Suspending and Suspended are mutually exclusive
340 if (!(state & Suspended)) {
341 // switch state in case this is the first invocation
344 }
345
346 // decrease active thread count since this thread will wait.
347 const ThreadPoolThreadReleaser releaser(d->pool());
349}
350
352{
353 const QMutexLocker lock(&d->m_mutex);
354 return d->m_progressValue;
355}
356
358{
359 const QMutexLocker lock(&d->m_mutex);
360 return d->m_progress ? d->m_progress->minimum : 0;
361}
362
364{
365 const QMutexLocker lock(&d->m_mutex);
366 return d->m_progress ? d->m_progress->maximum : 0;
367}
368
370{
372 return d->internal_resultCount();
373}
374
376{
377 QMutexLocker locker(&d->m_mutex);
378 return d->m_progress ? d->m_progress->text : QString();
379}
380
382{
383 QMutexLocker locker(&d->m_mutex);
384 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
385}
386
396
401
402#ifndef QT_NO_EXCEPTIONS
404{
405 try {
406 exception.raise();
407 } catch (...) {
408 reportException(std::current_exception());
409 }
410}
411
412#if QT_VERSION < QT_VERSION_CHECK(7, 0, 0)
413void QFutureInterfaceBase::reportException(std::exception_ptr exception)
414#else
415void QFutureInterfaceBase::reportException(const std::exception_ptr &exception)
416#endif
417{
418 QMutexLocker locker(&d->m_mutex);
419 if (d->state.loadRelaxed() & (Canceled|Finished))
420 return;
421
422 d->hasException = true;
423 d->data.setException(exception);
424 switch_on(d->state, Canceled);
425 d->waitCondition.wakeAll();
426 d->pausedWaitCondition.wakeAll();
428}
429#endif
430
440
447
452
454{
455 return d->state.loadRelaxed() & state;
456}
457
459{
460 // Used from ~QPromise, so this check is needed
461 if (!d)
463 return d->state.loadRelaxed();
464}
465
467{
468 if (d->hasException)
470
472 if (!isRunningOrPending())
473 return;
474 lock.unlock();
475
476 // To avoid deadlocks and reduce the number of threads used, try to
477 // run the runnable in the current thread.
478 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
479
480 lock.relock();
481
482 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
483 while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
484 d->waitCondition.wait(&d->m_mutex);
485
486 if (d->hasException)
488}
489
491{
493 const bool alreadyFinished = isFinished();
494 lock.unlock();
495
496 if (!alreadyFinished) {
497 d->pool()->d_func()->stealAndRunRunnable(d->runnable);
498
499 lock.relock();
500
501 while (!isFinished())
502 d->waitCondition.wait(&d->m_mutex);
503 }
504
505 if (d->hasException)
507}
508
509void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
510{
511 if (beginIndex == endIndex || (d->state.loadRelaxed() & (Canceled|Finished)))
512 return;
513
515
516 if (!d->m_progress) {
517 if (d->internal_updateProgressValue(d->m_progressValue + endIndex - beginIndex) == false) {
519 beginIndex,
520 endIndex));
521 return;
522 }
523
526 QString()),
528 beginIndex,
529 endIndex));
530 return;
531 }
533}
534
536{
537 d->runnable = runnable;
538}
539
544
546{
547 return d->m_pool;
548}
549
556
573void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
574{
575 QMutexLocker locker(&d->m_mutex);
576 if (!d->m_progress)
578 d->m_progress->minimum = minimum;
579 d->m_progress->maximum = qMax(minimum, maximum);
581 d->m_progressValue = minimum;
582}
583
588
596 const QString &progressText)
597{
598 QMutexLocker locker(&d->m_mutex);
599 if (!d->m_progress)
601
602 const bool useProgressRange = (d->m_progress->maximum != 0) || (d->m_progress->minimum != 0);
603 if (useProgressRange
604 && ((progressValue < d->m_progress->minimum) || (progressValue > d->m_progress->maximum))) {
605 return;
606 }
607
609 return;
610
611 if (d->state.loadRelaxed() & (Canceled|Finished))
612 return;
613
617 d->m_progress->text));
618 }
619}
620
622{
623 return d->m_mutex;
624}
625
627{
628 return d->hasException;
629}
630
636
642
648
655
656// ### Qt 7: inline
661
662bool QFutureInterfaceBase::refT() const noexcept
663{
664 return d->refCount.refT();
665}
666
667bool QFutureInterfaceBase::derefT() const noexcept
668{
669 // Called from ~QFutureInterface
670 return !d || d->refCount.derefT();
671}
672
674{
675 d->m_progressValue = 0;
676 d->m_progress.reset();
678 d->isValid = false;
679}
680
686
692
694{
695 if (hasException)
696 data.m_exceptionStore.~ExceptionStore();
697 else
698 data.m_results.~ResultStoreBase();
699}
700
702{
703 return hasException ? 0 : data.m_results.count(); // ### subtract canceled results.
704}
705
707{
708 return hasException ? false : (data.m_results.contains(index));
709}
710
712{
713 if (hasException)
714 return false;
715
716 if (data.m_results.hasNextResult())
717 return true;
718
720 && data.m_results.hasNextResult() == false)
722
724 && data.m_results.hasNextResult();
725}
726
728{
729 if (m_progressValue >= progress)
730 return false;
731
732 m_progressValue = progress;
733
734 if (progressTime.isValid() && m_progressValue != 0) // make sure the first and last steps are emitted.
736 return false;
737
739 return true;
740
741}
742
744 const QString &progressText)
745{
746 if (m_progressValue >= progress)
747 return false;
748
750
751 m_progressValue = progress;
752 m_progress->text = progressText;
753
754 if (progressTime.isValid() && m_progressValue != m_progress->maximum) // make sure the first and last steps are emitted.
756 return false;
757
759 return true;
760}
761
763{
764 // bail out if we are not changing the state
767 return;
768
769 // change the state
770 if (enable) {
772 } else {
774 if (!(state.loadRelaxed() & suspendingOrSuspended))
776 }
777}
778
780{
782 return;
783
784 for (int i = 0; i < outputConnections.size(); ++i)
785 outputConnections.at(i)->postCallOutEvent(callOutEvent);
786}
787
789 const QFutureCallOutEvent &callOutEvent2)
790{
792 return;
793
794 for (int i = 0; i < outputConnections.size(); ++i) {
796 iface->postCallOutEvent(callOutEvent1);
797 iface->postCallOutEvent(callOutEvent2);
798 }
799}
800
801// This function connects an output interface (for example a QFutureWatcher)
802// to this future. While holding the lock we check the state and ready results
803// and add the appropriate callouts to the queue.
805{
806 QMutexLocker locker(&m_mutex);
807
808 const auto currentState = state.loadRelaxed();
809 if (currentState & QFutureInterfaceBase::Started) {
810 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
811 if (m_progress) {
813 m_progress->minimum,
814 m_progress->maximum));
817 m_progress->text));
818 } else {
820 0,
821 0));
824 QString()));
825 }
826 }
827
828 if (!hasException) {
830 while (it != data.m_results.end()) {
831 const int begin = it.resultIndex();
832 const int end = begin + it.batchSize();
834 begin,
835 end));
836 it.batchedAdvance();
837 }
838 }
839
840 if (currentState & QFutureInterfaceBase::Suspended)
841 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
842 else if (currentState & QFutureInterfaceBase::Suspending)
844
845 if (currentState & QFutureInterfaceBase::Canceled)
846 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
847
848 if (currentState & QFutureInterfaceBase::Finished)
849 iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
850
852}
853
855{
858 if (index == -1)
859 return;
861
862 iface->callOutInterfaceDisconnected();
863}
864
869
871{
872 setContinuation(std::move(func), nullptr);
873}
874
876 QFutureInterfaceBasePrivate *continuationFutureData)
877{
879
880 // If the state is ready, run continuation immediately,
881 // otherwise save it for later.
882 if (isFinished()) {
883 lock.unlock();
884 func(*this);
885 lock.relock();
886 }
887 // Unless the continuation has been cleaned earlier, we have to
888 // store the move-only continuation, to guarantee that the associated
889 // future's data stays alive.
891 if (d->continuation) {
892 qWarning() << "Adding a continuation to a future which already has a continuation. "
893 "The existing continuation is overwritten.";
894 }
895 d->continuation = std::move(func);
896 d->continuationData = continuationFutureData;
897 }
898}
899
901{
902 if (!d)
903 return;
904
906 d->continuation = nullptr;
908 d->continuationData = nullptr;
909}
910
912{
914 if (d->continuation) {
915 // Save the continuation in a local function, to avoid calling
916 // a null std::function below, in case cleanContinuation() is
917 // called from some other thread right after unlock() below.
918 auto fn = std::move(d->continuation);
919 lock.unlock();
920 fn(*this);
921
922 lock.relock();
923 // Unless the continuation has been cleaned earlier, we have to
924 // store the move-only continuation, to guarantee that the associated
925 // future's data stays alive.
927 d->continuation = std::move(fn);
928 }
929}
930
935
940
942{
943 return d->launchAsync;
944}
945
946namespace QtFuture {
947
948QFuture<void> makeReadyVoidFuture()
949{
950 QFutureInterface<void> promise;
951 promise.reportStarted();
952 promise.reportFinished();
953
954 return promise.future();
955}
956
957} // namespace QtFuture
958
960
961#include "qfutureinterface.moc"
\inmodule QtCore
Definition qatomic.h:112
void storeRelaxed(T newValue) noexcept
T loadRelaxed() const noexcept
void invalidate() noexcept
Marks this QElapsedTimer object as invalid.
qint64 elapsed() const noexcept
Returns the number of milliseconds since this QElapsedTimer was last started.
void start() noexcept
\typealias QElapsedTimer::Duration Synonym for std::chrono::nanoseconds.
bool isValid() const noexcept
Returns false if the timer has never been started or invalidated by a call to invalidate().
\inmodule QtCore
Definition qexception.h:22
virtual void raise() const
In your QException subclass, reimplement raise() like this:
virtual ~QFutureCallOutInterface()
virtual void postCallOutEvent(const QFutureCallOutEvent &)=0
std::atomic< ContinuationState > continuationState
bool internal_updateProgress(int progress, const QString &progressText=QString())
void sendCallOuts(const QFutureCallOutEvent &callOut1, const QFutureCallOutEvent &callOut2)
void sendCallOut(const QFutureCallOutEvent &callOut)
void setState(QFutureInterfaceBase::State state)
QList< QFutureCallOutInterface * > outputConnections
void disconnectOutputInterface(QFutureCallOutInterface *iface)
bool internal_updateProgressValue(int progress)
bool internal_isResultReadyAt(int index) const
std::function< void(const QFutureInterfaceBase &) continuation)
void connectOutputInterface(QFutureCallOutInterface *iface)
void internal_setThrottled(bool enable)
QFutureInterfaceBasePrivate * continuationData
QScopedPointer< ProgressData > m_progress
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
QtPrivate::ExceptionStore & exceptionStore()
QThreadPool * threadPool() const
QtPrivate::ResultStoreBase & resultStoreBase()
void waitForResult(int resultIndex)
bool queryState(State state) const
bool isProgressUpdateNeeded() const
void setThreadPool(QThreadPool *pool)
void setExpectedResultCount(int resultCount)
QFutureInterfaceBase(State initialState=NoState)
void reportResultsReady(int beginIndex, int endIndex)
void setContinuation(std::function< void(const QFutureInterfaceBase &)> func)
void setLaunchAsync(bool value)
bool derefT() const noexcept
void setThrottled(bool enable)
void setSuspended(bool suspend)
void setProgressValue(int progressValue)
void setProgressRange(int minimum, int maximum)
QFutureInterfaceBase & operator=(const QFutureInterfaceBase &other)
void swap(QFutureInterfaceBase &other) noexcept
bool refT() const noexcept
void reportException(const QException &e)
QString progressText() const
void setProgressValueAndText(int progressValue, const QString &progressText)
void setRunnable(QRunnable *runnable)
void setFilterMode(bool enable)
bool isResultReadyAt(int index) const
qsizetype size() const noexcept
Definition qlist.h:397
bool isEmpty() const noexcept
Definition qlist.h:401
void removeAt(qsizetype i)
Definition qlist.h:590
const_reference at(qsizetype i) const noexcept
Definition qlist.h:446
void append(parameter_type t)
Definition qlist.h:458
\inmodule QtCore
Definition qmutex.h:313
\inmodule QtCore
Definition qmutex.h:281
QObjectContinuationWrapper(QObject *parent=nullptr)
\inmodule QtCore
Definition qobject.h:103
QObject * parent() const
Returns a pointer to the parent object.
Definition qobject.h:346
static QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *member, Qt::ConnectionType=Qt::AutoConnection)
\threadsafe
Definition qobject.cpp:2960
void destroyed(QObject *=nullptr)
This signal is emitted immediately before the object obj is destroyed, after any instances of QPointe...
void deleteLater()
\threadsafe
Definition qobject.cpp:2435
\inmodule QtCore
Definition qrunnable.h:18
iterator begin()
Definition qset.h:136
iterator end()
Definition qset.h:140
\macro QT_RESTRICTED_CAST_FROM_ASCII
Definition qstring.h:129
\inmodule QtCore
Definition qthreadpool.h:22
void releaseThread()
Releases a thread previously reserved by a call to reserveThread().
bool wait(QMutex *, QDeadlineTimer=QDeadlineTimer(QDeadlineTimer::Forever))
Q_NORETURN void rethrowException() const
void setFilterMode(bool enable)
QSet< QString >::iterator it
else opt state
[0]
void newState(QList< State > &states, const char *token, const char *lexem, bool pre)
short next
Definition keywords.cpp:445
Combined button and popup list for selecting options.
Q_CORE_EXPORT QFuture< void > makeReadyVoidFuture()
void Q_CORE_EXPORT watchContinuationImpl(const QObject *context, QtPrivate::QSlotObjectBase *slotObj, QFutureInterfaceBase &fi)
std::unique_ptr< QSlotObjectBase, QSlotObjectBase::Deleter > SlotObjUniquePtr
static void * context
static jboolean copy(JNIEnv *, jobject)
#define Q_NODISCARD_CTOR
#define QT_WARNING_DISABLE_GCC(text)
#define Q_IMPL_EVENT_COMMON(Class)
Definition qcoreevent.h:31
EGLOutputLayerEXT EGLint EGLAttrib value
[5]
@ MaxProgressEmitsPerSecond
static int switch_from_to(QAtomicInt &a, int from, int to)
static int switch_off(QAtomicInt &a, int which)
static int switch_on(QAtomicInt &a, int which)
#define qWarning
Definition qlogging.h:166
constexpr const T & qMax(const T &a, const T &b)
Definition qminmax.h:42
GLenum mode
GLboolean GLboolean GLboolean GLboolean a
[7]
GLuint index
[2]
GLuint GLuint end
GLint GLsizei GLsizei GLenum GLenum GLsizei void * data
GLboolean enable
GLenum func
Definition qopenglext.h:663
#define Q_ASSERT(cond)
Definition qrandom.cpp:47
QtPrivate::QRegularExpressionMatchIteratorRangeBasedForIterator begin(const QRegularExpressionMatchIterator &iterator)
QT_BEGIN_NAMESPACE constexpr void qSwap(T &value1, T &value2) noexcept(std::is_nothrow_swappable_v< T >)
Definition qswap.h:20
#define Q_OBJECT
#define signals
#define emit
#define Q_UNUSED(x)
ptrdiff_t qsizetype
Definition qtypes.h:165
QT_BEGIN_NAMESPACE Q_ALWAYS_INLINE void qYieldCpu(void) Q_DECL_NOEXCEPT
Definition qyieldcpu.h:29
future suspend()
QFutureWatcher< int > watcher
QReadWriteLock lock
[0]
const QSemaphoreReleaser releaser(sem)
[4]
QSharedPointer< T > other(t)
[5]
QJSValueList args
qsizetype indexOf(const AT &t, qsizetype from=0) const noexcept
Definition qlist.h:962
QtPrivate::ExceptionStore m_exceptionStore
QtPrivate::ResultStoreBase m_results