// Copyright (C) 2022 The Qt Company Ltd. // Copyright (C) 2019 Alexey Edelev // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only #include #include #include #include #include #include #include #include #include #include using namespace Qt::Literals::StringLiterals; using namespace qtgrpc::tests; class QtGrpcClientServerStreamTest : public GrpcClientTestBase { Q_OBJECT public: QtGrpcClientServerStreamTest() : GrpcClientTestBase( Channels{ GrpcClientTestBase::Channel::Qt }) { } private Q_SLOTS: void valid(); void cancel(); void deferredCancel(); void hugeBlob(); void getAsyncReply(); void multipleStreams(); void multipleStreamsCancel(); void inThread(); void cancelWhileErrorTimeout(); void deadline_data(); void deadline(); }; void QtGrpcClientServerStreamTest::valid() { const int ExpectedMessageCount = 4; SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); QSignalSpy messageReceivedSpy(stream.get(), &QGrpcServerStream::messageReceived); QVERIFY(messageReceivedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&result, stream] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); }); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamFinishedSpy.count(), 1, MessageLatencyWithThreshold * ExpectedMessageCount); QCOMPARE(streamErrorSpy.count(), 0); QCOMPARE(messageReceivedSpy.count(), ExpectedMessageCount); QCOMPARE_EQ(result.testFieldString(), "Stream1Stream2Stream3Stream4"); } void QtGrpcClientServerStreamTest::cancel() { const int ExpectedMessageCount = 2; SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); int i = 0; QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); if (++i == ExpectedMessageCount) stream->cancel(); }); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamErrorSpy.count(), 1, MessageLatencyWithThreshold * ExpectedMessageCount); QCOMPARE(streamFinishedSpy.count(), 0); QCOMPARE(i, 2); QCOMPARE_EQ(result.testFieldString(), "Stream1Stream2"); } void QtGrpcClientServerStreamTest::deferredCancel() { const int ExpectedMessageCount = 3; SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QSignalSpy messageReceivedSpy(stream.get(), &QGrpcServerStream::messageReceived); QVERIFY(messageReceivedSpy.isValid()); int i = 0; QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); if (++i == ExpectedMessageCount) QTimer::singleShot(MessageLatencyThreshold, stream.get(), &QGrpcServerStream::cancel); }); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamErrorSpy.count(), 1, MessageLatencyWithThreshold * ExpectedMessageCount); QCOMPARE(streamFinishedSpy.count(), 0); QCOMPARE(messageReceivedSpy.count(), ExpectedMessageCount); QCOMPARE_EQ(result.testFieldString(), "Stream1Stream2Stream3"); } void QtGrpcClientServerStreamTest::hugeBlob() { BlobMessage result; BlobMessage request; QFile testFile(":/assets/testfile"); QVERIFY(testFile.open(QFile::ReadOnly)); request.setTestBytes(testFile.readAll()); QByteArray dataHash = QCryptographicHash::hash(request.testBytes(), QCryptographicHash::Sha256); auto stream = client()->testMethodBlobServerStream(request); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&result, stream] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestBytes(ret->testBytes()); }); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamFinishedSpy.count(), 1, MessageLatencyWithThreshold); QCOMPARE_EQ(streamErrorSpy.count(), 0); QVERIFY(!result.testBytes().isEmpty()); QByteArray returnDataHash = QCryptographicHash::hash(result.testBytes(), QCryptographicHash::Sha256); QCOMPARE_EQ(returnDataHash, dataHash); } void QtGrpcClientServerStreamTest::getAsyncReply() { SimpleStringMessage request; request.setTestFieldString("Some status message"); QGrpcStatus::StatusCode asyncStatus; QString statusMessage; auto reply = client()->testMethodStatusMessage(request); bool finishedCalled = false; reply->subscribe( this, [&finishedCalled] { finishedCalled = true; }, [&asyncStatus, &statusMessage](const QGrpcStatus &status) { asyncStatus = status.code(); statusMessage = status.message(); }); QTRY_COMPARE_WITH_TIMEOUT(statusMessage, request.testFieldString(), MessageLatencyWithThreshold); QVERIFY(finishedCalled); SimpleStringMessage result; request.setTestFieldString("Hello Qt!"); reply = client()->testMethod(request); reply->subscribe(this, [reply, &result] { const auto ret = reply->read(); QVERIFY(ret.has_value()); result = *ret; }); QTRY_COMPARE_WITH_TIMEOUT(result.testFieldString(), request.testFieldString(), MessageLatencyWithThreshold); result.setTestFieldString(""); request.setTestFieldString("Hello Qt1!"); reply = client()->testMethod(request); reply->subscribe(this, [reply, &result] { const auto ret = reply->read(); QVERIFY(ret.has_value()); result = *ret; }, [] { QVERIFY(false); } ); QTRY_COMPARE_WITH_TIMEOUT(result.testFieldString(), request.testFieldString(), MessageLatencyWithThreshold); } void QtGrpcClientServerStreamTest::multipleStreams() { const int ExpectedMessageCount = 4; SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); // Ensure we're not reusing streams QCOMPARE_NE(stream, client()->testMethodServerStream(request)); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QSignalSpy steamMessageRecievedSpy(stream.get(), &QGrpcServerStream::messageReceived); QVERIFY(steamMessageRecievedSpy.isValid()); QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&result, stream] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); }); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamFinishedSpy.count(), 1, MessageLatencyWithThreshold * ExpectedMessageCount); QCOMPARE_EQ(streamErrorSpy.count(), 0); QCOMPARE_EQ(steamMessageRecievedSpy.count(), ExpectedMessageCount); QCOMPARE_EQ(result.testFieldString(), "Stream1Stream2Stream3Stream4"); } void QtGrpcClientServerStreamTest::multipleStreamsCancel() { SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); auto streamNext = client()->testMethodServerStream(request); QCOMPARE_NE(stream, streamNext); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QSignalSpy streamNextFinishedSpy(streamNext.get(), &QGrpcServerStream::finished); QVERIFY(streamNextFinishedSpy.isValid()); QSignalSpy streamNextErrorSpy(streamNext.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamNextErrorSpy.isValid()); streamNext->cancel(); QCOMPARE(streamFinishedSpy.count(), 0); QCOMPARE(streamErrorSpy.count(), 0); QCOMPARE(streamNextFinishedSpy.count(), 0); QCOMPARE(streamNextErrorSpy.count(), 1); stream = client()->testMethodServerStream(request); QCOMPARE_NE(stream, streamNext); streamNext = client()->testMethodServerStream(request); QCOMPARE_NE(stream, streamNext); QSignalSpy otherStreamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy otherStreamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(otherStreamErrorSpy.isValid()); QSignalSpy otherStreamNextFinishedSpy(streamNext.get(), &QGrpcServerStream::finished); QVERIFY(streamNextFinishedSpy.isValid()); QSignalSpy otherStreamNextErrorSpy(streamNext.get(), &QGrpcServerStream::errorOccurred); QVERIFY(otherStreamNextErrorSpy.isValid()); stream->cancel(); QCOMPARE(otherStreamFinishedSpy.count(), 0); QCOMPARE_EQ(otherStreamErrorSpy.count(), 1); QCOMPARE(otherStreamNextFinishedSpy.count(), 0); QCOMPARE_EQ(otherStreamNextErrorSpy.count(), 0); } void QtGrpcClientServerStreamTest::inThread() { SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); QSignalSpy clientErrorSpy(client().get(), &TestService::Client::errorOccurred); QVERIFY(clientErrorSpy.isValid()); int i = 0; const std::unique_ptr thread(QThread::create([&] { QEventLoop waiter; auto stream = client()->testMethodServerStream(request); QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, &waiter, [&result, &i, &waiter, stream] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); if (++i == 4) waiter.quit(); }); waiter.exec(); })); thread->start(); QTRY_COMPARE_EQ_WITH_TIMEOUT(clientErrorSpy.count(), 1, FailTimeout); QTRY_VERIFY(result.testFieldString().isEmpty()); QTRY_VERIFY( qvariant_cast(clientErrorSpy.at(0).first()) .message() .startsWith( "QGrpcClientBase::startStream is called from a " "different thread.")); } void QtGrpcClientServerStreamTest::cancelWhileErrorTimeout() { SimpleStringMessage result; SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); stream->cancel(); stream.reset(); QTRY_COMPARE_EQ_WITH_TIMEOUT(streamErrorSpy.count(), 1, MessageLatencyWithThreshold); QCOMPARE_EQ(streamFinishedSpy.count(), 0); } void QtGrpcClientServerStreamTest::deadline_data() { const int ExpectedMessageCount = 4; QTest::addColumn("timeout"); QTest::addColumn("ExpectedMessageCount"); constexpr std::array messageLatencyFractions{ 0.7, 0.9, 1.0, 1.3 }; for (const auto &fraction : messageLatencyFractions) QTest::newRow(QString("MessageLatency * ExpectedMessageCount * %1") .arg(fraction) .toStdString() .c_str()) << QGrpcDuration( static_cast((MessageLatency * fraction * ExpectedMessageCount))) << ExpectedMessageCount; } void QtGrpcClientServerStreamTest::deadline() { QFETCH(const QGrpcDuration, timeout); QFETCH(const int, ExpectedMessageCount); QGrpcCallOptions opt; opt.setDeadline(timeout); SimpleStringMessage request; request.setTestFieldString("Stream"); auto stream = client()->testMethodServerStream(request, opt); QSignalSpy streamErrorSpy(stream.get(), &QGrpcServerStream::errorOccurred); QVERIFY(streamErrorSpy.isValid()); QSignalSpy streamFinishedSpy(stream.get(), &QGrpcServerStream::finished); QVERIFY(streamFinishedSpy.isValid()); SimpleStringMessage result; QObject::connect(stream.get(), &QGrpcServerStream::messageReceived, this, [&result, stream] { const auto ret = stream->read(); QVERIFY(ret.has_value()); result.setTestFieldString(result.testFieldString() + ret->testFieldString()); }); if (timeout.count() < MessageLatency * ExpectedMessageCount) { QTRY_COMPARE_EQ_WITH_TIMEOUT(streamErrorSpy.count(), 1, FailTimeout); const auto code = qvariant_cast(streamErrorSpy.at(0).first()).code(); // Really low timeout can trigger before service becomes available QVERIFY(code == QGrpcStatus::StatusCode::Cancelled || code == QGrpcStatus::StatusCode::Unavailable); } else if (timeout.count() >= MessageLatencyWithThreshold * ExpectedMessageCount) { QTRY_COMPARE_EQ_WITH_TIMEOUT(streamFinishedSpy.count(), 1, MessageLatencyWithThreshold * ExpectedMessageCount); QCOMPARE(streamErrorSpy.count(), 0); QCOMPARE_EQ(result.testFieldString(), "Stream1Stream2Stream3Stream4"); } else { // Because we're can't be sure about the result, // cancel the stream, that might affect other tests. stream->cancel(); } } QTEST_MAIN(QtGrpcClientServerStreamTest) #include "tst_grpc_client_serverstream.moc"