From bbe37b645da9a07fca8820b6d15b1adb595eee96 Mon Sep 17 00:00:00 2001 From: Sungtak Lee Date: Wed, 29 Aug 2018 15:15:48 -0700 Subject: [PATCH] android.hardware.media.bufferpool@2.0 impl Implementation is currently identical to 1.0 impl. The update will be applied afterwards. Bug: 112203066 Change-Id: If19af34121f5c9736ab4e8ccf7b1716d0be05c81 --- media/bufferpool/2.0/Accessor.cpp | 201 +++++ media/bufferpool/2.0/Accessor.h | 188 +++++ media/bufferpool/2.0/AccessorImpl.cpp | 543 ++++++++++++++ media/bufferpool/2.0/AccessorImpl.h | 300 ++++++++ media/bufferpool/2.0/Android.bp | 29 + media/bufferpool/2.0/BufferPoolClient.cpp | 708 ++++++++++++++++++ media/bufferpool/2.0/BufferPoolClient.h | 102 +++ media/bufferpool/2.0/BufferStatus.cpp | 191 +++++ media/bufferpool/2.0/BufferStatus.h | 143 ++++ media/bufferpool/2.0/ClientManager.cpp | 504 +++++++++++++ media/bufferpool/2.0/Connection.cpp | 89 +++ media/bufferpool/2.0/Connection.h | 103 +++ .../2.0/include/bufferpool/BufferPoolTypes.h | 118 +++ .../2.0/include/bufferpool/ClientManager.h | 179 +++++ 14 files changed, 3398 insertions(+) create mode 100644 media/bufferpool/2.0/Accessor.cpp create mode 100644 media/bufferpool/2.0/Accessor.h create mode 100644 media/bufferpool/2.0/AccessorImpl.cpp create mode 100644 media/bufferpool/2.0/AccessorImpl.h create mode 100644 media/bufferpool/2.0/Android.bp create mode 100644 media/bufferpool/2.0/BufferPoolClient.cpp create mode 100644 media/bufferpool/2.0/BufferPoolClient.h create mode 100644 media/bufferpool/2.0/BufferStatus.cpp create mode 100644 media/bufferpool/2.0/BufferStatus.h create mode 100644 media/bufferpool/2.0/ClientManager.cpp create mode 100644 media/bufferpool/2.0/Connection.cpp create mode 100644 media/bufferpool/2.0/Connection.h create mode 100644 media/bufferpool/2.0/include/bufferpool/BufferPoolTypes.h create mode 100644 media/bufferpool/2.0/include/bufferpool/ClientManager.h diff --git a/media/bufferpool/2.0/Accessor.cpp b/media/bufferpool/2.0/Accessor.cpp new file mode 100644 index 0000000000..3fd41f0b91 --- /dev/null +++ b/media/bufferpool/2.0/Accessor.cpp @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define LOG_TAG "BufferPoolConnection" + +#include "Accessor.h" +#include "AccessorImpl.h" +#include "Connection.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +void ConnectionDeathRecipient::add( + int64_t connectionId, + const sp &accessor) { + std::lock_guard lock(mLock); + if (mAccessors.find(connectionId) == mAccessors.end()) { + mAccessors.insert(std::make_pair(connectionId, accessor)); + } +} + +void ConnectionDeathRecipient::remove(int64_t connectionId) { + std::lock_guard lock(mLock); + mAccessors.erase(connectionId); + auto it = mConnectionToCookie.find(connectionId); + if (it != mConnectionToCookie.end()) { + uint64_t cookie = it->second; + mConnectionToCookie.erase(it); + auto cit = mCookieToConnections.find(cookie); + if (cit != mCookieToConnections.end()) { + cit->second.erase(connectionId); + if (cit->second.size() == 0) { + mCookieToConnections.erase(cit); + } + } + } +} + +void ConnectionDeathRecipient::addCookieToConnection( + uint64_t cookie, + int64_t connectionId) { + std::lock_guard lock(mLock); + if (mAccessors.find(connectionId) == mAccessors.end()) { + return; + } + mConnectionToCookie.insert(std::make_pair(connectionId, cookie)); + auto it = mCookieToConnections.find(cookie); + if (it != mCookieToConnections.end()) { + it->second.insert(connectionId); + } else { + mCookieToConnections.insert(std::make_pair( + cookie, std::set{connectionId})); + } +} + +void ConnectionDeathRecipient::serviceDied( + uint64_t cookie, + const wp<::android::hidl::base::V1_0::IBase>& /* who */ + ) { + std::map> connectionsToClose; + { + std::lock_guard lock(mLock); + + auto it = mCookieToConnections.find(cookie); + if (it != mCookieToConnections.end()) { + for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) { + auto accessorIt = mAccessors.find(*conIt); + if (accessorIt != mAccessors.end()) { + connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second)); + mAccessors.erase(accessorIt); + } + mConnectionToCookie.erase(*conIt); + } + mCookieToConnections.erase(it); + } + } + + if (connectionsToClose.size() > 0) { + sp accessor; + for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) { + accessor = it->second.promote(); + + if (accessor) { + accessor->close(it->first); + ALOGD("connection %lld closed on death", (long long)it->first); + } + } + } +} + +namespace { +static sp sConnectionDeathRecipient = + new ConnectionDeathRecipient(); +} + +sp Accessor::getConnectionDeathRecipient() { + return sConnectionDeathRecipient; +} + +// Methods from ::android::hardware::media::bufferpool::V2_0::IAccessor follow. +Return Accessor::connect(connect_cb _hidl_cb) { + sp connection; + ConnectionId connectionId; + const QueueDescriptor* fmqDesc; + + ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false); + if (status == ResultStatus::OK) { + _hidl_cb(status, connection, connectionId, *fmqDesc); + } else { + _hidl_cb(status, nullptr, -1LL, + android::hardware::MQDescriptorSync( + std::vector(), + nullptr /* nhandle */, 0 /* size */)); + } + return Void(); +} + +Accessor::Accessor(const std::shared_ptr &allocator) + : mImpl(new Impl(allocator)) {} + +Accessor::~Accessor() { +} + +bool Accessor::isValid() { + return (bool)mImpl; +} + +ResultStatus Accessor::allocate( + ConnectionId connectionId, + const std::vector ¶ms, + BufferId *bufferId, const native_handle_t** handle) { + if (mImpl) { + return mImpl->allocate(connectionId, params, bufferId, handle); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus Accessor::fetch( + ConnectionId connectionId, TransactionId transactionId, + BufferId bufferId, const native_handle_t** handle) { + if (mImpl) { + return mImpl->fetch(connectionId, transactionId, bufferId, handle); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus Accessor::connect( + sp *connection, ConnectionId *pConnectionId, + const QueueDescriptor** fmqDescPtr, bool local) { + if (mImpl) { + ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr); + if (!local && status == ResultStatus::OK) { + sp accessor(this); + sConnectionDeathRecipient->add(*pConnectionId, accessor); + } + return status; + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus Accessor::close(ConnectionId connectionId) { + if (mImpl) { + ResultStatus status = mImpl->close(connectionId); + sConnectionDeathRecipient->remove(connectionId); + return status; + } + return ResultStatus::CRITICAL_ERROR; +} + +void Accessor::cleanUp(bool clearCache) { + if (mImpl) { + mImpl->cleanUp(clearCache); + } +} + +//IAccessor* HIDL_FETCH_IAccessor(const char* /* name */) { +// return new Accessor(); +//} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/Accessor.h b/media/bufferpool/2.0/Accessor.h new file mode 100644 index 0000000000..4fd8f5b2e2 --- /dev/null +++ b/media/bufferpool/2.0/Accessor.h @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSOR_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSOR_H + +#include +#include +#include +#include +#include "BufferStatus.h" + +#include + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +using ::android::hardware::hidl_array; +using ::android::hardware::hidl_memory; +using ::android::hardware::hidl_string; +using ::android::hardware::hidl_vec; +using ::android::hardware::Return; +using ::android::hardware::Void; +using ::android::sp; + +struct Accessor; +struct Connection; + +/** + * Receives death notifications from remote connections. + * On death notifications, the connections are closed and used resources + * are released. + */ +struct ConnectionDeathRecipient : public hardware::hidl_death_recipient { + /** + * Registers a newly connected connection from remote processes. + */ + void add(int64_t connectionId, const sp &accessor); + + /** + * Removes a connection. + */ + void remove(int64_t connectionId); + + void addCookieToConnection(uint64_t cookie, int64_t connectionId); + + virtual void serviceDied( + uint64_t /* cookie */, + const wp<::android::hidl::base::V1_0::IBase>& /* who */ + ) override; + +private: + std::mutex mLock; + std::map> mCookieToConnections; + std::map mConnectionToCookie; + std::map> mAccessors; +}; + +/** + * A buffer pool accessor which enables a buffer pool to communicate with buffer + * pool clients. 1:1 correspondense holds between a buffer pool and an accessor. + */ +struct Accessor : public IAccessor { + // Methods from ::android::hardware::media::bufferpool::V2_0::IAccessor follow. + Return connect(connect_cb _hidl_cb) override; + + /** + * Creates a buffer pool accessor which uses the specified allocator. + * + * @param allocator buffer allocator. + */ + explicit Accessor(const std::shared_ptr &allocator); + + /** Destructs a buffer pool accessor. */ + ~Accessor(); + + /** Returns whether the accessor is valid. */ + bool isValid(); + + /** Allocates a buffer from a buffer pool. + * + * @param connectionId the connection id of the client. + * @param params the allocation parameters. + * @param bufferId the id of the allocated buffer. + * @param handle the native handle of the allocated buffer. + * + * @return OK when a buffer is successfully allocated. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus allocate( + ConnectionId connectionId, + const std::vector& params, + BufferId *bufferId, + const native_handle_t** handle); + + /** + * Fetches a buffer for the specified transaction. + * + * @param connectionId the id of receiving connection(client). + * @param transactionId the id of the transfer transaction. + * @param bufferId the id of the buffer to be fetched. + * @param handle the native handle of the fetched buffer. + * + * @return OK when a buffer is successfully fetched. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus fetch( + ConnectionId connectionId, + TransactionId transactionId, + BufferId bufferId, + const native_handle_t** handle); + + /** + * Makes a connection to the buffer pool. The buffer pool client uses the + * created connection in order to communicate with the buffer pool. An + * FMQ for buffer status message is also created for the client. + * + * @param connection created connection + * @param pConnectionId the id of the created connection + * @param fmqDescPtr FMQ descriptor for shared buffer status message + * queue between a buffer pool and the client. + * @param local true when a connection request comes from local process, + * false otherwise. + * + * @return OK when a connection is successfully made. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus connect( + sp *connection, ConnectionId *pConnectionId, + const QueueDescriptor** fmqDescPtr, bool local); + + /** + * Closes the specified connection to the client. + * + * @param connectionId the id of the connection. + * + * @return OK when the connection is closed. + * CRITICAL_ERROR otherwise. + */ + ResultStatus close(ConnectionId connectionId); + + /** + * Processes pending buffer status messages and perfoms periodic cache + * cleaning. + * + * @param clearCache if clearCache is true, it frees all buffers waiting + * to be recycled. + */ + void cleanUp(bool clearCache); + + /** + * Gets a hidl_death_recipient for remote connection death. + */ + static sp getConnectionDeathRecipient(); + +private: + class Impl; + std::unique_ptr mImpl; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSOR_H diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp new file mode 100644 index 0000000000..e5cf275ee3 --- /dev/null +++ b/media/bufferpool/2.0/AccessorImpl.cpp @@ -0,0 +1,543 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "BufferPoolAccessor" +//#define LOG_NDEBUG 0 + +#include +#include +#include +#include +#include "AccessorImpl.h" +#include "Connection.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +namespace { + static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec + static constexpr int64_t kLogDurationUs = 5000000; // 5 secs + + static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15; + static constexpr size_t kMinBufferCountForEviction = 40; +} + +// Buffer structure in bufferpool process +struct InternalBuffer { + BufferId mId; + size_t mOwnerCount; + size_t mTransactionCount; + const std::shared_ptr mAllocation; + const size_t mAllocSize; + const std::vector mConfig; + + InternalBuffer( + BufferId id, + const std::shared_ptr &alloc, + const size_t allocSize, + const std::vector &allocConfig) + : mId(id), mOwnerCount(0), mTransactionCount(0), + mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {} + + const native_handle_t *handle() { + return mAllocation->handle(); + } +}; + +struct TransactionStatus { + TransactionId mId; + BufferId mBufferId; + ConnectionId mSender; + ConnectionId mReceiver; + BufferStatus mStatus; + int64_t mTimestampUs; + bool mSenderValidated; + + TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) { + mId = message.transactionId; + mBufferId = message.bufferId; + mStatus = message.newStatus; + mTimestampUs = timestampUs; + if (mStatus == BufferStatus::TRANSFER_TO) { + mSender = message.connectionId; + mReceiver = message.targetConnectionId; + mSenderValidated = true; + } else { + mSender = -1LL; + mReceiver = message.connectionId; + mSenderValidated = false; + } + } +}; + +// Helper template methods for handling map of set. +template +bool insert(std::map> *mapOfSet, T key, U value) { + auto iter = mapOfSet->find(key); + if (iter == mapOfSet->end()) { + std::set valueSet{value}; + mapOfSet->insert(std::make_pair(key, valueSet)); + return true; + } else if (iter->second.find(value) == iter->second.end()) { + iter->second.insert(value); + return true; + } + return false; +} + +template +bool erase(std::map> *mapOfSet, T key, U value) { + bool ret = false; + auto iter = mapOfSet->find(key); + if (iter != mapOfSet->end()) { + if (iter->second.erase(value) > 0) { + ret = true; + } + if (iter->second.size() == 0) { + mapOfSet->erase(iter); + } + } + return ret; +} + +template +bool contains(std::map> *mapOfSet, T key, U value) { + auto iter = mapOfSet->find(key); + if (iter != mapOfSet->end()) { + auto setIter = iter->second.find(value); + return setIter != iter->second.end(); + } + return false; +} + +int32_t Accessor::Impl::sPid = getpid(); +uint32_t Accessor::Impl::sSeqId = time(nullptr); + +Accessor::Impl::Impl( + const std::shared_ptr &allocator) + : mAllocator(allocator) {} + +Accessor::Impl::~Impl() { +} + +ResultStatus Accessor::Impl::connect( + const sp &accessor, sp *connection, + ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) { + sp newConnection = new Connection(); + ResultStatus status = ResultStatus::CRITICAL_ERROR; + { + std::lock_guard lock(mBufferPool.mMutex); + if (newConnection) { + ConnectionId id = (int64_t)sPid << 32 | sSeqId; + status = mBufferPool.mObserver.open(id, fmqDescPtr); + if (status == ResultStatus::OK) { + newConnection->initialize(accessor, id); + *connection = newConnection; + *pConnectionId = id; + ++sSeqId; + } + } + mBufferPool.processStatusMessages(); + mBufferPool.cleanUp(); + } + return status; +} + +ResultStatus Accessor::Impl::close(ConnectionId connectionId) { + std::lock_guard lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + mBufferPool.handleClose(connectionId); + mBufferPool.mObserver.close(connectionId); + // Since close# will be called after all works are finished, it is OK to + // evict unused buffers. + mBufferPool.cleanUp(true); + return ResultStatus::OK; +} + +ResultStatus Accessor::Impl::allocate( + ConnectionId connectionId, const std::vector& params, + BufferId *bufferId, const native_handle_t** handle) { + std::unique_lock lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + ResultStatus status = ResultStatus::OK; + if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) { + lock.unlock(); + std::shared_ptr alloc; + size_t allocSize; + status = mAllocator->allocate(params, &alloc, &allocSize); + lock.lock(); + if (status == ResultStatus::OK) { + status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle); + } + ALOGV("create a buffer %d : %u %p", + status == ResultStatus::OK, *bufferId, *handle); + } + if (status == ResultStatus::OK) { + // TODO: handle ownBuffer failure + mBufferPool.handleOwnBuffer(connectionId, *bufferId); + } + mBufferPool.cleanUp(); + return status; +} + +ResultStatus Accessor::Impl::fetch( + ConnectionId connectionId, TransactionId transactionId, + BufferId bufferId, const native_handle_t** handle) { + std::lock_guard lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + auto found = mBufferPool.mTransactions.find(transactionId); + if (found != mBufferPool.mTransactions.end() && + contains(&mBufferPool.mPendingTransactions, + connectionId, transactionId)) { + if (found->second->mSenderValidated && + found->second->mStatus == BufferStatus::TRANSFER_FROM && + found->second->mBufferId == bufferId) { + found->second->mStatus = BufferStatus::TRANSFER_FETCH; + auto bufferIt = mBufferPool.mBuffers.find(bufferId); + if (bufferIt != mBufferPool.mBuffers.end()) { + mBufferPool.mStats.onBufferFetched(); + *handle = bufferIt->second->handle(); + return ResultStatus::OK; + } + } + } + mBufferPool.cleanUp(); + return ResultStatus::CRITICAL_ERROR; +} + +void Accessor::Impl::cleanUp(bool clearCache) { + // transaction timeout, buffer cacheing TTL handling + std::lock_guard lock(mBufferPool.mMutex); + mBufferPool.processStatusMessages(); + mBufferPool.cleanUp(clearCache); +} + +Accessor::Impl::Impl::BufferPool::BufferPool() + : mTimestampUs(getTimestampNow()), + mLastCleanUpUs(mTimestampUs), + mLastLogUs(mTimestampUs), + mSeq(0) {} + + +// Statistics helper +template +int percentage(T base, S total) { + return int(total ? 0.5 + 100. * static_cast(base) / total : 0); +} + +Accessor::Impl::Impl::BufferPool::~BufferPool() { + std::lock_guard lock(mMutex); + ALOGD("Destruction - bufferpool %p " + "cached: %zu/%zuM, %zu/%d%% in use; " + "allocs: %zu, %d%% recycled; " + "transfers: %zu, %d%% unfetced", + this, mStats.mBuffersCached, mStats.mSizeCached >> 20, + mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached), + mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations), + mStats.mTotalTransfers, + percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); +} + +bool Accessor::Impl::BufferPool::handleOwnBuffer( + ConnectionId connectionId, BufferId bufferId) { + + bool added = insert(&mUsingBuffers, connectionId, bufferId); + if (added) { + auto iter = mBuffers.find(bufferId); + iter->second->mOwnerCount++; + } + insert(&mUsingConnections, bufferId, connectionId); + return added; +} + +bool Accessor::Impl::BufferPool::handleReleaseBuffer( + ConnectionId connectionId, BufferId bufferId) { + bool deleted = erase(&mUsingBuffers, connectionId, bufferId); + if (deleted) { + auto iter = mBuffers.find(bufferId); + iter->second->mOwnerCount--; + if (iter->second->mOwnerCount == 0 && + iter->second->mTransactionCount == 0) { + mStats.onBufferUnused(iter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } + } + erase(&mUsingConnections, bufferId, connectionId); + ALOGV("release buffer %u : %d", bufferId, deleted); + return deleted; +} + +bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) { + auto completed = mCompletedTransactions.find( + message.transactionId); + if (completed != mCompletedTransactions.end()) { + // already completed + mCompletedTransactions.erase(completed); + return true; + } + // the buffer should exist and be owned. + auto bufferIter = mBuffers.find(message.bufferId); + if (bufferIter == mBuffers.end() || + !contains(&mUsingBuffers, message.connectionId, message.bufferId)) { + return false; + } + auto found = mTransactions.find(message.transactionId); + if (found != mTransactions.end()) { + // transfer_from was received earlier. + found->second->mSender = message.connectionId; + found->second->mSenderValidated = true; + return true; + } + // TODO: verify there is target connection Id + mStats.onBufferSent(); + mTransactions.insert(std::make_pair( + message.transactionId, + std::make_unique(message, mTimestampUs))); + insert(&mPendingTransactions, message.targetConnectionId, + message.transactionId); + bufferIter->second->mTransactionCount++; + return true; +} + +bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) { + auto found = mTransactions.find(message.transactionId); + if (found == mTransactions.end()) { + // TODO: is it feasible to check ownership here? + mStats.onBufferSent(); + mTransactions.insert(std::make_pair( + message.transactionId, + std::make_unique(message, mTimestampUs))); + insert(&mPendingTransactions, message.connectionId, + message.transactionId); + auto bufferIter = mBuffers.find(message.bufferId); + bufferIter->second->mTransactionCount++; + } else { + if (message.connectionId == found->second->mReceiver) { + found->second->mStatus = BufferStatus::TRANSFER_FROM; + } + } + return true; +} + +bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) { + auto found = mTransactions.find(message.transactionId); + if (found != mTransactions.end()) { + bool deleted = erase(&mPendingTransactions, message.connectionId, + message.transactionId); + if (deleted) { + if (!found->second->mSenderValidated) { + mCompletedTransactions.insert(message.transactionId); + } + auto bufferIter = mBuffers.find(message.bufferId); + if (message.newStatus == BufferStatus::TRANSFER_OK) { + handleOwnBuffer(message.connectionId, message.bufferId); + } + bufferIter->second->mTransactionCount--; + if (bufferIter->second->mOwnerCount == 0 + && bufferIter->second->mTransactionCount == 0) { + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(message.bufferId); + } + mTransactions.erase(found); + } + ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId, + message.bufferId, deleted); + return deleted; + } + ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId, + message.bufferId); + return false; +} + +void Accessor::Impl::BufferPool::processStatusMessages() { + std::vector messages; + mObserver.getBufferStatusChanges(messages); + mTimestampUs = getTimestampNow(); + for (BufferStatusMessage& message: messages) { + bool ret = false; + switch (message.newStatus) { + case BufferStatus::NOT_USED: + ret = handleReleaseBuffer( + message.connectionId, message.bufferId); + break; + case BufferStatus::USED: + // not happening + break; + case BufferStatus::TRANSFER_TO: + ret = handleTransferTo(message); + break; + case BufferStatus::TRANSFER_FROM: + ret = handleTransferFrom(message); + break; + case BufferStatus::TRANSFER_TIMEOUT: + // TODO + break; + case BufferStatus::TRANSFER_LOST: + // TODO + break; + case BufferStatus::TRANSFER_FETCH: + // not happening + break; + case BufferStatus::TRANSFER_OK: + case BufferStatus::TRANSFER_ERROR: + ret = handleTransferResult(message); + break; + } + if (ret == false) { + ALOGW("buffer status message processing failure - message : %d connection : %lld", + message.newStatus, (long long)message.connectionId); + } + } + messages.clear(); +} + +bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { + // Cleaning buffers + auto buffers = mUsingBuffers.find(connectionId); + if (buffers != mUsingBuffers.end()) { + for (const BufferId& bufferId : buffers->second) { + bool deleted = erase(&mUsingConnections, bufferId, connectionId); + if (deleted) { + auto bufferIter = mBuffers.find(bufferId); + bufferIter->second->mOwnerCount--; + if (bufferIter->second->mOwnerCount == 0 && + bufferIter->second->mTransactionCount == 0) { + // TODO: handle freebuffer insert fail + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } + } + } + mUsingBuffers.erase(buffers); + } + + // Cleaning transactions + auto pending = mPendingTransactions.find(connectionId); + if (pending != mPendingTransactions.end()) { + for (const TransactionId& transactionId : pending->second) { + auto iter = mTransactions.find(transactionId); + if (iter != mTransactions.end()) { + if (!iter->second->mSenderValidated) { + mCompletedTransactions.insert(transactionId); + } + BufferId bufferId = iter->second->mBufferId; + auto bufferIter = mBuffers.find(bufferId); + bufferIter->second->mTransactionCount--; + if (bufferIter->second->mOwnerCount == 0 && + bufferIter->second->mTransactionCount == 0) { + // TODO: handle freebuffer insert fail + mStats.onBufferUnused(bufferIter->second->mAllocSize); + mFreeBuffers.insert(bufferId); + } + mTransactions.erase(iter); + } + } + } + return true; +} + +bool Accessor::Impl::BufferPool::getFreeBuffer( + const std::shared_ptr &allocator, + const std::vector ¶ms, BufferId *pId, + const native_handle_t** handle) { + auto bufferIt = mFreeBuffers.begin(); + for (;bufferIt != mFreeBuffers.end(); ++bufferIt) { + BufferId bufferId = *bufferIt; + if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) { + break; + } + } + if (bufferIt != mFreeBuffers.end()) { + BufferId id = *bufferIt; + mFreeBuffers.erase(bufferIt); + mStats.onBufferRecycled(mBuffers[id]->mAllocSize); + *handle = mBuffers[id]->handle(); + *pId = id; + ALOGV("recycle a buffer %u %p", id, *handle); + return true; + } + return false; +} + +ResultStatus Accessor::Impl::BufferPool::addNewBuffer( + const std::shared_ptr &alloc, + const size_t allocSize, + const std::vector ¶ms, + BufferId *pId, + const native_handle_t** handle) { + + BufferId bufferId = mSeq++; + if (mSeq == Connection::SYNC_BUFFERID) { + mSeq = 0; + } + std::unique_ptr buffer = + std::make_unique( + bufferId, alloc, allocSize, params); + if (buffer) { + auto res = mBuffers.insert(std::make_pair( + bufferId, std::move(buffer))); + if (res.second) { + mStats.onBufferAllocated(allocSize); + *handle = alloc->handle(); + *pId = bufferId; + return ResultStatus::OK; + } + } + return ResultStatus::NO_MEMORY; +} + +void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { + if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) { + mLastCleanUpUs = mTimestampUs; + if (mTimestampUs > mLastLogUs + kLogDurationUs) { + mLastLogUs = mTimestampUs; + ALOGD("bufferpool %p : %zu(%zu size) total buffers - " + "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - " + "%zu/%zu (fetch/transfer)", + this, mStats.mBuffersCached, mStats.mSizeCached, + mStats.mBuffersInUse, mStats.mSizeInUse, + mStats.mTotalRecycles, mStats.mTotalAllocations, + mStats.mTotalFetches, mStats.mTotalTransfers); + } + for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { + if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction + && mBuffers.size() < kMinBufferCountForEviction) { + break; + } + auto it = mBuffers.find(*freeIt); + if (it != mBuffers.end() && + it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { + mStats.onBufferEvicted(it->second->mAllocSize); + mBuffers.erase(it); + freeIt = mFreeBuffers.erase(freeIt); + } else { + ++freeIt; + ALOGW("bufferpool inconsistent!"); + } + } + } +} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h new file mode 100644 index 0000000000..404394071c --- /dev/null +++ b/media/bufferpool/2.0/AccessorImpl.h @@ -0,0 +1,300 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSORIMPL_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSORIMPL_H + +#include +#include +#include "Accessor.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +struct InternalBuffer; +struct TransactionStatus; + +/** + * An implementation of a buffer pool accessor(or a buffer pool implementation.) */ +class Accessor::Impl { +public: + Impl(const std::shared_ptr &allocator); + + ~Impl(); + + ResultStatus connect( + const sp &accessor, sp *connection, + ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr); + + ResultStatus close(ConnectionId connectionId); + + ResultStatus allocate(ConnectionId connectionId, + const std::vector& params, + BufferId *bufferId, + const native_handle_t** handle); + + ResultStatus fetch(ConnectionId connectionId, + TransactionId transactionId, + BufferId bufferId, + const native_handle_t** handle); + + void cleanUp(bool clearCache); + +private: + // ConnectionId = pid : (timestamp_created + seqId) + // in order to guarantee uniqueness for each connection + static uint32_t sSeqId; + static int32_t sPid; + + const std::shared_ptr mAllocator; + + /** + * Buffer pool implementation. + * + * Handles buffer status messages. Handles buffer allocation/recycling. + * Handles buffer transfer between buffer pool clients. + */ + struct BufferPool { + private: + std::mutex mMutex; + int64_t mTimestampUs; + int64_t mLastCleanUpUs; + int64_t mLastLogUs; + BufferId mSeq; + BufferStatusObserver mObserver; + + std::map> mUsingBuffers; + std::map> mUsingConnections; + + std::map> mPendingTransactions; + // Transactions completed before TRANSFER_TO message arrival. + // Fetch does not occur for the transactions. + // Only transaction id is kept for the transactions in short duration. + std::set mCompletedTransactions; + // Currently active(pending) transations' status & information. + std::map> + mTransactions; + + std::map> mBuffers; + std::set mFreeBuffers; + + /// Buffer pool statistics which tracks allocation and transfer statistics. + struct Stats { + /// Total size of allocations which are used or available to use. + /// (bytes or pixels) + size_t mSizeCached; + /// # of cached buffers which are used or available to use. + size_t mBuffersCached; + /// Total size of allocations which are currently used. (bytes or pixels) + size_t mSizeInUse; + /// # of currently used buffers + size_t mBuffersInUse; + + /// # of allocations called on bufferpool. (# of fetched from BlockPool) + size_t mTotalAllocations; + /// # of allocations that were served from the cache. + /// (# of allocator alloc prevented) + size_t mTotalRecycles; + /// # of buffer transfers initiated. + size_t mTotalTransfers; + /// # of transfers that had to be fetched. + size_t mTotalFetches; + + Stats() + : mSizeCached(0), mBuffersCached(0), mSizeInUse(0), mBuffersInUse(0), + mTotalAllocations(0), mTotalRecycles(0), mTotalTransfers(0), mTotalFetches(0) {} + + /// A new buffer is allocated on an allocation request. + void onBufferAllocated(size_t allocSize) { + mSizeCached += allocSize; + mBuffersCached++; + + mSizeInUse += allocSize; + mBuffersInUse++; + + mTotalAllocations++; + } + + /// A buffer is evicted and destroyed. + void onBufferEvicted(size_t allocSize) { + mSizeCached -= allocSize; + mBuffersCached--; + } + + /// A buffer is recycled on an allocation request. + void onBufferRecycled(size_t allocSize) { + mSizeInUse += allocSize; + mBuffersInUse++; + + mTotalAllocations++; + mTotalRecycles++; + } + + /// A buffer is available to be recycled. + void onBufferUnused(size_t allocSize) { + mSizeInUse -= allocSize; + mBuffersInUse--; + } + + /// A buffer transfer is initiated. + void onBufferSent() { + mTotalTransfers++; + } + + /// A buffer fetch is invoked by a buffer transfer. + void onBufferFetched() { + mTotalFetches++; + } + } mStats; + + public: + /** Creates a buffer pool. */ + BufferPool(); + + /** Destroys a buffer pool. */ + ~BufferPool(); + + /** + * Processes all pending buffer status messages, and returns the result. + * Each status message is handled by methods with 'handle' prefix. + */ + void processStatusMessages(); + + /** + * Handles a buffer being owned by a connection. + * + * @param connectionId the id of the buffer owning connection. + * @param bufferId the id of the buffer. + * + * @return {@code true} when the buffer is owned, + * {@code false} otherwise. + */ + bool handleOwnBuffer(ConnectionId connectionId, BufferId bufferId); + + /** + * Handles a buffer being released by a connection. + * + * @param connectionId the id of the buffer owning connection. + * @param bufferId the id of the buffer. + * + * @return {@code true} when the buffer ownership is released, + * {@code false} otherwise. + */ + bool handleReleaseBuffer(ConnectionId connectionId, BufferId bufferId); + + /** + * Handles a transfer transaction start message from the sender. + * + * @param message a buffer status message for the transaction. + * + * @result {@code true} when transfer_to message is acknowledged, + * {@code false} otherwise. + */ + bool handleTransferTo(const BufferStatusMessage &message); + + /** + * Handles a transfer transaction being acked by the receiver. + * + * @param message a buffer status message for the transaction. + * + * @result {@code true} when transfer_from message is acknowledged, + * {@code false} otherwise. + */ + bool handleTransferFrom(const BufferStatusMessage &message); + + /** + * Handles a transfer transaction result message from the receiver. + * + * @param message a buffer status message for the transaction. + * + * @result {@code true} when the exisitng transaction is finished, + * {@code false} otherwise. + */ + bool handleTransferResult(const BufferStatusMessage &message); + + /** + * Handles a connection being closed, and returns the result. All the + * buffers and transactions owned by the connection will be cleaned up. + * The related FMQ will be cleaned up too. + * + * @param connectionId the id of the connection. + * + * @result {@code true} when the connection existed, + * {@code false} otherwise. + */ + bool handleClose(ConnectionId connectionId); + + /** + * Recycles a existing free buffer if it is possible. + * + * @param allocator the buffer allocator + * @param params the allocation parameters. + * @param pId the id of the recycled buffer. + * @param handle the native handle of the recycled buffer. + * + * @return {@code true} when a buffer is recycled, {@code false} + * otherwise. + */ + bool getFreeBuffer( + const std::shared_ptr &allocator, + const std::vector ¶ms, + BufferId *pId, const native_handle_t **handle); + + /** + * Adds a newly allocated buffer to bufferpool. + * + * @param alloc the newly allocated buffer. + * @param allocSize the size of the newly allocated buffer. + * @param params the allocation parameters. + * @param pId the buffer id for the newly allocated buffer. + * @param handle the native handle for the newly allocated buffer. + * + * @return OK when an allocation is successfully allocated. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus addNewBuffer( + const std::shared_ptr &alloc, + const size_t allocSize, + const std::vector ¶ms, + BufferId *pId, + const native_handle_t **handle); + + /** + * Processes pending buffer status messages and performs periodic cache + * cleaning. + * + * @param clearCache if clearCache is true, it frees all buffers + * waiting to be recycled. + */ + void cleanUp(bool clearCache = false); + + friend class Accessor::Impl; + } mBufferPool; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace ufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_ACCESSORIMPL_H diff --git a/media/bufferpool/2.0/Android.bp b/media/bufferpool/2.0/Android.bp new file mode 100644 index 0000000000..413125a571 --- /dev/null +++ b/media/bufferpool/2.0/Android.bp @@ -0,0 +1,29 @@ +cc_library { + name: "libstagefright_bufferpool@2.0", + vendor_available: true, + srcs: [ + "Accessor.cpp", + "AccessorImpl.cpp", + "BufferPoolClient.cpp", + "BufferStatus.cpp", + "ClientManager.cpp", + "Connection.cpp", + ], + export_include_dirs: [ + "include", + ], + shared_libs: [ + "libcutils", + "libfmq", + "libhidlbase", + "libhwbinder", + "libhidltransport", + "liblog", + "libutils", + "android.hardware.media.bufferpool@2.0", + ], + export_shared_lib_headers: [ + "libfmq", + "android.hardware.media.bufferpool@2.0", + ], +} diff --git a/media/bufferpool/2.0/BufferPoolClient.cpp b/media/bufferpool/2.0/BufferPoolClient.cpp new file mode 100644 index 0000000000..10158c375c --- /dev/null +++ b/media/bufferpool/2.0/BufferPoolClient.cpp @@ -0,0 +1,708 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "BufferPoolClient" +//#define LOG_NDEBUG 0 + +#include +#include +#include "BufferPoolClient.h" +#include "Connection.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +static constexpr int64_t kReceiveTimeoutUs = 1000000; // 100ms +static constexpr int kPostMaxRetry = 3; +static constexpr int kCacheTtlUs = 1000000; // TODO: tune + +class BufferPoolClient::Impl + : public std::enable_shared_from_this { +public: + explicit Impl(const sp &accessor); + + explicit Impl(const sp &accessor); + + bool isValid() { + return mValid; + } + + bool isLocal() { + return mValid && mLocal; + } + + ConnectionId getConnectionId() { + return mConnectionId; + } + + sp &getAccessor() { + return mAccessor; + } + + bool isActive(int64_t *lastTransactionUs, bool clearCache); + + ResultStatus allocate(const std::vector ¶ms, + native_handle_t **handle, + std::shared_ptr *buffer); + + ResultStatus receive( + TransactionId transactionId, BufferId bufferId, + int64_t timestampUs, + native_handle_t **handle, std::shared_ptr *buffer); + + void postBufferRelease(BufferId bufferId); + + bool postSend( + BufferId bufferId, ConnectionId receiver, + TransactionId *transactionId, int64_t *timestampUs); +private: + + bool postReceive( + BufferId bufferId, TransactionId transactionId, + int64_t timestampUs); + + bool postReceiveResult( + BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync); + + void trySyncFromRemote(); + + bool syncReleased(); + + void evictCaches(bool clearCache = false); + + ResultStatus allocateBufferHandle( + const std::vector& params, BufferId *bufferId, + native_handle_t **handle); + + ResultStatus fetchBufferHandle( + TransactionId transactionId, BufferId bufferId, + native_handle_t **handle); + + struct BlockPoolDataDtor; + struct ClientBuffer; + + bool mLocal; + bool mValid; + sp mAccessor; + sp mLocalConnection; + sp mRemoteConnection; + uint32_t mSeqId; + ConnectionId mConnectionId; + int64_t mLastEvictCacheUs; + + // CachedBuffers + struct BufferCache { + std::mutex mLock; + bool mCreating; + std::condition_variable mCreateCv; + std::map> mBuffers; + int mActive; + int64_t mLastChangeUs; + + BufferCache() : mCreating(false), mActive(0), mLastChangeUs(getTimestampNow()) {} + + void incActive_l() { + ++mActive; + mLastChangeUs = getTimestampNow(); + } + + void decActive_l() { + --mActive; + mLastChangeUs = getTimestampNow(); + } + } mCache; + + // FMQ - release notifier + struct { + std::mutex mLock; + // TODO: use only one list?(using one list may dealy sending messages?) + std::list mReleasingIds; + std::list mReleasedIds; + std::unique_ptr mStatusChannel; + } mReleasing; + + // This lock is held during synchronization from remote side. + // In order to minimize remote calls and locking durtaion, this lock is held + // by best effort approach using try_lock(). + std::mutex mRemoteSyncLock; +}; + +struct BufferPoolClient::Impl::BlockPoolDataDtor { + BlockPoolDataDtor(const std::shared_ptr &impl) + : mImpl(impl) {} + + void operator()(BufferPoolData *buffer) { + BufferId id = buffer->mId; + delete buffer; + + auto impl = mImpl.lock(); + if (impl && impl->isValid()) { + impl->postBufferRelease(id); + } + } + const std::weak_ptr mImpl; +}; + +struct BufferPoolClient::Impl::ClientBuffer { +private: + bool mInvalidated; // TODO: implement + int64_t mExpireUs; + bool mHasCache; + ConnectionId mConnectionId; + BufferId mId; + native_handle_t *mHandle; + std::weak_ptr mCache; + + void updateExpire() { + mExpireUs = getTimestampNow() + kCacheTtlUs; + } + +public: + ClientBuffer( + ConnectionId connectionId, BufferId id, native_handle_t *handle) + : mInvalidated(false), mHasCache(false), + mConnectionId(connectionId), mId(id), mHandle(handle) { + (void)mInvalidated; + mExpireUs = getTimestampNow() + kCacheTtlUs; + } + + ~ClientBuffer() { + if (mHandle) { + native_handle_close(mHandle); + native_handle_delete(mHandle); + } + } + + bool expire() const { + int64_t now = getTimestampNow(); + return now >= mExpireUs; + } + + bool hasCache() const { + return mHasCache; + } + + std::shared_ptr fetchCache(native_handle_t **pHandle) { + if (mHasCache) { + std::shared_ptr cache = mCache.lock(); + if (cache) { + *pHandle = mHandle; + } + return cache; + } + return nullptr; + } + + std::shared_ptr createCache( + const std::shared_ptr &impl, + native_handle_t **pHandle) { + if (!mHasCache) { + // Allocates a raw ptr in order to avoid sending #postBufferRelease + // from deleter, in case of native_handle_clone failure. + BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId); + if (ptr) { + std::shared_ptr cache(ptr, BlockPoolDataDtor(impl)); + if (cache) { + mCache = cache; + mHasCache = true; + *pHandle = mHandle; + return cache; + } + } + if (ptr) { + delete ptr; + } + } + return nullptr; + } + + bool onCacheRelease() { + if (mHasCache) { + // TODO: verify mCache is not valid; + updateExpire(); + mHasCache = false; + return true; + } + return false; + } +}; + +BufferPoolClient::Impl::Impl(const sp &accessor) + : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0), + mLastEvictCacheUs(getTimestampNow()) { + const QueueDescriptor *fmqDesc; + ResultStatus status = accessor->connect( + &mLocalConnection, &mConnectionId, &fmqDesc, true); + if (status == ResultStatus::OK) { + mReleasing.mStatusChannel = + std::make_unique(*fmqDesc); + mValid = mReleasing.mStatusChannel && + mReleasing.mStatusChannel->isValid(); + } +} + +BufferPoolClient::Impl::Impl(const sp &accessor) + : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0), + mLastEvictCacheUs(getTimestampNow()) { + bool valid = false; + sp& outConnection = mRemoteConnection; + ConnectionId& id = mConnectionId; + std::unique_ptr& outChannel = + mReleasing.mStatusChannel; + Return transResult = accessor->connect( + [&valid, &outConnection, &id, &outChannel] + (ResultStatus status, sp connection, + ConnectionId connectionId, const QueueDescriptor& desc) { + if (status == ResultStatus::OK) { + outConnection = connection; + id = connectionId; + outChannel = std::make_unique(desc); + if (outChannel && outChannel->isValid()) { + valid = true; + } + } + }); + mValid = transResult.isOk() && valid; +} + +bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCache) { + bool active = false; + { + std::lock_guard lock(mCache.mLock); + syncReleased(); + evictCaches(clearCache); + *lastTransactionUs = mCache.mLastChangeUs; + active = mCache.mActive > 0; + } + if (mValid && mLocal && mLocalConnection) { + mLocalConnection->cleanUp(clearCache); + return true; + } + return active; +} + +ResultStatus BufferPoolClient::Impl::allocate( + const std::vector ¶ms, + native_handle_t **pHandle, + std::shared_ptr *buffer) { + if (!mLocal || !mLocalConnection || !mValid) { + return ResultStatus::CRITICAL_ERROR; + } + BufferId bufferId; + native_handle_t *handle = nullptr; + buffer->reset(); + ResultStatus status = allocateBufferHandle(params, &bufferId, &handle); + if (status == ResultStatus::OK) { + if (handle) { + std::unique_lock lock(mCache.mLock); + syncReleased(); + evictCaches(); + auto cacheIt = mCache.mBuffers.find(bufferId); + if (cacheIt != mCache.mBuffers.end()) { + // TODO: verify it is recycled. (not having active ref) + mCache.mBuffers.erase(cacheIt); + } + auto clientBuffer = std::make_unique( + mConnectionId, bufferId, handle); + if (clientBuffer) { + auto result = mCache.mBuffers.insert(std::make_pair( + bufferId, std::move(clientBuffer))); + if (result.second) { + *buffer = result.first->second->createCache( + shared_from_this(), pHandle); + if (*buffer) { + mCache.incActive_l(); + } + } + } + } + if (!*buffer) { + ALOGV("client cache creation failure %d: %lld", + handle != nullptr, (long long)mConnectionId); + status = ResultStatus::NO_MEMORY; + postBufferRelease(bufferId); + } + } + return status; +} + +ResultStatus BufferPoolClient::Impl::receive( + TransactionId transactionId, BufferId bufferId, int64_t timestampUs, + native_handle_t **pHandle, + std::shared_ptr *buffer) { + if (!mValid) { + return ResultStatus::CRITICAL_ERROR; + } + if (timestampUs != 0) { + timestampUs += kReceiveTimeoutUs; + } + if (!postReceive(bufferId, transactionId, timestampUs)) { + return ResultStatus::CRITICAL_ERROR; + } + ResultStatus status = ResultStatus::CRITICAL_ERROR; + buffer->reset(); + while(1) { + std::unique_lock lock(mCache.mLock); + syncReleased(); + evictCaches(); + auto cacheIt = mCache.mBuffers.find(bufferId); + if (cacheIt != mCache.mBuffers.end()) { + if (cacheIt->second->hasCache()) { + *buffer = cacheIt->second->fetchCache(pHandle); + if (!*buffer) { + // check transfer time_out + lock.unlock(); + std::this_thread::yield(); + continue; + } + ALOGV("client receive from reference %lld", (long long)mConnectionId); + break; + } else { + *buffer = cacheIt->second->createCache(shared_from_this(), pHandle); + if (*buffer) { + mCache.incActive_l(); + } + ALOGV("client receive from cache %lld", (long long)mConnectionId); + break; + } + } else { + if (!mCache.mCreating) { + mCache.mCreating = true; + lock.unlock(); + native_handle_t* handle = nullptr; + status = fetchBufferHandle(transactionId, bufferId, &handle); + lock.lock(); + if (status == ResultStatus::OK) { + if (handle) { + auto clientBuffer = std::make_unique( + mConnectionId, bufferId, handle); + if (clientBuffer) { + auto result = mCache.mBuffers.insert( + std::make_pair(bufferId, std::move( + clientBuffer))); + if (result.second) { + *buffer = result.first->second->createCache( + shared_from_this(), pHandle); + if (*buffer) { + mCache.incActive_l(); + } + } + } + } + if (!*buffer) { + status = ResultStatus::NO_MEMORY; + } + } + mCache.mCreating = false; + lock.unlock(); + mCache.mCreateCv.notify_all(); + break; + } + mCache.mCreateCv.wait(lock); + } + } + bool needsSync = false; + bool posted = postReceiveResult(bufferId, transactionId, + *buffer ? true : false, &needsSync); + ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId, + *buffer ? "ok" : "fail", posted); + if (mValid && mLocal && mLocalConnection) { + mLocalConnection->cleanUp(false); + } + if (needsSync && mRemoteConnection) { + trySyncFromRemote(); + } + if (*buffer) { + if (!posted) { + buffer->reset(); + return ResultStatus::CRITICAL_ERROR; + } + return ResultStatus::OK; + } + return status; +} + + +void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) { + std::lock_guard lock(mReleasing.mLock); + mReleasing.mReleasingIds.push_back(bufferId); + mReleasing.mStatusChannel->postBufferRelease( + mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds); +} + +// TODO: revise ad-hoc posting data structure +bool BufferPoolClient::Impl::postSend( + BufferId bufferId, ConnectionId receiver, + TransactionId *transactionId, int64_t *timestampUs) { + bool ret = false; + bool needsSync = false; + { + std::lock_guard lock(mReleasing.mLock); + *timestampUs = getTimestampNow(); + *transactionId = (mConnectionId << 32) | mSeqId++; + // TODO: retry, add timeout, target? + ret = mReleasing.mStatusChannel->postBufferStatusMessage( + *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId, + receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds); + needsSync = !mLocal && mReleasing.mStatusChannel->needsSync(); + } + if (mValid && mLocal && mLocalConnection) { + mLocalConnection->cleanUp(false); + } + if (needsSync && mRemoteConnection) { + trySyncFromRemote(); + } + return ret; +} + +bool BufferPoolClient::Impl::postReceive( + BufferId bufferId, TransactionId transactionId, int64_t timestampUs) { + for (int i = 0; i < kPostMaxRetry; ++i) { + std::unique_lock lock(mReleasing.mLock); + int64_t now = getTimestampNow(); + if (timestampUs == 0 || now < timestampUs) { + bool result = mReleasing.mStatusChannel->postBufferStatusMessage( + transactionId, bufferId, BufferStatus::TRANSFER_FROM, + mConnectionId, -1, mReleasing.mReleasingIds, + mReleasing.mReleasedIds); + if (result) { + return true; + } + lock.unlock(); + std::this_thread::yield(); + } else { + mReleasing.mStatusChannel->postBufferStatusMessage( + transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT, + mConnectionId, -1, mReleasing.mReleasingIds, + mReleasing.mReleasedIds); + return false; + } + } + return false; +} + +bool BufferPoolClient::Impl::postReceiveResult( + BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) { + std::lock_guard lock(mReleasing.mLock); + // TODO: retry, add timeout + bool ret = mReleasing.mStatusChannel->postBufferStatusMessage( + transactionId, bufferId, + result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR, + mConnectionId, -1, mReleasing.mReleasingIds, + mReleasing.mReleasedIds); + *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync(); + return ret; +} + +void BufferPoolClient::Impl::trySyncFromRemote() { + if (mRemoteSyncLock.try_lock()) { + bool needsSync = false; + { + std::lock_guard lock(mReleasing.mLock); + needsSync = mReleasing.mStatusChannel->needsSync(); + } + if (needsSync) { + TransactionId transactionId = (mConnectionId << 32); + BufferId bufferId = Connection::SYNC_BUFFERID; + Return transResult = mRemoteConnection->fetch( + transactionId, bufferId, + [] + (ResultStatus outStatus, Buffer outBuffer) { + (void) outStatus; + (void) outBuffer; + }); + } + mRemoteSyncLock.unlock(); + } +} + +// should have mCache.mLock +bool BufferPoolClient::Impl::syncReleased() { + std::lock_guard lock(mReleasing.mLock); + if (mReleasing.mReleasingIds.size() > 0) { + mReleasing.mStatusChannel->postBufferRelease( + mConnectionId, mReleasing.mReleasingIds, + mReleasing.mReleasedIds); + } + if (mReleasing.mReleasedIds.size() > 0) { + for (BufferId& id: mReleasing.mReleasedIds) { + ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id); + auto found = mCache.mBuffers.find(id); + if (found != mCache.mBuffers.end()) { + if (found->second->onCacheRelease()) { + mCache.decActive_l(); + } else { + // should not happen! + ALOGW("client %lld cache release status inconsitent!", + (long long)mConnectionId); + } + } else { + // should not happen! + ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId); + } + } + mReleasing.mReleasedIds.clear(); + return true; + } + return false; +} + +// should have mCache.mLock +void BufferPoolClient::Impl::evictCaches(bool clearCache) { + int64_t now = getTimestampNow(); + if (now >= mLastEvictCacheUs + kCacheTtlUs || clearCache) { + size_t evicted = 0; + for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) { + if (!it->second->hasCache() && (it->second->expire() || clearCache)) { + it = mCache.mBuffers.erase(it); + ++evicted; + } else { + ++it; + } + } + ALOGV("cache count %lld : total %zu, active %d, evicted %zu", + (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted); + mLastEvictCacheUs = now; + } +} + +ResultStatus BufferPoolClient::Impl::allocateBufferHandle( + const std::vector& params, BufferId *bufferId, + native_handle_t** handle) { + if (mLocalConnection) { + const native_handle_t* allocHandle = nullptr; + ResultStatus status = mLocalConnection->allocate( + params, bufferId, &allocHandle); + if (status == ResultStatus::OK) { + *handle = native_handle_clone(allocHandle); + } + ALOGV("client allocate result %lld %d : %u clone %p", + (long long)mConnectionId, status == ResultStatus::OK, + *handle ? *bufferId : 0 , *handle); + return status; + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus BufferPoolClient::Impl::fetchBufferHandle( + TransactionId transactionId, BufferId bufferId, + native_handle_t **handle) { + sp connection; + if (mLocal) { + connection = mLocalConnection; + } else { + connection = mRemoteConnection; + } + ResultStatus status; + Return transResult = connection->fetch( + transactionId, bufferId, + [&status, &handle] + (ResultStatus outStatus, Buffer outBuffer) { + status = outStatus; + if (status == ResultStatus::OK) { + *handle = native_handle_clone( + outBuffer.buffer.getNativeHandle()); + } + }); + return transResult.isOk() ? status : ResultStatus::CRITICAL_ERROR; +} + + +BufferPoolClient::BufferPoolClient(const sp &accessor) { + mImpl = std::make_shared(accessor); +} + +BufferPoolClient::BufferPoolClient(const sp &accessor) { + mImpl = std::make_shared(accessor); +} + +BufferPoolClient::~BufferPoolClient() { + // TODO: how to handle orphaned buffers? +} + +bool BufferPoolClient::isValid() { + return mImpl && mImpl->isValid(); +} + +bool BufferPoolClient::isLocal() { + return mImpl && mImpl->isLocal(); +} + +bool BufferPoolClient::isActive(int64_t *lastTransactionUs, bool clearCache) { + if (!isValid()) { + *lastTransactionUs = 0; + return false; + } + return mImpl->isActive(lastTransactionUs, clearCache); +} + +ConnectionId BufferPoolClient::getConnectionId() { + if (isValid()) { + return mImpl->getConnectionId(); + } + return -1; +} + +ResultStatus BufferPoolClient::getAccessor(sp *accessor) { + if (isValid()) { + *accessor = mImpl->getAccessor(); + return ResultStatus::OK; + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus BufferPoolClient::allocate( + const std::vector ¶ms, + native_handle_t **handle, + std::shared_ptr *buffer) { + if (isValid()) { + return mImpl->allocate(params, handle, buffer); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus BufferPoolClient::receive( + TransactionId transactionId, BufferId bufferId, int64_t timestampUs, + native_handle_t **handle, std::shared_ptr *buffer) { + if (isValid()) { + return mImpl->receive(transactionId, bufferId, timestampUs, handle, buffer); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus BufferPoolClient::postSend( + ConnectionId receiverId, + const std::shared_ptr &buffer, + TransactionId *transactionId, + int64_t *timestampUs) { + if (isValid()) { + bool result = mImpl->postSend( + buffer->mId, receiverId, transactionId, timestampUs); + return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR; + } + return ResultStatus::CRITICAL_ERROR; +} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/BufferPoolClient.h b/media/bufferpool/2.0/BufferPoolClient.h new file mode 100644 index 0000000000..00d6839d31 --- /dev/null +++ b/media/bufferpool/2.0/BufferPoolClient.h @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLCLIENT_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLCLIENT_H + +#include +#include +#include +#include +#include +#include "Accessor.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +using ::android::hardware::media::bufferpool::V2_0::IAccessor; +using ::android::hardware::media::bufferpool::V2_0::IConnection; +using ::android::hardware::media::bufferpool::V2_0::ResultStatus; +using ::android::sp; + +/** + * A buffer pool client for a buffer pool. For a specific buffer pool, at most + * one buffer pool client exists per process. This class will not be exposed + * outside. A buffer pool client will be used via ClientManager. + */ +class BufferPoolClient { +public: + /** + * Creates a buffer pool client from a local buffer pool + * (via ClientManager#create). + */ + explicit BufferPoolClient(const sp &accessor); + + /** + * Creates a buffer pool client from a remote buffer pool + * (via ClientManager#registerSender). + * Note: A buffer pool client created with remote buffer pool cannot + * allocate a buffer. + */ + explicit BufferPoolClient(const sp &accessor); + + /** Destructs a buffer pool client. */ + ~BufferPoolClient(); + +private: + bool isValid(); + + bool isLocal(); + + bool isActive(int64_t *lastTransactionUs, bool clearCache); + + ConnectionId getConnectionId(); + + ResultStatus getAccessor(sp *accessor); + + ResultStatus allocate(const std::vector ¶ms, + native_handle_t **handle, + std::shared_ptr *buffer); + + ResultStatus receive(TransactionId transactionId, + BufferId bufferId, + int64_t timestampUs, + native_handle_t **handle, + std::shared_ptr *buffer); + + ResultStatus postSend(ConnectionId receiver, + const std::shared_ptr &buffer, + TransactionId *transactionId, + int64_t *timestampUs); + + class Impl; + std::shared_ptr mImpl; + + friend struct ClientManager; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLCLIENT_H diff --git a/media/bufferpool/2.0/BufferStatus.cpp b/media/bufferpool/2.0/BufferStatus.cpp new file mode 100644 index 0000000000..3379e21502 --- /dev/null +++ b/media/bufferpool/2.0/BufferStatus.cpp @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "BufferPoolStatus" +//#define LOG_NDEBUG 0 + +#include +#include "BufferStatus.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +int64_t getTimestampNow() { + int64_t stamp; + struct timespec ts; + // TODO: CLOCK_MONOTONIC_COARSE? + clock_gettime(CLOCK_MONOTONIC, &ts); + stamp = ts.tv_nsec / 1000; + stamp += (ts.tv_sec * 1000000LL); + return stamp; +} + +static constexpr int kNumElementsInQueue = 1024*16; +static constexpr int kMinElementsToSyncInQueue = 128; + +ResultStatus BufferStatusObserver::open( + ConnectionId id, const QueueDescriptor** fmqDescPtr) { + if (mBufferStatusQueues.find(id) != mBufferStatusQueues.end()) { + // TODO: id collision log? + return ResultStatus::CRITICAL_ERROR; + } + std::unique_ptr queue = + std::make_unique(kNumElementsInQueue); + if (!queue || queue->isValid() == false) { + *fmqDescPtr = nullptr; + return ResultStatus::NO_MEMORY; + } else { + *fmqDescPtr = queue->getDesc(); + } + auto result = mBufferStatusQueues.insert( + std::make_pair(id, std::move(queue))); + if (!result.second) { + *fmqDescPtr = nullptr; + return ResultStatus::NO_MEMORY; + } + return ResultStatus::OK; +} + +ResultStatus BufferStatusObserver::close(ConnectionId id) { + if (mBufferStatusQueues.find(id) == mBufferStatusQueues.end()) { + return ResultStatus::CRITICAL_ERROR; + } + mBufferStatusQueues.erase(id); + return ResultStatus::OK; +} + +void BufferStatusObserver::getBufferStatusChanges(std::vector &messages) { + for (auto it = mBufferStatusQueues.begin(); it != mBufferStatusQueues.end(); ++it) { + BufferStatusMessage message; + size_t avail = it->second->availableToRead(); + while (avail > 0) { + if (!it->second->read(&message, 1)) { + // Since avaliable # of reads are already confirmed, + // this should not happen. + // TODO: error handling (spurious client?) + ALOGW("FMQ message cannot be read from %lld", (long long)it->first); + return; + } + message.connectionId = it->first; + messages.push_back(message); + --avail; + } + } +} + +BufferStatusChannel::BufferStatusChannel( + const QueueDescriptor &fmqDesc) { + std::unique_ptr queue = + std::make_unique(fmqDesc); + if (!queue || queue->isValid() == false) { + mValid = false; + return; + } + mValid = true; + mBufferStatusQueue = std::move(queue); +} + +bool BufferStatusChannel::isValid() { + return mValid; +} + +bool BufferStatusChannel::needsSync() { + if (mValid) { + size_t avail = mBufferStatusQueue->availableToWrite(); + return avail + kMinElementsToSyncInQueue < kNumElementsInQueue; + } + return false; +} + +void BufferStatusChannel::postBufferRelease( + ConnectionId connectionId, + std::list &pending, std::list &posted) { + if (mValid && pending.size() > 0) { + size_t avail = mBufferStatusQueue->availableToWrite(); + avail = std::min(avail, pending.size()); + BufferStatusMessage message; + for (size_t i = 0 ; i < avail; ++i) { + BufferId id = pending.front(); + message.newStatus = BufferStatus::NOT_USED; + message.bufferId = id; + message.connectionId = connectionId; + if (!mBufferStatusQueue->write(&message, 1)) { + // Since avaliable # of writes are already confirmed, + // this should not happen. + // TODO: error handing? + ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); + return; + } + pending.pop_front(); + posted.push_back(id); + } + } +} + +bool BufferStatusChannel::postBufferStatusMessage( + TransactionId transactionId, BufferId bufferId, + BufferStatus status, ConnectionId connectionId, ConnectionId targetId, + std::list &pending, std::list &posted) { + if (mValid) { + size_t avail = mBufferStatusQueue->availableToWrite(); + size_t numPending = pending.size(); + if (avail >= numPending + 1) { + BufferStatusMessage release, message; + for (size_t i = 0; i < numPending; ++i) { + BufferId id = pending.front(); + release.newStatus = BufferStatus::NOT_USED; + release.bufferId = id; + release.connectionId = connectionId; + if (!mBufferStatusQueue->write(&release, 1)) { + // Since avaliable # of writes are already confirmed, + // this should not happen. + // TODO: error handling? + ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); + return false; + } + pending.pop_front(); + posted.push_back(id); + } + message.transactionId = transactionId; + message.bufferId = bufferId; + message.newStatus = status; + message.connectionId = connectionId; + message.targetConnectionId = targetId; + // TODO : timesatamp + message.timestampUs = 0; + if (!mBufferStatusQueue->write(&message, 1)) { + // Since avaliable # of writes are already confirmed, + // this should not happen. + ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId); + return false; + } + return true; + } + } + return false; +} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + diff --git a/media/bufferpool/2.0/BufferStatus.h b/media/bufferpool/2.0/BufferStatus.h new file mode 100644 index 0000000000..a74f0a5d32 --- /dev/null +++ b/media/bufferpool/2.0/BufferStatus.h @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERSTATUS_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERSTATUS_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +/** Returns monotonic timestamp in Us since fixed point in time. */ +int64_t getTimestampNow(); + +/** + * A collection of FMQ for a buffer pool. buffer ownership/status change + * messages are sent via the FMQs from the clients. + */ +class BufferStatusObserver { +private: + std::map> + mBufferStatusQueues; + +public: + /** Creates an FMQ for the specified connection(client). + * + * @param connectionId connection Id of the specified client. + * @param fmqDescPtr double ptr of created FMQ's descriptor. + * + * @return OK if FMQ is created successfully. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus open(ConnectionId id, const QueueDescriptor** fmqDescPtr); + + /** Closes an FMQ for the specified connection(client). + * + * @param connectionId connection Id of the specified client. + * + * @return OK if the specified connection is closed successfully. + * CRITICAL_ERROR otherwise. + */ + ResultStatus close(ConnectionId id); + + /** Retrieves all pending FMQ buffer status messages from clients. + * + * @param messages retrieved pending messages. + */ + void getBufferStatusChanges(std::vector &messages); +}; + +/** + * An FMQ for a buffer pool client. Buffer ownership/status change messages + * are sent via the fmq to the buffer pool. + */ +class BufferStatusChannel { +private: + bool mValid; + std::unique_ptr mBufferStatusQueue; + +public: + /** + * Connects to an FMQ from a descriptor of the created FMQ. + * + * @param fmqDesc Descriptor of the created FMQ. + */ + BufferStatusChannel(const QueueDescriptor &fmqDesc); + + /** Returns whether the FMQ is connected successfully. */ + bool isValid(); + + /** Returns whether the FMQ needs to be synced from the buffer pool */ + bool needsSync(); + + /** + * Posts a buffer release message to the buffer pool. + * + * @param connectionId connection Id of the client. + * @param pending currently pending buffer release messages. + * @param posted posted buffer release messages. + */ + void postBufferRelease( + ConnectionId connectionId, + std::list &pending, std::list &posted); + + /** + * Posts a buffer status message regarding the specified buffer + * transfer transaction. + * + * @param transactionId Id of the specified transaction. + * @param bufferId buffer Id of the specified transaction. + * @param status new status of the buffer. + * @param connectionId connection Id of the client. + * @param targetId connection Id of the receiver(only when the sender + * posts a status message). + * @param pending currently pending buffer release messages. + * @param posted posted buffer release messages. + * + * @return {@code true} when the specified message is posted, + * {@code false} otherwise. + */ + bool postBufferStatusMessage( + TransactionId transactionId, + BufferId bufferId, + BufferStatus status, + ConnectionId connectionId, + ConnectionId targetId, + std::list &pending, std::list &posted); +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERSTATUS_H diff --git a/media/bufferpool/2.0/ClientManager.cpp b/media/bufferpool/2.0/ClientManager.cpp new file mode 100644 index 0000000000..eeaf093a40 --- /dev/null +++ b/media/bufferpool/2.0/ClientManager.cpp @@ -0,0 +1,504 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define LOG_TAG "BufferPoolManager" +//#define LOG_NDEBUG 0 + +#include +#include +#include +#include +#include +#include +#include "BufferPoolClient.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec +static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune +static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune + +/** + * The holder of the cookie of remote IClientManager. + * The cookie is process locally unique for each IClientManager. + * (The cookie is used to notify death of clients to bufferpool process.) + */ +class ClientManagerCookieHolder { +public: + /** + * Creates a cookie holder for remote IClientManager(s). + */ + ClientManagerCookieHolder(); + + /** + * Gets a cookie for a remote IClientManager. + * + * @param manager the specified remote IClientManager. + * @param added true when the specified remote IClientManager is added + * newly, false otherwise. + * + * @return the process locally unique cookie for the specified IClientManager. + */ + uint64_t getCookie(const sp &manager, bool *added); + +private: + uint64_t mSeqId; + std::mutex mLock; + std::list, uint64_t>> mManagers; +}; + +ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){} + +uint64_t ClientManagerCookieHolder::getCookie( + const sp &manager, + bool *added) { + std::lock_guard lock(mLock); + for (auto it = mManagers.begin(); it != mManagers.end();) { + const sp key = it->first.promote(); + if (key) { + if (interfacesEqual(key, manager)) { + *added = false; + return it->second; + } + ++it; + } else { + it = mManagers.erase(it); + } + } + uint64_t id = mSeqId++; + *added = true; + mManagers.push_back(std::make_pair(manager, id)); + return id; +} + +class ClientManager::Impl { +public: + Impl(); + + // BnRegisterSender + ResultStatus registerSender(const sp &accessor, + ConnectionId *pConnectionId); + + // BpRegisterSender + ResultStatus registerSender(const sp &receiver, + ConnectionId senderId, + ConnectionId *receiverId); + + ResultStatus create(const std::shared_ptr &allocator, + ConnectionId *pConnectionId); + + ResultStatus close(ConnectionId connectionId); + + ResultStatus allocate(ConnectionId connectionId, + const std::vector ¶ms, + native_handle_t **handle, + std::shared_ptr *buffer); + + ResultStatus receive(ConnectionId connectionId, + TransactionId transactionId, + BufferId bufferId, + int64_t timestampUs, + native_handle_t **handle, + std::shared_ptr *buffer); + + ResultStatus postSend(ConnectionId receiverId, + const std::shared_ptr &buffer, + TransactionId *transactionId, + int64_t *timestampUs); + + ResultStatus getAccessor(ConnectionId connectionId, + sp *accessor); + + void cleanUp(bool clearCache = false); + +private: + // In order to prevent deadlock between multiple locks, + // always lock ClientCache.lock before locking ActiveClients.lock. + struct ClientCache { + // This lock is held for brief duration. + // Blocking operation is not performed while holding the lock. + std::mutex mMutex; + std::list, const std::weak_ptr>> + mClients; + std::condition_variable mConnectCv; + bool mConnecting; + int64_t mLastCleanUpUs; + + ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {} + } mCache; + + // Active clients which can be retrieved via ConnectionId + struct ActiveClients { + // This lock is held for brief duration. + // Blocking operation is not performed holding the lock. + std::mutex mMutex; + std::map> + mClients; + } mActive; + + ClientManagerCookieHolder mRemoteClientCookies; +}; + +ClientManager::Impl::Impl() {} + +ResultStatus ClientManager::Impl::registerSender( + const sp &accessor, ConnectionId *pConnectionId) { + cleanUp(); + int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs; + do { + std::unique_lock lock(mCache.mMutex); + for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) { + sp sAccessor = it->first.promote(); + if (sAccessor && interfacesEqual(sAccessor, accessor)) { + const std::shared_ptr client = it->second.lock(); + if (client) { + std::lock_guard lock(mActive.mMutex); + *pConnectionId = client->getConnectionId(); + if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) { + ALOGV("register existing connection %lld", (long long)*pConnectionId); + return ResultStatus::ALREADY_EXISTS; + } + } + mCache.mClients.erase(it); + break; + } + } + if (!mCache.mConnecting) { + mCache.mConnecting = true; + lock.unlock(); + ResultStatus result = ResultStatus::OK; + const std::shared_ptr client = + std::make_shared(accessor); + lock.lock(); + if (!client) { + result = ResultStatus::NO_MEMORY; + } else if (!client->isValid()) { + result = ResultStatus::CRITICAL_ERROR; + } + if (result == ResultStatus::OK) { + // TODO: handle insert fail. (malloc fail) + const std::weak_ptr wclient = client; + mCache.mClients.push_back(std::make_pair(accessor, wclient)); + ConnectionId conId = client->getConnectionId(); + { + std::lock_guard lock(mActive.mMutex); + mActive.mClients.insert(std::make_pair(conId, client)); + } + *pConnectionId = conId; + ALOGV("register new connection %lld", (long long)*pConnectionId); + } + mCache.mConnecting = false; + lock.unlock(); + mCache.mConnectCv.notify_all(); + return result; + } + mCache.mConnectCv.wait_for( + lock, std::chrono::microseconds(kRegisterTimeoutUs)); + } while (getTimestampNow() < timeoutUs); + // TODO: return timeout error + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::Impl::registerSender( + const sp &receiver, + ConnectionId senderId, + ConnectionId *receiverId) { + sp accessor; + bool local = false; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(senderId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + it->second->getAccessor(&accessor); + local = it->second->isLocal(); + } + ResultStatus rs = ResultStatus::CRITICAL_ERROR; + if (accessor) { + Return transResult = receiver->registerSender( + accessor, + [&rs, receiverId]( + ResultStatus status, + int64_t connectionId) { + rs = status; + *receiverId = connectionId; + }); + if (!transResult.isOk()) { + return ResultStatus::CRITICAL_ERROR; + } else if (local && rs == ResultStatus::OK) { + sp recipient = Accessor::getConnectionDeathRecipient(); + if (recipient) { + ALOGV("client death recipient registered %lld", (long long)*receiverId); + bool added; + uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added); + recipient->addCookieToConnection(cookie, *receiverId); + if (added) { + Return transResult = receiver->linkToDeath(recipient, cookie); + } + } + } + } + return rs; +} + +ResultStatus ClientManager::Impl::create( + const std::shared_ptr &allocator, + ConnectionId *pConnectionId) { + const sp accessor = new Accessor(allocator); + if (!accessor || !accessor->isValid()) { + return ResultStatus::CRITICAL_ERROR; + } + std::shared_ptr client = + std::make_shared(accessor); + if (!client || !client->isValid()) { + return ResultStatus::CRITICAL_ERROR; + } + // Since a new bufferpool is created, evict memories which are used by + // existing bufferpools and clients. + cleanUp(true); + { + // TODO: handle insert fail. (malloc fail) + std::lock_guard lock(mCache.mMutex); + const std::weak_ptr wclient = client; + mCache.mClients.push_back(std::make_pair(accessor, wclient)); + ConnectionId conId = client->getConnectionId(); + { + std::lock_guard lock(mActive.mMutex); + mActive.mClients.insert(std::make_pair(conId, client)); + } + *pConnectionId = conId; + ALOGV("create new connection %lld", (long long)*pConnectionId); + } + return ResultStatus::OK; +} + +ResultStatus ClientManager::Impl::close(ConnectionId connectionId) { + std::lock_guard lock1(mCache.mMutex); + std::lock_guard lock2(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it != mActive.mClients.end()) { + sp accessor; + it->second->getAccessor(&accessor); + mActive.mClients.erase(connectionId); + for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) { + // clean up dead client caches + sp cAccessor = cit->first.promote(); + if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) { + cit = mCache.mClients.erase(cit); + } else { + cit++; + } + } + return ResultStatus::OK; + } + return ResultStatus::NOT_FOUND; +} + +ResultStatus ClientManager::Impl::allocate( + ConnectionId connectionId, const std::vector ¶ms, + native_handle_t **handle, std::shared_ptr *buffer) { + std::shared_ptr client; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + client = it->second; + } + return client->allocate(params, handle, buffer); +} + +ResultStatus ClientManager::Impl::receive( + ConnectionId connectionId, TransactionId transactionId, + BufferId bufferId, int64_t timestampUs, + native_handle_t **handle, std::shared_ptr *buffer) { + std::shared_ptr client; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + client = it->second; + } + return client->receive(transactionId, bufferId, timestampUs, handle, buffer); +} + +ResultStatus ClientManager::Impl::postSend( + ConnectionId receiverId, const std::shared_ptr &buffer, + TransactionId *transactionId, int64_t *timestampUs) { + ConnectionId connectionId = buffer->mConnectionId; + std::shared_ptr client; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + client = it->second; + } + return client->postSend(receiverId, buffer, transactionId, timestampUs); +} + +ResultStatus ClientManager::Impl::getAccessor( + ConnectionId connectionId, sp *accessor) { + std::shared_ptr client; + { + std::lock_guard lock(mActive.mMutex); + auto it = mActive.mClients.find(connectionId); + if (it == mActive.mClients.end()) { + return ResultStatus::NOT_FOUND; + } + client = it->second; + } + return client->getAccessor(accessor); +} + +void ClientManager::Impl::cleanUp(bool clearCache) { + int64_t now = getTimestampNow(); + int64_t lastTransactionUs; + std::lock_guard lock1(mCache.mMutex); + if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) { + std::lock_guard lock2(mActive.mMutex); + int cleaned = 0; + for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) { + if (!it->second->isActive(&lastTransactionUs, clearCache)) { + if (lastTransactionUs + kClientTimeoutUs < now) { + sp accessor; + it->second->getAccessor(&accessor); + it = mActive.mClients.erase(it); + ++cleaned; + continue; + } + } + ++it; + } + for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) { + // clean up dead client caches + sp cAccessor = cit->first.promote(); + if (!cAccessor) { + cit = mCache.mClients.erase(cit); + } else { + ++cit; + } + } + ALOGV("# of cleaned connections: %d", cleaned); + mCache.mLastCleanUpUs = now; + } +} + +// Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow. +Return ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) { + if (mImpl) { + ConnectionId connectionId = -1; + ResultStatus status = mImpl->registerSender(bufferPool, &connectionId); + _hidl_cb(status, connectionId); + } else { + _hidl_cb(ResultStatus::CRITICAL_ERROR, -1); + } + return Void(); +} + +// Methods for local use. +sp ClientManager::sInstance; +std::mutex ClientManager::sInstanceLock; + +sp ClientManager::getInstance() { + std::lock_guard lock(sInstanceLock); + if (!sInstance) { + sInstance = new ClientManager(); + } + return sInstance; +} + +ClientManager::ClientManager() : mImpl(new Impl()) {} + +ClientManager::~ClientManager() { +} + +ResultStatus ClientManager::create( + const std::shared_ptr &allocator, + ConnectionId *pConnectionId) { + if (mImpl) { + return mImpl->create(allocator, pConnectionId); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::registerSender( + const sp &receiver, + ConnectionId senderId, + ConnectionId *receiverId) { + if (mImpl) { + return mImpl->registerSender(receiver, senderId, receiverId); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::close(ConnectionId connectionId) { + if (mImpl) { + return mImpl->close(connectionId); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::allocate( + ConnectionId connectionId, const std::vector ¶ms, + native_handle_t **handle, std::shared_ptr *buffer) { + if (mImpl) { + return mImpl->allocate(connectionId, params, handle, buffer); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::receive( + ConnectionId connectionId, TransactionId transactionId, + BufferId bufferId, int64_t timestampUs, + native_handle_t **handle, std::shared_ptr *buffer) { + if (mImpl) { + return mImpl->receive(connectionId, transactionId, bufferId, + timestampUs, handle, buffer); + } + return ResultStatus::CRITICAL_ERROR; +} + +ResultStatus ClientManager::postSend( + ConnectionId receiverId, const std::shared_ptr &buffer, + TransactionId *transactionId, int64_t* timestampUs) { + if (mImpl && buffer) { + return mImpl->postSend(receiverId, buffer, transactionId, timestampUs); + } + return ResultStatus::CRITICAL_ERROR; +} + +void ClientManager::cleanUp() { + if (mImpl) { + mImpl->cleanUp(true); + } +} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/Connection.cpp b/media/bufferpool/2.0/Connection.cpp new file mode 100644 index 0000000000..cd837a19dc --- /dev/null +++ b/media/bufferpool/2.0/Connection.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Connection.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +// Methods from ::android::hardware::media::bufferpool::V2_0::IConnection follow. +Return Connection::fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) { + ResultStatus status = ResultStatus::CRITICAL_ERROR; + if (mInitialized && mAccessor) { + if (bufferId != SYNC_BUFFERID) { + const native_handle_t *handle = nullptr; + status = mAccessor->fetch( + mConnectionId, transactionId, bufferId, &handle); + if (status == ResultStatus::OK) { + _hidl_cb(status, Buffer{bufferId, handle}); + return Void(); + } + } else { + mAccessor->cleanUp(false); + } + } + _hidl_cb(status, Buffer{0, nullptr}); + return Void(); +} + +Connection::Connection() : mInitialized(false), mConnectionId(-1LL) {} + +Connection::~Connection() { + if (mInitialized && mAccessor) { + mAccessor->close(mConnectionId); + } +} + +void Connection::initialize( + const sp& accessor, ConnectionId connectionId) { + if (!mInitialized) { + mAccessor = accessor; + mConnectionId = connectionId; + mInitialized = true; + } +} + +ResultStatus Connection::allocate( + const std::vector ¶ms, BufferId *bufferId, + const native_handle_t **handle) { + if (mInitialized && mAccessor) { + return mAccessor->allocate(mConnectionId, params, bufferId, handle); + } + return ResultStatus::CRITICAL_ERROR; +} + +void Connection::cleanUp(bool clearCache) { + if (mInitialized && mAccessor) { + mAccessor->cleanUp(clearCache); + } +} + +// Methods from ::android::hidl::base::V1_0::IBase follow. + +//IConnection* HIDL_FETCH_IConnection(const char* /* name */) { +// return new Connection(); +//} + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android diff --git a/media/bufferpool/2.0/Connection.h b/media/bufferpool/2.0/Connection.h new file mode 100644 index 0000000000..e2b47f1dcb --- /dev/null +++ b/media/bufferpool/2.0/Connection.h @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CONNECTION_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CONNECTION_H + +#include +#include +#include +#include +#include "Accessor.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +using ::android::hardware::hidl_array; +using ::android::hardware::hidl_memory; +using ::android::hardware::hidl_string; +using ::android::hardware::hidl_vec; +using ::android::hardware::media::bufferpool::V2_0::implementation::Accessor; +using ::android::hardware::Return; +using ::android::hardware::Void; +using ::android::sp; + +struct Connection : public IConnection { + // Methods from ::android::hardware::media::bufferpool::V2_0::IConnection follow. + Return fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) override; + + /** + * Allocates a buffer using the specified parameters. Recycles a buffer if + * it is possible. The returned buffer can be transferred to other remote + * clients(Connection). + * + * @param params allocation parameters. + * @param bufferId Id of the allocated buffer. + * @param handle native handle of the allocated buffer. + * + * @return OK if a buffer is successfully allocated. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus allocate(const std::vector ¶ms, + BufferId *bufferId, const native_handle_t **handle); + + /** + * Processes pending buffer status messages and performs periodic cache cleaning + * from bufferpool. + * + * @param clearCache if clearCache is true, bufferpool frees all buffers + * waiting to be recycled. + */ + void cleanUp(bool clearCache); + + /** Destructs a connection. */ + ~Connection(); + + /** Creates a connection. */ + Connection(); + + /** + * Initializes with the specified buffer pool and the connection id. + * The connection id should be unique in the whole system. + * + * @param accessor the specified buffer pool. + * @param connectionId Id. + */ + void initialize(const sp &accessor, ConnectionId connectionId); + + enum : uint32_t { + SYNC_BUFFERID = UINT32_MAX, + }; + +private: + bool mInitialized; + sp mAccessor; + ConnectionId mConnectionId; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CONNECTION_H diff --git a/media/bufferpool/2.0/include/bufferpool/BufferPoolTypes.h b/media/bufferpool/2.0/include/bufferpool/BufferPoolTypes.h new file mode 100644 index 0000000000..d2de62842c --- /dev/null +++ b/media/bufferpool/2.0/include/bufferpool/BufferPoolTypes.h @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLTYPES_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLTYPES_H + +#include +#include +#include +#include +#include + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { + +struct BufferPoolData { + // For local use, to specify a bufferpool (client connection) for buffers. + // Return value from connect#IAccessor(android.hardware.media.bufferpool@2.0). + int64_t mConnectionId; + // BufferId + uint32_t mId; + + BufferPoolData() : mConnectionId(0), mId(0) {} + + BufferPoolData( + int64_t connectionId, uint32_t id) + : mConnectionId(connectionId), mId(id) {} + + ~BufferPoolData() {} +}; + +namespace V2_0 { +namespace implementation { + +using ::android::hardware::kSynchronizedReadWrite; + +typedef uint32_t BufferId; +typedef uint64_t TransactionId; +typedef int64_t ConnectionId; + +enum : ConnectionId { + INVALID_CONNECTIONID = 0, +}; + +typedef android::hardware::MessageQueue BufferStatusQueue; +typedef BufferStatusQueue::Descriptor QueueDescriptor; + +/** + * Allocation wrapper class for buffer pool. + */ +struct BufferPoolAllocation { + const native_handle_t *mHandle; + + const native_handle_t *handle() { + return mHandle; + } + + BufferPoolAllocation(const native_handle_t *handle) : mHandle(handle) {} + + ~BufferPoolAllocation() {}; +}; + +/** + * Allocator wrapper class for buffer pool. + */ +class BufferPoolAllocator { +public: + + /** + * Allocate an allocation(buffer) for buffer pool. + * + * @param params allocation parameters + * @param alloc created allocation + * @param allocSize size of created allocation + * + * @return OK when an allocation is created successfully. + */ + virtual ResultStatus allocate( + const std::vector ¶ms, + std::shared_ptr *alloc, + size_t *allocSize) = 0; + + /** + * Returns whether allocation parameters of an old allocation are + * compatible with new allocation parameters. + */ + virtual bool compatible(const std::vector &newParams, + const std::vector &oldParams) = 0; + +protected: + BufferPoolAllocator() = default; + + virtual ~BufferPoolAllocator() = default; +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_BUFFERPOOLTYPES_H diff --git a/media/bufferpool/2.0/include/bufferpool/ClientManager.h b/media/bufferpool/2.0/include/bufferpool/ClientManager.h new file mode 100644 index 0000000000..cfc3bc3bcb --- /dev/null +++ b/media/bufferpool/2.0/include/bufferpool/ClientManager.h @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CLIENTMANAGER_H +#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CLIENTMANAGER_H + +#include +#include +#include +#include +#include "BufferPoolTypes.h" + +namespace android { +namespace hardware { +namespace media { +namespace bufferpool { +namespace V2_0 { +namespace implementation { + +using ::android::hardware::hidl_array; +using ::android::hardware::hidl_memory; +using ::android::hardware::hidl_string; +using ::android::hardware::hidl_vec; +using ::android::hardware::media::bufferpool::V2_0::IAccessor; +using ::android::hardware::media::bufferpool::V2_0::ResultStatus; +using ::android::hardware::Return; +using ::android::hardware::Void; +using ::android::sp; + +struct ClientManager : public IClientManager { + // Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow. + Return registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) override; + + /** Gets an instance. */ + static sp getInstance(); + + /** + * Creates a local connection with a newly created buffer pool. + * + * @param allocator for new buffer allocation. + * @param pConnectionId Id of the created connection. This is + * system-wide unique. + * + * @return OK when a buffer pool and a local connection is successfully + * created. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus create(const std::shared_ptr &allocator, + ConnectionId *pConnectionId); + + /** + * Register a created connection as sender for remote process. + * + * @param receiver The remote receiving process. + * @param senderId A local connection which will send buffers to. + * @param receiverId Id of the created receiving connection on the receiver + * process. + * + * @return OK when the receiving connection is successfully created on the + * receiver process. + * NOT_FOUND when the sender connection was not found. + * ALREADY_EXISTS the receiving connection is already made. + * CRITICAL_ERROR otherwise. + */ + ResultStatus registerSender(const sp &receiver, + ConnectionId senderId, + ConnectionId *receiverId); + + /** + * Closes the specified connection. + * + * @param connectionId The id of the connection. + * + * @return OK when the connection is closed. + * NOT_FOUND when the specified connection was not found. + * CRITICAL_ERROR otherwise. + */ + ResultStatus close(ConnectionId connectionId); + + /** + * Allocates a buffer from the specified connection. + * + * @param connectionId The id of the connection. + * @param params The allocation parameters. + * @param handle The native handle to the allocated buffer. handle + * should be cloned before use. + * @param buffer The allocated buffer. + * + * @return OK when a buffer was allocated successfully. + * NOT_FOUND when the specified connection was not found. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus allocate(ConnectionId connectionId, + const std::vector ¶ms, + native_handle_t **handle, + std::shared_ptr *buffer); + + /** + * Receives a buffer for the transaction. + * + * @param connectionId The id of the receiving connection. + * @param transactionId The id for the transaction. + * @param bufferId The id for the buffer. + * @param timestampUs The timestamp of the buffer is being sent. + * @param handle The native handle to the allocated buffer. handle + * should be cloned before use. + * @param buffer The received buffer. + * + * @return OK when a buffer was received successfully. + * NOT_FOUND when the specified connection was not found. + * NO_MEMORY when there is no memory. + * CRITICAL_ERROR otherwise. + */ + ResultStatus receive(ConnectionId connectionId, + TransactionId transactionId, + BufferId bufferId, + int64_t timestampUs, + native_handle_t **handle, + std::shared_ptr *buffer); + + /** + * Posts a buffer transfer transaction to the buffer pool. Sends a buffer + * to other remote clients(connection) after this call has been succeeded. + * + * @param receiverId The id of the receiving connection. + * @param buffer to transfer + * @param transactionId Id of the transfer transaction. + * @param timestampUs The timestamp of the buffer transaction is being + * posted. + * + * @return OK when a buffer transaction was posted successfully. + * NOT_FOUND when the sending connection was not found. + * CRITICAL_ERROR otherwise. + */ + ResultStatus postSend(ConnectionId receiverId, + const std::shared_ptr &buffer, + TransactionId *transactionId, + int64_t *timestampUs); + + /** + * Time out inactive lingering connections and close. + */ + void cleanUp(); + + /** Destructs the manager of buffer pool clients. */ + ~ClientManager(); +private: + static sp sInstance; + static std::mutex sInstanceLock; + + class Impl; + const std::unique_ptr mImpl; + + ClientManager(); +}; + +} // namespace implementation +} // namespace V2_0 +} // namespace bufferpool +} // namespace media +} // namespace hardware +} // namespace android + +#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_CLIENTMANAGER_H