Qt
Internal/Contributor docs for the Qt SDK. <b>Note:</b> These are NOT official API docs; those are found <a href='https://doc.qt.io/'>here</a>.
Loading...
Searching...
No Matches
qwindowspipereader.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4
6#include <qcoreapplication.h>
7#include <QMutexLocker>
8#include <QPointer>
9
11
12using namespace Qt::StringLiterals;
13
14static const DWORD minReadBufferSize = 4096;
15
17 : QObject(parent),
18 handle(INVALID_HANDLE_VALUE),
19 eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
20 syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
21 waitObject(NULL),
22 readBufferMaxSize(0),
23 actualReadBufferSize(0),
24 pendingReadBytes(0),
25 lastError(ERROR_SUCCESS),
26 state(Stopped),
27 readSequenceStarted(false),
28 pipeBroken(true),
29 readyReadPending(false),
30 winEventActPosted(false)
31{
32 ZeroMemory(&overlapped, sizeof(OVERLAPPED));
33 overlapped.hEvent = eventHandle;
34 waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
35 if (waitObject == NULL)
36 qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
37}
38
40{
41 stop();
42
43 // Wait for thread pool callback to complete, as it can be still
44 // executing some completion code.
45 WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
46 CloseThreadpoolWait(waitObject);
47 CloseHandle(eventHandle);
48 CloseHandle(syncHandle);
49}
50
56{
57 readBuffer.clear();
58 actualReadBufferSize = 0;
59 readyReadPending = false;
60 pendingReadBytes = 0;
61 handle = hPipeReadEnd;
62 pipeBroken = false;
63 lastError = ERROR_SUCCESS;
64}
65
71{
72 cancelAsyncRead(Stopped);
73 pipeBroken = true;
74}
75
81{
82 cancelAsyncRead(Draining);
83 pipeBroken = true;
84}
85
91{
92 cancelAsyncRead(Stopped);
93 readBuffer.clear();
94 actualReadBufferSize = 0;
95 // QLocalSocket is supposed to write data in the 'Closing'
96 // state, so we don't set 'pipeBroken' flag here. Also, avoid
97 // setting this flag in checkForReadyRead().
98 lastError = ERROR_SUCCESS;
99}
100
104void QWindowsPipeReader::cancelAsyncRead(State newState)
105{
106 if (state != Running)
107 return;
108
109 mutex.lock();
110 state = newState;
111 if (readSequenceStarted) {
112 // This can legitimately fail due to the GetOverlappedResult()
113 // in the callback not being locked. We ignore ERROR_NOT_FOUND
114 // in this case.
115 if (!CancelIoEx(handle, &overlapped)) {
116 const DWORD dwError = GetLastError();
117 if (dwError != ERROR_NOT_FOUND) {
118 qErrnoWarning(dwError, "QWindowsPipeReader: CancelIoEx on handle %p failed.",
119 handle);
120 }
121 }
122
123 // Wait for callback to complete.
124 do {
125 mutex.unlock();
126 waitForNotification();
127 mutex.lock();
128 } while (readSequenceStarted);
129 }
130 mutex.unlock();
131
132 // Finish reading to keep the class state consistent. Note that
133 // signals are not emitted in the call below, as the caller is
134 // expected to do that synchronously.
135 consumePending();
136}
137
142{
143 QMutexLocker locker(&mutex);
144 readBufferMaxSize = size;
145}
146
152{
153 QMutexLocker locker(&mutex);
154 return readSequenceStarted || readyReadPending
155 || (lastError != ERROR_SUCCESS && !pipeBroken);
156}
157
162{
163 return actualReadBufferSize;
164}
165
170{
171 QMutexLocker locker(&mutex);
172 qint64 readSoFar;
173
174 // If startAsyncRead() has read data, copy it to its destination.
175 if (maxlen == 1 && actualReadBufferSize > 0) {
176 *data = readBuffer.getChar();
177 actualReadBufferSize--;
178 readSoFar = 1;
179 } else {
180 readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
181 actualReadBufferSize -= readSoFar;
182 }
183
184 if (!pipeBroken) {
185 startAsyncReadHelper(&locker);
186 if (readSoFar == 0)
187 return -2; // signal EWOULDBLOCK
188 }
189
190 return readSoFar;
191}
192
199{
200 QMutexLocker locker(&mutex);
201 qint64 readSoFar = 0;
202
203 if (actualReadBufferSize > 0) {
204 readSoFar = readBuffer.readLine(data, qMin(actualReadBufferSize + 1, maxlen));
205 actualReadBufferSize -= readSoFar;
206 }
207
208 if (!pipeBroken) {
209 startAsyncReadHelper(&locker);
210 if (readSoFar == 0)
211 return -2; // signal EWOULDBLOCK
212 }
213
214 return readSoFar;
215}
216
221{
222 QMutexLocker locker(&mutex);
223
224 const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen));
225 actualReadBufferSize -= skippedSoFar;
226
227 if (!pipeBroken) {
228 startAsyncReadHelper(&locker);
229 if (skippedSoFar == 0)
230 return -2; // signal EWOULDBLOCK
231 }
232
233 return skippedSoFar;
234}
235
240{
241 QMutexLocker locker(&mutex);
242 return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
243}
244
249{
250 QMutexLocker locker(&mutex);
251 startAsyncReadHelper(&locker);
252}
253
254void QWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker)
255{
256 if (readSequenceStarted || lastError != ERROR_SUCCESS)
257 return;
258
259 state = Running;
260 startAsyncReadLocked();
261
262 // Do not post the event, if the read operation will be completed asynchronously.
263 if (!readyReadPending && lastError == ERROR_SUCCESS)
264 return;
265
266 if (!winEventActPosted) {
267 winEventActPosted = true;
268 locker->unlock();
270 } else {
271 locker->unlock();
272 }
273
274 SetEvent(syncHandle);
275}
276
281void QWindowsPipeReader::startAsyncReadLocked()
282{
283 // Determine the number of bytes to read.
284 qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
285
286 // This can happen only while draining; just do nothing in this case.
287 if (bytesToRead == 0)
288 return;
289
290 while (lastError == ERROR_SUCCESS) {
291 if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
292 bytesToRead = readBufferMaxSize - readBuffer.size();
293 if (bytesToRead <= 0) {
294 // Buffer is full. User must read data from the buffer
295 // before we can read more from the pipe.
296 return;
297 }
298 }
299
300 char *ptr = readBuffer.reserve(bytesToRead);
301
302 // ReadFile() returns true, if the read operation completes synchronously.
303 // We don't need to call GetOverlappedResult() additionally, because
304 // 'numberOfBytesRead' is valid in this case.
305 DWORD numberOfBytesRead;
306 DWORD errorCode = ERROR_SUCCESS;
307 if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
308 errorCode = GetLastError();
309 if (errorCode == ERROR_IO_PENDING) {
310 Q_ASSERT(state == Running);
311 // Operation has been queued and will complete in the future.
312 readSequenceStarted = true;
313 SetThreadpoolWait(waitObject, eventHandle, NULL);
314 return;
315 }
316 }
317
318 if (!readCompleted(errorCode, numberOfBytesRead))
319 return;
320
321 // In the 'Draining' state, we have to get all the data with one call
322 // to ReadFile(). Note that message mode pipes are not supported here.
323 if (state == Draining) {
324 Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
325 return;
326 }
327
328 // We need to loop until all pending data has been read and an
329 // operation is queued for asynchronous completion.
330 // If the pipe is configured to work in message mode, we read
331 // the data in chunks.
332 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
333 }
334}
335
341void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
342 PTP_WAIT wait, TP_WAIT_RESULT waitResult)
343{
344 Q_UNUSED(instance);
345 Q_UNUSED(wait);
346 Q_UNUSED(waitResult);
347 QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
348
349 // Get the result of the asynchronous operation.
350 DWORD numberOfBytesTransfered = 0;
351 DWORD errorCode = ERROR_SUCCESS;
352 if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
353 &numberOfBytesTransfered, FALSE))
354 errorCode = GetLastError();
355
356 pipeReader->mutex.lock();
357
358 pipeReader->readSequenceStarted = false;
359
360 // Do not overwrite error code, if error has been detected by
361 // checkPipeState() in waitForPipeClosed(). Also, if the reader was
362 // stopped, the only reason why this function can be called is the
363 // completion of a cancellation. No signals should be emitted, and
364 // no new read sequence should be started in this case.
365 if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
366 // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
367 // specifically for flushing the pipe.
368 if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
369 errorCode = ERROR_SUCCESS;
370
371 if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
372 pipeReader->startAsyncReadLocked();
373
374 if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
375 pipeReader->winEventActPosted = true;
376 pipeReader->mutex.unlock();
378 } else {
379 pipeReader->mutex.unlock();
380 }
381 } else {
382 pipeReader->mutex.unlock();
383 }
384
385 // We set the event only after unlocking to avoid additional context
386 // switches due to the released thread immediately running into the lock.
387 SetEvent(pipeReader->syncHandle);
388}
389
394bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
395{
396 // ERROR_MORE_DATA is not an error. We're connected to a message mode
397 // pipe and the message didn't fit into the pipe's system
398 // buffer. We will read the remaining data in the next call.
399 if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
400 readyReadPending = true;
401 pendingReadBytes += numberOfBytesRead;
402 readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
403 return true;
404 }
405
406 lastError = errorCode;
407 return false;
408}
409
414{
415 if (e->type() == QEvent::WinEventAct) {
416 consumePendingAndEmit(true);
417 return true;
418 }
419 return QObject::event(e);
420}
421
426bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
427{
428 ResetEvent(syncHandle);
429 mutex.lock();
430
431 // Enable QEvent::WinEventAct posting.
432 if (allowWinActPosting)
433 winEventActPosted = false;
434
435 const bool emitReadyRead = consumePending();
436 const DWORD dwError = lastError;
437
438 mutex.unlock();
439
440 // Trigger 'pipeBroken' only once. This flag must be updated before
441 // emitting the readyRead() signal. Otherwise, the read sequence will
442 // be considered not finished, and we may hang if a slot connected
443 // to readyRead() calls waitForReadyRead().
444 const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken);
445 if (emitPipeClosed)
446 pipeBroken = true;
447
448 // Disable any further processing, if the pipe was stopped.
449 // We are not allowed to emit signals in either 'Stopped'
450 // or 'Draining' state.
451 if (state != Running)
452 return false;
453
454 if (!emitPipeClosed) {
455 if (emitReadyRead)
456 emit readyRead();
457 } else {
458 QPointer<QWindowsPipeReader> alive(this);
459 if (emitReadyRead)
460 emit readyRead();
461 if (alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
462 emit winError(dwError, "QWindowsPipeReader::consumePendingAndEmit"_L1);
463 if (alive)
465 }
466
467 return emitReadyRead;
468}
469
474bool QWindowsPipeReader::consumePending()
475{
476 if (readyReadPending) {
477 readyReadPending = false;
478 actualReadBufferSize += pendingReadBytes;
479 pendingReadBytes = 0;
480 return true;
481 }
482
483 return false;
484}
485
490{
491 DWORD bytes;
492 if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
493 return bytes;
494
495 lastError = GetLastError();
496 return 0;
497}
498
499bool QWindowsPipeReader::waitForNotification()
500{
501 DWORD waitRet;
502 do {
503 waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);
504 if (waitRet == WAIT_OBJECT_0)
505 return true;
506
507 // Some I/O completion routine was called. Wait some more.
508 } while (waitRet == WAIT_IO_COMPLETION);
509
510 return false;
511}
512
514
515#include "moc_qwindowspipereader_p.cpp"
static void postEvent(QObject *receiver, QEvent *event, int priority=Qt::NormalEventPriority)
\inmodule QtCore
Definition qcoreevent.h:45
@ WinEventAct
Definition qcoreevent.h:99
Type type() const
Returns the event type.
Definition qcoreevent.h:304
\inmodule QtCore
Definition qmutex.h:313
void unlock() noexcept
Unlocks the mutex.
Definition qmutex.h:289
void lock() noexcept
Locks the mutex.
Definition qmutex.h:286
\inmodule QtCore
Definition qobject.h:103
virtual bool event(QEvent *event)
This virtual function receives events to an object and should return true if the event e was recogniz...
Definition qobject.cpp:1389
void stop()
Stops the asynchronous read sequence.
bool canReadLine() const
Returns true if a complete line of data can be read from the buffer.
qint64 bytesAvailable() const
Returns the number of bytes we've read so far.
DWORD checkPipeState()
Returns the number of available bytes in the pipe.
bool isReadOperationActive() const
Returns true if async operation is in progress, there is pending data to read, or a read error is pen...
qint64 skip(qint64 maxlen)
Skips up to {maxlen} bytes from the internal read buffer.
QWindowsPipeReader(QObject *parent=nullptr)
void startAsyncRead()
Starts an asynchronous read sequence on the pipe.
void stopAndClear()
Stops the asynchronous read sequence.
void drainAndStop()
Stops the asynchronous read sequence.
qint64 read(char *data, qint64 maxlen)
Copies at most {maxlen} bytes from the internal read buffer to {data}.
qint64 readLine(char *data, qint64 maxlen)
Reads a line from the internal buffer, but no more than {maxlen} characters.
void winError(ulong, const QString &)
void setHandle(HANDLE hPipeReadEnd)
Sets the handle to read from.
void setMaxReadBufferSize(qint64 size)
Sets the size of internal read buffer.
bool event(QEvent *e) override
Receives notification that the read operation has completed.
else opt state
[0]
void newState(QList< State > &states, const char *token, const char *lexem, bool pre)
void qErrnoWarning(const char *msg,...)
Combined button and popup list for selecting options.
void * HANDLE
static void * context
static ControlElement< T > * ptr(QWidget *widget)
constexpr const T & qMin(const T &a, const T &b)
Definition qminmax.h:40
constexpr const T & qMax(const T &a, const T &b)
Definition qminmax.h:42
GLuint64 GLenum void * handle
GLenum GLuint GLintptr GLsizeiptr size
[1]
GLint GLsizei GLsizei GLenum GLenum GLsizei void * data
GLbitfield GLuint readBuffer
#define Q_ASSERT(cond)
Definition qrandom.cpp:47
#define emit
#define Q_UNUSED(x)
long long qint64
Definition qtypes.h:60
static const DWORD minReadBufferSize