Implement unix:// sockets support

Add the support for the UNIX sockets to QGrpcHttp2Channel. The channel
only supports the non-encrypted communicatoin.

Pick-to: 6.7
Task-number: QTBUG-119074
Change-Id: Ie404999d7511a783e4d1504a0ac51e5fc97f23ba
Reviewed-by: Qt CI Bot <qt_ci_bot@qt-project.org>
Reviewed-by: Tatiana Borisova <tatiana.borisova@qt.io>
This commit is contained in:
Alexey Edelev 2023-11-19 10:49:38 +01:00
parent 316de69d70
commit ad6c1128c0
4 changed files with 61 additions and 14 deletions

View File

@ -16,6 +16,7 @@
#include <QtCore/qqueue.h>
#include <QtNetwork/private/hpack_p.h>
#include <QtNetwork/private/qhttp2connection_p.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtNetwork/qtcpsocket.h>
#if QT_CONFIG(ssl)
# include <QtNetwork/qsslsocket.h>
@ -149,6 +150,22 @@ private:
void channelOperationAsyncError(QGrpcChannelOperation *channelOperation,
const QGrpcStatus &status);
template <typename T>
void connectErrorHandler(T *socket, QGrpcChannelOperation *channelOperation)
{
QObject::connect(socket, &T::errorOccurred, channelOperation,
[channelOperationPtr = QPointer(channelOperation)](auto error) {
emit channelOperationPtr
->errorOccurred({ QGrpcStatus::StatusCode::Unavailable,
QString("Network error occurred %1"_L1)
.arg(error) });
// The errorOccured signal can remove the last channelOperation holder,
// and in the same time the last finished signal listener, so we need
// to make sure that channelOperationPtr is still valid before
if (!channelOperationPtr.isNull())
emit channelOperationPtr->finished();
});
}
void sendRequest(QGrpcChannelOperation *channelOperation);
void sendPendingRequests();
@ -167,10 +184,11 @@ private:
}
QGrpcChannelOptions m_channelOptions;
std::unique_ptr<QAbstractSocket> m_socket = nullptr;
std::unique_ptr<QIODevice> m_socket = nullptr;
QHttp2Connection *m_connection = nullptr;
std::vector<std::shared_ptr<QGrpcChannelOperation>> m_operations;
QList<Http2Handler *> m_activeHandlers;
bool m_isLocalSocket = false;
};
QGrpcHttp2ChannelPrivate::Http2Handler::Http2Handler(QHttp2Stream *_stream)
@ -256,6 +274,21 @@ QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QGrpcChannelOptions &op
: m_channelOptions(options)
{
QUrl url = m_channelOptions.host();
if (url.scheme() == "unix"_L1) {
auto *localSocket = initSocket<QLocalSocket>();
m_isLocalSocket = true;
QObject::connect(localSocket, &QLocalSocket::connected, this,
&QGrpcHttp2ChannelPrivate::createHttp2Connection);
QObject::connect(localSocket, &QLocalSocket::errorOccurred, this,
[this, url](QLocalSocket::LocalSocketError error) {
qGrpcDebug()
<< "Error occurred(" << error << "):"
<< static_cast<QLocalSocket *>(m_socket.get())->errorString()
<< url;
});
localSocket->connectToServer(url.host() + url.path());
} else
#if QT_CONFIG(ssl)
if (url.scheme() == "https"_L1 || options.sslConfiguration()) {
auto *sslSocket = initSocket<QSslSocket>();
@ -308,18 +341,26 @@ void QGrpcHttp2ChannelPrivate::processOperation(std::shared_ptr<QGrpcChannelOper
auto *channelOperationPtr = channelOperation.get();
Q_ASSERT_X(channelOperationPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation",
"Channel operation is nullptr.");
if (!m_socket->isWritable()) {
channelOperationAsyncError(channelOperationPtr,
{ QGrpcStatus::StatusCode::Unavailable,
m_socket->errorString() });
return;
}
if (m_isLocalSocket) {
connectErrorHandler<QLocalSocket>(static_cast<QLocalSocket *>(m_socket.get()),
channelOperationPtr);
} else {
connectErrorHandler<QAbstractSocket>(static_cast<QAbstractSocket *>(m_socket.get()),
channelOperationPtr);
}
if (m_connection == nullptr)
m_operations.emplace_back(channelOperation);
else
sendRequest(channelOperationPtr);
QObject::connect(m_socket.get(), &QAbstractSocket::errorOccurred, channelOperationPtr,
[channelOperationPtr](QAbstractSocket::SocketError error) {
emit channelOperationPtr
->errorOccurred({ QGrpcStatus::StatusCode::Unavailable,
QString("Network error occurred %1"_L1)
.arg(error) });
});
}
void QGrpcHttp2ChannelPrivate::createHttp2Connection()
@ -455,9 +496,9 @@ void QGrpcHttp2ChannelPrivate::sendRequest(QGrpcChannelOperation *channelOperati
{ ":authority"_ba, m_channelOptions.host().host().toLatin1() },
{ ":method"_ba, "POST"_ba },
{ ":path"_ba, QByteArray('/' + service + '/' + method) },
{ ":scheme"_ba, m_channelOptions.host().scheme().toLatin1() },
{ ":scheme"_ba, m_isLocalSocket ? "http"_ba : m_channelOptions.host().scheme().toLatin1() },
{ ContentTypeHeader.toByteArray(), "application/grpc"_ba },
{ GrpcServiceNameHeader.toByteArray(), service },
{ GrpcServiceNameHeader.toByteArray(), { service } },
{ GrpcAcceptEncodingHeader.toByteArray(), "identity,deflate,gzip"_ba },
{ AcceptEncodingHeader.toByteArray(), "identity,gzip"_ba },
{ TEHeader.toByteArray(), "trailers"_ba },

View File

@ -39,9 +39,9 @@ void QtGrpcClientBidirStreamTest::Valid()
auto stream = client()->streamTestMethodBiStream(request);
QString fullResponse;
int i = 0;
QObject::connect(stream.get(), &QGrpcBidirStream::messageReceived, this,
[stream, &request, &fullResponse]() {
static int i = 0;
[stream, &request, &fullResponse, &i]() {
SimpleStringMessage rsp = stream->read<SimpleStringMessage>();
fullResponse += rsp.testFieldString() + QString::number(++i);
stream->sendMessage(request);

View File

@ -38,9 +38,9 @@ void QtGrpcClientClientStreamTest::Valid()
auto stream = client()->streamTestMethodClientStream(request);
int i = 0;
QTimer sendTimer;
QObject::connect(&sendTimer, &QTimer::timeout, this, [&]() {
static int i = 0;
stream->sendMessage(request);
if (++i == ExpectedMessageCount)
sendTimer.stop();

View File

@ -17,6 +17,12 @@ void GrpcClientTestBase::initTestCase_data()
<< QFlags{ Channel::Qt }
<< std::shared_ptr<QAbstractGrpcChannel>(new QGrpcHttp2Channel(QGrpcChannelOptions{
QUrl("http://localhost:50051", QUrl::StrictMode) }));
#ifndef Q_OS_WINDOWS
QTest::newRow("Http2ClientUnix")
<< QFlags{ Channel::Qt }
<< std::shared_ptr<QAbstractGrpcChannel>(new QGrpcHttp2Channel(QGrpcChannelOptions{
QUrl("unix:///tmp/qtgrpc_test.sock", QUrl::StrictMode) }));
#endif
}
#if QT_CONFIG(native_grpc)