Merge "CCodec: refactor pipeline logic"

gugelfrei
TreeHugger Robot 5 years ago committed by Android (Google) Code Review
commit 5876259a06

@ -52,33 +52,26 @@
namespace android {
class C2SoftAacDec::IntfImpl : public C2InterfaceHelper {
constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
class C2SoftAacDec::IntfImpl : public SimpleInterface<void>::BaseParams {
public:
explicit IntfImpl(const std::shared_ptr<C2ReflectorHelper> &helper)
: C2InterfaceHelper(helper) {
setDerivedInstance(this);
addParameter(
DefineParam(mInputFormat, C2_NAME_INPUT_STREAM_FORMAT_SETTING)
.withConstValue(new C2StreamFormatConfig::input(0u, C2FormatCompressed))
.build());
addParameter(
DefineParam(mOutputFormat, C2_NAME_OUTPUT_STREAM_FORMAT_SETTING)
.withConstValue(new C2StreamFormatConfig::output(0u, C2FormatAudio))
.build());
: SimpleInterface<void>::BaseParams(
helper,
COMPONENT_NAME,
C2Component::KIND_DECODER,
C2Component::DOMAIN_AUDIO,
MEDIA_MIMETYPE_AUDIO_AAC) {
noPrivateBuffers();
noInputReferences();
noOutputReferences();
noInputLatency();
noTimeStretch();
addParameter(
DefineParam(mInputMediaType, C2_NAME_INPUT_PORT_MIME_SETTING)
.withConstValue(AllocSharedString<C2PortMimeConfig::input>(
MEDIA_MIMETYPE_AUDIO_AAC))
.build());
addParameter(
DefineParam(mOutputMediaType, C2_NAME_OUTPUT_PORT_MIME_SETTING)
.withConstValue(AllocSharedString<C2PortMimeConfig::output>(
MEDIA_MIMETYPE_AUDIO_RAW))
DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
.withConstValue(new C2PortActualDelayTuning::output(2u))
.build());
addParameter(
@ -231,8 +224,6 @@ private:
// TODO Add : C2StreamAacSbrModeTuning
};
constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
C2SoftAacDec::C2SoftAacDec(
const char *name,
c2_node_id_t id,

@ -51,6 +51,12 @@ public:
noInputLatency();
noTimeStretch();
// TODO: Proper support for reorder depth.
addParameter(
DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
.withConstValue(new C2PortActualDelayTuning::output(8u))
.build());
// TODO: output latency and reordering
addParameter(
@ -877,6 +883,8 @@ void C2SoftAvcDec::process(
} else if (!hasPicture) {
fillEmptyWork(work);
}
work->input.buffers.clear();
}
c2_status_t C2SoftAvcDec::drainInternal(

@ -51,7 +51,11 @@ public:
noInputLatency();
noTimeStretch();
// TODO: output latency and reordering
// TODO: Proper support for reorder depth.
addParameter(
DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
.withConstValue(new C2PortActualDelayTuning::output(8u))
.build());
addParameter(
DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)

@ -60,7 +60,11 @@ public:
noInputLatency();
noTimeStretch();
// TODO: output latency and reordering
// TODO: Proper support for reorder depth.
addParameter(
DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
.withConstValue(new C2PortActualDelayTuning::output(1u))
.build());
addParameter(
DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)

@ -55,12 +55,10 @@ struct CodecListener : public android::Codec2Client::Listener {
: callBack(fn) {}
virtual void onWorkDone(
const std::weak_ptr<android::Codec2Client::Component>& comp,
std::list<std::unique_ptr<C2Work>>& workItems,
size_t numDiscardedInputBuffers) override {
std::list<std::unique_ptr<C2Work>>& workItems) override {
/* TODO */
ALOGD("onWorkDone called");
(void)comp;
(void)numDiscardedInputBuffers;
if (callBack) callBack(workItems);
}
@ -89,9 +87,10 @@ struct CodecListener : public android::Codec2Client::Listener {
}
virtual void onInputBufferDone(
const std::shared_ptr<C2Buffer>& buffer) override {
uint64_t frameIndex, size_t arrayIndex) override {
/* TODO */
(void)buffer;
(void)frameIndex;
(void)arrayIndex;
}
virtual void onFrameRendered(

@ -344,17 +344,13 @@ struct Codec2Client::Component::HidlListener : public IComponentListener {
return Void();
}
// release input buffers potentially held by the component from queue
size_t numDiscardedInputBuffers = 0;
std::shared_ptr<Codec2Client::Component> strongComponent =
component.lock();
if (strongComponent) {
numDiscardedInputBuffers =
strongComponent->handleOnWorkDone(workItems);
strongComponent->handleOnWorkDone(workItems);
}
if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
listener->onWorkDone(component,
workItems,
numDiscardedInputBuffers);
listener->onWorkDone(component, workItems);
} else {
LOG(DEBUG) << "onWorkDone -- listener died.";
}
@ -418,26 +414,15 @@ struct Codec2Client::Component::HidlListener : public IComponentListener {
LOG(DEBUG) << "onInputBuffersReleased -- listener died.";
return Void();
}
std::shared_ptr<Codec2Client::Component> strongComponent =
component.lock();
if (!strongComponent) {
LOG(DEBUG) << "onInputBuffersReleased -- component died.";
return Void();
}
for (const InputBuffer& inputBuffer : inputBuffers) {
std::shared_ptr<C2Buffer> buffer =
strongComponent->freeInputBuffer(
inputBuffer.frameIndex,
inputBuffer.arrayIndex);
LOG(VERBOSE) << "onInputBuffersReleased --"
" received death notification of"
" input buffer:"
" frameIndex = " << inputBuffer.frameIndex
<< ", bufferIndex = " << inputBuffer.arrayIndex
<< ".";
if (buffer) {
listener->onInputBufferDone(buffer);
}
listener->onInputBufferDone(
inputBuffer.frameIndex, inputBuffer.arrayIndex);
}
return Void();
}
@ -918,43 +903,8 @@ c2_status_t Codec2Client::Component::destroyBlockPool(
return static_cast<c2_status_t>(static_cast<Status>(transResult));
}
size_t Codec2Client::Component::handleOnWorkDone(
void Codec2Client::Component::handleOnWorkDone(
const std::list<std::unique_ptr<C2Work>> &workItems) {
// Input buffers' lifetime management
std::vector<uint64_t> inputDone;
for (const std::unique_ptr<C2Work> &work : workItems) {
if (work) {
if (work->worklets.empty()
|| !work->worklets.back()
|| (work->worklets.back()->output.flags &
C2FrameData::FLAG_INCOMPLETE) == 0) {
// input is complete
inputDone.emplace_back(work->input.ordinal.frameIndex.peeku());
}
}
}
size_t numDiscardedInputBuffers = 0;
{
std::lock_guard<std::mutex> lock(mInputBuffersMutex);
for (uint64_t inputIndex : inputDone) {
auto it = mInputBuffers.find(inputIndex);
if (it == mInputBuffers.end()) {
LOG(VERBOSE) << "onWorkDone -- returned consumed/unknown "
"input frame: index = "
<< inputIndex << ".";
} else {
LOG(VERBOSE) << "onWorkDone -- processed input frame: "
<< inputIndex
<< " (containing " << it->second.size()
<< " buffers).";
mInputBuffers.erase(it);
mInputBufferCount.erase(inputIndex);
++numDiscardedInputBuffers;
}
}
}
// Output bufferqueue-based blocks' lifetime management
mOutputBufferQueueMutex.lock();
sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@ -965,72 +915,10 @@ size_t Codec2Client::Component::handleOnWorkDone(
if (igbp) {
holdBufferQueueBlocks(workItems, igbp, bqId, generation);
}
return numDiscardedInputBuffers;
}
std::shared_ptr<C2Buffer> Codec2Client::Component::freeInputBuffer(
uint64_t frameIndex,
size_t bufferIndex) {
std::shared_ptr<C2Buffer> buffer;
std::lock_guard<std::mutex> lock(mInputBuffersMutex);
auto it = mInputBuffers.find(frameIndex);
if (it == mInputBuffers.end()) {
LOG(INFO) << "freeInputBuffer -- Unrecognized input frame index "
<< frameIndex << ".";
return nullptr;
}
if (bufferIndex >= it->second.size()) {
LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
<< " is not valid in input with frame index " << frameIndex
<< ".";
return nullptr;
}
buffer = it->second[bufferIndex];
if (!buffer) {
LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
<< " in input with frame index " << frameIndex
<< " has already been freed.";
return nullptr;
}
it->second[bufferIndex] = nullptr;
if (--mInputBufferCount[frameIndex] == 0) {
mInputBuffers.erase(it);
mInputBufferCount.erase(frameIndex);
}
return buffer;
}
c2_status_t Codec2Client::Component::queue(
std::list<std::unique_ptr<C2Work>>* const items) {
// remember input buffers queued to hold reference to them
{
std::lock_guard<std::mutex> lock(mInputBuffersMutex);
for (const std::unique_ptr<C2Work> &work : *items) {
if (!work) {
continue;
}
if (work->input.buffers.size() == 0) {
continue;
}
uint64_t inputIndex = work->input.ordinal.frameIndex.peeku();
auto res = mInputBuffers.emplace(inputIndex, work->input.buffers);
if (!res.second) {
// TODO: append? - for now we are replacing
res.first->second = work->input.buffers;
LOG(INFO) << "queue -- duplicate input frame index: "
<< inputIndex
<< ". Discarding the old input frame...";
}
mInputBufferCount[inputIndex] = work->input.buffers.size();
LOG(VERBOSE) << "queue -- queuing input frame: "
<< "index = " << inputIndex
<< ", number of buffers = "
<< work->input.buffers.size()
<< ".";
}
}
WorkBundle workBundle;
if (!objcpy(&workBundle, *items, &mBufferPoolSender)) {
LOG(ERROR) << "queue -- bad input.";
@ -1088,24 +976,6 @@ c2_status_t Codec2Client::Component::flush(
}
}
// Input buffers' lifetime management
for (uint64_t flushedIndex : flushedIndices) {
std::lock_guard<std::mutex> lock(mInputBuffersMutex);
auto it = mInputBuffers.find(flushedIndex);
if (it == mInputBuffers.end()) {
LOG(VERBOSE) << "flush -- returned consumed/unknown input frame: "
"index = " << flushedIndex << ".";
} else {
LOG(VERBOSE) << "flush -- returned unprocessed input frame: "
"index = " << flushedIndex
<< ", number of buffers = "
<< mInputBufferCount[flushedIndex]
<< ".";
mInputBuffers.erase(it);
mInputBufferCount.erase(flushedIndex);
}
}
// Output bufferqueue-based blocks' lifetime management
mOutputBufferQueueMutex.lock();
sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@ -1160,10 +1030,6 @@ c2_status_t Codec2Client::Component::stop() {
if (status != C2_OK) {
LOG(DEBUG) << "stop -- call failed: " << status << ".";
}
mInputBuffersMutex.lock();
mInputBuffers.clear();
mInputBufferCount.clear();
mInputBuffersMutex.unlock();
return status;
}
@ -1178,10 +1044,6 @@ c2_status_t Codec2Client::Component::reset() {
if (status != C2_OK) {
LOG(DEBUG) << "reset -- call failed: " << status << ".";
}
mInputBuffersMutex.lock();
mInputBuffers.clear();
mInputBufferCount.clear();
mInputBuffersMutex.unlock();
return status;
}
@ -1196,10 +1058,6 @@ c2_status_t Codec2Client::Component::release() {
if (status != C2_OK) {
LOG(DEBUG) << "release -- call failed: " << status << ".";
}
mInputBuffersMutex.lock();
mInputBuffers.clear();
mInputBufferCount.clear();
mInputBuffersMutex.unlock();
return status;
}

@ -252,16 +252,9 @@ protected:
struct Codec2Client::Listener {
// This is called when the component produces some output.
//
// numDiscardedInputBuffers is the number of input buffers contained in
// workItems that have just become unused. Note that workItems may contain
// more input buffers than numDiscardedInputBuffers because buffers that
// have been previously reported by onInputBufferDone() are not counted
// towards numDiscardedInputBuffers, but may still show up in workItems.
virtual void onWorkDone(
const std::weak_ptr<Component>& comp,
std::list<std::unique_ptr<C2Work>>& workItems,
size_t numDiscardedInputBuffers) = 0;
std::list<std::unique_ptr<C2Work>>& workItems) = 0;
// This is called when the component goes into a tripped state.
virtual void onTripped(
@ -283,7 +276,7 @@ struct Codec2Client::Listener {
// Input buffers that have been returned by onWorkDone() or flush() will not
// trigger a call to this function.
virtual void onInputBufferDone(
const std::shared_ptr<C2Buffer>& buffer) = 0;
uint64_t frameIndex, size_t arrayIndex) = 0;
// This is called when the component becomes aware of a frame being
// rendered.
@ -385,24 +378,6 @@ struct Codec2Client::Component : public Codec2Client::Configurable {
protected:
sp<Base> mBase;
// Mutex for mInputBuffers and mInputBufferCount.
mutable std::mutex mInputBuffersMutex;
// Map: frameIndex -> vector of bufferIndices
//
// mInputBuffers[frameIndex][bufferIndex] may be null if the buffer in that
// slot has been freed.
mutable std::map<uint64_t, std::vector<std::shared_ptr<C2Buffer>>>
mInputBuffers;
// Map: frameIndex -> number of bufferIndices that have not been freed
//
// mInputBufferCount[frameIndex] keeps track of the number of non-null
// elements in mInputBuffers[frameIndex]. When mInputBufferCount[frameIndex]
// decreases to 0, frameIndex can be removed from both mInputBuffers and
// mInputBufferCount.
mutable std::map<uint64_t, size_t> mInputBufferCount;
::android::hardware::media::c2::V1_0::utils::DefaultBufferPoolSender
mBufferPoolSender;
@ -419,10 +394,7 @@ protected:
friend struct Codec2Client;
struct HidlListener;
// Return the number of input buffers that should be discarded.
size_t handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
// Remove an input buffer from mInputBuffers and return it.
std::shared_ptr<C2Buffer> freeInputBuffer(uint64_t frameIndex, size_t bufferIndex);
void handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
};

@ -8,6 +8,7 @@ cc_library_shared {
"CCodecConfig.cpp",
"Codec2Buffer.cpp",
"Codec2InfoBuilder.cpp",
"PipelineWatcher.cpp",
"ReflectedParamUpdater.cpp",
"SkipCutBuffer.cpp",
],

@ -448,14 +448,13 @@ struct CCodec::ClientListener : public Codec2Client::Listener {
virtual void onWorkDone(
const std::weak_ptr<Codec2Client::Component>& component,
std::list<std::unique_ptr<C2Work>>& workItems,
size_t numDiscardedInputBuffers) override {
std::list<std::unique_ptr<C2Work>>& workItems) override {
(void)component;
sp<CCodec> codec(mCodec.promote());
if (!codec) {
return;
}
codec->onWorkDone(workItems, numDiscardedInputBuffers);
codec->onWorkDone(workItems);
}
virtual void onTripped(
@ -504,10 +503,10 @@ struct CCodec::ClientListener : public Codec2Client::Listener {
}
virtual void onInputBufferDone(
const std::shared_ptr<C2Buffer>& buffer) override {
uint64_t frameIndex, size_t arrayIndex) override {
sp<CCodec> codec(mCodec.promote());
if (codec) {
codec->onInputBufferDone(buffer);
codec->onInputBufferDone(frameIndex, arrayIndex);
}
}
@ -531,10 +530,6 @@ public:
{RenderedFrameInfo(mediaTimeUs, renderTimeNs)});
}
void onWorkQueued(bool eos) override {
mCodec->onWorkQueued(eos);
}
void onOutputBuffersChanged() override {
mCodec->mCallback->onOutputBuffersChanged();
}
@ -546,8 +541,7 @@ private:
// CCodec
CCodec::CCodec()
: mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))),
mQueuedWorkCount(0) {
: mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))) {
}
CCodec::~CCodec() {
@ -1343,7 +1337,6 @@ void CCodec::flush() {
}
mChannel->flush(flushedWork);
subQueuedWorkCount(flushedWork.size());
{
Mutexed<State>::Locked state(mState);
@ -1465,28 +1458,16 @@ void CCodec::signalRequestIDRFrame() {
config->setParameters(comp, params, C2_MAY_BLOCK);
}
void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
size_t numDiscardedInputBuffers) {
void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems) {
if (!workItems.empty()) {
{
Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
mNumDiscardedInputBuffersQueue);
numDiscardedInputBuffersQueue->insert(
numDiscardedInputBuffersQueue->end(),
workItems.size() - 1, 0);
numDiscardedInputBuffersQueue->emplace_back(
numDiscardedInputBuffers);
}
{
Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
queue->splice(queue->end(), workItems);
}
Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
queue->splice(queue->end(), workItems);
}
(new AMessage(kWhatWorkDone, this))->post();
}
void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
mChannel->onInputBufferDone(buffer);
void CCodec::onInputBufferDone(uint64_t frameIndex, size_t arrayIndex) {
mChannel->onInputBufferDone(frameIndex, arrayIndex);
}
void CCodec::onMessageReceived(const sp<AMessage> &msg) {
@ -1512,7 +1493,6 @@ void CCodec::onMessageReceived(const sp<AMessage> &msg) {
case kWhatStart: {
// C2Component::start() should return within 500ms.
setDeadline(now, 550ms, "start");
mQueuedWorkCount = 0;
start();
break;
}
@ -1520,10 +1500,6 @@ void CCodec::onMessageReceived(const sp<AMessage> &msg) {
// C2Component::stop() should return within 500ms.
setDeadline(now, 550ms, "stop");
stop();
mQueuedWorkCount = 0;
Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
deadline->set(TimePoint::max(), "none");
break;
}
case kWhatFlush: {
@ -1549,7 +1525,6 @@ void CCodec::onMessageReceived(const sp<AMessage> &msg) {
}
case kWhatWorkDone: {
std::unique_ptr<C2Work> work;
size_t numDiscardedInputBuffers;
bool shouldPost = false;
{
Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
@ -1560,24 +1535,10 @@ void CCodec::onMessageReceived(const sp<AMessage> &msg) {
queue->pop_front();
shouldPost = !queue->empty();
}
{
Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
mNumDiscardedInputBuffersQueue);
if (numDiscardedInputBuffersQueue->empty()) {
numDiscardedInputBuffers = 0;
} else {
numDiscardedInputBuffers = numDiscardedInputBuffersQueue->front();
numDiscardedInputBuffersQueue->pop_front();
}
}
if (shouldPost) {
(new AMessage(kWhatWorkDone, this))->post();
}
if (work->worklets.empty()
|| !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
subQueuedWorkCount(1);
}
// handle configuration changes in work done
Mutexed<Config>::Locked config(mConfig);
bool changed = false;
@ -1641,8 +1602,7 @@ void CCodec::onMessageReceived(const sp<AMessage> &msg) {
}
mChannel->onWorkDone(
std::move(work), changed ? config->mOutputFormat : nullptr,
initData.hasChanged() ? initData.update().get() : nullptr,
numDiscardedInputBuffers);
initData.hasChanged() ? initData.update().get() : nullptr);
break;
}
case kWhatWatch: {
@ -1669,16 +1629,25 @@ void CCodec::setDeadline(
void CCodec::initiateReleaseIfStuck() {
std::string name;
bool pendingDeadline = false;
for (Mutexed<NamedTimePoint> *deadlinePtr : { &mDeadline, &mQueueDeadline, &mEosDeadline }) {
Mutexed<NamedTimePoint>::Locked deadline(*deadlinePtr);
{
Mutexed<NamedTimePoint>::Locked deadline(mDeadline);
if (deadline->get() < std::chrono::steady_clock::now()) {
name = deadline->getName();
break;
}
if (deadline->get() != TimePoint::max()) {
pendingDeadline = true;
}
}
if (name.empty()) {
constexpr std::chrono::steady_clock::duration kWorkDurationThreshold = 3s;
std::chrono::steady_clock::duration elapsed = mChannel->elapsed();
if (elapsed >= kWorkDurationThreshold) {
name = "queue";
}
if (elapsed > 0s) {
pendingDeadline = true;
}
}
if (name.empty()) {
// We're not stuck.
if (pendingDeadline) {
@ -1694,33 +1663,6 @@ void CCodec::initiateReleaseIfStuck() {
mCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
}
void CCodec::onWorkQueued(bool eos) {
ALOGV("queued work count +1 from %d", mQueuedWorkCount.load());
int32_t count = ++mQueuedWorkCount;
if (eos) {
CCodecWatchdog::getInstance()->watch(this);
Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
deadline->set(std::chrono::steady_clock::now() + 3s, "eos");
}
// TODO: query and use input/pipeline/output delay combined
if (count >= 4) {
CCodecWatchdog::getInstance()->watch(this);
Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
deadline->set(std::chrono::steady_clock::now() + 3s, "queue");
}
}
void CCodec::subQueuedWorkCount(uint32_t count) {
ALOGV("queued work count -%u from %d", count, mQueuedWorkCount.load());
int32_t currentCount = (mQueuedWorkCount -= count);
if (currentCount == 0) {
Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
deadline->set(TimePoint::max(), "none");
}
Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
deadline->set(TimePoint::max(), "none");
}
} // namespace android
extern "C" android::CodecBase *CreateCodec() {

@ -66,9 +66,8 @@ public:
virtual void signalRequestIDRFrame() override;
void initiateReleaseIfStuck();
void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
size_t numDiscardedInputBuffers);
void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems);
void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
protected:
virtual ~CCodec();
@ -76,7 +75,7 @@ protected:
virtual void onMessageReceived(const sp<AMessage> &msg) override;
private:
typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
typedef std::chrono::steady_clock::time_point TimePoint;
status_t tryAndReportOnError(std::function<status_t()> job);
@ -100,9 +99,6 @@ private:
const std::chrono::milliseconds &timeout,
const char *name);
void onWorkQueued(bool eos);
void subQueuedWorkCount(uint32_t count);
enum {
kWhatAllocate,
kWhatConfigure,
@ -167,13 +163,9 @@ private:
struct ClientListener;
Mutexed<NamedTimePoint> mDeadline;
std::atomic_int32_t mQueuedWorkCount;
Mutexed<NamedTimePoint> mQueueDeadline;
Mutexed<NamedTimePoint> mEosDeadline;
typedef CCodecConfig Config;
Mutexed<Config> mConfig;
Mutexed<std::list<std::unique_ptr<C2Work>>> mWorkDoneQueue;
Mutexed<std::list<size_t>> mNumDiscardedInputBuffersQueue;
friend class CCodecCallbackImpl;

@ -152,6 +152,11 @@ public:
*/
virtual std::unique_ptr<InputBuffers> toArrayMode(size_t size) = 0;
/**
* Return number of buffers the client owns.
*/
virtual size_t numClientBuffers() const = 0;
protected:
// Pool to obtain blocks for input buffers.
std::shared_ptr<C2BlockPool> mPool;
@ -508,6 +513,14 @@ public:
mBuffers.clear();
}
size_t numClientBuffers() const {
return std::count_if(
mBuffers.begin(), mBuffers.end(),
[](const Entry &entry) {
return (entry.clientBuffer != nullptr);
});
}
private:
friend class BuffersArrayImpl;
@ -693,6 +706,14 @@ public:
}
}
size_t numClientBuffers() const {
return std::count_if(
mBuffers.begin(), mBuffers.end(),
[](const Entry &entry) {
return entry.ownedByClient;
});
}
private:
std::string mImplName; ///< name for debugging
const char *mName; ///< C-string version of name
@ -756,6 +777,10 @@ public:
mImpl.flush();
}
size_t numClientBuffers() const final {
return mImpl.numClientBuffers();
}
private:
BuffersArrayImpl mImpl;
};
@ -823,6 +848,10 @@ public:
return std::move(array);
}
size_t numClientBuffers() const final {
return mImpl.numClientBuffers();
}
virtual sp<Codec2Buffer> alloc(size_t size) {
C2MemoryUsage usage = { C2MemoryUsage::CPU_READ, C2MemoryUsage::CPU_WRITE };
std::shared_ptr<C2LinearBlock> block;
@ -967,6 +996,10 @@ public:
return std::move(array);
}
size_t numClientBuffers() const final {
return mImpl.numClientBuffers();
}
private:
FlexBuffersImpl mImpl;
std::shared_ptr<C2AllocatorStore> mStore;
@ -1030,6 +1063,10 @@ public:
return std::move(array);
}
size_t numClientBuffers() const final {
return mImpl.numClientBuffers();
}
private:
FlexBuffersImpl mImpl;
std::shared_ptr<LocalBufferPool> mLocalBufferPool;
@ -1065,6 +1102,10 @@ public:
void getArray(Vector<sp<MediaCodecBuffer>> *array) const final {
array->clear();
}
size_t numClientBuffers() const final {
return 0u;
}
};
class OutputBuffersArray : public CCodecBufferChannel::OutputBuffers {
@ -1422,90 +1463,6 @@ void CCodecBufferChannel::QueueSync::stop() {
count->value = -1;
}
// CCodecBufferChannel::PipelineCapacity
CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
: input(0), component(0),
mName("<UNKNOWN COMPONENT>") {
}
void CCodecBufferChannel::PipelineCapacity::initialize(
int newInput,
int newComponent,
const char* newName,
const char* callerTag) {
input.store(newInput, std::memory_order_relaxed);
component.store(newComponent, std::memory_order_relaxed);
mName = newName;
ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
"pipeline availability initialized ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
newInput, newComponent);
}
bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
if (prevInput > 0 && prevComponent > 0) {
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
"pipeline availability -1 all ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput - 1,
prevComponent - 1);
return true;
}
input.fetch_add(1, std::memory_order_relaxed);
component.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
"pipeline availability unchanged ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput,
prevComponent);
return false;
}
void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
int prevInput = input.fetch_add(1, std::memory_order_relaxed);
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::free(): "
"pipeline availability +1 all ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
prevInput + 1,
prevComponent + 1);
}
int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
size_t numDiscardedInputBuffers,
const char* callerTag) {
int prevInput = input.fetch_add(numDiscardedInputBuffers,
std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
"pipeline availability +%zu input ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
numDiscardedInputBuffers,
numDiscardedInputBuffers,
prevInput + static_cast<int>(numDiscardedInputBuffers),
component.load(std::memory_order_relaxed));
return prevInput + static_cast<int>(numDiscardedInputBuffers);
}
int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
const char* callerTag) {
int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
"pipeline availability +1 component ==> "
"input = %d, component = %d",
mName, callerTag ? callerTag : "*",
input.load(std::memory_order_relaxed),
prevComponent + 1);
return prevComponent + 1;
}
// CCodecBufferChannel::ReorderStash
CCodecBufferChannel::ReorderStash::ReorderStash() {
@ -1595,7 +1552,6 @@ CCodecBufferChannel::CCodecBufferChannel(
mFrameIndex(0u),
mFirstValidFrameIndex(0u),
mMetaMode(MODE_NONE),
mAvailablePipelineCapacity(),
mInputMetEos(false) {
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
buffers->reset(new DummyInputBuffers(""));
@ -1658,6 +1614,9 @@ status_t CCodecBufferChannel::queueInputBufferInternal(const sp<MediaCodecBuffer
work->input.ordinal.customOrdinal = timeUs;
work->input.buffers.clear();
uint64_t queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
std::vector<std::shared_ptr<C2Buffer>> queuedBuffers;
if (buffer->size() > 0u) {
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
std::shared_ptr<C2Buffer> c2buffer;
@ -1665,11 +1624,9 @@ status_t CCodecBufferChannel::queueInputBufferInternal(const sp<MediaCodecBuffer
return -ENOENT;
}
work->input.buffers.push_back(c2buffer);
} else {
mAvailablePipelineCapacity.freeInputSlots(1, "queueInputBufferInternal");
if (eos) {
flags |= C2FrameData::FLAG_END_OF_STREAM;
}
queuedBuffers.push_back(c2buffer);
} else if (eos) {
flags |= C2FrameData::FLAG_END_OF_STREAM;
}
work->input.flags = (C2FrameData::flags_t)flags;
// TODO: fill info's
@ -1680,10 +1637,16 @@ status_t CCodecBufferChannel::queueInputBufferInternal(const sp<MediaCodecBuffer
std::list<std::unique_ptr<C2Work>> items;
items.push_back(std::move(work));
mPipelineWatcher.lock()->onWorkQueued(
queuedFrameIndex,
std::move(queuedBuffers),
PipelineWatcher::Clock::now());
c2_status_t err = mComponent->queue(&items);
if (err != C2_OK) {
mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
}
if (err == C2_OK && eos && buffer->size() > 0u) {
mCCodecCallback->onWorkQueued(false);
work.reset(new C2Work);
work->input.ordinal.timestamp = timeUs;
work->input.ordinal.frameIndex = mFrameIndex++;
@ -1693,13 +1656,22 @@ status_t CCodecBufferChannel::queueInputBufferInternal(const sp<MediaCodecBuffer
work->input.flags = C2FrameData::FLAG_END_OF_STREAM;
work->worklets.emplace_back(new C2Worklet);
queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
queuedBuffers.clear();
items.clear();
items.push_back(std::move(work));
mPipelineWatcher.lock()->onWorkQueued(
queuedFrameIndex,
std::move(queuedBuffers),
PipelineWatcher::Clock::now());
err = mComponent->queue(&items);
if (err != C2_OK) {
mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
}
}
if (err == C2_OK) {
mCCodecCallback->onWorkQueued(eos);
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
bool released = (*buffers)->releaseBuffer(buffer, nullptr, true);
ALOGV("[%s] queueInputBuffer: buffer %sreleased", mName, released ? "" : "not ");
@ -1846,14 +1818,16 @@ void CCodecBufferChannel::feedInputBufferIfAvailable() {
void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
while (!mInputMetEos &&
!mReorderStash.lock()->hasPending() &&
mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
!mPipelineWatcher.lock()->pipelineFull()) {
sp<MediaCodecBuffer> inBuffer;
size_t index;
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
if ((*buffers)->numClientBuffers() >= mNumInputSlots) {
return;
}
if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
ALOGV("[%s] no new buffer available", mName);
mAvailablePipelineCapacity.free("feedInputBufferIfAvailable");
break;
}
}
@ -2032,15 +2006,12 @@ status_t CCodecBufferChannel::discardBuffer(const sp<MediaCodecBuffer> &buffer)
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr, true)) {
buffers.unlock();
released = true;
mAvailablePipelineCapacity.freeInputSlots(1, "discardBuffer");
}
}
{
Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
buffers.unlock();
released = true;
}
}
@ -2408,10 +2379,14 @@ status_t CCodecBufferChannel::start(
// about buffers from the previous generation do not interfere with the
// newly initialized pipeline capacity.
mAvailablePipelineCapacity.initialize(
mNumInputSlots,
mNumInputSlots + mNumOutputSlots,
mName);
{
Mutexed<PipelineWatcher>::Locked watcher(mPipelineWatcher);
watcher->inputDelay(inputDelay ? inputDelay.value : 0)
.pipelineDelay(pipelineDelay ? pipelineDelay.value : 0)
.outputDelay(outputDelay ? outputDelay.value : 0)
.smoothnessFactor(kSmoothnessFactor);
watcher->flush();
}
mInputMetEos = false;
mSync.start();
@ -2472,21 +2447,16 @@ status_t CCodecBufferChannel::requestInitialInputBuffers() {
buffer->meta()->setInt64("timeUs", 0);
post = false;
}
if (mAvailablePipelineCapacity.allocate("requestInitialInputBuffers")) {
if (post) {
mCallback->onInputBufferAvailable(index, buffer);
} else {
toBeQueued.emplace_back(buffer);
}
if (post) {
mCallback->onInputBufferAvailable(index, buffer);
} else {
ALOGD("[%s] pipeline is full while requesting %zu-th input buffer",
mName, i);
toBeQueued.emplace_back(buffer);
}
}
}
for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
if (queueInputBufferInternal(buffer) != OK) {
mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
ALOGV("[%s] Error while queueing initial buffers", mName);
}
}
return OK;
@ -2532,28 +2502,25 @@ void CCodecBufferChannel::flush(const std::list<std::unique_ptr<C2Work>> &flushe
(*buffers)->flush(flushedWork);
}
mReorderStash.lock()->flush();
mPipelineWatcher.lock()->flush();
}
void CCodecBufferChannel::onWorkDone(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
const C2StreamInitDataInfo::output *initData,
size_t numDiscardedInputBuffers) {
const C2StreamInitDataInfo::output *initData) {
if (handleWork(std::move(work), outputFormat, initData)) {
mAvailablePipelineCapacity.freeInputSlots(numDiscardedInputBuffers,
"onWorkDone");
feedInputBufferIfAvailable();
}
}
void CCodecBufferChannel::onInputBufferDone(
const std::shared_ptr<C2Buffer>& buffer) {
uint64_t frameIndex, size_t arrayIndex) {
std::shared_ptr<C2Buffer> buffer =
mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
bool newInputSlotAvailable;
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
if (newInputSlotAvailable) {
mAvailablePipelineCapacity.freeInputSlots(1, "onInputBufferDone");
}
}
if (newInputSlotAvailable) {
feedInputBufferIfAvailable();
@ -2573,7 +2540,7 @@ bool CCodecBufferChannel::handleWork(
if (work->worklets.size() != 1u
|| !work->worklets.front()
|| !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
mAvailablePipelineCapacity.freeComponentSlot("handleWork");
mPipelineWatcher.lock()->onWorkDone(work->input.ordinal.frameIndex.peeku());
}
if (work->result == C2_NOT_FOUND) {
@ -2832,6 +2799,10 @@ status_t CCodecBufferChannel::setSurface(const sp<Surface> &newSurface) {
return OK;
}
PipelineWatcher::Clock::duration CCodecBufferChannel::elapsed() {
return mPipelineWatcher.lock()->elapsed(PipelineWatcher::Clock::now());
}
void CCodecBufferChannel::setMetaMode(MetaMode mode) {
mMetaMode = mode;
}

@ -34,6 +34,7 @@
#include <media/ICrypto.h>
#include "InputSurfaceWrapper.h"
#include "PipelineWatcher.h"
namespace android {
@ -44,7 +45,6 @@ public:
virtual ~CCodecCallback() = default;
virtual void onError(status_t err, enum ActionCode actionCode) = 0;
virtual void onOutputFramesRendered(int64_t mediaTimeUs, nsecs_t renderTimeNs) = 0;
virtual void onWorkQueued(bool eos) = 0;
virtual void onOutputBuffersChanged() = 0;
};
@ -128,22 +128,21 @@ public:
* @param workItems finished work item.
* @param outputFormat new output format if it has changed, otherwise nullptr
* @param initData new init data (CSD) if it has changed, otherwise nullptr
* @param numDiscardedInputBuffers the number of input buffers that are
* returned for the first time (not previously returned by
* onInputBufferDone()).
*/
void onWorkDone(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
const C2StreamInitDataInfo::output *initData,
size_t numDiscardedInputBuffers);
const C2StreamInitDataInfo::output *initData);
/**
* Make an input buffer available for the client as it is no longer needed
* by the codec.
*
* @param buffer The buffer that becomes unused.
* @param frameIndex The index of input work
* @param arrayIndex The index of buffer in the input work buffers.
*/
void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
PipelineWatcher::Clock::duration elapsed();
enum MetaMode {
MODE_NONE,
@ -266,79 +265,7 @@ private:
MetaMode mMetaMode;
// PipelineCapacity is used in the input buffer gating logic.
//
// There are three criteria that need to be met before
// onInputBufferAvailable() is called:
// 1. The number of input buffers that have been received by
// CCodecBufferChannel but not returned via onWorkDone() or
// onInputBufferDone() does not exceed a certain limit. (Let us call this
// number the "input" capacity.)
// 2. The number of work items that have been received by
// CCodecBufferChannel whose outputs have not been returned from the
// component (by calling onWorkDone()) does not exceed a certain limit.
// (Let us call this the "component" capacity.)
//
// These three criteria guarantee that a new input buffer that arrives from
// the invocation of onInputBufferAvailable() will not
// 1. overload CCodecBufferChannel's input buffers;
// 2. overload the component; or
//
struct PipelineCapacity {
// The number of available input capacity.
std::atomic_int input;
// The number of available component capacity.
std::atomic_int component;
PipelineCapacity();
// Set the values of #input and #component.
void initialize(int newInput, int newComponent,
const char* newName = "<UNKNOWN COMPONENT>",
const char* callerTag = nullptr);
// Return true and decrease #input and #component by one if
// they are all greater than zero; return false otherwise.
//
// callerTag is used for logging only.
//
// allocate() is called by CCodecBufferChannel to check whether it can
// receive another input buffer. If the return value is true,
// onInputBufferAvailable() and onOutputBufferAvailable() can be called
// afterwards.
bool allocate(const char* callerTag = nullptr);
// Increase #input and #component by one.
//
// callerTag is used for logging only.
//
// free() is called by CCodecBufferChannel after allocate() returns true
// but onInputBufferAvailable() cannot be called for any reasons. It
// essentially undoes an allocate() call.
void free(const char* callerTag = nullptr);
// Increase #input by @p numDiscardedInputBuffers.
//
// callerTag is used for logging only.
//
// freeInputSlots() is called by CCodecBufferChannel when onWorkDone()
// or onInputBufferDone() is called. @p numDiscardedInputBuffers is
// provided in onWorkDone(), and is 1 in onInputBufferDone().
int freeInputSlots(size_t numDiscardedInputBuffers,
const char* callerTag = nullptr);
// Increase #component by one and return the updated value.
//
// callerTag is used for logging only.
//
// freeComponentSlot() is called by CCodecBufferChannel when
// onWorkDone() is called.
int freeComponentSlot(const char* callerTag = nullptr);
private:
// Component name. Used for logging.
const char* mName;
};
PipelineCapacity mAvailablePipelineCapacity;
Mutexed<PipelineWatcher> mPipelineWatcher;
class ReorderStash {
public:

@ -0,0 +1,145 @@
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "PipelineWatcher"
#include <numeric>
#include <log/log.h>
#include "PipelineWatcher.h"
namespace android {
PipelineWatcher &PipelineWatcher::inputDelay(uint32_t value) {
mInputDelay = value;
return *this;
}
PipelineWatcher &PipelineWatcher::pipelineDelay(uint32_t value) {
mPipelineDelay = value;
return *this;
}
PipelineWatcher &PipelineWatcher::outputDelay(uint32_t value) {
mOutputDelay = value;
return *this;
}
PipelineWatcher &PipelineWatcher::smoothnessFactor(uint32_t value) {
mSmoothnessFactor = value;
return *this;
}
void PipelineWatcher::onWorkQueued(
uint64_t frameIndex,
std::vector<std::shared_ptr<C2Buffer>> &&buffers,
const Clock::time_point &queuedAt) {
ALOGV("onWorkQueued(frameIndex=%llu, buffers(size=%zu), queuedAt=%lld)",
(unsigned long long)frameIndex,
buffers.size(),
(long long)queuedAt.time_since_epoch().count());
auto it = mFramesInPipeline.find(frameIndex);
if (it != mFramesInPipeline.end()) {
ALOGD("onWorkQueued: Duplicate frame index (%llu); previous entry removed",
(unsigned long long)frameIndex);
(void)mFramesInPipeline.erase(it);
}
(void)mFramesInPipeline.try_emplace(frameIndex, std::move(buffers), queuedAt);
}
std::shared_ptr<C2Buffer> PipelineWatcher::onInputBufferReleased(
uint64_t frameIndex, size_t arrayIndex) {
ALOGV("onInputBufferReleased(frameIndex=%llu, arrayIndex=%zu)",
(unsigned long long)frameIndex, arrayIndex);
auto it = mFramesInPipeline.find(frameIndex);
if (it == mFramesInPipeline.end()) {
ALOGD("onInputBufferReleased: frameIndex not found (%llu); ignored",
(unsigned long long)frameIndex);
return nullptr;
}
if (it->second.buffers.size() <= arrayIndex) {
ALOGD("onInputBufferReleased: buffers at %llu: size %zu, requested index: %zu",
(unsigned long long)frameIndex, it->second.buffers.size(), arrayIndex);
return nullptr;
}
std::shared_ptr<C2Buffer> buffer(std::move(it->second.buffers[arrayIndex]));
ALOGD_IF(!buffer, "onInputBufferReleased: buffer already released (%llu:%zu)",
(unsigned long long)frameIndex, arrayIndex);
return buffer;
}
void PipelineWatcher::onWorkDone(uint64_t frameIndex) {
ALOGV("onWorkDone(frameIndex=%llu)", (unsigned long long)frameIndex);
auto it = mFramesInPipeline.find(frameIndex);
if (it == mFramesInPipeline.end()) {
ALOGD("onWorkDone: frameIndex not found (%llu); ignored",
(unsigned long long)frameIndex);
return;
}
(void)mFramesInPipeline.erase(it);
}
void PipelineWatcher::flush() {
mFramesInPipeline.clear();
}
bool PipelineWatcher::pipelineFull() const {
if (mFramesInPipeline.size() >=
mInputDelay + mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
ALOGV("pipelineFull: too many frames in pipeline (%zu)", mFramesInPipeline.size());
return true;
}
size_t sizeWithInputReleased = std::count_if(
mFramesInPipeline.begin(),
mFramesInPipeline.end(),
[](const decltype(mFramesInPipeline)::value_type &value) {
for (const std::shared_ptr<C2Buffer> &buffer : value.second.buffers) {
if (buffer) {
return false;
}
}
return true;
});
if (sizeWithInputReleased >=
mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
ALOGV("pipelineFull: too many frames in pipeline, with input released (%zu)",
sizeWithInputReleased);
return true;
}
ALOGV("pipeline has room (total: %zu, input released: %zu)",
mFramesInPipeline.size(), sizeWithInputReleased);
return false;
}
PipelineWatcher::Clock::duration PipelineWatcher::elapsed(
const PipelineWatcher::Clock::time_point &now) const {
return std::accumulate(
mFramesInPipeline.begin(),
mFramesInPipeline.end(),
Clock::duration::zero(),
[&now](const Clock::duration &current,
const decltype(mFramesInPipeline)::value_type &value) {
Clock::duration elapsed = now - value.second.queuedAt;
ALOGV("elapsed: frameIndex = %llu elapsed = %lldms",
(unsigned long long)value.first,
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
return current > elapsed ? current : elapsed;
});
}
} // namespace android

@ -0,0 +1,78 @@
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PIPELINE_WATCHER_H_
#define PIPELINE_WATCHER_H_
#include <chrono>
#include <map>
#include <memory>
#include <C2Work.h>
namespace android {
/**
* PipelineWatcher watches the status of the work.
*/
class PipelineWatcher {
public:
typedef std::chrono::steady_clock Clock;
PipelineWatcher()
: mInputDelay(0),
mPipelineDelay(0),
mOutputDelay(0),
mSmoothnessFactor(0) {}
~PipelineWatcher() = default;
PipelineWatcher &inputDelay(uint32_t value);
PipelineWatcher &pipelineDelay(uint32_t value);
PipelineWatcher &outputDelay(uint32_t value);
PipelineWatcher &smoothnessFactor(uint32_t value);
void onWorkQueued(
uint64_t frameIndex,
std::vector<std::shared_ptr<C2Buffer>> &&buffers,
const Clock::time_point &queuedAt);
std::shared_ptr<C2Buffer> onInputBufferReleased(
uint64_t frameIndex, size_t arrayIndex);
void onWorkDone(uint64_t frameIndex);
void flush();
bool pipelineFull() const;
Clock::duration elapsed(const Clock::time_point &now) const;
private:
uint32_t mInputDelay;
uint32_t mPipelineDelay;
uint32_t mOutputDelay;
uint32_t mSmoothnessFactor;
struct Frame {
Frame(std::vector<std::shared_ptr<C2Buffer>> &&b,
const Clock::time_point &q)
: buffers(b),
queuedAt(q) {}
std::vector<std::shared_ptr<C2Buffer>> buffers;
const Clock::time_point queuedAt;
};
std::map<uint64_t, Frame> mFramesInPipeline;
};
} // namespace android
#endif // PIPELINE_WATCHER_H_
Loading…
Cancel
Save