// Copyright (C) 2022 The Qt Company Ltd. // Copyright (C) 2019 Giulio Girardi // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "qgrpcchannel.h" #include "qgrpcchannel_p.h" QT_BEGIN_NAMESPACE using namespace Qt::StringLiterals; /*! \class QGrpcChannel \inmodule QtGRPC \brief The QGrpcChannel class is a gRPC-cpp native api implementation of QAbstractGrpcChannel interface. QGrpcChannel accepts the same grpc::ChannelCredentials type that is required by native-api grpc::CreateChannel. \sa{https://grpc.github.io/grpc/cpp/classgrpc_1_1_channel_credentials.html}{gRPC ChannelCredentials}. */ static grpc::Status parseByteBuffer(const grpc::ByteBuffer &buffer, QByteArray &data) { std::vector slices; auto status = buffer.Dump(&slices); if (!status.ok()) return status; for (const auto &slice : slices) data.append((const char *)slice.begin(), slice.size()); return grpc::Status::OK; } static grpc::ByteBuffer parseQByteArray(QByteArrayView bytearray) { grpc::ByteBuffer buffer; grpc::Slice slice(bytearray.data(), bytearray.size()); grpc::ByteBuffer tmp(&slice, 1); buffer.Swap(&tmp); return buffer; } static std::string toStdString(QLatin1StringView view) { return std::string(view.data(), view.size()); } static QByteArray buildRpcName(QLatin1StringView service, QLatin1StringView method) { return '/' % QByteArrayView(service) % '/' % QByteArrayView(method); } QGrpcChannelStream::QGrpcChannelStream(grpc::Channel *channel, QLatin1StringView method, QByteArrayView data, QObject *parent) : QObject(parent) { grpc::ByteBuffer request = parseQByteArray(data); reader = grpc::internal::ClientReaderFactory::Create( channel, grpc::internal::RpcMethod(toStdString(method).c_str(), grpc::internal::RpcMethod::SERVER_STREAMING), &context, request); thread = QThread::create([this] { grpc::ByteBuffer response; grpc::Status parseStatus; while (reader->Read(&response)) { QByteArray data; parseStatus = parseByteBuffer(response, data); if (!parseStatus.ok()) { status = { static_cast(parseStatus.error_code()), QString::fromStdString(parseStatus.error_message()) }; return; // exit thread } emit dataReady(data); } parseStatus = reader->Finish(); status = { static_cast(parseStatus.error_code()), QString::fromStdString(parseStatus.error_message()) }; }); connect(thread, &QThread::finished, this, &QGrpcChannelStream::finished); } void QGrpcChannelStream::start() { thread->start(); } QGrpcChannelStream::~QGrpcChannelStream() { cancel(); thread->wait(); thread->deleteLater(); delete reader; } void QGrpcChannelStream::cancel() { // TODO: check thread safety context.TryCancel(); } QGrpcChannelCall::QGrpcChannelCall(grpc::Channel *channel, QLatin1StringView method, QByteArrayView data, QObject *parent) : QObject(parent) { grpc::ByteBuffer request = parseQByteArray(data); thread = QThread::create([this, request, channel, method = toStdString(method)] { grpc::ByteBuffer callResponse; QByteArray data; grpc::Status callStatus; callStatus = grpc::internal::BlockingUnaryCall( channel, grpc::internal::RpcMethod(method.c_str(), grpc::internal::RpcMethod::NORMAL_RPC), &context, request, &callResponse); if (!callStatus.ok()) { status = { static_cast(callStatus.error_code()), QString::fromStdString(callStatus.error_message()) }; return; // exit thread } callStatus = parseByteBuffer(callResponse, response); status = { static_cast(callStatus.error_code()), QString::fromStdString(callStatus.error_message()) }; }); connect(thread, &QThread::finished, this, &QGrpcChannelCall::finished); } void QGrpcChannelCall::start() { thread->start(); } QGrpcChannelCall::~QGrpcChannelCall() { cancel(); thread->wait(); thread->deleteLater(); } void QGrpcChannelCall::cancel() { // TODO: check thread safety context.TryCancel(); } void QGrpcChannelCall::waitForFinished(const QDeadlineTimer &deadline) { thread->wait(deadline); } QGrpcChannelPrivate::QGrpcChannelPrivate(const QUrl &url, QGrpcChannel::NativeGrpcChannelCredentials credentialsType, const QStringList &credentialsList) { switch (credentialsType) { case QGrpcChannel::InsecureChannelCredentials: m_credentials = grpc::InsecureChannelCredentials(); m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials); break; case QGrpcChannel::GoogleDefaultCredentials: m_credentials = grpc::GoogleDefaultCredentials(); m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials); break; case QGrpcChannel::SslDefaultCredentials: if (credentialsList.size() < 3) { m_credentials = grpc::SslCredentials(grpc::SslCredentialsOptions()); } else { grpc::SslCredentialsOptions options; options.pem_root_certs = credentialsList[0].toStdString(); options.pem_private_key = credentialsList[1].toStdString(); options.pem_cert_chain = credentialsList[2].toStdString(); m_credentials = grpc::SslCredentials(options); } m_channel = grpc::CreateChannel(url.toString().toStdString(), m_credentials); break; } } QGrpcChannelPrivate::~QGrpcChannelPrivate() = default; std::shared_ptr QGrpcChannelPrivate::call(QAbstractGrpcClient *client, QLatin1StringView method, QLatin1StringView service, QByteArrayView args) { const QByteArray rpcName = buildRpcName(service, method); std::shared_ptr reply(new QGrpcCallReply(client), [](QGrpcCallReply *reply) { reply->deleteLater(); }); QSharedPointer call(new QGrpcChannelCall(m_channel.get(), QLatin1StringView(rpcName), args, reply.get()), &QGrpcChannelCall::deleteLater); auto connection = std::make_shared(); auto abortConnection = std::make_shared(); *connection = QObject::connect(call.get(), &QGrpcChannelCall::finished, reply.get(), [call, reply, connection, abortConnection] { if (call->status.code() == QGrpcStatus::Ok) { reply->setData(call->response); reply->finished(); } else { reply->setData({}); reply->errorOccurred(call->status); } QObject::disconnect(*connection); QObject::disconnect(*abortConnection); }); *abortConnection = QObject::connect(reply.get(), &QGrpcCallReply::errorOccurred, call.get(), [call, connection, abortConnection](const QGrpcStatus &status) { if (status.code() == QGrpcStatus::Aborted) { QObject::disconnect(*connection); QObject::disconnect(*abortConnection); } }); call->start(); return reply; } QGrpcStatus QGrpcChannelPrivate::call(QLatin1StringView method, QLatin1StringView service, QByteArrayView args, QByteArray &ret) { const QByteArray rpcName = buildRpcName(service, method); QGrpcChannelCall call(m_channel.get(), QLatin1StringView(rpcName), args); call.start(); call.waitForFinished(); ret = call.response; return call.status; } void QGrpcChannelPrivate::stream(QGrpcStream *stream, QLatin1StringView service) { Q_ASSERT(stream != nullptr); const QByteArray rpcName = buildRpcName(service, stream->method()); QSharedPointer sub(new QGrpcChannelStream(m_channel.get(), QLatin1StringView(rpcName), stream->arg(), stream), &QGrpcChannelStream::deleteLater); auto abortConnection = std::make_shared(); auto readConnection = std::make_shared(); auto connection = std::make_shared(); auto disconnectAllConnections = [abortConnection, readConnection, connection]() { QObject::disconnect(*connection); QObject::disconnect(*readConnection); QObject::disconnect(*abortConnection); }; *readConnection = QObject::connect(sub.get(), &QGrpcChannelStream::dataReady, stream, [stream](QByteArrayView data) { stream->handler(data.toByteArray()); }); *connection = QObject::connect(sub.get(), &QGrpcChannelStream::finished, stream, [disconnectAllConnections, sub, stream] { qGrpcDebug() << "Stream ended with server closing connection"; disconnectAllConnections(); if (sub->status.code() != QGrpcStatus::Ok) stream->errorOccurred(sub->status); stream->finished(); }); *abortConnection = QObject::connect(stream, &QGrpcStream::finished, sub.get(), [disconnectAllConnections, sub] { qGrpcDebug() << "Stream client was finished"; disconnectAllConnections(); sub->cancel(); }); sub->start(); } /*! QGrpcChannel constructs QGrpcChannel with \a url, \a credentialsType and \a credentialsList object. */ QGrpcChannel::QGrpcChannel(const QUrl &url, NativeGrpcChannelCredentials credentialsType, const QStringList &credentialsList) : QAbstractGrpcChannel(), dPtr(std::make_unique(url, credentialsType, credentialsList)) { } QGrpcChannel::QGrpcChannel(const QUrl &url, NativeGrpcChannelCredentials credentialsType) : QGrpcChannel(std::move(url), credentialsType, QStringList()) { } QGrpcChannel::~QGrpcChannel() = default; QGrpcStatus QGrpcChannel::call(QLatin1StringView method, QLatin1StringView service, QByteArrayView args, QByteArray &ret) { return dPtr->call(method, service, args, ret); } std::shared_ptr QGrpcChannel::call(QAbstractGrpcClient *client, QLatin1StringView method, QLatin1StringView service, QByteArrayView args) { return dPtr->call(client, method, service, args); } void QGrpcChannel::stream(QGrpcStream *stream, QLatin1StringView service) { dPtr->stream(stream, service); } std::shared_ptr QGrpcChannel::serializer() const { // TODO: make selection based on credentials or channel settings return std::make_shared(); } QT_END_NAMESPACE