sbRequestThreadQueue.cpp
Go to the documentation of this file.
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set sw=2 :miv */
3 /*
4  *=BEGIN SONGBIRD GPL
5  *
6  * This file is part of the Songbird web player.
7  *
8  * Copyright(c) 2005-2011 POTI, Inc.
9  * http://www.songbirdnest.com
10  *
11  * This file may be licensed under the terms of of the
12  * GNU General Public License Version 2 (the ``GPL'').
13  *
14  * Software distributed under the License is distributed
15  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
16  * express or implied. See the GPL for the specific language
17  * governing rights and limitations.
18  *
19  * You should have received a copy of the GPL along with this
20  * program. If not, go to http://www.gnu.org/licenses/gpl.html
21  * or write to the Free Software Foundation, Inc.,
22  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
23  *
24  *=END SONGBIRD GPL
25  */
26 
27 #include "sbRequestThreadQueue.h"
28 
29 // Platform includes
30 #include <algorithm>
31 
32 // Mozilla includes
33 #include <nsArrayUtils.h>
34 #include <nsAutoLock.h>
35 #include <nsComponentManagerUtils.h>
36 #include <nsIProgrammingLanguage.h>
37 #include <nsISupportsPrimitives.h>
38 #include <nsThreadUtils.h>
39 
40 // Mozilla interfaces
41 #include <nsIMutableArray.h>
42 
43 // Songbird includes
44 #include <sbDebugUtils.h>
45 #include <sbThreadUtils.h>
46 
47 // Local includes
48 #include "sbRequestItem.h"
49 
55 class sbRTQAddedEvent : public nsIRunnable
56 {
57  //----------------------------------------------------------------------------
58  //
59  // Public interface.
60  //
61  //----------------------------------------------------------------------------
62 
63 public:
64 
65  //
66  // Inherited interfaces.
67  //
68 
70  NS_DECL_NSIRUNNABLE
71 
72 
73  //
74  // Public methods.
75  //
76 
77  static nsresult New(sbRequestThreadQueue * aRTQ,
78  nsIRunnable** aEvent);
79 
80  //----------------------------------------------------------------------------
81  //
82  // Public interface.
83  //
84  //----------------------------------------------------------------------------
85 protected:
86  // Make default constructor protected to ensure derivation
87  sbRTQAddedEvent() : mRTQ(nsnull) {}
91  nsresult Initialize(sbRequestThreadQueue * aRTQ);
92 private:
97  sbRequestThreadQueue * mRTQ;
98 };
99 
101  mCountableItems(0),
102  mRequestType(sbRequestThreadQueue::REQUEST_TYPE_NOT_SET)
103 {
104 }
105 
107 {
108  clear();
109 }
110 
112 {
113  NS_ASSERTION(aItem, "sbRequestThreadQueue::Batch::push_back passed null");
114  // If we don't have any request type then set it. If we have set a request
115  // type but it's not a user defined one then set it to the defined one.
116  // Ideally user defined and reserved requests shouldn't be mixed, but we'll
117  // handle it nicely here anyway.
118  if (aItem->GetIsCountable()) {
119  if (mRequestType <= USER_REQUEST_TYPES) {
120  mRequestType = aItem->GetType();
121  }
122  aItem->SetBatchIndex(mCountableItems++);
123  }
124  else {
125  if (mRequestType == REQUEST_TYPE_NOT_SET) {
126  mRequestType = aItem->GetType();
127  }
128  }
129  NS_ADDREF(aItem);
130  mRequestItems.push_back(aItem);
131 }
132 
134 {
135  // If this is a countable item we'll need to update the indexes and
136  // mCountableItems
137  if ((*aIter)->GetIsCountable()) {
138  PRUint32 index = (*aIter)->GetBatchIndex();
139  const const_iterator end = mRequestItems.end();
140  iterator iter = aIter;
141  // Increment past the item to be removed
142  while (++iter != end) {
143  if ((*iter)->GetIsCountable()) {
144  (*iter)->SetBatchIndex(index++);
145  }
146  }
147  --mCountableItems;
148  }
149 
150  // Release the request item and remove it from the queue
151  NS_RELEASE(*aIter);
152  mRequestItems.erase(aIter);
153 
154  // Reset the request type if there are no more countable items
155  if (mCountableItems == 0) {
156  if (mRequestItems.empty()) {
158  }
159  else {
160  mRequestType = (*mRequestItems.begin())->GetType();
161  }
162  }
163 }
164 
165 inline void ReleaseRequestItem(sbRequestItem * aItem)
166 {
167  NS_RELEASE(aItem);
168 }
169 
171 {
172  // Release all the request items
173  std::for_each(mRequestItems.begin(), mRequestItems.end(), ReleaseRequestItem);
174  mRequestItems.clear();
176 }
177 
178 void sbRequestThreadQueue::Batch::RecalcBatchStats()
179 {
180  PRUint32 index = 0;
181  const RequestItems::const_iterator end = mRequestItems.end();
182  for (RequestItems::const_iterator iter = mRequestItems.begin();
183  iter != end;
184  ++iter)
185  {
186  if ((*iter)->GetIsCountable()) {
187  (*iter)->SetBatchIndex(index++);
188  }
189  }
190 }
191 
194 
200  mLock(nsnull),
201  mBatchDepth(0),
202  mStopWaitMonitor(nsnull),
207  mCurrentBatchId(1)
208 {
210 
211  mLock = nsAutoLock::NewLock("sbRequestThreadQueue::mLock");
212  // Create the request wait monitor.
214  nsAutoMonitor::NewMonitor("sbRequestThreadQueue::mStopWaitMonitor");
215 }
216 
218 {
219  NS_ASSERTION(mRequestQueue.size() == 0,
220  "sbRequestThreadQueue destructor with items in queue");
221  if (mStopWaitMonitor) {
222  nsAutoMonitor::DestroyMonitor(mStopWaitMonitor);
223  }
224  if (mLock) {
225  nsAutoLock::DestroyLock(mLock);
226  }
227 }
228 
230 {
231  TRACE_FUNCTION("");
232 
233  NS_ENSURE_STATE(mLock);
234  nsAutoLock lock(mLock);
235 
236  ++mBatchDepth;
237 
238  return NS_OK;
239 }
240 
242 {
243  TRACE_FUNCTION("");
244  NS_ENSURE_STATE(mLock);
245  nsAutoLock lock(mLock);
246  NS_ASSERTION(mBatchDepth > 0,
247  "sbRequestThreadQueue batch depth out of balance");
248  if (mBatchDepth > 0 && --mBatchDepth == 0) {
249  ++mCurrentBatchId;
250  ProcessRequest();
251  }
252  return NS_OK;
253 }
254 
258 static nsresult
260  nsresult (sbRequestThreadQueue::* aMethod)(int),
261  nsresult aFailureReturnValue,
262  int aArg1,
263  nsIRunnable ** aRunnable)
264 {
265  NS_ENSURE_ARG_POINTER(aRunnable);
266 
267  nsresult rv;
268 
270  Runnable * runnable = nsnull;
271 
272  // New addrefs runnable when returned
273  rv = Runnable::New(&runnable,
274  &aObject,
275  aMethod,
276  aFailureReturnValue,
277  aArg1);
278  NS_ENSURE_SUCCESS(rv, rv);
279 
280  *aRunnable = runnable;
281 
282  return NS_OK;
283 }
284 
286 {
287  TRACE_FUNCTION("");
288  // Ensure we've allocated our lock and monitor
289  NS_ENSURE_TRUE(mLock, NS_ERROR_OUT_OF_MEMORY);
290  NS_ENSURE_TRUE(mStopWaitMonitor, NS_ERROR_OUT_OF_MEMORY);
291 
292  nsresult rv;
293 
294  // No need to lock since Start should not be called while the request thread
295  // is running or from multiple threads.
296  // Start should not be called more than once before Stop is called.
297  NS_ENSURE_FALSE(mThreadStarted, NS_ERROR_FAILURE);
298 
299  // Clear the stop request processing flag. No need to lock since this is
300  // the only live thread with access to the object.
301  mStopProcessing = false;
302 
303  // Create the request added event object.
304  rv = sbRTQAddedEvent::New(this, getter_AddRefs(mReqAddedEvent));
305  NS_ENSURE_SUCCESS(rv, rv);
306 
307 
308  rv = sbRunnableMethod(*this,
309  &sbRequestThreadQueue::ThreadShutdownAction,
310  NS_ERROR_FAILURE,
311  0,
312  getter_AddRefs(mShutdownAction));
313  NS_ENSURE_SUCCESS(rv, rv);
314 
315  mThreadStarted = true;
316 
317  // Create the request processing thread.
318  rv = NS_NewThread(getter_AddRefs(mThread), nsnull);
319  NS_ENSURE_SUCCESS(rv, rv);
320 
322  NS_ENSURE_SUCCESS(rv, rv);
323 
324  return NS_OK;
325 }
326 
328 {
329  TRACE_FUNCTION("");
330  NS_ENSURE_STATE(mLock);
331  nsresult rv;
332 
333  // Check and set the thread state
334  {
335  nsAutoLock lock(mLock);
336 
337  // If stop was called with no Start or after another stop return an error
338  if (!mThreadStarted) {
339  return NS_ERROR_NOT_AVAILABLE;
340  }
341 
342  mThreadStarted = false;
343 
344  }
345 
346  // Notify external users that we're stopping
347  {
348  nsAutoMonitor monitor(mStopWaitMonitor);
349  // Set the stop processing requests flag.
350  mStopProcessing = true;
351  monitor.NotifyAll();
352  }
353 
354  // Push the thread stop request onto the queue. This will signal the request
355  // thread to shutdown
356  rv = PushRequestInternal(sbRequestItem::New(REQUEST_THREAD_STOP));
357  NS_WARN_IF_FALSE(NS_SUCCEEDED(rv), "Failed to send thread stop message");
358 
359  // Process the request
360  rv = ProcessRequest();
361  NS_WARN_IF_FALSE(NS_SUCCEEDED(rv), "Failed to process stop message request");
362 
363  return NS_OK;
364 }
365 
366 nsresult sbRequestThreadQueue::ThreadShutdownAction(int)
367 {
368  TRACE_FUNCTION("");
369 
370  NS_ENSURE_TRUE(NS_IsMainThread(), NS_ERROR_FAILURE);
371 
372  OnThreadStop();
373 
374  mThread->Shutdown();
375 
376  return NS_OK;
377 }
378 
379 nsresult sbRequestThreadQueue::ProcessRequest()
380 {
381  NS_ENSURE_STATE(mReqAddedEvent);
382  // Dispatch processing of the request added event.
383  nsresult rv = mThread->Dispatch(mReqAddedEvent, NS_DISPATCH_NORMAL);
384  NS_ENSURE_SUCCESS(rv, rv);
385 
386  return NS_OK;
387 }
388 
389 nsresult sbRequestThreadQueue::FindDuplicateRequest(sbRequestItem * aItem,
390  bool & aIsDuplicate)
391 {
392  NS_ENSURE_ARG_POINTER(aItem);
393 
394  nsresult rv;
395 
396  aIsDuplicate = false;
397  // System requests are never dupes
399  return NS_OK;
400  }
401  const RequestQueue::const_reverse_iterator rend = mRequestQueue.rend();
402  for (RequestQueue::const_reverse_iterator riter = mRequestQueue.rbegin();
403  riter != rend && !aIsDuplicate;
404  ++riter) {
405 
406  sbRequestItem * request = *riter;
407  // System requests are never dupes, continue looking
409  continue;
410  }
411  // We only need to check within the current batch. Duplicate checking is
412  // an optimization and we're trying to consolidate multiple requests that
413  // come in quickly as a result of a single operation.
414  const PRInt32 queueItemBatchId = request->GetBatchId();
415 
416  if (queueItemBatchId != mCurrentBatchId) {
417  break;
418  }
419  bool continueChecking = false;
420  rv = IsDuplicateRequest(request, aItem, aIsDuplicate, continueChecking);
421  NS_ENSURE_SUCCESS(rv, rv);
422 
423  if (!continueChecking) {
424  break;
425  }
426  }
427 
428  return NS_OK;
429 }
430 
431 nsresult sbRequestThreadQueue::PushRequestInternal(sbRequestItem * aRequestItem)
432 {
433  NS_ENSURE_ARG_POINTER(aRequestItem);
434  nsresult rv;
435 
436  bool isDupe;
437  rv = FindDuplicateRequest(aRequestItem, isDupe);
438  NS_ENSURE_SUCCESS(rv, rv);
439  if (isDupe) {
440  return NS_OK;
441  }
442 
443  aRequestItem->mBatchId = mCurrentBatchId;
444 
445  NS_ADDREF(aRequestItem);
446  mRequestQueue.push_back(aRequestItem);
447 
448  return NS_OK;
449 }
450 
452 {
453  TRACE_FUNCTION("RequestType=%ui", aRequestItem->mType);
454  NS_ENSURE_ARG_POINTER(aRequestItem);
455 
456  NS_ENSURE_STATE(mLock);
457 
458  nsresult rv;
459 
460  { /* scope for request lock */
461  nsAutoLock lock(mLock);
462 
463  nsAutoMonitor monitor(mStopWaitMonitor);
464  // If we're aborting or shutting down don't accept any more requests
466  {
467  return NS_ERROR_ABORT;
468  }
469 
470  rv = PushRequestInternal(aRequestItem);
471  NS_ENSURE_SUCCESS(rv, rv);
472  }
473 
474  NS_ASSERTION(mBatchDepth >= 0,
475  "Batch depth out of whack in sbBaseDevice::PushRequest");
476  // Only process requests if we're not in a batch
477  if (mBatchDepth == 0) {
478  rv = ProcessRequest();
479  NS_ENSURE_SUCCESS(rv, rv);
480  }
481  return NS_OK;
482 }
483 
485 {
486  TRACE_FUNCTION("");
487  NS_ENSURE_STATE(mLock);
488 
489  nsAutoLock lock(mLock);
490 
491  aBatch.clear();
492 
493  // If nothing was found then just return with an empty batch
494  if (mRequestQueue.empty()) {
495  LOG("No requests found\n");
496  return NS_OK;
497  }
498 
499  // If we're in the middle of a batch just return an empty batch
500  if (mBatchDepth > 0) {
501  LOG("Waiting on batch to complete\n");
502  return NS_OK;
503  }
504  RequestQueue::iterator queueIter = mRequestQueue.begin();
505  // request queue holds on to the object
506  sbRequestItem * request = *queueIter;
507 
508  // If this isn't countable just return it by itself
509  if (!request->GetIsCountable()) {
510  LOG("Single non-batch request found\n");
511  aBatch.push_back(request);
512  mRequestQueue.erase(queueIter);
513  // Release our reference to the request, aBatch is holding a reference to it
514  NS_RELEASE(request);
515  return NS_OK;
516  }
517 
518  const PRUint32 requestBatchId = request->GetBatchId();
519 
520  // find the end of the batch and keep track of the matching batch entries
521  const RequestQueue::const_iterator queueEnd = mRequestQueue.end();
522  while (queueIter != queueEnd &&
523  requestBatchId == (*queueIter)->GetBatchId()) {
524  request = *queueIter++;
525  aBatch.push_back(request);
526  // Release our reference to the request, aBatch is holding a reference to it
527  NS_RELEASE(request);
528  }
529 
530  // Remove all the items we pushed onto the batch
531  mRequestQueue.erase(mRequestQueue.begin(), queueIter);
532 
533  return NS_OK;
534 }
535 
537 {
538  NS_ENSURE_STATE(mLock);
539 
540  // Copy all the requests on the queue to aBatch
541  std::insert_iterator<Batch> insertIter(aBatch, aBatch.end());
542  std::copy(mRequestQueue.begin(), mRequestQueue.end(), insertIter);
543 
544  // Release all of our objects
545  std::for_each(mRequestQueue.begin(), mRequestQueue.end(), ReleaseRequestItem);
546 
547  // Now that we have copied the requests clear our request queue
548  mRequestQueue.clear();
549 
550  return NS_OK;
551 }
552 
554 {
555  TRACE_FUNCTION("");
556 
557  nsresult rv;
558 
559  NS_ENSURE_STATE(mLock);
560 
561  Batch batch;
562  // Lock the queue while copy and clear it
563  {
564  nsAutoLock lock(mLock);
565 
566  rv = ClearRequestsNoLock(batch);
567  NS_ENSURE_SUCCESS(rv, rv);
568  }
569 
570  // Now that we have released the lock, cleanup the unprocessed items in the
571  // the batch. Cleanup can result in proxying to the main thread which can
572  // deadlock if we're holding the lock.
573  rv = CleanupBatch(batch);
574  NS_ENSURE_SUCCESS(rv, rv);
575 
576  return NS_OK;
577 }
578 
580 {
581  TRACE_FUNCTION("");
582 
583  NS_ENSURE_STATE(mStopWaitMonitor);
584 
585  nsresult rv;
586 
587  Batch batch;
588  {
589  // Have to lock mLock before mStopWaitMonitor to avoid deadlocks
590  nsAutoLock lock(mLock);
591  nsAutoMonitor monitor(mStopWaitMonitor);
592  // If we're aborting set the flag, reset batch depth and clear requests
593  if (!mAbortRequests) {
594  if (mIsHandlingRequests) {
595  mAbortRequests = true;
596  monitor.NotifyAll();
597  }
598  mBatchDepth = 0;
599  rv = ClearRequestsNoLock(batch);
600  NS_ENSURE_SUCCESS(rv, rv);
601  }
602  }
603  // must not hold onto the monitor while we create sbDeviceStatus objects
604  // because that involves proxying to the main thread
605  rv = CleanupBatch(batch);
606  NS_ENSURE_SUCCESS(rv, rv);
607 
608  return NS_OK;
609 }
610 
612 {
613  TRACE_FUNCTION("");
614 
615  NS_ASSERTION(mLock, "mLock null");
616 
617  // Notify interested parties that we are aborting. This would be anyone that
618  // called GetStopWaitMonitor and may be blocking on it. Scope just to make
619  // sure we follow AB BA lock pattern with mLock
620  nsAutoMonitor monitor(mStopWaitMonitor);
621  const bool abort = mAbortRequests || mStopProcessing;
622  if (abort) {
623  mAbortRequests = false;
624  }
625  return abort;
626 }
627 
628 //------------------------------------------------------------------------------
629 //
630 // Device request added event nsISupports services.
631 //
632 //------------------------------------------------------------------------------
633 
635 
636 
637 //------------------------------------------------------------------------------
638 //
639 // Device request added event nsIRunnable services.
640 //
641 //------------------------------------------------------------------------------
642 
644 {
645  mRTQ = aRTQ;
646 
647  return NS_OK;
648 }
649 
651 {
652 public:
654  {
655  mAlreadyHandlingRequests = mRTQ->StartRequests();
656  }
658  {
659  if (!mAlreadyHandlingRequests) {
660  mRTQ->CompleteRequests();
661  }
662  }
664  {
665  return mAlreadyHandlingRequests;
666  }
667 private:
668  // Non-owning reference, object is assured to outlive this
669  sbRequestThreadQueue * mRTQ;
670  bool mAlreadyHandlingRequests;
671 };
676 NS_IMETHODIMP
677 sbRTQAddedEvent::Run()
678 {
679  NS_ENSURE_STATE(mRTQ);
680 
681  nsresult rv;
682 
683  sbAutoRequestHandling autoIsHandlingRequests(mRTQ);
684 
685  // Prevent re-entrancy. This can happen if some API waits and runs events on
686  // the request thread. Do nothing if already handling requests. Otherwise,
687  // indicate that requests are being handled and set up to automatically set
688  // the handling requests flag to false on exit. This check is only done on
689  // the request thread, so locking is not required.
690  if (autoIsHandlingRequests.AlreadyHandlingRequests()) {
691  return NS_OK;
692  }
693 
694  // Start processing of the next request batch and set to automatically
695  // complete the current request on exit.
697  rv = mRTQ->PopBatch(batch);
698  NS_ENSURE_SUCCESS(rv, rv);
699 
700  // Keep processing until there are no more batches.
701  while (!batch.empty()) {
702  PRUint32 batchRequestType = batch.RequestType();
703 
704  // Need to dispatch the thread stop event and then return
705  if (batchRequestType == sbRequestThreadQueue::REQUEST_THREAD_STOP) {
706  NS_ENSURE_STATE(mRTQ->mShutdownAction);
707  NS_DispatchToMainThread(mRTQ->mShutdownAction);
708 
709  // Now that we've dispatched it, null it out - the runnable holds a ref to
710  // the RTQ.
711  mRTQ->mShutdownAction = NULL;
712  return NS_OK;
713  }
714 
715  // Check for abort.
716  if (mRTQ->CheckAndResetRequestAbort()) {
717  rv = mRTQ->CleanupBatch(batch);
718  NS_ENSURE_SUCCESS(rv, rv);
719  return NS_ERROR_ABORT;
720  }
721 
727  if (batchRequestType == sbRequestThreadQueue::REQUEST_THREAD_START) {
728  // Call OnThreadStart on the device thread, for things like
729  // CoInitialize
730  rv = mRTQ->OnThreadStart();
731  NS_ENSURE_SUCCESS(rv, rv);
732  }
733 
734  rv = mRTQ->ProcessBatch(batch);
735 
736  // Always clean up the batch to remove any failed items from
737  // the device library. They can show up as ghosts until the device
738  // is re-scanned.
739  nsresult rv2 = mRTQ->CleanupBatch(batch);
740  NS_ENSURE_SUCCESS(rv2, rv2);
741 
742  // Check to see if the ProcessBatch call was aborted. If so we don't
743  // want to surface the error, just stop processing the batches.
744  if (rv == NS_ERROR_ABORT) {
745  return NS_OK;
746  }
747  NS_ENSURE_SUCCESS(rv, rv);
748 
749  rv = mRTQ->PopBatch(batch);
750  NS_ENSURE_SUCCESS(rv, rv);
751  }
752  return NS_OK;
753 }
754 
755 
765 /* static */
767  nsIRunnable** aEvent)
768 {
769  NS_ENSURE_ARG_POINTER(aEvent);
770 
771  nsresult rv;
772 
773  // Create the event object.
774  nsCOMPtr<sbRTQAddedEvent> event = new sbRTQAddedEvent();
775  NS_ENSURE_TRUE(event, NS_ERROR_OUT_OF_MEMORY);
776 
777  rv = event->Initialize(aRTQ);
778  NS_ENSURE_SUCCESS(rv, rv);
779 
780  nsCOMPtr<nsIRunnable> runnable = do_QueryInterface(event, &rv);
781  NS_ENSURE_SUCCESS(rv, rv);
782 
783  runnable.forget(aEvent);
784 
785  return NS_OK;
786 }
NS_DECL_ISUPPORTS static NS_DECL_NSIRUNNABLE nsresult New(sbRequestThreadQueue *aRTQ, nsIRunnable **aEvent)
#define SB_PRLOG_SETUP(x)
Definition: sbDebugUtils.h:115
return NS_OK
#define LOG(args)
function clear(dbq)
PRUint32 GetType() const
Definition: sbRequestItem.h:65
virtual void CompleteRequests()
var event
const NS_ERROR_ABORT
sbAutoRequestHandling(sbRequestThreadQueue *aRTQ)
nsresult Initialize(sbRequestThreadQueue *aRTQ)
static sbRequestItem * New(PRUint32 aType, bool aIsCountable=false)
const_iterator end() const
NS_IMPL_THREADSAFE_RELEASE(sbRequestThreadQueue)
RequestItems::const_iterator const_iterator
this _document false
Definition: FeedWriter.js:1085
Songbird Thread Utilities Definitions.
nsresult PopBatch(Batch &aBatch)
#define TRACE_FUNCTION(...)
Definition: sbDebugUtils.h:118
nsresult ClearRequestsNoLock(Batch &aRequests)
NS_IMPL_THREADSAFE_ADDREF(sbRequestThreadQueue)
bool GetIsCountable() const
PRUint32 GetBatchId() const
Definition: sbRequestItem.h:81
NS_IMPL_THREADSAFE_ISUPPORTS1(sbRTQAddedEvent, nsIRunnable)
void ReleaseRequestItem(sbRequestItem *aItem)
nsresult PushRequest(sbRequestItem *aRequestItem)
static nsresult sbRunnableMethod(sbRequestThreadQueue &aObject, nsresult(sbRequestThreadQueue::*aMethod)(int), nsresult aFailureReturnValue, int aArg1, nsIRunnable **aRunnable)
void SetBatchIndex(PRInt32 aBatchIndex)
Definition: sbRequestItem.h:98
void push_back(sbRequestItem *aItem)
RequestItems::iterator iterator