// Copyright (C) 2022 The Qt Company Ltd. // Copyright (C) 2019 Giulio Girardi // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only #include "qgrpcchannel.h" #include "qgrpcchannel_p.h" #include "qgrpcchanneloperation.h" #include "qabstractgrpcclient.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if QT_CONFIG(ssl) # include #endif QT_BEGIN_NAMESPACE using namespace Qt::StringLiterals; /*! \class QGrpcChannel \inmodule QtGrpc \brief The QGrpcHttp2Channel class is an HTTP/2 implementation of QAbstractGrpcChannel, based on the reference gRPC C++ API. QGrpcChannel accepts the same grpc::ChannelCredentials type that is required by native-api grpc::CreateChannel. \sa{https://grpc.github.io/grpc/cpp/classgrpc_1_1_channel_credentials.html}{gRPC ChannelCredentials}. */ static grpc::Status parseByteBuffer(const grpc::ByteBuffer &buffer, QByteArray &data) { std::vector slices; auto status = buffer.Dump(&slices); if (!status.ok()) return status; for (const auto &slice : slices) data.append((const char *)slice.begin(), slice.size()); return grpc::Status::OK; } static grpc::ByteBuffer parseQByteArray(QByteArrayView bytearray) { grpc::ByteBuffer buffer; grpc::Slice slice(bytearray.data(), bytearray.size()); grpc::ByteBuffer tmp(&slice, 1); buffer.Swap(&tmp); return buffer; } static std::string toStdString(QLatin1StringView view) { return std::string(view.data(), view.size()); } static QByteArray buildRpcName(QLatin1StringView service, QLatin1StringView method) { return '/' % QByteArrayView(service) % '/' % QByteArrayView(method); } QGrpcChannelStream::QGrpcChannelStream(grpc::Channel *channel, QLatin1StringView method, QByteArrayView data) { grpc::ByteBuffer request = parseQByteArray(data); reader = grpc::internal::ClientReaderFactory::Create( channel, grpc::internal::RpcMethod(toStdString(method).c_str(), grpc::internal::RpcMethod::SERVER_STREAMING), &context, request); thread = QThread::create([this] { grpc::ByteBuffer response; grpc::Status parseStatus; while (reader->Read(&response)) { QByteArray data; parseStatus = parseByteBuffer(response, data); if (!parseStatus.ok()) { status = { static_cast(parseStatus.error_code()), QString::fromStdString(parseStatus.error_message()) }; return; // exit thread } emit dataReady(data); } parseStatus = reader->Finish(); status = { static_cast(parseStatus.error_code()), QString::fromStdString(parseStatus.error_message()) }; }); connect(thread, &QThread::finished, this, &QGrpcChannelStream::finished); } void QGrpcChannelStream::start() { thread->start(); } QGrpcChannelStream::~QGrpcChannelStream() { cancel(); thread->wait(); thread->deleteLater(); delete reader; } void QGrpcChannelStream::cancel() { // TODO: check thread safety context.TryCancel(); } QGrpcChannelCall::QGrpcChannelCall(grpc::Channel *channel, QLatin1StringView method, QByteArrayView data) { grpc::ByteBuffer request = parseQByteArray(data); thread = QThread::create([this, request, channel, method = toStdString(method)] { grpc::ByteBuffer callResponse; grpc::Status callStatus; callStatus = grpc::internal::BlockingUnaryCall( channel, grpc::internal::RpcMethod(method.c_str(), grpc::internal::RpcMethod::NORMAL_RPC), &context, request, &callResponse); if (!callStatus.ok()) { status = { static_cast(callStatus.error_code()), QString::fromStdString(callStatus.error_message()) }; return; // exit thread } callStatus = parseByteBuffer(callResponse, response); status = { static_cast(callStatus.error_code()), QString::fromStdString(callStatus.error_message()) }; }); connect(thread, &QThread::finished, this, &QGrpcChannelCall::finished); } void QGrpcChannelCall::start() { thread->start(); } QGrpcChannelCall::~QGrpcChannelCall() { cancel(); thread->wait(); thread->deleteLater(); } void QGrpcChannelCall::cancel() { // TODO: check thread safety context.TryCancel(); } void QGrpcChannelCall::waitForFinished(const QDeadlineTimer &deadline) { thread->wait(deadline); } QGrpcChannelPrivate::QGrpcChannelPrivate(const QGrpcChannelOptions &channelOptions, QGrpcChannel::NativeGrpcChannelCredentials credentialsType) { switch (credentialsType) { case QGrpcChannel::InsecureChannelCredentials: m_credentials = grpc::InsecureChannelCredentials(); m_channel = grpc::CreateChannel(channelOptions.host().toString().toStdString(), m_credentials); break; case QGrpcChannel::GoogleDefaultCredentials: m_credentials = grpc::GoogleDefaultCredentials(); m_channel = grpc::CreateChannel(channelOptions.host().toString().toStdString(), m_credentials); break; case QGrpcChannel::SslDefaultCredentials: #if QT_CONFIG(ssl) if (auto maybeSslConfig = channelOptions.sslConfiguration()) { grpc::SslCredentialsOptions options; auto accumulateSslCert = [](const std::string &lhs, const QSslCertificate &rhs) { return lhs + rhs.toPem().toStdString(); }; options.pem_root_certs = std::accumulate(maybeSslConfig->peerCertificateChain().begin(), maybeSslConfig->peerCertificateChain().end(), options.pem_root_certs, accumulateSslCert); options.pem_cert_chain = std::accumulate(maybeSslConfig->localCertificateChain().begin(), maybeSslConfig->localCertificateChain().end(), options.pem_cert_chain, accumulateSslCert); options.pem_private_key = maybeSslConfig->privateKey().toPem(); m_credentials = grpc::SslCredentials(options); } else { m_credentials = grpc::SslCredentials(grpc::SslCredentialsOptions()); } #else m_credentials = grpc::SslCredentials(grpc::SslCredentialsOptions()); #endif m_channel = grpc::CreateChannel(channelOptions.host().toString().toStdString(), m_credentials); break; } } QGrpcChannelPrivate::~QGrpcChannelPrivate() = default; void QGrpcChannelPrivate::call(std::shared_ptr channelOperation) { const QByteArray rpcName = buildRpcName(channelOperation->service(), channelOperation->method()); QSharedPointer call(new QGrpcChannelCall(m_channel.get(), QLatin1StringView(rpcName), channelOperation->argument())); auto connection = std::make_shared(); auto abortConnection = std::make_shared(); *connection = QObject::connect(call.get(), &QGrpcChannelCall::finished, channelOperation.get(), [call, channelOperation, connection, abortConnection] { QObject::disconnect(*connection); QObject::disconnect(*abortConnection); if (call->status == QGrpcStatus::Ok) { channelOperation->dataReady(call->response); } else { emit channelOperation->errorOccurred(call->status); } emit channelOperation->finished(); }); *abortConnection = QObject::connect(channelOperation.get(), &QGrpcChannelOperation::cancelled, call.get(), [connection, abortConnection]() { QObject::disconnect(*connection); QObject::disconnect(*abortConnection); }); call->start(); } void QGrpcChannelPrivate::startServerStream(std::shared_ptr channelOperation) { const QByteArray rpcName = buildRpcName(channelOperation->service(), channelOperation->method()); QSharedPointer sub(new QGrpcChannelStream(m_channel.get(), QLatin1StringView(rpcName), channelOperation->argument())); auto abortConnection = std::make_shared(); auto readConnection = std::make_shared(); auto connection = std::make_shared(); auto disconnectAllConnections = [abortConnection, readConnection, connection]() { QObject::disconnect(*connection); QObject::disconnect(*readConnection); QObject::disconnect(*abortConnection); }; *readConnection = QObject::connect(sub.get(), &QGrpcChannelStream::dataReady, channelOperation.get(), [channelOperation](QByteArrayView data) { channelOperation->dataReady(data.toByteArray()); }); *connection = QObject::connect(sub.get(), &QGrpcChannelStream::finished, channelOperation.get(), [disconnectAllConnections, sub, channelOperation] { qGrpcDebug() << "Stream ended with server closing connection"; disconnectAllConnections(); if (sub->status != QGrpcStatus::Ok) emit channelOperation->errorOccurred(sub->status); emit channelOperation->finished(); }); *abortConnection = QObject::connect(channelOperation.get(), &QGrpcChannelOperation::cancelled, sub.get(), [disconnectAllConnections, sub, channelOperation] { qGrpcDebug() << "Server stream was cancelled by client"; disconnectAllConnections(); sub->cancel(); }); sub->start(); } std::shared_ptr QGrpcChannelPrivate::serializer() const { // TODO: make selection based on credentials or channel settings return std::make_shared(); } /*! Constructs a gRPC channel, with \a options and \a credentialsType. */ QGrpcChannel::QGrpcChannel(const QGrpcChannelOptions &options, NativeGrpcChannelCredentials credentialsType) : QAbstractGrpcChannel(options), dPtr(std::make_unique(options, credentialsType)) { } /*! Destroys the QGrpcChannel object. */ QGrpcChannel::~QGrpcChannel() = default; /*! \internal Implementation of unary gRPC call based on the reference gRPC C++ API. */ void QGrpcChannel::call(std::shared_ptr channelOperation) { dPtr->call(std::move(channelOperation)); } /*! \internal Implementation of server-side gRPC stream based on the reference gRPC C++ API. */ void QGrpcChannel::startServerStream(std::shared_ptr channelOperation) { dPtr->startServerStream(std::move(channelOperation)); } /*! \internal Implementation of client-side gRPC stream based on the reference gRPC C++ API. */ void QGrpcChannel::startClientStream(std::shared_ptr channelOperation) { QTimer::singleShot(0, channelOperation.get(), [channelOperation] { emit channelOperation->errorOccurred( { QGrpcStatus::Unknown, "Client-side streaming support is not implemented in QGrpcChannel"_L1 }); }); } /*! \internal Implementation of bidirectional gRPC stream based on the reference gRPC C++ API. */ void QGrpcChannel::startBidirStream(std::shared_ptr channelOperation) { QTimer::singleShot(0, channelOperation.get(), [channelOperation] { emit channelOperation->errorOccurred( { QGrpcStatus::Unknown, "Bidirectional streaming support is not implemented in QGrpcChannel"_L1 }); }); } /*! Returns the newly created QProtobufSerializer shared pointer. */ std::shared_ptr QGrpcChannel::serializer() const { return dPtr->serializer(); } QT_END_NAMESPACE