Merge "Codec2Utils: Cache all connections in DefaultBufferPoolSender"

gugelfrei
TreeHugger Robot 4 years ago committed by Android (Google) Code Review
commit 33873dec8b

@ -206,10 +206,23 @@ private:
std::mutex mMutex;
sp<ClientManager> mSenderManager;
sp<IClientManager> mReceiverManager;
int64_t mReceiverConnectionId;
int64_t mSourceConnectionId;
std::chrono::steady_clock::time_point mLastSent;
std::chrono::steady_clock::duration mRefreshInterval;
struct Connection {
int64_t receiverConnectionId;
std::chrono::steady_clock::time_point lastSent;
Connection(int64_t receiverConnectionId,
std::chrono::steady_clock::time_point lastSent)
: receiverConnectionId(receiverConnectionId),
lastSent(lastSent) {
}
};
// Map of connections.
//
// The key is the connection id. One sender-receiver pair may have multiple
// connections.
std::map<int64_t, Connection> mConnections;
};
// std::list<std::unique_ptr<C2Work>> -> WorkBundle

@ -969,8 +969,6 @@ DefaultBufferPoolSender::DefaultBufferPoolSender(
const sp<IClientManager>& receiverManager,
std::chrono::steady_clock::duration refreshInterval)
: mReceiverManager(receiverManager),
mSourceConnectionId(0),
mLastSent(std::chrono::steady_clock::now()),
mRefreshInterval(refreshInterval) {
}
@ -980,6 +978,7 @@ void DefaultBufferPoolSender::setReceiver(
std::lock_guard<std::mutex> lock(mMutex);
if (mReceiverManager != receiverManager) {
mReceiverManager = receiverManager;
mConnections.clear();
}
mRefreshInterval = refreshInterval;
}
@ -987,12 +986,16 @@ void DefaultBufferPoolSender::setReceiver(
ResultStatus DefaultBufferPoolSender::send(
const std::shared_ptr<BufferPoolData>& bpData,
BufferStatusMessage* bpMessage) {
int64_t connectionId = bpData->mConnectionId;
if (connectionId == 0) {
LOG(WARNING) << "registerSender -- invalid sender connection id (0).";
return ResultStatus::CRITICAL_ERROR;
}
std::lock_guard<std::mutex> lock(mMutex);
if (!mReceiverManager) {
LOG(ERROR) << "No access to receiver's BufferPool.";
return ResultStatus::NOT_FOUND;
}
ResultStatus rs;
std::lock_guard<std::mutex> lock(mMutex);
if (!mSenderManager) {
mSenderManager = ClientManager::getInstance();
if (!mSenderManager) {
@ -1000,52 +1003,61 @@ ResultStatus DefaultBufferPoolSender::send(
return ResultStatus::CRITICAL_ERROR;
}
}
int64_t connectionId = bpData->mConnectionId;
int64_t receiverConnectionId{0};
auto foundConnection = mConnections.find(connectionId);
bool isNewConnection = foundConnection == mConnections.end();
std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
std::chrono::steady_clock::duration interval = now - mLastSent;
if (mSourceConnectionId == 0 ||
mSourceConnectionId != connectionId ||
interval > mRefreshInterval) {
if (isNewConnection ||
(now - foundConnection->second.lastSent > mRefreshInterval)) {
// Initialize the bufferpool connection.
mSourceConnectionId = connectionId;
if (mSourceConnectionId == 0) {
return ResultStatus::CRITICAL_ERROR;
}
int64_t receiverConnectionId;
rs = mSenderManager->registerSender(mReceiverManager,
connectionId,
&receiverConnectionId);
ResultStatus rs =
mSenderManager->registerSender(mReceiverManager,
connectionId,
&receiverConnectionId);
if ((rs != ResultStatus::OK) && (rs != ResultStatus::ALREADY_EXISTS)) {
LOG(WARNING) << "registerSender -- returned error: "
<< static_cast<int32_t>(rs)
<< ".";
return rs;
} else if (receiverConnectionId == 0) {
LOG(WARNING) << "registerSender -- "
"invalid receiver connection id (0).";
return ResultStatus::CRITICAL_ERROR;
} else {
mReceiverConnectionId = receiverConnectionId;
if (isNewConnection) {
foundConnection = mConnections.try_emplace(
connectionId, receiverConnectionId, now).first;
} else {
foundConnection->second.receiverConnectionId = receiverConnectionId;
}
}
} else {
receiverConnectionId = foundConnection->second.receiverConnectionId;
}
uint64_t transactionId;
int64_t timestampUs;
rs = mSenderManager->postSend(
mReceiverConnectionId, bpData, &transactionId, &timestampUs);
ResultStatus rs = mSenderManager->postSend(
receiverConnectionId, bpData, &transactionId, &timestampUs);
if (rs != ResultStatus::OK) {
LOG(ERROR) << "ClientManager::postSend -- returned error: "
<< static_cast<int32_t>(rs)
<< ".";
mConnections.erase(foundConnection);
return rs;
}
if (!bpMessage) {
LOG(ERROR) << "Null output parameter for BufferStatusMessage.";
mConnections.erase(foundConnection);
return ResultStatus::CRITICAL_ERROR;
}
bpMessage->connectionId = mReceiverConnectionId;
bpMessage->connectionId = receiverConnectionId;
bpMessage->bufferId = bpData->mId;
bpMessage->transactionId = transactionId;
bpMessage->timestampUs = timestampUs;
mLastSent = now;
foundConnection->second.lastSent = now;
return rs;
}

Loading…
Cancel
Save