Как читатели, возможно, уже знают, Qt предоставляет несколько многопоточных конструкций (потоки, мьютексы, состояния ожидания и т.д.), а также API более высокого уровня, такие как QThreadPool, Qt Concurrent и другие родственные классы. В этой статье будет рассказано об асинхронных API более высокого уровня и изменениях, внесенных в Qt 6.
API параллелизма более высокого уровня в Qt
Qt Concurrent упрощает многопоточное программирование, устраняя необходимость в низкоуровневой синхронизации (примитивы, такие как мьютексы и блокировки), и управляет несколькими потоками вручную. Он предоставляет алгоритмы сопоставления, фильтрации и сокращения (более известные из функционального программирования) для параллельной обработки повторяемых контейнеров. Кроме того, существуют такие классы, как QFuture, QFutureWatcher и QFutureSynchronizer для доступа и отслеживания результатов асинхронных вычислений. Хотя все они очень полезны, все же были некоторые недостатки, такие как невозможность использовать QFuture вне Qt Concurrent, отсутствие поддержки для объединения нескольких вычислений для более простого и чистого кода, отсутствие гибкости Qt Concurrent API и т. д. Что касается Qt 6, разработчики постарались учесть отзывы, полученные за последние годы, и сделать многопоточное программирование на Qt более приятным и увлекательным!
Присоединение продолжений к QFuture
Распространенным сценарием в многопоточном программировании является выполнение асинхронного вычисления, которое, в свою очередь, должно вызывать другое асинхронное вычисление и передавать ему данные, которые зависят от другого, и так далее. Поскольку каждый этап требует результатов предыдущего, вам нужно либо подождать (путем блокировки или опроса), пока предыдущий этап не завершится и использовать его результаты, либо структурировать свой код в стиле "call-callback" («обратного вызова»). Ни один из этих вариантов не идеален: вы либо тратите ресурсы на ожидание, либо получаете сложный неподдерживаемый код. Добавление новых этапов или логики (для обработки ошибок и т. д.) еще больше увеличивает сложность.
Чтобы лучше понять проблему, рассмотрим следующий пример. Допустим, вы хотите загрузить большое изображение из сети, провести с ним трудную обработку и показать получившееся изображение в вашем приложении. Итак, вы должны сделать следующие шаги:
• сделать сетевой запрос и дождаться получения всех данных;
• создать изображение из необработанных данных;
• обработать изображение;
• показать изображение.
Есть следующие методы для каждого шага, которые нужно вызывать последовательно:
QByteArray download(const QUrl &url); QImage createImage(const QByteArray &data); QImage processImage(const QImage &image); void show(const QImage &image);
Можно использовать QtConcurrent для асинхронного выполнения этих задач и QFutureWatcher для отслеживания прогресса:
void loadImage(const QUrl &url) { QFuture data = QtConcurrent::run(download, url); QFutureWatcher dataWatcher; dataWatcher.setFuture(data); connect(&dataWatcher, &QFutureWatcher ::finished, this, [=] { // handle possible errors // ... QImage image = createImage(data); // Process the image // ... QFuture processedImage = QtConcurrent::run(processImage, image); QFutureWatcher<QImage> imageWatcher; imageWatcher.setFuture(processedImage); connect(&imageWatcher, &QFutureWatcher::finished, this, [=] { // handle possible errors // ... show(processedImage); }); }); }
Нехорошо выглядит, правда? Логика приложения смешана со стандартным кодом, необходимым для объединения компонентов программы вместе. И вы знаете, что чем больше шагов добавляете в цепочку, тем страшнее это становится. QFuture помогает решить эту проблему, добавляя поддержку присоединения продолжений через метод QFuture::then():
auto future = QtConcurrent::run(download, url) .then(createImage) .then(processImage) .then(show);
Это, несомненно, выглядит намного лучше! Но не хватает одного - обработки ошибок. Вы можете сделать что-то вроде:
auto future = QtConcurrent::run(download, url) .then([](QByteArray data) { // handle possible errors from the previous step // ... return createImage(data); }) .then(...) ...
Это будет работать, но код обработки ошибок по-прежнему смешан с логикой программы. Также, вероятно, не стоит запускать всю цепочку, если один из шагов не удался. Это можно решить с помощью метода QFuture::onFailed(), который позволяет прикреплять определенные обработчики ошибок для каждого возможного типа ошибки:
auto future = QtConcurrent::run(download, url) .then(createImage) .then(processImage) .then(show) .onFailed([](QNetworkReply::NetworkError) { // handle network errors }) .onFailed([](ImageProcessingError) { // handle image processing errors }) .onFailed([] { // handle any other error });
Обратите внимание, что для использования .onFailed() необходимо включить исключения. Если какой-либо из шагов завершается неудачно с исключением, цепочка прерывается, и вызывается обработчик ошибок, соответствующий типу сгенерированного исключения.
Подобно .then() и onFailed(), существует также .onCanceled()для прикрепления обработчиков на случай отмены фьючерса.
Создание QFuture из сигналов
Подобно фьючерсам, сигналы также представляют собой то, что станет доступным когда-нибудь в будущем, поэтому кажется естественным иметь возможность работать с ними, как с фьючерсами, продолжениями присоединения, обработчиками сбоев и т. д. Учитывая класс MyObject на основе QObject с сигналом void mySignal(int), вы можете использовать этот сигнал в качестве фьючерса следующим образом:
QFuture intFuture = QtFuture::connect(&object, &MyObject::mySignal);
Теперь вы можете прикрепить обработчики продолжения, сбоя или отмены к полученному фьючерсу.
Обратите внимание, что тип результирующего фьючерса совпадает с типом аргумента сигнала. Если у него нет аргументов, возвращается QFuture
Вернемся к первому шагу (т.е. загрузке) вашего примера обработки изображений, чтобы увидеть, как это может быть полезно на практике. Есть много способов реализовать это, давайте используем QNetworkAccessManager для отправки сетевого запроса и получения данных:
QNetworkAccessManager manager; ... QByteArray download(const QUrl &url) { QNetworkReply *reply = manager.get(QNetworkRequest(url)); QObject::connect(reply, &QNetworkReply::finished, [reply] {...}); // wait until we've received all data // ... return data; }
Но ожидание блокировки выше это не очень хорошо, было бы лучше, если бы можно было избавиться от этого и вместо этого сказать "when QNetworkAccessManager gets the data, create an image, then process it and then show" («когда QNetworkAccessManager получит данные, создайте изображение, затем обработайте его и покажите»). Можно сделать это, подключив сигнал finished() диспетчера сетевого доступа к QFuture:
QNetworkReply *reply = manager.get(QNetworkRequest(url)); auto future = QtFuture::connect(reply, &QNetworkReply::finished) .then([reply] { return reply->readAll(); }) .then(QtFuture::Launch::Async, createImage) .then(processImage) .then(show) ...
Вы можете заметить, что теперь вместо использования QtConcurrent::run() для асинхронной загрузки и возврата данных в новом потоке просто подключаемся к сигналу QNetworkAccessManager::finished(), который запускает цепочку вычислений. Также обратите внимание на дополнительный параметр в следующей строке:
.then(QtFuture::Launch::Async, createImage)
По умолчанию продолжения, прикрепленные с помощью .then(), вызываются в том же потоке, в котором был запущен родительский поток (в вашем случае основной поток). Теперь, когда не используется QtConcurrent::run()для асинхронного запуска цепочки, нужно передать дополнительный параметр QtFuture::Launch::Async, чтобы запустить цепочку продолжений в отдельном потоке и избежать блокировки пользовательского интерфейса.
Создание QFuture
До сих пор единственным «официальным» способом создания и сохранения значения внутри QFuture было использование одного из методов QtConcurrent. Так что вне QtConcurrent, QFuture был не очень полезен. В Qt 6, наконец, появился "setter" ("сеттер") аналог QFuture: QPromise, представленный Андреем Голубевым. Его можно использовать для установки значений, хода выполнения и исключений для асинхронных вычислений, к которым позже можно будет получить доступ через QFuture. Чтобы продемонстрировать, как это работает, давайте еще раз перепишем пример обработки изображения и воспользуемся классом QPromise:
QFuture download(const QUrl &url) { QPromise promise; QFuture future = promise.future(); promise.reportStarted(); // notify that download is started QNetworkReply *reply = manager.get(QNetworkRequest(url)); QObject::connect(reply, &QNetworkReply::finished, [reply, p = std::move(promise)] { p.addResult(reply->readAll()); p.reportFinished(); // notify that download is finished reply->deleteLater(); }); return future; } auto future = download() .then(QtFuture::Launch::Async, createImage) .then(processImage) .then(show) ...
Изменения в QtConcurrent
Благодаря Яреку Кобусу, Мартену Нордхейму, Карстену Хаймриху, Тимуру Почепцову и Виталию Фанаскову, QtConcurrent также получил хорошие обновления. Существующие API получили некоторые улучшения, в частности:
- Теперь вы можете установить собственный пул потоков для всех методов QtConcurrent вместо того, чтобы всегда запускать их в глобальном пуле потоков и потенциально блокировать выполнение других задач.
- Map объекты алгоритмы их сокращения и фильтрации теперь могут принимать начальное значение, поэтому вам не нужно искать обходные пути для типов, у которых нет конструктора по умолчанию.
- QtConcurrent::run был улучшен для работы с переменным количеством аргументов и типами, предназначенными только для перемещения.
Кроме того, были добавлены два новых API в QtConcurrent, чтобы предоставить пользователям большую гибкость. Давайте посмотрим на них более подробно.
QtConcurrent::runWithPromise
Новый метод QtConcurrent::runWithPromise(), разработанный Яреком Кобусом, является еще одним приятным дополнением к фреймворку QtConcurrent. Он очень похож на QtConcurrent::run(), за исключением того, что делает объект QPromise, связанный с данной задачей, доступным для пользователя. Это достигается за счет того, что runnable, переданный в QtConcurrent::runWithPromise(), принимает ссылку на объект QPromise:
auto future = QtConcurrent::runWithPromise( [] (QPromise &promise, /* other arguments may follow */ ...) { // ... for (auto value : listOfValues) { if (promise.isCanceled()) // handle the cancellation // do some processing... promise.addResult(...); promise.setProgressValue(...); } }, /* pass other arguments */ ...);
Как видите, с runWithPromise() пользователи имеют больший контроль над задачей и могут реагировать на запросы отмены или приостановки, делать отчеты о ходе выполнения и т. д., что невозможно с QtConcurrent::run().
QtConcurrent::task
QtConcurrent::task() предоставляет свободный интерфейс для выполнения задачи в отдельном потоке. Это более современная альтернатива QtConcurrent::run(), которая позволяет более удобно настраивать задачи. Вместо использования одной из немногих перезагрузок QtConcurrent::run()для передачи параметров предназначенных для запуска задачи, вы можете указать их в любом порядке, пропустить те, которые не нужны, и так далее. Например:
QFuture future = QtConcurrent::task(doSomething) .withArguments(1, 2, 3) .onThreadPool(pool) .withPriority(10) .spawn();
Обратите внимание, что, в отличие от run(), вы также можете передать приоритет задачи.
Не раскрыт один момент. Как после всей цепочке отдать результат в gui поток? Вот пример из статьи:
Допустим в методе show() мы вставляем изображение в лейбл. Но вот show же выполняется в другом не в gui потоке.
Чтобы show выполнялся в gui потоке, мне надо делать так?