Merge "codec2 components: merge WorkQueue and PendingWork" into qt-dev

gugelfrei
Wonsik Kim 5 years ago committed by Android (Google) Code Review
commit 10ba310fea

@ -272,12 +272,9 @@ c2_status_t SimpleC2Component::flush_sm(
flushedWork->push_back(std::move(work));
}
}
}
{
Mutexed<PendingWork>::Locked pending(mPendingWork);
while (!pending->empty()) {
flushedWork->push_back(std::move(pending->begin()->second));
pending->erase(pending->begin());
while (!queue->pending().empty()) {
flushedWork->push_back(std::move(queue->pending().begin()->second));
queue->pending().erase(queue->pending().begin());
}
}
@ -342,10 +339,7 @@ c2_status_t SimpleC2Component::stop() {
{
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
queue->clear();
}
{
Mutexed<PendingWork>::Locked pending(mPendingWork);
pending->clear();
queue->pending().clear();
}
sp<AMessage> reply;
(new AMessage(WorkHandler::kWhatStop, mHandler))->postAndAwaitResponse(&reply);
@ -366,10 +360,7 @@ c2_status_t SimpleC2Component::reset() {
{
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
queue->clear();
}
{
Mutexed<PendingWork>::Locked pending(mPendingWork);
pending->clear();
queue->pending().clear();
}
sp<AMessage> reply;
(new AMessage(WorkHandler::kWhatReset, mHandler))->postAndAwaitResponse(&reply);
@ -401,13 +392,13 @@ void SimpleC2Component::finish(
uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
std::unique_ptr<C2Work> work;
{
Mutexed<PendingWork>::Locked pending(mPendingWork);
if (pending->count(frameIndex) == 0) {
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
if (queue->pending().count(frameIndex) == 0) {
ALOGW("unknown frame index: %" PRIu64, frameIndex);
return;
}
work = std::move(pending->at(frameIndex));
pending->erase(frameIndex);
work = std::move(queue->pending().at(frameIndex));
queue->pending().erase(frameIndex);
}
if (work) {
fillWork(work);
@ -426,13 +417,13 @@ void SimpleC2Component::cloneAndSend(
work->input.flags = currentWork->input.flags;
work->input.ordinal = currentWork->input.ordinal;
} else {
Mutexed<PendingWork>::Locked pending(mPendingWork);
if (pending->count(frameIndex) == 0) {
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
if (queue->pending().count(frameIndex) == 0) {
ALOGW("unknown frame index: %" PRIu64, frameIndex);
return;
}
work->input.flags = pending->at(frameIndex)->input.flags;
work->input.ordinal = pending->at(frameIndex)->input.ordinal;
work->input.flags = queue->pending().at(frameIndex)->input.flags;
work->input.ordinal = queue->pending().at(frameIndex)->input.ordinal;
}
work->worklets.emplace_back(new C2Worklet);
if (work) {
@ -552,24 +543,21 @@ bool SimpleC2Component::processQueue() {
}
process(work, mOutputBlockPool);
ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
{
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
if (queue->generation() != generation) {
ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
queue->generation(), generation);
work->result = C2_NOT_FOUND;
queue.unlock();
{
Mutexed<ExecState>::Locked state(mExecState);
std::shared_ptr<C2Component::Listener> listener = state->mListener;
state.unlock();
listener->onWorkDone_nb(shared_from_this(), vec(work));
}
queue.lock();
return hasQueuedWork;
}
Mutexed<WorkQueue>::Locked queue(mWorkQueue);
if (queue->generation() != generation) {
ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
queue->generation(), generation);
work->result = C2_NOT_FOUND;
queue.unlock();
Mutexed<ExecState>::Locked state(mExecState);
std::shared_ptr<C2Component::Listener> listener = state->mListener;
state.unlock();
listener->onWorkDone_nb(shared_from_this(), vec(work));
return hasQueuedWork;
}
if (work->workletsProcessed != 0u) {
queue.unlock();
Mutexed<ExecState>::Locked state(mExecState);
ALOGV("returning this work");
std::shared_ptr<C2Component::Listener> listener = state->mListener;
@ -579,15 +567,15 @@ bool SimpleC2Component::processQueue() {
ALOGV("queue pending work");
work->input.buffers.clear();
std::unique_ptr<C2Work> unexpected;
{
Mutexed<PendingWork>::Locked pending(mPendingWork);
uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
if (pending->count(frameIndex) != 0) {
unexpected = std::move(pending->at(frameIndex));
pending->erase(frameIndex);
}
(void)pending->insert({ frameIndex, std::move(work) });
uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
if (queue->pending().count(frameIndex) != 0) {
unexpected = std::move(queue->pending().at(frameIndex));
queue->pending().erase(frameIndex);
}
(void)queue->pending().insert({ frameIndex, std::move(work) });
queue.unlock();
if (unexpected) {
ALOGD("unexpected pending work");
unexpected->result = C2_CORRUPTED;

@ -202,6 +202,8 @@ private:
class WorkQueue {
public:
typedef std::unordered_map<uint64_t, std::unique_ptr<C2Work>> PendingWork;
inline WorkQueue() : mFlush(false), mGeneration(0ul) {}
inline uint64_t generation() const { return mGeneration; }
@ -218,6 +220,7 @@ private:
return flush;
}
void clear();
PendingWork &pending() { return mPendingWork; }
private:
struct Entry {
@ -228,12 +231,10 @@ private:
bool mFlush;
uint64_t mGeneration;
std::list<Entry> mQueue;
PendingWork mPendingWork;
};
Mutexed<WorkQueue> mWorkQueue;
typedef std::unordered_map<uint64_t, std::unique_ptr<C2Work>> PendingWork;
Mutexed<PendingWork> mPendingWork;
class BlockingBlockPool;
std::shared_ptr<BlockingBlockPool> mOutputBlockPool;

Loading…
Cancel
Save