bufferpool2: handle transfer to closed connection

Handle transfer to closed connection properly. Also avoid spinlocking
during invalidation.

Bug: 139506073
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Merged-In: I9037ab0bde48a51b2b97158ecd0086dac84f8b26
Change-Id: I9037ab0bde48a51b2b97158ecd0086dac84f8b26
gugelfrei
Sungtak Lee 5 years ago committed by Wonsik Kim
parent 847eaf44dd
commit ccc32cb955

@ -151,6 +151,7 @@ ResultStatus Accessor::Impl::connect(
newConnection->initialize(accessor, id); newConnection->initialize(accessor, id);
*connection = newConnection; *connection = newConnection;
*pConnectionId = id; *pConnectionId = id;
mBufferPool.mConnectionIds.insert(id);
++sSeqId; ++sSeqId;
} }
} }
@ -305,7 +306,12 @@ bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &mes
found->second->mSenderValidated = true; found->second->mSenderValidated = true;
return true; return true;
} }
// TODO: verify there is target connection Id if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
// N.B: it could be fake or receive connection already closed.
ALOGD("bufferpool %p receiver connection %lld is no longer valid",
this, (long long)message.targetConnectionId);
return false;
}
mStats.onBufferSent(); mStats.onBufferSent();
mTransactions.insert(std::make_pair( mTransactions.insert(std::make_pair(
message.transactionId, message.transactionId,
@ -450,6 +456,7 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
} }
} }
} }
mConnectionIds.erase(connectionId);
return true; return true;
} }

@ -94,6 +94,7 @@ private:
std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers; std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
std::set<BufferId> mFreeBuffers; std::set<BufferId> mFreeBuffers;
std::set<ConnectionId> mConnectionIds;
/// Buffer pool statistics which tracks allocation and transfer statistics. /// Buffer pool statistics which tracks allocation and transfer statistics.
struct Stats { struct Stats {

@ -163,6 +163,7 @@ ResultStatus Accessor::Impl::connect(
*connection = newConnection; *connection = newConnection;
*pConnectionId = id; *pConnectionId = id;
*pMsgId = mBufferPool.mInvalidation.mInvalidationId; *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
mBufferPool.mConnectionIds.insert(id);
mBufferPool.mInvalidationChannel.getDesc(invDescPtr); mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
mBufferPool.mInvalidation.onConnect(id, observer); mBufferPool.mInvalidation.onConnect(id, observer);
++sSeqId; ++sSeqId;
@ -474,7 +475,12 @@ bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &mes
found->second->mSenderValidated = true; found->second->mSenderValidated = true;
return true; return true;
} }
// TODO: verify there is target connection Id if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
// N.B: it could be fake or receive connection already closed.
ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
this, (long long)message.targetConnectionId);
return false;
}
mStats.onBufferSent(); mStats.onBufferSent();
mTransactions.insert(std::make_pair( mTransactions.insert(std::make_pair(
message.transactionId, message.transactionId,
@ -644,6 +650,7 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
} }
} }
} }
mConnectionIds.erase(connectionId);
return true; return true;
} }
@ -774,11 +781,19 @@ void Accessor::Impl::invalidatorThread(
std::mutex &mutex, std::mutex &mutex,
std::condition_variable &cv, std::condition_variable &cv,
bool &ready) { bool &ready) {
constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
constexpr useconds_t MAX_SLEEP_US = 10000;
uint32_t numSpin = 0;
useconds_t sleepUs = 1;
while(true) { while(true) {
std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied; std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
if (!ready) { if (!ready) {
numSpin = 0;
sleepUs = 1;
cv.wait(lock); cv.wait(lock);
} }
copied.insert(accessors.begin(), accessors.end()); copied.insert(accessors.begin(), accessors.end());
@ -800,9 +815,20 @@ void Accessor::Impl::invalidatorThread(
if (accessors.size() == 0) { if (accessors.size() == 0) {
ready = false; ready = false;
} else { } else {
// prevent draining cpu. // TODO Use an efficient way to wait over FMQ.
// N.B. Since there is not a efficient way to wait over FMQ,
// polling over the FMQ is the current way to prevent draining
// CPU.
lock.unlock(); lock.unlock();
std::this_thread::yield(); ++numSpin;
if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
sleepUs < MAX_SLEEP_US) {
sleepUs *= 10;
}
if (numSpin % NUM_SPIN_TO_LOG == 0) {
ALOGW("invalidator thread spinning");
}
::usleep(sleepUs);
} }
} }
} }

@ -111,6 +111,7 @@ private:
std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers; std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
std::set<BufferId> mFreeBuffers; std::set<BufferId> mFreeBuffers;
std::set<ConnectionId> mConnectionIds;
struct Invalidation { struct Invalidation {
static std::atomic<std::uint32_t> sInvSeqId; static std::atomic<std::uint32_t> sInvSeqId;

Loading…
Cancel
Save