2022-10-20 11:49:27 +00:00
|
|
|
// Copyright (C) 2022 The Qt Company Ltd.
|
|
|
|
|
// Copyright (C) 2019 Giulio Girardi <giulio.girardi@protechgroup.it>
|
|
|
|
|
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
|
|
|
|
|
|
|
|
|
|
#include <QtCore/QFuture>
|
|
|
|
|
#include <QtCore/QList>
|
|
|
|
|
#include <QtCore/QThread>
|
|
|
|
|
#include <QtCore/qloggingcategory.h>
|
|
|
|
|
#include <QtGrpc/qabstractgrpcclient.h>
|
|
|
|
|
#include <QtGrpc/qabstractgrpccredentials.h>
|
|
|
|
|
#include <QtGrpc/qgrpccredentials.h>
|
|
|
|
|
#include <QtProtobuf/QProtobufSerializer>
|
|
|
|
|
#include <qtgrpcglobal_p.h>
|
|
|
|
|
|
|
|
|
|
#include <grpcpp/channel.h>
|
|
|
|
|
#include <grpcpp/create_channel.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/byte_buffer.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/client_unary_call.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/rpc_method.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/slice.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/status.h>
|
|
|
|
|
#include <grpcpp/impl/codegen/sync_stream.h>
|
|
|
|
|
#include <grpcpp/security/credentials.h>
|
|
|
|
|
|
|
|
|
|
#include <unordered_map>
|
|
|
|
|
|
|
|
|
|
#include "qgrpcchannel.h"
|
|
|
|
|
#include "qgrpcchannel_p.h"
|
|
|
|
|
|
|
|
|
|
QT_BEGIN_NAMESPACE
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
\class QGrpcChannel
|
2022-12-13 11:28:02 +00:00
|
|
|
\inmodule QtGRPC
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
\brief The QGrpcChannel class is a gRPC-cpp native api implementation of
|
2022-12-08 12:53:08 +00:00
|
|
|
QAbstractGrpcChannel interface.
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
QGrpcChannel accepts the same grpc::ChannelCredentials type that is required
|
|
|
|
|
by native-api grpc::CreateChannel.
|
2023-01-31 08:00:43 +00:00
|
|
|
\sa{https://grpc.github.io/grpc/cpp/classgrpc_1_1_channel_credentials.html}{gRPC ChannelCredentials}.
|
2022-12-08 12:53:08 +00:00
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
static grpc::Status parseByteBuffer(const grpc::ByteBuffer &buffer, QByteArray &data)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
|
std::vector<grpc::Slice> slices;
|
|
|
|
|
auto status = buffer.Dump(&slices);
|
|
|
|
|
|
|
|
|
|
if (!status.ok())
|
|
|
|
|
return status;
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
for (const auto &slice : slices)
|
|
|
|
|
data.append((const char *)slice.begin(), slice.size());
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
return grpc::Status::OK;
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
static grpc::ByteBuffer parseQByteArray(QByteArrayView bytearray)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
|
grpc::ByteBuffer buffer;
|
|
|
|
|
grpc::Slice slice(bytearray.data(), bytearray.size());
|
|
|
|
|
grpc::ByteBuffer tmp(&slice, 1);
|
|
|
|
|
buffer.Swap(&tmp);
|
|
|
|
|
return buffer;
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
static std::string toStdString(QLatin1StringView view)
|
|
|
|
|
{
|
|
|
|
|
return std::string(view.data(), view.size());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QGrpcChannelStream::QGrpcChannelStream(grpc::Channel *channel, QLatin1StringView method,
|
|
|
|
|
QByteArrayView data, QObject *parent)
|
2022-10-20 11:49:27 +00:00
|
|
|
: QObject(parent)
|
|
|
|
|
{
|
|
|
|
|
grpc::ByteBuffer request = parseQByteArray(data);
|
|
|
|
|
|
|
|
|
|
reader = grpc::internal::ClientReaderFactory<grpc::ByteBuffer>::Create(
|
|
|
|
|
channel,
|
2022-12-16 14:03:33 +00:00
|
|
|
grpc::internal::RpcMethod(toStdString(method).c_str(),
|
2022-10-20 11:49:27 +00:00
|
|
|
grpc::internal::RpcMethod::SERVER_STREAMING),
|
|
|
|
|
&context, request);
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
thread = QThread::create([this] {
|
2022-10-20 11:49:27 +00:00
|
|
|
grpc::ByteBuffer response;
|
2022-12-09 12:03:04 +00:00
|
|
|
grpc::Status parseStatus;
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
while (reader->Read(&response)) {
|
|
|
|
|
QByteArray data;
|
2022-12-09 12:03:04 +00:00
|
|
|
parseStatus = parseByteBuffer(response, data);
|
|
|
|
|
if (!parseStatus.ok()) {
|
|
|
|
|
status = { static_cast<QGrpcStatus::StatusCode>(parseStatus.error_code()),
|
|
|
|
|
QString::fromStdString(parseStatus.error_message()) };
|
2022-10-20 11:49:27 +00:00
|
|
|
return; // exit thread
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
emit dataReady(data);
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
parseStatus = reader->Finish();
|
|
|
|
|
status = { static_cast<QGrpcStatus::StatusCode>(parseStatus.error_code()),
|
|
|
|
|
QString::fromStdString(parseStatus.error_message()) };
|
2022-10-20 11:49:27 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
|
|
connect(thread, &QThread::finished, this, &QGrpcChannelStream::finished);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void QGrpcChannelStream::start()
|
|
|
|
|
{
|
|
|
|
|
thread->start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QGrpcChannelStream::~QGrpcChannelStream()
|
|
|
|
|
{
|
|
|
|
|
cancel();
|
|
|
|
|
thread->wait();
|
|
|
|
|
thread->deleteLater();
|
|
|
|
|
delete reader;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void QGrpcChannelStream::cancel()
|
|
|
|
|
{
|
|
|
|
|
// TODO: check thread safety
|
|
|
|
|
context.TryCancel();
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
QGrpcChannelCall::QGrpcChannelCall(grpc::Channel *channel, QLatin1StringView method,
|
|
|
|
|
QByteArrayView data, QObject *parent)
|
2022-10-20 11:49:27 +00:00
|
|
|
: QObject(parent)
|
|
|
|
|
{
|
|
|
|
|
grpc::ByteBuffer request = parseQByteArray(data);
|
2022-12-16 14:03:33 +00:00
|
|
|
thread = QThread::create([this, request, channel, method = toStdString(method)] {
|
2022-12-09 12:03:04 +00:00
|
|
|
grpc::ByteBuffer callResponse;
|
2022-10-20 11:49:27 +00:00
|
|
|
QByteArray data;
|
2022-12-09 12:03:04 +00:00
|
|
|
grpc::Status callStatus;
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
callStatus = grpc::internal::BlockingUnaryCall(
|
2022-10-20 11:49:27 +00:00
|
|
|
channel,
|
2022-12-16 14:03:33 +00:00
|
|
|
grpc::internal::RpcMethod(method.c_str(), grpc::internal::RpcMethod::NORMAL_RPC),
|
2022-12-09 12:03:04 +00:00
|
|
|
&context, request, &callResponse);
|
|
|
|
|
if (!callStatus.ok()) {
|
|
|
|
|
status = { static_cast<QGrpcStatus::StatusCode>(callStatus.error_code()),
|
|
|
|
|
QString::fromStdString(callStatus.error_message()) };
|
2022-10-20 11:49:27 +00:00
|
|
|
return; // exit thread
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
callStatus = parseByteBuffer(callResponse, response);
|
|
|
|
|
status = { static_cast<QGrpcStatus::StatusCode>(callStatus.error_code()),
|
|
|
|
|
QString::fromStdString(callStatus.error_message()) };
|
2022-10-20 11:49:27 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
|
|
connect(thread, &QThread::finished, this, &QGrpcChannelCall::finished);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void QGrpcChannelCall::start()
|
|
|
|
|
{
|
|
|
|
|
thread->start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QGrpcChannelCall::~QGrpcChannelCall()
|
|
|
|
|
{
|
|
|
|
|
cancel();
|
|
|
|
|
thread->wait();
|
|
|
|
|
thread->deleteLater();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void QGrpcChannelCall::cancel()
|
|
|
|
|
{
|
|
|
|
|
// TODO: check thread safety
|
|
|
|
|
context.TryCancel();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void QGrpcChannelCall::waitForFinished(const QDeadlineTimer &deadline)
|
|
|
|
|
{
|
|
|
|
|
thread->wait(deadline);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QGrpcChannelPrivate::QGrpcChannelPrivate(const QUrl &url,
|
|
|
|
|
QGrpcChannel::NativeGrpcChannelCredentials credentialsType,
|
|
|
|
|
const QStringList &credentialsList)
|
|
|
|
|
{
|
|
|
|
|
switch (credentialsType) {
|
|
|
|
|
case QGrpcChannel::InsecureChannelCredentials:
|
|
|
|
|
m_credentials = grpc::InsecureChannelCredentials();
|
|
|
|
|
m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials);
|
|
|
|
|
break;
|
|
|
|
|
case QGrpcChannel::GoogleDefaultCredentials:
|
|
|
|
|
m_credentials = grpc::GoogleDefaultCredentials();
|
|
|
|
|
m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials);
|
|
|
|
|
break;
|
2022-12-08 12:53:08 +00:00
|
|
|
case QGrpcChannel::SslDefaultCredentials:
|
2022-10-20 11:49:27 +00:00
|
|
|
if (credentialsList.size() < 3) {
|
|
|
|
|
m_credentials = grpc::SslCredentials(grpc::SslCredentialsOptions());
|
|
|
|
|
} else {
|
|
|
|
|
grpc::SslCredentialsOptions options;
|
|
|
|
|
options.pem_root_certs = credentialsList[0].toStdString();
|
|
|
|
|
options.pem_private_key = credentialsList[1].toStdString();
|
|
|
|
|
options.pem_cert_chain = credentialsList[2].toStdString();
|
|
|
|
|
m_credentials = grpc::SslCredentials(options);
|
|
|
|
|
}
|
|
|
|
|
m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials);
|
2022-12-08 12:53:08 +00:00
|
|
|
break;
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
QGrpcChannelPrivate::~QGrpcChannelPrivate() = default;
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
void QGrpcChannelPrivate::call(QLatin1StringView method, QLatin1StringView service,
|
|
|
|
|
QByteArrayView args, QGrpcCallReply *reply)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2022-12-16 14:03:33 +00:00
|
|
|
const QByteArray rpcName(QLatin1StringView("/%1/%2").arg(service, method).toLatin1());
|
|
|
|
|
QSharedPointer<QGrpcChannelCall> call(new QGrpcChannelCall(m_channel.get(),
|
|
|
|
|
QLatin1StringView(rpcName), args,
|
2022-10-20 11:49:27 +00:00
|
|
|
reply),
|
|
|
|
|
&QGrpcChannelCall::deleteLater);
|
2022-12-09 13:03:47 +00:00
|
|
|
auto connection = std::make_shared<QMetaObject::Connection>();
|
|
|
|
|
auto abortConnection = std::make_shared<QMetaObject::Connection>();
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
*connection = QObject::connect(call.get(), &QGrpcChannelCall::finished, reply,
|
2022-12-09 12:03:04 +00:00
|
|
|
[call, reply, connection, abortConnection] {
|
2022-10-20 11:49:27 +00:00
|
|
|
if (call->status.code() == QGrpcStatus::Ok) {
|
|
|
|
|
reply->setData(call->response);
|
|
|
|
|
reply->finished();
|
|
|
|
|
} else {
|
|
|
|
|
reply->setData({});
|
|
|
|
|
reply->errorOccurred(call->status);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QObject::disconnect(*connection);
|
|
|
|
|
QObject::disconnect(*abortConnection);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
*abortConnection = QObject::connect(reply, &QGrpcCallReply::errorOccurred, call.get(),
|
|
|
|
|
[call, connection,
|
|
|
|
|
abortConnection](const QGrpcStatus &status) {
|
|
|
|
|
if (status.code() == QGrpcStatus::Aborted) {
|
|
|
|
|
QObject::disconnect(*connection);
|
|
|
|
|
QObject::disconnect(*abortConnection);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
call->start();
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
QGrpcStatus QGrpcChannelPrivate::call(QLatin1StringView method, QLatin1StringView service,
|
|
|
|
|
QByteArrayView args, QByteArray &ret)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2022-12-16 14:03:33 +00:00
|
|
|
const QByteArray rpcName(QLatin1StringView("/%1/%2").arg(service, method).toLatin1());
|
|
|
|
|
QGrpcChannelCall call(m_channel.get(), QLatin1StringView(rpcName), args);
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
call.start();
|
|
|
|
|
call.waitForFinished();
|
|
|
|
|
|
|
|
|
|
ret = call.response;
|
|
|
|
|
return call.status;
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
void QGrpcChannelPrivate::stream(QGrpcStream *stream, QLatin1StringView service)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
|
Q_ASSERT(stream != nullptr);
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
const QByteArray rpcName(QLatin1StringView("/%1/%2").arg(service, stream->method()).toLatin1());
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
QSharedPointer<QGrpcChannelStream> sub(new QGrpcChannelStream(m_channel.get(),
|
|
|
|
|
QLatin1StringView(rpcName),
|
2022-10-20 11:49:27 +00:00
|
|
|
stream->arg(), stream),
|
|
|
|
|
&QGrpcChannelStream::deleteLater);
|
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
auto abortConnection = std::make_shared<QMetaObject::Connection>();
|
|
|
|
|
auto readConnection = std::make_shared<QMetaObject::Connection>();
|
|
|
|
|
auto connection = std::make_shared<QMetaObject::Connection>();
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2023-01-03 14:47:29 +00:00
|
|
|
auto disconnectAllConnections = [abortConnection, readConnection, connection]() {
|
2022-12-09 12:03:04 +00:00
|
|
|
QObject::disconnect(*connection);
|
|
|
|
|
QObject::disconnect(*readConnection);
|
|
|
|
|
QObject::disconnect(*abortConnection);
|
|
|
|
|
};
|
|
|
|
|
|
2022-10-20 11:49:27 +00:00
|
|
|
*readConnection = QObject::connect(sub.get(), &QGrpcChannelStream::dataReady, stream,
|
2022-12-09 12:03:04 +00:00
|
|
|
[stream](QByteArrayView data) {
|
|
|
|
|
stream->handler(data.toByteArray());
|
|
|
|
|
});
|
2022-10-20 11:49:27 +00:00
|
|
|
|
|
|
|
|
*connection = QObject::connect(sub.get(), &QGrpcChannelStream::finished, stream,
|
2022-12-09 12:03:04 +00:00
|
|
|
[disconnectAllConnections, sub, stream] {
|
2022-10-20 11:49:27 +00:00
|
|
|
qGrpcDebug()
|
|
|
|
|
<< "Stream ended with server closing connection";
|
2022-12-09 12:03:04 +00:00
|
|
|
disconnectAllConnections();
|
2022-10-20 11:49:27 +00:00
|
|
|
|
2022-12-08 12:53:08 +00:00
|
|
|
if (sub->status.code() != QGrpcStatus::Ok)
|
2022-10-20 11:49:27 +00:00
|
|
|
stream->errorOccurred(sub->status);
|
2022-11-04 18:31:32 +00:00
|
|
|
stream->finished();
|
2022-10-20 11:49:27 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
|
|
*abortConnection = QObject::connect(stream, &QGrpcStream::finished, sub.get(),
|
2022-12-09 12:03:04 +00:00
|
|
|
[disconnectAllConnections, sub] {
|
2022-10-20 11:49:27 +00:00
|
|
|
qGrpcDebug() << "Stream client was finished";
|
2022-12-09 12:03:04 +00:00
|
|
|
disconnectAllConnections();
|
2022-10-20 11:49:27 +00:00
|
|
|
sub->cancel();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
sub->start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
2022-12-08 12:53:08 +00:00
|
|
|
QGrpcChannel constructs QGrpcChannel with \a url, \a credentialsType
|
|
|
|
|
and \a credentialsList object.
|
|
|
|
|
*/
|
2022-10-20 11:49:27 +00:00
|
|
|
QGrpcChannel::QGrpcChannel(const QUrl &url, NativeGrpcChannelCredentials credentialsType,
|
|
|
|
|
const QStringList &credentialsList)
|
|
|
|
|
: QAbstractGrpcChannel(),
|
|
|
|
|
dPtr(std::make_unique<QGrpcChannelPrivate>(url, credentialsType, credentialsList))
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-09 12:03:04 +00:00
|
|
|
QGrpcChannel::QGrpcChannel(const QUrl &url, NativeGrpcChannelCredentials credentialsType)
|
|
|
|
|
: QGrpcChannel(url, credentialsType, QStringList())
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-20 11:49:27 +00:00
|
|
|
QGrpcChannel::~QGrpcChannel() = default;
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
QGrpcStatus QGrpcChannel::call(QLatin1StringView method, QLatin1StringView service,
|
|
|
|
|
QByteArrayView args, QByteArray &ret)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
|
return dPtr->call(method, service, args, ret);
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
void QGrpcChannel::call(QLatin1StringView method, QLatin1StringView service, QByteArrayView args,
|
2022-10-20 11:49:27 +00:00
|
|
|
QGrpcCallReply *reply)
|
|
|
|
|
{
|
|
|
|
|
dPtr->call(method, service, args, reply);
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-16 14:03:33 +00:00
|
|
|
void QGrpcChannel::stream(QGrpcStream *stream, QLatin1StringView service)
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
2023-01-03 14:47:29 +00:00
|
|
|
dPtr->stream(stream, service);
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
|
|
2022-12-09 13:03:47 +00:00
|
|
|
std::shared_ptr<QAbstractProtobufSerializer> QGrpcChannel::serializer() const
|
2022-10-20 11:49:27 +00:00
|
|
|
{
|
|
|
|
|
// TODO: make selection based on credentials or channel settings
|
2022-12-09 13:03:47 +00:00
|
|
|
return std::make_shared<QProtobufSerializer>();
|
2022-10-20 11:49:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
QT_END_NAMESPACE
|