Qt
Internal/Contributor docs for the Qt SDK. <b>Note:</b> These are NOT official API docs; those are found <a href='https://doc.qt.io/'>here</a>.
Loading...
Searching...
No Matches
qctfserver.cpp
Go to the documentation of this file.
1// Copyright (C) 2023 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include <qloggingcategory.h>
5#include "qctfserver_p.h"
6
7#if QT_CONFIG(zstd)
8#include <zstd.h>
9#endif
10
11using namespace Qt::Literals::StringLiterals;
12
13Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg)
14
15#if QT_CONFIG(zstd)
16static QByteArray zstdCompress(ZSTD_CCtx *&context, const QByteArray &data, int compression)
17{
18 if (context == nullptr)
19 context = ZSTD_createCCtx();
20 qsizetype size = data.size();
21 size = ZSTD_COMPRESSBOUND(size);
23 char *dst = compressed.data();
24 size_t n = ZSTD_compressCCtx(context, dst, size,
25 data.constData(), data.size(),
26 compression);
27 if (ZSTD_isError(n)) {
28 qCWarning(lcCtfInfoTrace) << "Compression with zstd failed: " << QString::fromUtf8(ZSTD_getErrorName(n));
29 return {};
30 }
32 return compressed;
33}
34#endif
35
37 : QThread(parent)
38{
39 m_keySet << "cliendId"_L1
40 << "clientVersion"_L1
41 << "sessionName"_L1
42 << "sessionTracepoints"_L1
43 << "flags"_L1
44 << "bufferSize"_L1
45 << "compressionScheme"_L1;
46}
47
49{
50#if QT_CONFIG(zstd)
51 ZSTD_freeCCtx(m_zstdCCtx);
52#endif
53}
54
56{
57 m_address = address;
58}
59
61{
62 m_port = port;
63}
64
66{
67 m_cb = cb;
68}
69
71{
72 return m_req.sessionName;
73}
74
79
81{
82 return m_bufferOnIdle;
83}
84
86{
87 return m_status;
88}
89
90void QCtfServer::setStatusAndNotify(ServerStatus status)
91{
92 m_status = status;
94}
95
96void QCtfServer::bytesWritten(qint64 size)
97{
98 m_writtenSize += size;
99 if (m_writtenSize >= m_waitWriteSize && m_eventLoop)
100 m_eventLoop->exit();
101}
102
103void QCtfServer::initWrite()
104{
105 m_waitWriteSize = 0;
106 m_writtenSize = 0;
107}
108
109bool QCtfServer::waitSocket()
110{
111 if (m_eventLoop)
112 m_eventLoop->exec();
113 return m_socket->state() == QTcpSocket::ConnectedState;
114}
115
116void QCtfServer::handleString(QCborStreamReader &cbor)
117{
118 const auto readString = [](QCborStreamReader &cbor) -> QString {
120 auto r = cbor.readString();
121 while (r.status == QCborStreamReader::Ok) {
122 result += r.data;
123 r = cbor.readString();
124 }
125
126 if (r.status == QCborStreamReader::Error) {
127 // handle error condition
128 result.clear();
129 }
130 return result;
131 };
132 do {
133 if (m_currentKey.isEmpty()) {
134 m_currentKey = readString(cbor);
135 } else {
136 switch (m_keySet.indexOf(m_currentKey)) {
138 m_req.sessionName = readString(cbor);
139 break;
141 m_req.sessionTracepoints = readString(cbor);
142 break;
144 m_requestedCompressionScheme = readString(cbor);
145 break;
146 default:
147 // handle error
148 break;
149 }
150 m_currentKey.clear();
151 }
152 if (cbor.lastError() == QCborError::EndOfFile) {
153 if (!waitSocket())
154 return;
155 cbor.reparse();
156 }
157 } while (cbor.lastError() == QCborError::EndOfFile);
158}
159
160void QCtfServer::handleFixedWidth(QCborStreamReader &cbor)
161{
162 switch (m_keySet.indexOf(m_currentKey)) {
163 case RequestClientId:
164 if (!cbor.isUnsignedInteger())
165 return;
166 m_req.clientId = cbor.toUnsignedInteger();
167 break;
169 if (!cbor.isUnsignedInteger())
170 return;
171 m_req.clientVersion = cbor.toUnsignedInteger();
172 break;
173 case RequestFlags:
174 if (!cbor.isUnsignedInteger())
175 return;
176 m_req.flags = cbor.toUnsignedInteger();
177 break;
179 if (!cbor.isUnsignedInteger())
180 return;
181 m_req.bufferSize = cbor.toUnsignedInteger();
182 break;
183 default:
184 // handle error
185 break;
186 }
187 m_currentKey.clear();
188}
189
190void QCtfServer::readCbor(QCborStreamReader &cbor)
191{
192 switch (cbor.type()) {
199 handleFixedWidth(cbor);
200 cbor.next();
201 break;
204 handleString(cbor);
205 break;
208 cbor.enterContainer();
209 while (cbor.lastError() == QCborError::NoError && cbor.hasNext())
210 readCbor(cbor);
211 if (cbor.lastError() == QCborError::NoError)
212 cbor.leaveContainer();
213 default:
214 break;
215 }
216}
217
218void QCtfServer::writePacket(TracePacket &packet, QCborStreamWriter &cbor)
219{
220 cbor.startMap(4);
221 cbor.append("magic"_L1);
222 cbor.append(packet.PacketMagicNumber);
223 cbor.append("name"_L1);
225 cbor.append("flags"_L1);
226 cbor.append(packet.flags);
227
228 cbor.append("data"_L1);
229 if (m_compression > 0) {
231#if QT_CONFIG(zstd)
232 if (m_requestedCompressionScheme == QStringLiteral("zstd"))
233 compressed = zstdCompress(m_zstdCCtx, packet.stream_data, m_compression);
234 else
235#endif
236 compressed = qCompress(packet.stream_data, m_compression);
237
238 cbor.append(compressed);
239 } else {
240 cbor.append(packet.stream_data);
241 }
242
243 cbor.endMap();
244}
245
246bool QCtfServer::recognizedCompressionScheme() const
247{
248 if (m_requestedCompressionScheme.isEmpty())
249 return true;
250#if QT_CONFIG(zstd)
251 if (m_requestedCompressionScheme == QStringLiteral("zstd"))
252 return true;
253#endif
254 if (m_requestedCompressionScheme == QStringLiteral("zlib"))
255 return true;
256 return false;
257}
258
260{
261 m_server = new QTcpServer();
263 if (m_address.isEmpty())
265 else
266 addr = QHostAddress(m_address);
267
268 qCInfo(lcCtfInfoTrace) << "Starting CTF server: " << m_address << ", port: " << m_port;
269
270 while (m_stopping == 0) {
271 if (!m_server->isListening()) {
272 if (!m_server->listen(addr, m_port)) {
273 qCInfo(lcCtfInfoTrace) << "Unable to start server";
274 m_stopping = 1;
275 setStatusAndNotify(Error);
276 }
277 }
278 setStatusAndNotify(Idle);
279 if (m_server->waitForNewConnection(-1)) {
280 qCInfo(lcCtfInfoTrace) << "client connection";
281 m_eventLoop = new QEventLoop();
282 m_socket = m_server->nextPendingConnection();
283
284 QObject::connect(m_socket, &QTcpSocket::readyRead, [&](){
285 if (m_eventLoop) m_eventLoop->exit();
286 });
287 QObject::connect(m_socket, &QTcpSocket::bytesWritten, this, &QCtfServer::bytesWritten);
289 if (m_eventLoop) m_eventLoop->exit();
290 });
291
292 m_server->close(); // Do not wait for more connections
293 setStatusAndNotify(Connected);
294
295 if (waitSocket())
296 {
297 QCborStreamReader cbor(m_socket);
298
299 m_req = {};
300 while (cbor.hasNext() && cbor.lastError() == QCborError::NoError)
301 readCbor(cbor);
302
303 if (!m_req.isValid()) {
304 qCInfo(lcCtfInfoTrace) << "Invalid trace request.";
305 m_socket->close();
306 } else {
307 m_compression = m_req.flags & CompressionMask;
308#if QT_CONFIG(zstd)
309 m_compression = qMin(m_compression, ZSTD_maxCLevel());
310#else
311 m_compression = qMin(m_compression, 9);
312#endif
313 m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
314
315 m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
316
317 if (!recognizedCompressionScheme()) {
318 qCWarning(lcCtfInfoTrace) << "Client requested unrecognized compression scheme: " << m_requestedCompressionScheme;
319 m_requestedCompressionScheme.clear();
320 m_compression = 0;
321 }
322
323 qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints;
324
325 m_cb->handleSessionChange();
326 {
327 TraceResponse resp;
328 resp.serverId = ServerId;
329 resp.serverVersion = 1;
330 resp.serverName = QStringLiteral("Ctf Server");
331
332 QCborStreamWriter cbor(m_socket);
333 cbor.startMap(m_compression ? 4 : 3);
334 cbor.append("serverId"_L1);
335 cbor.append(resp.serverId);
336 cbor.append("serverVersion"_L1);
337 cbor.append(resp.serverVersion);
338 cbor.append("serverName"_L1);
339 cbor.append(resp.serverName);
340 if (m_compression) {
341 cbor.append("compressionScheme"_L1);
342 cbor.append(m_requestedCompressionScheme);
343 }
344 cbor.endMap();
345 }
346
347 qCInfo(lcCtfInfoTrace) << "response sent, sending data";
348 if (waitSocket()) {
349 while (m_socket->state() == QTcpSocket::ConnectedState) {
350 QList<TracePacket> packets;
351 {
352 QMutexLocker lock(&m_mutex);
353 while (m_packets.size() == 0)
354 m_bufferHasData.wait(&m_mutex);
355 packets = std::exchange(m_packets, {});
356 }
357
358 {
359 QCborStreamWriter cbor(m_socket);
360 for (TracePacket &packet : packets) {
361 writePacket(packet, cbor);
362 if (!waitSocket())
363 break;
364 }
365 }
366 qCInfo(lcCtfInfoTrace) << packets.size() << " packets written";
367 }
368 }
369
370 qCInfo(lcCtfInfoTrace) << "client connection closed";
371 }
372 }
373 delete m_eventLoop;
374 m_eventLoop = nullptr;
375 } else {
376 qCInfo(lcCtfInfoTrace) << "error: " << m_server->errorString();
377 m_stopping = 1;
378 setStatusAndNotify(Error);
379 }
380 }
381}
382
384{
385 start();
386}
388{
389 this->m_stopping = 1;
390 wait();
391}
392
394{
395 QMutexLocker lock(&m_mutex);
396 TracePacket packet;
397 packet.stream_name = stream.toUtf8();
398 packet.stream_data = data;
399 packet.flags = flags;
400 m_packets.append(packet);
401 if (m_packets.size() > m_maxPackets)
402 m_packets.pop_front();
403 m_bufferHasData.wakeOne();
404}
void close() override
Closes the I/O device for the socket and calls disconnectFromHost() to close the socket's connection.
SocketState state() const
Returns the state of the socket.
void disconnected()
This signal is emitted when the socket has been disconnected.
\inmodule QtCore
Definition qbytearray.h:57
char * data()
\macro QT_NO_CAST_FROM_BYTEARRAY
Definition qbytearray.h:611
void truncate(qsizetype pos)
Truncates the byte array at index position pos.
\inmodule QtCore\reentrant
QCborError lastError() const
Returns the last error in decoding the stream, if any.
bool next(int maxRecursion=10000)
Advance the CBOR stream decoding one element.
Type type() const
\variable QCborStreamReader::StringResult::data
bool enterContainer()
Enters the array or map that is the current item and prepares for iterating the elements contained in...
bool isUnsignedInteger() const
Returns true if the type of the current element is an unsigned integer (that is if type() returns QCb...
void reparse()
Reparses the current element.
StringResult< QString > readString()
Decodes one string chunk from the CBOR string and returns it.
bool hasNext() const noexcept Q_DECL_PURE_FUNCTION
Returns true if there are more items to be decoded in the current container or false of we've reached...
quint64 toUnsignedInteger() const
Returns the unsigned integer value of the current element.
bool leaveContainer()
Leaves the array or map whose items were being processed and positions the decoder at the next item a...
\inmodule QtCore\reentrant
bool endMap()
Terminates the map started by either overload of startMap() and returns true if the correct number of...
void startMap()
Starts a CBOR Map with indeterminate length in the CBOR stream.
void append(quint64 u)
This is an overloaded member function, provided for convenience. It differs from the above function o...
void setHost(const QString &address)
QString sessionTracepoints() const
QCtfServer(QObject *parent=nullptr)
@ RequestSessionTracepoints
@ RequestCompressionScheme
bool bufferOnIdle() const
void setCallback(ServerCallback *cb)
void startServer()
void stopServer()
void bufferData(const QString &stream, const QByteArray &data, quint32 flags)
void setPort(int port)
void run() override
ServerStatus status() const
QString sessionName() const
\inmodule QtCore
Definition qeventloop.h:16
int exec(ProcessEventsFlags flags=AllEvents)
Enters the main event loop and waits until exit() is called.
void exit(int returnCode=0)
Tells the event loop to exit with a return code.
The QHostAddress class provides an IP address.
void readyRead()
This signal is emitted once every time new data is available for reading from the device's current re...
void bytesWritten(qint64 bytes)
This signal is emitted every time a payload of data has been written to the device's current write ch...
qsizetype size() const noexcept
Definition qlist.h:397
void pop_front() noexcept
Definition qlist.h:680
void append(parameter_type t)
Definition qlist.h:458
\inmodule QtCore
Definition qmutex.h:313
\inmodule QtCore
Definition qobject.h:103
static QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *member, Qt::ConnectionType=Qt::AutoConnection)
\threadsafe
Definition qobject.cpp:2960
\macro QT_RESTRICTED_CAST_FROM_ASCII
Definition qstring.h:129
bool isEmpty() const noexcept
Returns true if the string has no characters; otherwise returns false.
Definition qstring.h:192
void clear()
Clears the contents of the string and makes it null.
Definition qstring.h:1252
static QString fromUtf8(QByteArrayView utf8)
This is an overloaded member function, provided for convenience. It differs from the above function o...
Definition qstring.cpp:6018
The QTcpServer class provides a TCP-based server.
Definition qtcpserver.h:22
QString errorString() const
Returns a human readable description of the last error that occurred.
virtual QTcpSocket * nextPendingConnection()
Returns the next pending connection as a connected QTcpSocket object.
bool listen(const QHostAddress &address=QHostAddress::Any, quint16 port=0)
Tells the server to listen for incoming connections on address address and port port.
bool isListening() const
Returns true if the server is currently listening for incoming connections; otherwise returns false.
bool waitForNewConnection(int msec=0, bool *timedOut=nullptr)
Waits for at most msec milliseconds or until an incoming connection is available.
void close()
Closes the server.
bool wait(QDeadlineTimer deadline=QDeadlineTimer(QDeadlineTimer::Forever))
Definition qthread.cpp:1023
bool wait(QMutex *, QDeadlineTimer=QDeadlineTimer(QDeadlineTimer::Forever))
constexpr Initialization Uninitialized
static void * context
QByteArray qCompress(const uchar *data, qsizetype nbytes, int compressionLevel)
struct ZSTD_CCtx_s ZSTD_CCtx
EGLStreamKHR stream
EGLOutputPortEXT port
@ QtWarningMsg
Definition qlogging.h:31
#define Q_LOGGING_CATEGORY(name,...)
#define qCInfo(category,...)
#define qCWarning(category,...)
constexpr const T & qMin(const T &a, const T &b)
Definition qminmax.h:40
constexpr const T & qMax(const T &a, const T &b)
Definition qminmax.h:42
GLenum GLuint GLintptr GLsizeiptr size
[1]
GLboolean r
[2]
GLint GLsizei GLsizei GLenum GLenum GLsizei void * data
GLenum GLenum dst
GLbitfield flags
GLuint start
GLfloat n
GLenum const void * addr
GLuint GLuint64EXT address
GLuint64EXT * result
[6]
SSL_CTX int(* cb)(SSL *ssl, unsigned char **out, unsigned char *outlen, const unsigned char *in, unsigned int inlen, void *arg)
#define QStringLiteral(str)
unsigned int quint32
Definition qtypes.h:50
ptrdiff_t qsizetype
Definition qtypes.h:165
long long qint64
Definition qtypes.h:60
QByteArray compressed
QReadWriteLock lock
[0]
virtual void handleSessionChange()=0
virtual void handleStatusChange(ServerStatus status)=0
qsizetype indexOf(const AT &t, qsizetype from=0) const noexcept
Definition qlist.h:962
static constexpr quint32 PacketSize
quint32 flags
static constexpr quint32 PacketMagicNumber
QByteArray stream_data
QByteArray stream_name
QString sessionName
quint32 clientVersion
quint32 bufferSize
quint32 clientId
quint32 flags
bool isValid() const
QString sessionTracepoints