From c7f9e2c21af39156e7111d6d2404f01a56587810 Mon Sep 17 00:00:00 2001 From: Sungtak Lee Date: Fri, 14 Sep 2018 16:23:40 -0700 Subject: [PATCH] bufferpool2.0: Implement buffer invalidation Change-Id: If7a4a38004f50b4d43a2fae4781f541fe322c249 --- media/bufferpool/2.0/Accessor.cpp | 32 +- media/bufferpool/2.0/Accessor.h | 21 +- media/bufferpool/2.0/AccessorImpl.cpp | 298 +++++++++++++++++- media/bufferpool/2.0/AccessorImpl.h | 101 +++++- media/bufferpool/2.0/Android.bp | 1 + media/bufferpool/2.0/BufferPoolClient.cpp | 238 +++++++++++--- media/bufferpool/2.0/BufferPoolClient.h | 11 +- media/bufferpool/2.0/BufferStatus.cpp | 113 +++++++ media/bufferpool/2.0/BufferStatus.h | 101 +++++- media/bufferpool/2.0/ClientManager.cpp | 43 ++- media/bufferpool/2.0/Connection.cpp | 7 + media/bufferpool/2.0/Connection.h | 5 + media/bufferpool/2.0/Observer.cpp | 73 +++++ media/bufferpool/2.0/Observer.h | 67 ++++ .../2.0/include/bufferpool/ClientManager.h | 12 + 15 files changed, 1032 insertions(+), 91 deletions(-) create mode 100644 media/bufferpool/2.0/Observer.cpp create mode 100644 media/bufferpool/2.0/Observer.h diff --git a/media/bufferpool/2.0/Accessor.cpp b/media/bufferpool/2.0/Accessor.cpp index 3eaea7c64f..f26450153a 100644 --- a/media/bufferpool/2.0/Accessor.cpp +++ b/media/bufferpool/2.0/Accessor.cpp @@ -117,19 +117,18 @@ sp Accessor::getConnectionDeathRecipient() { Return Accessor::connect( const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer, connect_cb _hidl_cb) { - (void)observer; sp connection; ConnectionId connectionId; + uint32_t msgId; const StatusDescriptor* fmqDesc; + const InvalidationDescriptor* invDesc; - ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false); + ResultStatus status = connect( + observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc); if (status == ResultStatus::OK) { - _hidl_cb(status, connection, connectionId, *fmqDesc, - android::hardware::MQDescriptorUnsync( - std::vector(), - nullptr /* nhandle */, 0 /* size */)); + _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc); } else { - _hidl_cb(status, nullptr, -1LL, + _hidl_cb(status, nullptr, -1LL, 0, android::hardware::MQDescriptorSync( std::vector(), nullptr /* nhandle */, 0 /* size */), @@ -147,7 +146,15 @@ Accessor::~Accessor() { } bool Accessor::isValid() { - return (bool)mImpl; + return (bool)mImpl && mImpl->isValid(); +} + +ResultStatus Accessor::flush() { + if (mImpl) { + mImpl->flush(); + return ResultStatus::OK; + } + return ResultStatus::CRITICAL_ERROR; } ResultStatus Accessor::allocate( @@ -170,10 +177,15 @@ ResultStatus Accessor::fetch( } ResultStatus Accessor::connect( + const sp &observer, bool local, sp *connection, ConnectionId *pConnectionId, - const StatusDescriptor** fmqDescPtr, bool local) { + uint32_t *pMsgId, + const StatusDescriptor** statusDescPtr, + const InvalidationDescriptor** invDescPtr) { if (mImpl) { - ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr); + ResultStatus status = mImpl->connect( + this, observer, connection, pConnectionId, pMsgId, + statusDescPtr, invDescPtr); if (!local && status == ResultStatus::OK) { sp accessor(this); sConnectionDeathRecipient->add(*pConnectionId, accessor); diff --git a/media/bufferpool/2.0/Accessor.h b/media/bufferpool/2.0/Accessor.h index a718da1db9..4b5b17a2c8 100644 --- a/media/bufferpool/2.0/Accessor.h +++ b/media/bufferpool/2.0/Accessor.h @@ -95,6 +95,9 @@ struct Accessor : public IAccessor { /** Returns whether the accessor is valid. */ bool isValid(); + /** Invalidates all buffers which are owned by bufferpool */ + ResultStatus flush(); + /** Allocates a buffer from a buffer pool. * * @param connectionId the connection id of the client. @@ -135,20 +138,28 @@ struct Accessor : public IAccessor { * created connection in order to communicate with the buffer pool. An * FMQ for buffer status message is also created for the client. * + * @param observer client observer for buffer invalidation + * @param local true when a connection request comes from local process, + * false otherwise. * @param connection created connection * @param pConnectionId the id of the created connection - * @param fmqDescPtr FMQ descriptor for shared buffer status message + * @param pMsgId the id of the recent buffer pool message + * @param statusDescPtr FMQ descriptor for shared buffer status message * queue between a buffer pool and the client. - * @param local true when a connection request comes from local process, - * false otherwise. + * @param invDescPtr FMQ descriptor for buffer invalidation message + * queue from a buffer pool to the client. * * @return OK when a connection is successfully made. * NO_MEMORY when there is no memory. * CRITICAL_ERROR otherwise. */ ResultStatus connect( + const sp& observer, + bool local, sp *connection, ConnectionId *pConnectionId, - const StatusDescriptor** fmqDescPtr, bool local); + uint32_t *pMsgId, + const StatusDescriptor** statusDescPtr, + const InvalidationDescriptor** invDescPtr); /** * Closes the specified connection to the client. @@ -176,7 +187,7 @@ struct Accessor : public IAccessor { private: class Impl; - std::unique_ptr mImpl; + std::shared_ptr mImpl; }; } // namespace implementation diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp index 0ba6600e18..4cc8abc623 100644 --- a/media/bufferpool/2.0/AccessorImpl.cpp +++ b/media/bufferpool/2.0/AccessorImpl.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "AccessorImpl.h" #include "Connection.h" @@ -47,6 +48,7 @@ struct InternalBuffer { const std::shared_ptr mAllocation; const size_t mAllocSize; const std::vector mConfig; + bool mInvalidated; InternalBuffer( BufferId id, @@ -54,11 +56,16 @@ struct InternalBuffer { const size_t allocSize, const std::vector &allocConfig) : mId(id), mOwnerCount(0), mTransactionCount(0), - mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {} + mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig), + mInvalidated(false) {} const native_handle_t *handle() { return mAllocation->handle(); } + + void invalidate() { + mInvalidated = true; + } }; struct TransactionStatus { @@ -138,21 +145,29 @@ Accessor::Impl::~Impl() { } ResultStatus Accessor::Impl::connect( - const sp &accessor, sp *connection, - ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) { + const sp &accessor, const sp &observer, + sp *connection, + ConnectionId *pConnectionId, + uint32_t *pMsgId, + const StatusDescriptor** statusDescPtr, + const InvalidationDescriptor** invDescPtr) { sp newConnection = new Connection(); ResultStatus status = ResultStatus::CRITICAL_ERROR; { std::lock_guard lock(mBufferPool.mMutex); if (newConnection) { ConnectionId id = (int64_t)sPid << 32 | sSeqId; - status = mBufferPool.mObserver.open(id, fmqDescPtr); + status = mBufferPool.mObserver.open(id, statusDescPtr); if (status == ResultStatus::OK) { newConnection->initialize(accessor, id); *connection = newConnection; *pConnectionId = id; + *pMsgId = mBufferPool.mInvalidation.mInvalidationId; + mBufferPool.mInvalidationChannel.getDesc(invDescPtr); + mBufferPool.mInvalidation.onConnect(id, observer); ++sSeqId; } + } mBufferPool.processStatusMessages(); mBufferPool.cleanUp(); @@ -165,6 +180,7 @@ ResultStatus Accessor::Impl::close(ConnectionId connectionId) { mBufferPool.processStatusMessages(); mBufferPool.handleClose(connectionId); mBufferPool.mObserver.close(connectionId); + mBufferPool.mInvalidation.onClose(connectionId); // Since close# will be called after all works are finished, it is OK to // evict unused buffers. mBufferPool.cleanUp(true); @@ -229,11 +245,30 @@ void Accessor::Impl::cleanUp(bool clearCache) { mBufferPool.cleanUp(clearCache); } +void Accessor::Impl::flush() { + std::lock_guard lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + mBufferPool.flush(shared_from_this()); +} + +void Accessor::Impl::handleInvalidateAck() { + std::lock_guard lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + mBufferPool.mInvalidation.onHandleAck(); +} + +bool Accessor::Impl::isValid() { + return mBufferPool.isValid(); +} + Accessor::Impl::Impl::BufferPool::BufferPool() : mTimestampUs(getTimestampNow()), mLastCleanUpUs(mTimestampUs), mLastLogUs(mTimestampUs), - mSeq(0) {} + mSeq(0), + mStartSeq(0) { + mValid = mInvalidationChannel.isValid(); +} // Statistics helper @@ -242,6 +277,8 @@ int percentage(T base, S total) { return int(total ? 0.5 + 100. * static_cast(base) / total : 0); } +std::atomic Accessor::Impl::BufferPool::Invalidation::sSeqId(0); + Accessor::Impl::Impl::BufferPool::~BufferPool() { std::lock_guard lock(mMutex); ALOGD("Destruction - bufferpool %p " @@ -255,6 +292,96 @@ Accessor::Impl::Impl::BufferPool::~BufferPool() { percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); } +void Accessor::Impl::BufferPool::Invalidation::onConnect( + ConnectionId conId, const sp& observer) { + mAcks[conId] = mInvalidationId; // starts from current invalidationId + mObservers.insert(std::make_pair(conId, observer)); +} + +void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) { + mAcks.erase(conId); + mObservers.erase(conId); +} + +void Accessor::Impl::BufferPool::Invalidation::onAck( + ConnectionId conId, + uint32_t msgId) { + auto it = mAcks.find(conId); + if (it == mAcks.end() || isMessageLater(msgId, it->second)) { + mAcks[conId] = msgId; + } +} + +void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated( + BufferId bufferId, + BufferInvalidationChannel &channel) { + for (auto it = mPendings.begin(); it != mPendings.end();) { + if (it->invalidate(bufferId)) { + it = mPendings.erase(it); + uint32_t msgId = 0; + if (it->mNeedsAck) { + msgId = ++mInvalidationId; + if (msgId == 0) { + // wrap happens + msgId = ++mInvalidationId; + } + } + channel.postInvalidation(msgId, it->mFrom, it->mTo); + sInvalidator.addAccessor(mId, it->mImpl); + continue; + } + ++it; + } +} + +void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest( + bool needsAck, + uint32_t from, + uint32_t to, + size_t left, + BufferInvalidationChannel &channel, + const std::shared_ptr &impl) { + if (left == 0) { + uint32_t msgId = 0; + if (needsAck) { + msgId = ++mInvalidationId; + if (msgId == 0) { + // wrap happens + msgId = ++mInvalidationId; + } + } + channel.postInvalidation(msgId, from, to); + sInvalidator.addAccessor(mId, impl); + } else { + // TODO: sending hint message? + Pending pending(needsAck, from, to, left, impl); + mPendings.push_back(pending); + } +} + +void Accessor::Impl::BufferPool::Invalidation::onHandleAck() { + if (mInvalidationId != 0) { + std::set deads; + for (auto it = mAcks.begin(); it != mAcks.end(); ++it) { + if (it->second != mInvalidationId) { + const sp observer = mObservers[it->first].promote(); + if (observer) { + observer->onMessage(it->first, mInvalidationId); + } else { + deads.insert(it->first); + } + } + } + if (deads.size() > 0) { + for (auto it = deads.begin(); it != deads.end(); ++it) { + onClose(*it); + } + } + } + // All invalidation Ids are synced. + sInvalidator.delAccessor(mId); +} + bool Accessor::Impl::BufferPool::handleOwnBuffer( ConnectionId connectionId, BufferId bufferId) { @@ -275,8 +402,15 @@ bool Accessor::Impl::BufferPool::handleReleaseBuffer( iter->second->mOwnerCount--; if (iter->second->mOwnerCount == 0 && iter->second->mTransactionCount == 0) { - mStats.onBufferUnused(iter->second->mAllocSize); - mFreeBuffers.insert(bufferId); + if (!iter->second->mInvalidated) { + mStats.onBufferUnused(iter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } else { + mStats.onBufferUnused(iter->second->mAllocSize); + mStats.onBufferEvicted(iter->second->mAllocSize); + mBuffers.erase(iter); + mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); + } } } erase(&mUsingConnections, bufferId, connectionId); @@ -352,8 +486,15 @@ bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage bufferIter->second->mTransactionCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { - mStats.onBufferUnused(bufferIter->second->mAllocSize); - mFreeBuffers.insert(message.bufferId); + if (!bufferIter->second->mInvalidated) { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(message.bufferId); + } else { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mStats.onBufferEvicted(bufferIter->second->mAllocSize); + mBuffers.erase(bufferIter); + mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel); + } } mTransactions.erase(found); } @@ -400,7 +541,7 @@ void Accessor::Impl::BufferPool::processStatusMessages() { ret = handleTransferResult(message); break; case BufferStatus::INVALIDATION_ACK: - // TODO + mInvalidation.onAck(message.connectionId, message.bufferId); break; } if (ret == false) { @@ -423,8 +564,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail - mStats.onBufferUnused(bufferIter->second->mAllocSize); - mFreeBuffers.insert(bufferId); + if (!bufferIter->second->mInvalidated) { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } else { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mStats.onBufferEvicted(bufferIter->second->mAllocSize); + mBuffers.erase(bufferIter); + mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); + } } } } @@ -446,8 +594,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail - mStats.onBufferUnused(bufferIter->second->mAllocSize); - mFreeBuffers.insert(bufferId); + if (!bufferIter->second->mInvalidated) { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } else { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mStats.onBufferEvicted(bufferIter->second->mAllocSize); + mBuffers.erase(bufferIter); + mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); + } } mTransactions.erase(iter); } @@ -538,6 +693,121 @@ void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { } } +void Accessor::Impl::BufferPool::invalidate( + bool needsAck, BufferId from, BufferId to, + const std::shared_ptr &impl) { + for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { + if (isBufferInRange(from, to, *freeIt)) { + auto it = mBuffers.find(*freeIt); + if (it != mBuffers.end() && + it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { + mStats.onBufferEvicted(it->second->mAllocSize); + mBuffers.erase(it); + freeIt = mFreeBuffers.erase(freeIt); + continue; + } else { + ALOGW("bufferpool inconsistent!"); + } + } + ++freeIt; + } + + size_t left = 0; + for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) { + if (isBufferInRange(from, to, it->first)) { + it->second->invalidate(); + ++left; + } + } + mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl); +} + +void Accessor::Impl::BufferPool::flush(const std::shared_ptr &impl) { + BufferId from = mStartSeq; + BufferId to = mSeq; + mStartSeq = mSeq; + // TODO: needsAck params + if (from != to) { + invalidate(true, from, to, impl); + } +} + +void Accessor::Impl::invalidatorThread( + std::map> &accessors, + std::mutex &mutex, + std::condition_variable &cv, + bool &ready) { + while(true) { + std::map> copied; + { + std::unique_lock lock(mutex); + if (!ready) { + cv.wait(lock); + } + copied.insert(accessors.begin(), accessors.end()); + } + std::list erased; + for (auto it = copied.begin(); it != copied.end(); ++it) { + const std::shared_ptr impl = it->second.lock(); + if (!impl) { + erased.push_back(it->first); + } else { + impl->handleInvalidateAck(); + } + } + { + std::unique_lock lock(mutex); + for (auto it = erased.begin(); it != erased.end(); ++it) { + accessors.erase(*it); + } + if (accessors.size() == 0) { + ready = false; + } else { + // prevent draining cpu. + lock.unlock(); + std::this_thread::yield(); + } + } + } +} + +Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) { + std::thread invalidator( + invalidatorThread, + std::ref(mAccessors), + std::ref(mMutex), + std::ref(mCv), + std::ref(mReady)); + invalidator.detach(); +} + +void Accessor::Impl::AccessorInvalidator::addAccessor( + uint32_t accessorId, const std::weak_ptr &impl) { + bool notify = false; + std::unique_lock lock(mMutex); + if (mAccessors.find(accessorId) == mAccessors.end()) { + if (!mReady) { + mReady = true; + notify = true; + } + mAccessors.insert(std::make_pair(accessorId, impl)); + } + lock.unlock(); + if (notify) { + mCv.notify_one(); + } +} + +void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) { + std::lock_guard lock(mMutex); + mAccessors.erase(accessorId); + if (mAccessors.size() == 0) { + mReady = false; + } +} + +Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator; + } // namespace implementation } // namespace V2_0 } // namespace bufferpool diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h index 1d33880e2a..6b03494621 100644 --- a/media/bufferpool/2.0/AccessorImpl.h +++ b/media/bufferpool/2.0/AccessorImpl.h @@ -19,6 +19,7 @@ #include #include +#include #include "Accessor.h" namespace android { @@ -33,15 +34,20 @@ struct TransactionStatus; /** * An implementation of a buffer pool accessor(or a buffer pool implementation.) */ -class Accessor::Impl { +class Accessor::Impl + : public std::enable_shared_from_this { public: Impl(const std::shared_ptr &allocator); ~Impl(); ResultStatus connect( - const sp &accessor, sp *connection, - ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr); + const sp &accessor, const sp &observer, + sp *connection, + ConnectionId *pConnectionId, + uint32_t *pMsgId, + const StatusDescriptor** statusDescPtr, + const InvalidationDescriptor** invDescPtr); ResultStatus close(ConnectionId connectionId); @@ -55,8 +61,14 @@ public: BufferId bufferId, const native_handle_t** handle); + void flush(); + void cleanUp(bool clearCache); + bool isValid(); + + void handleInvalidateAck(); + private: // ConnectionId = pid : (timestamp_created + seqId) // in order to guarantee uniqueness for each connection @@ -78,7 +90,10 @@ private: int64_t mLastCleanUpUs; int64_t mLastLogUs; BufferId mSeq; + BufferId mStartSeq; + bool mValid; BufferStatusObserver mObserver; + BufferInvalidationChannel mInvalidationChannel; std::map> mUsingBuffers; std::map> mUsingConnections; @@ -95,6 +110,54 @@ private: std::map> mBuffers; std::set mFreeBuffers; + struct Invalidation { + static std::atomic sSeqId; + + struct Pending { + bool mNeedsAck; + uint32_t mFrom; + uint32_t mTo; + size_t mLeft; + const std::weak_ptr mImpl; + Pending(bool needsAck, uint32_t from, uint32_t to, size_t left, + const std::shared_ptr &impl) + : mNeedsAck(needsAck), + mFrom(from), + mTo(to), + mLeft(left), + mImpl(impl) + {} + + bool invalidate(uint32_t bufferId) { + return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0; + } + }; + + std::list mPendings; + std::map mAcks; + std::map> mObservers; + uint32_t mInvalidationId; + uint32_t mId; + + Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {} + + void onConnect(ConnectionId conId, const sp &observer); + + void onClose(ConnectionId conId); + + void onAck(ConnectionId conId, uint32_t msgId); + + void onBufferInvalidated( + BufferId bufferId, + BufferInvalidationChannel &channel); + + void onInvalidationRequest( + bool needsAck, uint32_t from, uint32_t to, size_t left, + BufferInvalidationChannel &channel, + const std::shared_ptr &impl); + + void onHandleAck(); + } mInvalidation; /// Buffer pool statistics which tracks allocation and transfer statistics. struct Stats { /// Total size of allocations which are used or available to use. @@ -164,6 +227,13 @@ private: } } mStats; + bool isValid() { + return mValid; + } + + void invalidate(bool needsAck, BufferId from, BufferId to, + const std::shared_ptr &impl); + public: /** Creates a buffer pool. */ BufferPool(); @@ -286,8 +356,33 @@ private: */ void cleanUp(bool clearCache = false); + /** + * Processes pending buffer status messages and invalidate all current + * free buffers. Active buffers are invalidated after being inactive. + */ + void flush(const std::shared_ptr &impl); + friend class Accessor::Impl; } mBufferPool; + + struct AccessorInvalidator { + std::map> mAccessors; + std::mutex mMutex; + std::condition_variable mCv; + bool mReady; + + AccessorInvalidator(); + void addAccessor(uint32_t accessorId, const std::weak_ptr &impl); + void delAccessor(uint32_t accessorId); + }; + + static AccessorInvalidator sInvalidator; + + static void invalidatorThread( + std::map> &accessors, + std::mutex &mutex, + std::condition_variable &cv, + bool &ready); }; } // namespace implementation diff --git a/media/bufferpool/2.0/Android.bp b/media/bufferpool/2.0/Android.bp index 413125a571..cd4e06e8c3 100644 --- a/media/bufferpool/2.0/Android.bp +++ b/media/bufferpool/2.0/Android.bp @@ -8,6 +8,7 @@ cc_library { "BufferStatus.cpp", "ClientManager.cpp", "Connection.cpp", + "Observer.cpp", ], export_include_dirs: [ "include", diff --git a/media/bufferpool/2.0/BufferPoolClient.cpp b/media/bufferpool/2.0/BufferPoolClient.cpp index 0f763f7e28..c80beff693 100644 --- a/media/bufferpool/2.0/BufferPoolClient.cpp +++ b/media/bufferpool/2.0/BufferPoolClient.cpp @@ -36,9 +36,9 @@ static constexpr int kCacheTtlUs = 1000000; // TODO: tune class BufferPoolClient::Impl : public std::enable_shared_from_this { public: - explicit Impl(const sp &accessor); + explicit Impl(const sp &accessor, const sp &observer); - explicit Impl(const sp &accessor); + explicit Impl(const sp &accessor, const sp &observer); bool isValid() { return mValid; @@ -58,6 +58,10 @@ public: bool isActive(int64_t *lastTransactionUs, bool clearCache); + void receiveInvalidation(uint32_t msgID); + + ResultStatus flush(); + ResultStatus allocate(const std::vector ¶ms, native_handle_t **handle, std::shared_ptr *buffer); @@ -83,10 +87,14 @@ private: void trySyncFromRemote(); - bool syncReleased(); + bool syncReleased(uint32_t msgId = 0); void evictCaches(bool clearCache = false); + void invalidateBuffer(BufferId id); + + void invalidateRange(BufferId from, BufferId to); + ResultStatus allocateBufferHandle( const std::vector& params, BufferId *bufferId, native_handle_t **handle); @@ -106,6 +114,7 @@ private: uint32_t mSeqId; ConnectionId mConnectionId; int64_t mLastEvictCacheUs; + std::unique_ptr mInvalidationListener; // CachedBuffers struct BufferCache { @@ -130,12 +139,16 @@ private: } mCache; // FMQ - release notifier - struct { + struct ReleaseCache { std::mutex mLock; // TODO: use only one list?(using one list may dealy sending messages?) std::list mReleasingIds; std::list mReleasedIds; + uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool + bool mInvalidateAck; std::unique_ptr mStatusChannel; + + ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {} } mReleasing; // This lock is held during synchronization from remote side. @@ -162,7 +175,6 @@ struct BufferPoolClient::Impl::BlockPoolDataDtor { struct BufferPoolClient::Impl::ClientBuffer { private: - bool mInvalidated; // TODO: implement int64_t mExpireUs; bool mHasCache; ConnectionId mConnectionId; @@ -177,9 +189,8 @@ private: public: ClientBuffer( ConnectionId connectionId, BufferId id, native_handle_t *handle) - : mInvalidated(false), mHasCache(false), - mConnectionId(connectionId), mId(id), mHandle(handle) { - (void)mInvalidated; + : mHasCache(false), mConnectionId(connectionId), + mId(id), mHandle(handle) { mExpireUs = getTimestampNow() + kCacheTtlUs; } @@ -190,6 +201,10 @@ public: } } + BufferId id() const { + return mId; + } + bool expire() const { int64_t now = getTimestampNow(); return now >= mExpireUs; @@ -244,41 +259,53 @@ public: } }; -BufferPoolClient::Impl::Impl(const sp &accessor) +BufferPoolClient::Impl::Impl(const sp &accessor, const sp &observer) : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0), mLastEvictCacheUs(getTimestampNow()) { - const StatusDescriptor *fmqDesc; + const StatusDescriptor *statusDesc; + const InvalidationDescriptor *invDesc; ResultStatus status = accessor->connect( - &mLocalConnection, &mConnectionId, &fmqDesc, true); + observer, true, + &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId, + &statusDesc, &invDesc); if (status == ResultStatus::OK) { mReleasing.mStatusChannel = - std::make_unique(*fmqDesc); + std::make_unique(*statusDesc); + mInvalidationListener = + std::make_unique(*invDesc); mValid = mReleasing.mStatusChannel && - mReleasing.mStatusChannel->isValid(); + mReleasing.mStatusChannel->isValid() && + mInvalidationListener && + mInvalidationListener->isValid(); } } -BufferPoolClient::Impl::Impl(const sp &accessor) +BufferPoolClient::Impl::Impl(const sp &accessor, const sp &observer) : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0), mLastEvictCacheUs(getTimestampNow()) { bool valid = false; - sp observer; // TODO sp& outConnection = mRemoteConnection; ConnectionId& id = mConnectionId; + uint32_t& outMsgId = mReleasing.mInvalidateId; std::unique_ptr& outChannel = mReleasing.mStatusChannel; + std::unique_ptr& outObserver = + mInvalidationListener; Return transResult = accessor->connect( observer, - [&valid, &outConnection, &id, &outChannel] + [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver] (ResultStatus status, sp connection, - ConnectionId connectionId, const StatusDescriptor& desc, + ConnectionId connectionId, uint32_t msgId, + const StatusDescriptor& statusDesc, const InvalidationDescriptor& invDesc) { - (void) invDesc; if (status == ResultStatus::OK) { outConnection = connection; id = connectionId; - outChannel = std::make_unique(desc); - if (outChannel && outChannel->isValid()) { + outMsgId = msgId; + outChannel = std::make_unique(statusDesc); + outObserver = std::make_unique(invDesc); + if (outChannel && outChannel->isValid() && + outObserver && outObserver->isValid()) { valid = true; } } @@ -302,6 +329,24 @@ bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCach return active; } +void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) { + std::lock_guard lock(mCache.mLock); + syncReleased(messageId); + // TODO: evict cache required? +} + +ResultStatus BufferPoolClient::Impl::flush() { + if (!mLocal || !mLocalConnection || !mValid) { + return ResultStatus::CRITICAL_ERROR; + } + { + std::unique_lock lock(mCache.mLock); + syncReleased(); + evictCaches(); + return mLocalConnection->flush(); + } +} + ResultStatus BufferPoolClient::Impl::allocate( const std::vector ¶ms, native_handle_t **pHandle, @@ -455,6 +500,11 @@ void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) { bool BufferPoolClient::Impl::postSend( BufferId bufferId, ConnectionId receiver, TransactionId *transactionId, int64_t *timestampUs) { + { + // TODO: don't need to call syncReleased every time + std::lock_guard lock(mCache.mLock); + syncReleased(); + } bool ret = false; bool needsSync = false; { @@ -538,34 +588,74 @@ void BufferPoolClient::Impl::trySyncFromRemote() { } // should have mCache.mLock -bool BufferPoolClient::Impl::syncReleased() { - std::lock_guard lock(mReleasing.mLock); - if (mReleasing.mReleasingIds.size() > 0) { - mReleasing.mStatusChannel->postBufferRelease( - mConnectionId, mReleasing.mReleasingIds, - mReleasing.mReleasedIds); - } - if (mReleasing.mReleasedIds.size() > 0) { - for (BufferId& id: mReleasing.mReleasedIds) { - ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id); - auto found = mCache.mBuffers.find(id); - if (found != mCache.mBuffers.end()) { - if (found->second->onCacheRelease()) { - mCache.decActive_l(); +bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) { + bool cleared = false; + { + std::lock_guard lock(mReleasing.mLock); + if (mReleasing.mReleasingIds.size() > 0) { + mReleasing.mStatusChannel->postBufferRelease( + mConnectionId, mReleasing.mReleasingIds, + mReleasing.mReleasedIds); + } + if (mReleasing.mReleasedIds.size() > 0) { + for (BufferId& id: mReleasing.mReleasedIds) { + ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id); + auto found = mCache.mBuffers.find(id); + if (found != mCache.mBuffers.end()) { + if (found->second->onCacheRelease()) { + mCache.decActive_l(); + } else { + // should not happen! + ALOGW("client %lld cache release status inconsitent!", + (long long)mConnectionId); + } } else { // should not happen! - ALOGW("client %lld cache release status inconsitent!", - (long long)mConnectionId); + ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId); } + } + mReleasing.mReleasedIds.clear(); + cleared = true; + } + } + std::vector invalidations; + mInvalidationListener->getInvalidations(invalidations); + uint32_t lastMsgId = 0; + if (invalidations.size() > 0) { + for (auto it = invalidations.begin(); it != invalidations.end(); ++it) { + if (it->messageId != 0) { + lastMsgId = it->messageId; + } + if (it->fromBufferId == it->toBufferId) { + // TODO: handle fromBufferId = UINT32_MAX + invalidateBuffer(it->fromBufferId); } else { - // should not happen! - ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId); + invalidateRange(it->fromBufferId, it->toBufferId); } } - mReleasing.mReleasedIds.clear(); - return true; } - return false; + { + std::lock_guard lock(mReleasing.mLock); + if (lastMsgId != 0) { + if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) { + mReleasing.mInvalidateId = lastMsgId; + mReleasing.mInvalidateAck = false; + } + } else if (messageId != 0) { + // messages are drained. + if (isMessageLater(messageId, mReleasing.mInvalidateId)) { + mReleasing.mInvalidateId = lastMsgId; + mReleasing.mInvalidateAck = true; + } + } + if (!mReleasing.mInvalidateAck) { + // post ACK + mReleasing.mStatusChannel->postBufferInvalidateAck( + mConnectionId, + mReleasing.mInvalidateId, &mReleasing.mInvalidateAck); + } + } + return cleared; } // should have mCache.mLock @@ -587,6 +677,49 @@ void BufferPoolClient::Impl::evictCaches(bool clearCache) { } } +// should have mCache.mLock +void BufferPoolClient::Impl::invalidateBuffer(BufferId id) { + for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) { + if (id == it->second->id()) { + if (!it->second->hasCache()) { + mCache.mBuffers.erase(it); + ALOGV("cache invalidated %lld : buffer %u", + (long long)mConnectionId, id); + } else { + ALOGW("Inconsitent invalidation %lld : activer buffer!! %u", + (long long)mConnectionId, (unsigned int)id); + } + break; + } + } +} + +// should have mCache.mLock +void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) { + size_t invalidated = 0; + for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) { + if (!it->second->hasCache()) { + BufferId bid = it->second->id(); + if (from < to) { + if (from <= bid && bid < to) { + ++invalidated; + it = mCache.mBuffers.erase(it); + continue; + } + } else { + if (from <= bid || bid < to) { + ++invalidated; + it = mCache.mBuffers.erase(it); + continue; + } + } + } + ++it; + } + ALOGV("cache invalidated %lld : # of invalidated %zu", + (long long)mConnectionId, invalidated); +} + ResultStatus BufferPoolClient::Impl::allocateBufferHandle( const std::vector& params, BufferId *bufferId, native_handle_t** handle) { @@ -629,12 +762,14 @@ ResultStatus BufferPoolClient::Impl::fetchBufferHandle( } -BufferPoolClient::BufferPoolClient(const sp &accessor) { - mImpl = std::make_shared(accessor); +BufferPoolClient::BufferPoolClient(const sp &accessor, + const sp &observer) { + mImpl = std::make_shared(accessor, observer); } -BufferPoolClient::BufferPoolClient(const sp &accessor) { - mImpl = std::make_shared(accessor); +BufferPoolClient::BufferPoolClient(const sp &accessor, + const sp &observer) { + mImpl = std::make_shared(accessor, observer); } BufferPoolClient::~BufferPoolClient() { @@ -672,6 +807,19 @@ ResultStatus BufferPoolClient::getAccessor(sp *accessor) { return ResultStatus::CRITICAL_ERROR; } +void BufferPoolClient::receiveInvalidation(uint32_t msgId) { + if (isValid()) { + mImpl->receiveInvalidation(msgId); + } +} + +ResultStatus BufferPoolClient::flush() { + if (isValid()) { + return mImpl->flush(); + } + return ResultStatus::CRITICAL_ERROR; +} + ResultStatus BufferPoolClient::allocate( const std::vector ¶ms, native_handle_t **handle, diff --git a/media/bufferpool/2.0/BufferPoolClient.h b/media/bufferpool/2.0/BufferPoolClient.h index 1889ea3ce1..e8d9ae6703 100644 --- a/media/bufferpool/2.0/BufferPoolClient.h +++ b/media/bufferpool/2.0/BufferPoolClient.h @@ -49,7 +49,8 @@ public: * Creates a buffer pool client from a local buffer pool * (via ClientManager#create). */ - explicit BufferPoolClient(const sp &accessor); + explicit BufferPoolClient(const sp &accessor, + const sp &observer); /** * Creates a buffer pool client from a remote buffer pool @@ -57,7 +58,8 @@ public: * Note: A buffer pool client created with remote buffer pool cannot * allocate a buffer. */ - explicit BufferPoolClient(const sp &accessor); + explicit BufferPoolClient(const sp &accessor, + const sp &observer); /** Destructs a buffer pool client. */ ~BufferPoolClient(); @@ -73,6 +75,10 @@ private: ResultStatus getAccessor(sp *accessor); + void receiveInvalidation(uint32_t msgId); + + ResultStatus flush(); + ResultStatus allocate(const std::vector ¶ms, native_handle_t **handle, std::shared_ptr *buffer); @@ -92,6 +98,7 @@ private: std::shared_ptr mImpl; friend struct ClientManager; + friend struct Observer; }; } // namespace implementation diff --git a/media/bufferpool/2.0/BufferStatus.cpp b/media/bufferpool/2.0/BufferStatus.cpp index 0d3f5a3f54..693726086e 100644 --- a/media/bufferpool/2.0/BufferStatus.cpp +++ b/media/bufferpool/2.0/BufferStatus.cpp @@ -17,6 +17,7 @@ #define LOG_TAG "BufferPoolStatus" //#define LOG_NDEBUG 0 +#include #include #include "BufferStatus.h" @@ -37,6 +38,18 @@ int64_t getTimestampNow() { return stamp; } +bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) { + return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId; +} + +bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) { + if (from < to) { + return from <= bufferId && bufferId < to; + } else { // wrap happens + return from <= bufferId || bufferId < to; + } +} + static constexpr int kNumElementsInQueue = 1024*16; static constexpr int kMinElementsToSyncInQueue = 128; @@ -139,6 +152,29 @@ void BufferStatusChannel::postBufferRelease( } } +void BufferStatusChannel::postBufferInvalidateAck( + ConnectionId connectionId, + uint32_t invalidateId, + bool *invalidated) { + if (mValid && !*invalidated) { + size_t avail = mBufferStatusQueue->availableToWrite(); + if (avail > 0) { + BufferStatusMessage message; + message.newStatus = BufferStatus::INVALIDATION_ACK; + message.bufferId = invalidateId; + message.connectionId = connectionId; + if (!mBufferStatusQueue->write(&message, 1)) { + // Since avaliable # of writes are already confirmed, + // this should not happen. + // TODO: error handing? + ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); + return; + } + *invalidated = true; + } + } +} + bool BufferStatusChannel::postBufferStatusMessage( TransactionId transactionId, BufferId bufferId, BufferStatus status, ConnectionId connectionId, ConnectionId targetId, @@ -182,6 +218,83 @@ bool BufferStatusChannel::postBufferStatusMessage( return false; } +BufferInvalidationListener::BufferInvalidationListener( + const InvalidationDescriptor &fmqDesc) { + std::unique_ptr queue = + std::make_unique(fmqDesc); + if (!queue || queue->isValid() == false) { + mValid = false; + return; + } + mValid = true; + mBufferInvalidationQueue = std::move(queue); + // drain previous messages + size_t avail = std::min( + mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue); + std::vector temp(avail); + if (avail > 0) { + mBufferInvalidationQueue->read(temp.data(), avail); + } +} + +void BufferInvalidationListener::getInvalidations( + std::vector &messages) { + // Try twice in case of overflow. + // TODO: handling overflow though it may not happen. + for (int i = 0; i < 2; ++i) { + size_t avail = std::min( + mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue); + if (avail > 0) { + std::vector temp(avail); + if (mBufferInvalidationQueue->read(temp.data(), avail)) { + messages.reserve(messages.size() + avail); + for (auto it = temp.begin(); it != temp.end(); ++it) { + messages.push_back(*it); + } + break; + } + } else { + return; + } + } +} + +bool BufferInvalidationListener::isValid() { + return mValid; +} + +BufferInvalidationChannel::BufferInvalidationChannel() + : mValid(true), + mBufferInvalidationQueue( + std::make_unique(kNumElementsInQueue, true)) { + if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) { + mValid = false; + } +} + +bool BufferInvalidationChannel::isValid() { + return mValid; +} + +void BufferInvalidationChannel::getDesc(const InvalidationDescriptor **fmqDescPtr) { + if (mValid) { + *fmqDescPtr = mBufferInvalidationQueue->getDesc(); + } else { + *fmqDescPtr = nullptr; + } +} + +void BufferInvalidationChannel::postInvalidation( + uint32_t msgId, BufferId fromId, BufferId toId) { + BufferInvalidationMessage message; + + message.messageId = msgId; + message.fromBufferId = fromId; + message.toBufferId = toId; + // TODO: handle failure (it does not happen normally.) + mBufferInvalidationQueue->write(&message); +} + } // namespace implementation } // namespace V2_0 } // namespace bufferpool diff --git a/media/bufferpool/2.0/BufferStatus.h b/media/bufferpool/2.0/BufferStatus.h index 777a3200f8..fa658382a3 100644 --- a/media/bufferpool/2.0/BufferStatus.h +++ b/media/bufferpool/2.0/BufferStatus.h @@ -37,9 +37,13 @@ namespace implementation { /** Returns monotonic timestamp in Us since fixed point in time. */ int64_t getTimestampNow(); +bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId); + +bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId); + /** - * A collection of FMQ for a buffer pool. buffer ownership/status change - * messages are sent via the FMQs from the clients. + * A collection of buffer status message FMQ for a buffer pool. buffer + * ownership/status change messages are sent via the FMQs from the clients. */ class BufferStatusObserver { private: @@ -47,7 +51,8 @@ private: mBufferStatusQueues; public: - /** Creates an FMQ for the specified connection(client). + /** Creates a buffer status message FMQ for the specified + * connection(client). * * @param connectionId connection Id of the specified client. * @param fmqDescPtr double ptr of created FMQ's descriptor. @@ -58,7 +63,8 @@ public: */ ResultStatus open(ConnectionId id, const StatusDescriptor** fmqDescPtr); - /** Closes an FMQ for the specified connection(client). + /** Closes a buffer status message FMQ for the specified + * connection(client). * * @param connectionId connection Id of the specified client. * @@ -75,8 +81,8 @@ public: }; /** - * An FMQ for a buffer pool client. Buffer ownership/status change messages - * are sent via the fmq to the buffer pool. + * A buffer status message FMQ for a buffer pool client. Buffer ownership/status + * change messages are sent via the fmq to the buffer pool. */ class BufferStatusChannel { private: @@ -85,7 +91,8 @@ private: public: /** - * Connects to an FMQ from a descriptor of the created FMQ. + * Connects to a buffer status message FMQ from a descriptor of + * the created FMQ. * * @param fmqDesc Descriptor of the created FMQ. */ @@ -131,6 +138,86 @@ public: ConnectionId connectionId, ConnectionId targetId, std::list &pending, std::list &posted); + + /** + * Posts a buffer invaliadation messge to the buffer pool. + * + * @param connectionId connection Id of the client. + * @param invalidateId invalidation ack to the buffer pool. + * if invalidation id is zero, the ack will not be + * posted. + * @param invalidated sets {@code true} only when the invalidation ack is + * posted. + */ + void postBufferInvalidateAck( + ConnectionId connectionId, + uint32_t invalidateId, + bool *invalidated); +}; + +/** + * A buffer invalidation FMQ for a buffer pool client. Buffer invalidation + * messages are received via the fmq from the buffer pool. Buffer invalidation + * messages are handled as soon as possible. + */ +class BufferInvalidationListener { +private: + bool mValid; + std::unique_ptr mBufferInvalidationQueue; + +public: + /** + * Connects to a buffer invalidation FMQ from a descriptor of the created FMQ. + * + * @param fmqDesc Descriptor of the created FMQ. + */ + BufferInvalidationListener(const InvalidationDescriptor &fmqDesc); + + /** Retrieves all pending buffer invalidation messages from the buffer pool. + * + * @param messages retrieved pending messages. + */ + void getInvalidations(std::vector &messages); + + /** Returns whether the FMQ is connected succesfully. */ + bool isValid(); +}; + +/** + * A buffer invalidation FMQ for a buffer pool. A buffer pool will send buffer + * invalidation messages to the clients via the FMQ. The FMQ is shared among + * buffer pool clients. + */ +class BufferInvalidationChannel { +private: + bool mValid; + std::unique_ptr mBufferInvalidationQueue; + +public: + /** + * Creates a buffer invalidation FMQ for a buffer pool. + */ + BufferInvalidationChannel(); + + /** Returns whether the FMQ is connected succesfully. */ + bool isValid(); + + /** + * Retrieves the descriptor of a buffer invalidation FMQ. the descriptor may + * be passed to the client for buffer invalidation handling. + * + * @param fmqDescPtr double ptr of created FMQ's descriptor. + */ + void getDesc(const InvalidationDescriptor **fmqDescPtr); + + /** Posts a buffer invalidation for invalidated buffers. + * + * @param msgId Invalidation message id which is used when clients send + * acks back via BufferStatusMessage + * @param fromId The start bufferid of the invalidated buffers(inclusive) + * @param toId The end bufferId of the invalidated buffers(inclusive) + */ + void postInvalidation(uint32_t msgId, BufferId fromId, BufferId toId); }; } // namespace implementation diff --git a/media/bufferpool/2.0/ClientManager.cpp b/media/bufferpool/2.0/ClientManager.cpp index eeaf093a40..f8531d812c 100644 --- a/media/bufferpool/2.0/ClientManager.cpp +++ b/media/bufferpool/2.0/ClientManager.cpp @@ -23,6 +23,7 @@ #include #include #include "BufferPoolClient.h" +#include "Observer.h" namespace android { namespace hardware { @@ -106,6 +107,8 @@ public: ResultStatus close(ConnectionId connectionId); + ResultStatus flush(ConnectionId connectionId); + ResultStatus allocate(ConnectionId connectionId, const std::vector ¶ms, native_handle_t **handle, @@ -153,10 +156,13 @@ private: mClients; } mActive; + sp mObserver; + ClientManagerCookieHolder mRemoteClientCookies; }; -ClientManager::Impl::Impl() {} +ClientManager::Impl::Impl() + : mObserver(new Observer()) {} ResultStatus ClientManager::Impl::registerSender( const sp &accessor, ConnectionId *pConnectionId) { @@ -185,7 +191,7 @@ ResultStatus ClientManager::Impl::registerSender( lock.unlock(); ResultStatus result = ResultStatus::OK; const std::shared_ptr client = - std::make_shared(accessor); + std::make_shared(accessor, mObserver); lock.lock(); if (!client) { result = ResultStatus::NO_MEMORY; @@ -197,6 +203,7 @@ ResultStatus ClientManager::Impl::registerSender( const std::weak_ptr wclient = client; mCache.mClients.push_back(std::make_pair(accessor, wclient)); ConnectionId conId = client->getConnectionId(); + mObserver->addClient(conId, wclient); { std::lock_guard lock(mActive.mMutex); mActive.mClients.insert(std::make_pair(conId, client)); @@ -266,8 +273,9 @@ ResultStatus ClientManager::Impl::create( if (!accessor || !accessor->isValid()) { return ResultStatus::CRITICAL_ERROR; } + // TODO: observer is local. use direct call instead of hidl call. std::shared_ptr client = - std::make_shared(accessor); + std::make_shared(accessor, mObserver); if (!client || !client->isValid()) { return ResultStatus::CRITICAL_ERROR; } @@ -280,6 +288,7 @@ ResultStatus ClientManager::Impl::create( const std::weak_ptr wclient = client; mCache.mClients.push_back(std::make_pair(accessor, wclient)); ConnectionId conId = client->getConnectionId(); + mObserver->addClient(conId, wclient); { std::lock_guard lock(mActive.mMutex); mActive.mClients.insert(std::make_pair(conId, client)); @@ -291,12 +300,13 @@ ResultStatus ClientManager::Impl::create( } ResultStatus ClientManager::Impl::close(ConnectionId connectionId) { - std::lock_guard lock1(mCache.mMutex); - std::lock_guard lock2(mActive.mMutex); + std::unique_lock lock1(mCache.mMutex); + std::unique_lock lock2(mActive.mMutex); auto it = mActive.mClients.find(connectionId); if (it != mActive.mClients.end()) { sp accessor; it->second->getAccessor(&accessor); + std::shared_ptr closing = it->second; mActive.mClients.erase(connectionId); for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) { // clean up dead client caches @@ -307,11 +317,27 @@ ResultStatus ClientManager::Impl::close(ConnectionId connectionId) { cit++; } } + lock2.unlock(); + lock1.unlock(); + closing->flush(); return ResultStatus::OK; } return ResultStatus::NOT_FOUND; } +ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) { + std::shared_ptr client; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + client = it->second; + } + return client->flush(); +} + ResultStatus ClientManager::Impl::allocate( ConnectionId connectionId, const std::vector ¶ms, native_handle_t **handle, std::shared_ptr *buffer) { @@ -461,6 +487,13 @@ ResultStatus ClientManager::close(ConnectionId connectionId) { return ResultStatus::CRITICAL_ERROR; } +ResultStatus ClientManager::flush(ConnectionId connectionId) { + if (mImpl) { + return mImpl->flush(connectionId); + } + return ResultStatus::CRITICAL_ERROR; +} + ResultStatus ClientManager::allocate( ConnectionId connectionId, const std::vector ¶ms, native_handle_t **handle, std::shared_ptr *buffer) { diff --git a/media/bufferpool/2.0/Connection.cpp b/media/bufferpool/2.0/Connection.cpp index cd837a19dc..6bd0e7956f 100644 --- a/media/bufferpool/2.0/Connection.cpp +++ b/media/bufferpool/2.0/Connection.cpp @@ -60,6 +60,13 @@ void Connection::initialize( } } +ResultStatus Connection::flush() { + if (mInitialized && mAccessor) { + return mAccessor->flush(); + } + return ResultStatus::CRITICAL_ERROR; +} + ResultStatus Connection::allocate( const std::vector ¶ms, BufferId *bufferId, const native_handle_t **handle) { diff --git a/media/bufferpool/2.0/Connection.h b/media/bufferpool/2.0/Connection.h index e2b47f1dcb..8507749849 100644 --- a/media/bufferpool/2.0/Connection.h +++ b/media/bufferpool/2.0/Connection.h @@ -43,6 +43,11 @@ struct Connection : public IConnection { // Methods from ::android::hardware::media::bufferpool::V2_0::IConnection follow. Return fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) override; + /** + * Invalidates all buffers which are active and/or are ready to be recycled. + */ + ResultStatus flush(); + /** * Allocates a buffer using the specified parameters. Recycles a buffer if * it is possible. The returned buffer can be transferred to other remote diff --git a/media/bufferpool/2.0/Observer.cpp b/media/bufferpool/2.0/Observer.cpp new file mode 100644 index 0000000000..5b23160463 --- /dev/null +++ b/media/bufferpool/2.0/Observer.cpp @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Observer.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +Observer::Observer() { +} + +Observer::~Observer() { +} + +// Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow. +Return Observer::onMessage(int64_t connectionId, uint32_t msgId) { + std::unique_lock lock(mLock); + auto it = mClients.find(connectionId); + if (it != mClients.end()) { + const std::shared_ptr client = it->second.lock(); + if (!client) { + mClients.erase(it); + } else { + lock.unlock(); + client->receiveInvalidation(msgId); + } + } + return Void(); +} + +void Observer::addClient(ConnectionId connectionId, + const std::weak_ptr &wclient) { + std::lock_guard lock(mLock); + for (auto it = mClients.begin(); it != mClients.end();) { + if (!it->second.lock() || it->first == connectionId) { + it = mClients.erase(it); + } else { + ++it; + } + } + mClients.insert(std::make_pair(connectionId, wclient)); + +} + +void Observer::delClient(ConnectionId connectionId) { + std::lock_guard lock(mLock); + mClients.erase(connectionId); +} + + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/Observer.h b/media/bufferpool/2.0/Observer.h new file mode 100644 index 0000000000..42bd7c1325 --- /dev/null +++ b/media/bufferpool/2.0/Observer.h @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H + +#include +#include +#include +#include "BufferPoolClient.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +using ::android::hardware::hidl_array; +using ::android::hardware::hidl_memory; +using ::android::hardware::hidl_string; +using ::android::hardware::hidl_vec; +using ::android::hardware::Return; +using ::android::hardware::Void; +using ::android::sp; + +struct Observer : public IObserver { + // Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow. + Return onMessage(int64_t connectionId, uint32_t msgId) override; + + ~Observer(); + + void addClient(ConnectionId connectionId, + const std::weak_ptr &wclient); + + void delClient(ConnectionId connectionId); + +private: + Observer(); + + friend struct ClientManager; + + std::mutex mLock; + std::map> mClients; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H diff --git a/media/bufferpool/2.0/include/bufferpool/ClientManager.h b/media/bufferpool/2.0/include/bufferpool/ClientManager.h index cfc3bc3bcb..953c3049a4 100644 --- a/media/bufferpool/2.0/include/bufferpool/ClientManager.h +++ b/media/bufferpool/2.0/include/bufferpool/ClientManager.h @@ -91,6 +91,18 @@ struct ClientManager : public IClientManager { */ ResultStatus close(ConnectionId connectionId); + /** + * Evicts cached allocations. If it's local connection, release the + * previous allocations and do not recycle current active allocations. + * + * @param connectionId The id of the connection. + * + * @return OK when the connection is resetted. + * NOT_FOUND when the specified connection was not found. + * CRITICAL_ERROR otherwise. + */ + ResultStatus flush(ConnectionId connectionId); + /** * Allocates a buffer from the specified connection. *