gstreamer的collectpad源码分析

1. 背景:

gstreamer的collectpad是一类特殊的pad,这类pad工作于收集模式,用于管理控制若干个pad组成的pad集合的数据同步处理。大部分的合成器(muxer)均使用collectpad来收集音视频数据,并根据可重载的收集条件判断函数对不同pad之间的数据进行处理(或同步)。

由于collectpad中大部分处理函数均可重载(set_func),因此本文只讨论默认的处理函数。

2. 默认流程:

collectpad的简单流程如下图:

不同的pad工作与不同的线程中,当某一个pad有数据到来时,会对所有pad进行判断,看看是否可以满足收集条件,如果满足收集条件就向对应的element推送数据。如果不满足收集条件,就会将该线程挂起,等待其他线程的数据。

当某个pad处于挂起时,其他pad收到数据后,一样会对收集条件进行判断,如果满足条件,会将所有pad的数据推送至element,同时广播条件变量,唤醒所有挂起中的其他pad(线程)。

简单的函数调用关系如下:

3. 数据结构:

数据结构如下:一个_GstCollectPads中维护了一个_GstCollectData的链表,每个pad对应一个_GstCollectData,其中记录了pad中的数据的时间戳,buffer,已经对应pad的状态(如锁、等待等标志位),GstCollectPadsPrivate中则记录了collectpad中注册的各种事件回调函数,这里的回调函数都有接口可以进行重载。此外,GstCollectPadsPrivate还维护了线程间同步用的锁和条件变量。

  1. /**
  2.  * GstCollectPads:
  3.  * @data: (element-type GstBase.CollectData): #GList of #GstCollectData managed
  4.  *   by this #GstCollectPads.
  5.  *
  6.  * Collectpads object.
  7.  */
  8. struct _GstCollectPads {
  9.   /* 基类。  */
  10.   GstObject      object;
  11.   /*< public >*/ /* with LOCK and/or STREAM_LOCK */
  12.   /* 所有PAD的集合。  */
  13.   /*
  14.     * GstCollectData:
  15.     * @collect: owner #GstCollectPads
  16.     * @pad: #GstPad managed by this data
  17.     * @buffer: currently queued buffer.
  18.     * @pos: position in the buffer
  19.     * @segment: last segment received.
  20.     * @dts: the signed version of the DTS converted to running time. To access
  21.     *       this memeber, use %GST_COLLECT_PADS_DTS macro. (Since 1.6)
  22.     *
  23.     * Structure used by the collect_pads.
  24.     struct _GstCollectData
  25.     {
  26.       /* with STREAM_LOCK of @collect */
  27.       /* 指向回collectpad。  */
  28.       GstCollectPads        *collect;
  29.       GstPad                *pad;
  30.       GstBuffer             *buffer;
  31.       guint                  pos;
  32.       GstSegment             segment;
  33.       /*< private >*/
  34.       /* state: bitfield for easier extension;
  35.        * eos, flushing, new_segment, waiting */
  36.       GstCollectPadsStateFlags    state;
  37.       GstCollectDataPrivate *priv;
  38.       union {
  39.         struct {
  40.           /*< public >*/
  41.           gint64 dts;
  42.           /*< private >*/
  43.         } abi;
  44.         gpointer _gst_reserved[GST_PADDING];
  45.       } ABI;
  46.     };
  47.    */
  48.   GSList        *data;                  /* list of CollectData items */
  49.   /*< private >*/
  50.   GRecMutex      stream_lock;          /* used to serialize collection among several streams */
  51.   GstCollectPadsPrivate *priv;
  52.   gpointer _gst_reserved[GST_PADDING];
  53. };

4. 代码分析:

4.1 主入口函数:

主入口函数gst_collect_pads_chain,不同pad工作于不同线程中。代码分析如下:

  1. /* For each buffer we receive we check if our collected condition is reached
  2.  * and if so we call the collected function. When this is done we check if
  3.  * data has been unqueued. If data is still queued we wait holding the stream
  4.  * lock to make sure no EOS event can happen while we are ready to be
  5.  * collected 
  6.  */
  7. static GstFlowReturn
  8. gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
  9. {
  10.   GstCollectData *data;
  11.   GstCollectPads *pads;
  12.   GstFlowReturn ret;
  13.   GstBuffer **buffer_p;
  14.   guint32 cookie;
  15.   GST_DEBUG (“Got buffer for pad %s:%s”, GST_DEBUG_PAD_NAME (pad));
  16.   /* some magic to get the managing collect_pads */
  17.   GST_OBJECT_LOCK (pad);
  18.   data = (GstCollectData *) gst_pad_get_element_private (pad);
  19.   if (G_UNLIKELY (data == NULL))
  20.     goto no_data;
  21.   ref_data (data);
  22.   GST_OBJECT_UNLOCK (pad);
  23.   pads = data->collect;
  24.   GST_COLLECT_PADS_STREAM_LOCK (pads);
  25.   /* 状态判断。  */
  26.   /* if not started, bail out */
  27.   if (G_UNLIKELY (!pads->priv->started))
  28.     goto not_started;
  29.   /* check if this pad is flushing */
  30.   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
  31.               GST_COLLECT_PADS_STATE_FLUSHING)))
  32.     goto flushing;
  33.   /* pad was EOS, we can refuse this data */
  34.   if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
  35.               GST_COLLECT_PADS_STATE_EOS)))
  36.     goto eos;
  37.   /* see if we need to clip */
  38.   /* 数据前处理。  */
  39.   if (pads->priv->clip_func) {
  40.     GstBuffer *outbuf = NULL;
  41.     ret =
  42.         pads->priv->clip_func (pads, data, buffer, &outbuf,
  43.         pads->priv->clip_user_data);
  44.     buffer = outbuf;
  45.     if (G_UNLIKELY (outbuf == NULL))
  46.       goto clipped;
  47.     if (G_UNLIKELY (ret == GST_FLOW_EOS))
  48.       goto eos;
  49.     else if (G_UNLIKELY (ret != GST_FLOW_OK))
  50.       goto error;
  51.   }
  52.   GST_DEBUG_OBJECT (pads, “Queuing buffer %p for pad %s:%s”, buffer,
  53.       GST_DEBUG_PAD_NAME (pad));
  54.   /* One more pad has data queued */
  55.   // 如果当前collectpad处于WAITING状态会将queuedpads增加
  56.   if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
  57.     pads->priv->queuedpads++;
  58.   buffer_p = &data->buffer;
  59.   gst_buffer_replace (buffer_p, buffer);
  60.   /* update segment last position if in TIME */
  61.   /* 更新当前pad上对应的时间信息,后续用于重新计算等待状态需要用到。  */
  62.   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
  63.     GstClockTime timestamp;
  64.     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
  65.     if (GST_CLOCK_TIME_IS_VALID (timestamp))
  66.       data->segment.position = timestamp;
  67.   }
  68.   /* While we have data queued on this pad try to collect stuff */
  69.   do {
  70.     /* Check if our collected condition is matched and call the collected
  71.      * function if it is */
  72.     /* 主要处理函数,判断收集条件是否满足,后续分析。  */
  73.     ret = gst_collect_pads_check_collected (pads);
  74.     /* when an error occurs, we want to report this back to the caller ASAP
  75.      * without having to block if the buffer was not popped */
  76.     /* 数据流处理异常,进入异常处理分支。  */
  77.     if (G_UNLIKELY (ret != GST_FLOW_OK))
  78.       goto error;
  79.     /* data was consumed, we can exit and accept new data */
  80.     /* 当buffer在check_collected函数中被消费,会在其中减少引用次数,释放buffer。
  81.      * 数据被处理后退出循环,等待下一次buffer到来调用chain函数。  */
  82.     if (data->buffer == NULL)
  83.       break;
  84.     /* 数据未被处理,未满足数据收集条件,本pad对应线程将进行唤醒等待。  */
  85.     /* Having the _INIT here means we don’t care about any broadcast up to here
  86.      * (most of which occur with STREAM_LOCK held, so could not have happened
  87.      * anyway).  We do care about e.g. a remove initiated broadcast as of this
  88.      * point.  Putting it here also makes this thread ignores any evt it raised
  89.      * itself (as is a usual WAIT semantic).
  90.      */
  91.     GST_COLLECT_PADS_EVT_INIT (cookie);
  92.     /* pad could be removed and re-added */
  93.     unref_data (data);
  94.     GST_OBJECT_LOCK (pad);
  95.     if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
  96.       goto pad_removed;
  97.     ref_data (data);
  98.     GST_OBJECT_UNLOCK (pad);
  99.     GST_DEBUG_OBJECT (pads, “Pad %s:%s has a buffer queued, waiting”,
  100.         GST_DEBUG_PAD_NAME (pad));
  101.     /* wait to be collected, this must happen from another thread triggered
  102.      * by the _chain function of another pad. We release the lock so we
  103.      * can get stopped or flushed as well. We can however not get EOS
  104.      * because we still hold the STREAM_LOCK.
  105.      */
  106.     /* 等待条件变量被唤醒。  */
  107.     GST_COLLECT_PADS_STREAM_UNLOCK (pads);
  108.     GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
  109.     GST_COLLECT_PADS_STREAM_LOCK (pads);
  110.     GST_DEBUG_OBJECT (pads, “Pad %s:%s resuming”, GST_DEBUG_PAD_NAME (pad));
  111.     /* 唤醒后的状态判断。  */
  112.     /* after a signal, we could be stopped */
  113.     if (G_UNLIKELY (!pads->priv->started))
  114.       goto not_started;
  115.     /* check if this pad is flushing */
  116.     if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
  117.                 GST_COLLECT_PADS_STATE_FLUSHING)))
  118.       goto flushing;
  119.   }
  120.   while (data->buffer != NULL);
  121. unlock_done:
  122.   GST_COLLECT_PADS_STREAM_UNLOCK (pads);
  123.   /* data is definitely NULL if pad_removed goto was run. */
  124.   if (data)
  125.     unref_data (data);
  126.   if (buffer)
  127.     gst_buffer_unref (buffer);
  128.   return ret;
  129. /* 异常状态处理。  */
  130. pad_removed:
  131.   {
  132.     GST_WARNING (“%s got removed from collectpads”, GST_OBJECT_NAME (pad));
  133.     GST_OBJECT_UNLOCK (pad);
  134.     ret = GST_FLOW_NOT_LINKED;
  135.     goto unlock_done;
  136.   }
  137.   /* ERRORS */
  138. no_data:
  139.   {
  140.     GST_DEBUG (“%s got removed from collectpads”, GST_OBJECT_NAME (pad));
  141.     GST_OBJECT_UNLOCK (pad);
  142.     gst_buffer_unref (buffer);
  143.     return GST_FLOW_NOT_LINKED;
  144.   }
  145. not_started:
  146.   {
  147.     GST_DEBUG (“not started”);
  148.     gst_collect_pads_clear (pads, data);
  149.     ret = GST_FLOW_FLUSHING;
  150.     goto unlock_done;
  151.   }
  152. flushing:
  153.   {
  154.     GST_DEBUG (“pad %s:%s is flushing”, GST_DEBUG_PAD_NAME (pad));
  155.     gst_collect_pads_clear (pads, data);
  156.     ret = GST_FLOW_FLUSHING;
  157.     goto unlock_done;
  158.   }
  159. eos:
  160.   {
  161.     /* we should not post an error for this, just inform upstream that
  162.      * we don’t expect anything anymore */
  163.     GST_DEBUG (“pad %s:%s is eos”, GST_DEBUG_PAD_NAME (pad));
  164.     ret = GST_FLOW_EOS;
  165.     goto unlock_done;
  166.   }
  167. clipped:
  168.   {
  169.     GST_DEBUG (“clipped buffer on pad %s:%s”, GST_DEBUG_PAD_NAME (pad));
  170.     ret = GST_FLOW_OK;
  171.     goto unlock_done;
  172.   }
  173. error:
  174.   {
  175.     /* we print the error, the element should post a reasonable error
  176.      * message for fatal errors */
  177.     GST_DEBUG (“collect failed, reason %d (%s)”, ret, gst_flow_get_name (ret));
  178.     gst_collect_pads_clear (pads, data);
  179.     goto unlock_done;
  180.   }
  181. }

4.2 框架上的收集条件判断

在check函数,首先对collectpads上面的pad状态进行检查,只有当有数据的pads和总的pads数满足一定条件时候,才会执行第二重的收集条件判断。函数为gst_collect_pads_check_collected,代码分析如下:

  1. static GstFlowReturn
  2. gst_collect_pads_check_collected (GstCollectPads * pads)
  3. {
  4.   GstFlowReturn flow_ret = GST_FLOW_OK;
  5.   GstCollectPadsFunction func;
  6.   gpointer user_data;
  7.   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
  8.   /* 获取回调数据。  */
  9.   GST_OBJECT_LOCK (pads);
  10.   func = pads->priv->func;
  11.   user_data = pads->priv->user_data;
  12.   GST_OBJECT_UNLOCK (pads);
  13.   g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
  14.   /* check for new pads, update stats etc.. */
  15.   /* 主要是对等待唤醒的pad的cookie进行校验。  */
  16.   gst_collect_pads_check_pads (pads);
  17.   /* 所有pad都是EOS状态。直接处理剩余的所有数据。  */
  18.   if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
  19.     /* If all our pads are EOS just collect once to let the element
  20.      * do its final EOS handling. */
  21.     GST_DEBUG_OBJECT (pads, “All active pads (%d) are EOS, calling %s”,
  22.         pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
  23.     if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
  24.                 TRUE, FALSE))) {
  25.       GST_INFO_OBJECT (pads, “finished seeking”);
  26.     }
  27.     do {
  28.       flow_ret = func (pads, user_data);
  29.     } while (flow_ret == GST_FLOW_OK);
  30.   } else {
  31.     /* 有pad处于非EOS状态。  */
  32.     gboolean collected = FALSE;
  33.     /* We call the collected function as long as our condition matches. */
  34.     /* 只有满足(有数据的有效pad数 + 无效pad数 >= 总的pad数)时,才可以进入下一步的
  35.      * 条件判断,这个判断是框架级别的判断,总是存在,其余重载的判断函数(func)都在这个循环中处理。
  36.      * 如果函数不执行,则buffer一定不会被消费,在外层会走入线程挂起等待唤醒的流程。  */
  37.     while (((pads->priv->queuedpads + pads->priv->eospads) >=
  38.             pads->priv->numpads)) {
  39.       GST_DEBUG_OBJECT (pads,
  40.           “All active pads (%d + %d >= %d) have data, “ “calling %s”,
  41.           pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
  42.           GST_DEBUG_FUNCPTR_NAME (func));
  43.       if (G_UNLIKELY (g_atomic_int_compare_and_exchange (&pads->priv->seeking,
  44.                   TRUE, FALSE))) {
  45.         GST_INFO_OBJECT (pads, “finished seeking”);
  46.       }
  47.       /* 具体数据的收集条件判断。  */
  48.       flow_ret = func (pads, user_data);
  49.       collected = TRUE;
  50.       /* 数据处理异常或者已经没有有数据的pad了,中断循环。  */
  51.       /* break on error */
  52.       if (flow_ret != GST_FLOW_OK)
  53.         break;
  54.       /* Don’t keep looping after telling the element EOS or flushing */
  55.       if (pads->priv->queuedpads == 0)
  56.         break;
  57.     }
  58.     if (!collected)
  59.       GST_DEBUG_OBJECT (pads, “Not all active pads (%d) have data, continuing”,
  60.           pads->priv->numpads);
  61.   }
  62.   return flow_ret;
  63. }

4.3 默认的第二重收集条件判断

第二重收集条件的函数是可以进行重载的,可以使用gst_collect_pads_set_function进行设置,这里只分析默认的函数gst_collect_pads_default_collected。

代码分析如下:

  1. /*
  2.  * Default collect callback triggered when #GstCollectPads gathered all data.
  3.  *
  4.  * Called with STREAM_LOCK.
  5.  */
  6. static GstFlowReturn
  7. gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
  8. {
  9.   GstCollectData *best = NULL;
  10.   GstBuffer *buffer;
  11.   GstFlowReturn ret = GST_FLOW_OK;
  12.   GstCollectPadsBufferFunction func;
  13.   gpointer buffer_user_data;
  14.   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
  15.   /* 获取回调数据。  */
  16.   GST_OBJECT_LOCK (pads);
  17.   func = pads->priv->buffer_func;
  18.   buffer_user_data = pads->priv->buffer_user_data;
  19.   GST_OBJECT_UNLOCK (pads);
  20.   g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
  21.   /* Find the oldest pad at all cost */
  22.   /* 寻找最合适的pad,并计算最早的数据和时间戳。
  23.    * 最后返回gst_collect_pads_recalculate_waiting的返回值,
  24.    * TRUE表示从非等待状态变为等待状态。
  25.    * 在默认场景下,只有使用了set_wait为FALSE时候才会标记为non-waiting状态。
  26.    * 因此如果在默认框架下主动设置了non-waiting状态,需要留意时间比较函数。
  27.    * 否则这里会进入一个
  28.    * FLOW_OK -> 数据没有POP -> pad_num没有变化 -> gst_collect_pads_check_collected主循环中死循环的问题。  */
  29.   if (gst_collect_pads_recalculate_full (pads)) {
  30.     /* waiting was switched on,
  31.      * so give another thread a chance to deliver a possibly
  32.      * older buffer; don’t charge on yet with the current oldest */
  33.     ret = GST_FLOW_OK;
  34.     goto done;
  35.   }
  36.   best = pads->priv->earliest_data;
  37.   /* No data collected means EOS. */
  38.   /* 在waiting状态下,但是没有最新的数据包,因此认为这个pad已经进入EOS状态了。无法接收数据。
  39.    * 注意,这里设置non-waiting以后并修改了时间比较函数后,其他地方调用默认函数,也会导致一个问题:
  40.    * 由于non-waiting增加了queuedpad,因此如果总的有两个pad,且两个pad都设置了non-waiting后,在函数
  41.    * gst_collect_pads_check_collected中条件判断总是成立,且queuedpad在non-waiting状态下无法自减,
  42.    * 第一次进入时候就会把所有的pad的数据直接处理,及时处理完所有数据后,依旧走到这里进行判断,
  43.    * 但是这时候buffer已经为空,导致collectpad认为这个pad的数据已经进入EOS状态,处理异常。  */
  44.   if (G_UNLIKELY (best == NULL)) {
  45.     ret = func (pads, best, NULL, buffer_user_data);
  46.     if (ret == GST_FLOW_OK)
  47.       ret = GST_FLOW_EOS;
  48.     goto done;
  49.   }
  50.   /* make sure that the pad we take a buffer from is waiting;
  51.    * otherwise popping a buffer will seem not to have happened
  52.    * and collectpads can get into a busy loop */
  53.   gst_collect_pads_set_waiting (pads, best, TRUE);
  54.   /* Send buffer */
  55.   /* 使用pop弹出buffer,并将buffer发送给buffer_func进行处理。  */
  56.   buffer = gst_collect_pads_pop (pads, best);
  57.   ret = func (pads, best, buffer, buffer_user_data);
  58.   /* maybe non-waiting was forced to waiting above due to
  59.    * newsegment events coming too sparsely,
  60.    * so re-check to restore state to avoid hanging/waiting */
  61.   gst_collect_pads_recalculate_full (pads);
  62. done:
  63.   return ret;
  64. }

注意,这里如果对某些函数进行重载或者设置了非等待状态,有两个潜在的异常流程。

4.3.1 异常流程1:

当使用默认的时间比较函数,且设置了非等待状态的pad有数据到来时,在函数gst_collect_pads_recalculate_waiting,当earliest_data检测到本PAD时,这时候时间戳应该是相等的,但是这时候如果处于非等待状态,无论是否加锁最后都会返回TRUE,这时候gst_collect_pads_default_collected函数中的第一个判断总会直接返回GST_FLOW_OK,但是实际并没有弹出任何buffer,但是gst_collect_pads_check_collected的循环条件并没有改变,导致这个线程会一直在这里循环。如果其他pad没有数据进入,则这个pad会进入死循环。

4.3.2 异常流程2:

当所有的pads都设置了non-waiting状态,则在框架的收集条件判断函数gst_collect_pads_check_collected中的pads数量比较循环总是成立,且所有pads数据弹出时都不会减少当前的queuedpad数,因此当有一个buffer弹出后,会持续弹出所有buffer,当buffer为空时,循环条件依旧成立,在处理空buffer时,认为这个pad已经进入了EOS状态,从而导致异常。异常流程如下图:

4.4 寻找最优的可用buffer和pad

这个函数流程比较简单,就是遍历collectpads中的所有pad,并和earliest_time进行比较,寻找最早的时间点的buffer。

这里涉及到时间比较的函数,这里的默认时间比较函数比较简单,就是单纯判断时间点的大小,相等返回0,第一个时间点大于第二个返回1,小于返回-1。

  1. /**
  2.  * gst_collect_pads_find_best_pad:
  3.  * @pads: the collectpads to use
  4.  * @data: returns the collectdata for earliest data
  5.  * @time: returns the earliest available buffertime
  6.  *
  7.  * Find the oldest/best pad, i.e. pad holding the oldest buffer and
  8.  * and return the corresponding #GstCollectData and buffertime.
  9.  *
  10.  * This function should be called with STREAM_LOCK held,
  11.  * such as in the callback.
  12.  */
  13. static void
  14. gst_collect_pads_find_best_pad (GstCollectPads * pads,
  15.     GstCollectData ** data, GstClockTime * time)
  16. {
  17.   GSList *collected;
  18.   GstCollectData *best = NULL;
  19.   GstClockTime best_time = GST_CLOCK_TIME_NONE;
  20.   g_return_if_fail (data != NULL);
  21.   g_return_if_fail (time != NULL);
  22.   /* 遍历所有pads,对所有pads中的数据与当前的earliest_time进行比较,
  23.    * 寻找时间最靠前的buffer及其对应的pad。  */
  24.   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
  25.     GstBuffer *buffer;
  26.     GstCollectData *data = (GstCollectData *) collected->data;
  27.     GstClockTime timestamp;
  28.     buffer = gst_collect_pads_peek (pads, data);
  29.     /* if we have a buffer check if it is better then the current best one */
  30.     if (buffer != NULL) {
  31.       timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
  32.       gst_buffer_unref (buffer);
  33.       if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
  34.               best, best_time, pads->priv->compare_user_data) < 0) {
  35.         best = data;
  36.         best_time = timestamp;
  37.       }
  38.     }
  39.   }
  40.   /* set earliest time */
  41.   *data = best;
  42.   *time = best_time;
  43.   GST_DEBUG_OBJECT (pads, “best pad %s, best time %” GST_TIME_FORMAT,
  44.       best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : “(nil)”,
  45.       GST_TIME_ARGS (best_time));
  46. }

3.5 重新计算等待状态函数

函数gst_collect_pads_recalculate_waiting会根据earliest_time和所有pad上的数据进行比较,计算collectpad是否需要重新进入等待状态,返回TRUE表示从非等待状态进入等待状态。

这里如果设置了non-waiting状态,则要小心4.3中出现的异常。

  1. /* General overview:
  2.  * – only pad with a buffer can determine earliest_data (and earliest_time)
  3.  * – only segment info determines (non-)waiting state
  4.  * – ? perhaps use _stream_time for comparison
  5.  *   (which muxers might have use as well ?)
  6.  */
  7. /*
  8.  * Function to recalculate the waiting state of all pads.
  9.  *
  10.  * Must be called with STREAM_LOCK.
  11.  *
  12.  * Returns %TRUE if a pad was set to waiting
  13.  * (from non-waiting state).
  14.  */
  15. static gboolean
  16. gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
  17. {
  18.   GSList *collected;
  19.   gboolean result = FALSE;
  20.   /* If earliest time is not known, there is nothing to do. */
  21.   /* 没有数据可以比较。  */
  22.   if (pads->priv->earliest_data == NULL)
  23.     return FALSE;
  24.   /* 遍历所有pads。  */
  25.   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
  26.     GstCollectData *data = (GstCollectData *) collected->data;
  27.     int cmp_res;
  28.     GstClockTime comp_time;
  29.     /* check if pad has a segment */
  30.     /* 检查本pad上对应的时间信息。  */
  31.     if (data->segment.format == GST_FORMAT_UNDEFINED) {
  32.       GST_WARNING_OBJECT (pads,
  33.           “GstCollectPads has no time segment, assuming 0 based.”);
  34.       gst_segment_init (&data->segment, GST_FORMAT_TIME);
  35.       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
  36.     }
  37.     /* check segment format */
  38.     if (data->segment.format != GST_FORMAT_TIME) {
  39.       GST_ERROR_OBJECT (pads, “GstCollectPads can handle only time segments.”);
  40.       continue;
  41.     }
  42.     /* check if the waiting state should be changed */
  43.     /* 将earliest_time和当前pad上的时间信息进行比较。
  44.      * 当cmp_res为1,表示本pad的时间比earliest_time晚,这时候数据可以消费,不需要等待。
  45.      * 将返回FALSE,在函数gst_collect_pads_default_collected执行buffer_func消费buffer。
  46.      * 否则表示本pad时间比earliest_time早或者相等,如果这时候是在非等待状态,则要设置成等待状态,
  47.      * 同时返回TRUE,并在gst_collect_pads_default_collected不处理buffer,返回GST_FLOW_OK,重新计算best。
  48.      * 这里要注意设置了non-waiting后的第一个包,第一个包的时间有可能是相同的,即0:00 == 0:00
  49.      */
  50.     comp_time = data->segment.position;
  51.     cmp_res = pads->priv->compare_func (pads, data, comp_time,
  52.         pads->priv->earliest_data, pads->priv->earliest_time,
  53.         pads->priv->compare_user_data);
  54.     if (cmp_res > 0)
  55.       /* stop waiting */
  56.       gst_collect_pads_set_waiting (pads, data, FALSE);
  57.     else {
  58.       if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
  59.         /* start waiting */
  60.         gst_collect_pads_set_waiting (pads, data, TRUE);
  61.         result = TRUE;
  62.       }
  63.     }
  64.   }
  65.   return result;
  66. }

4.6 锁和等待状态

collectpad提供了接口gst_collect_pads_set_waiting可以给其他组件设置某个pad为等待或者非等待状态。其设置与锁GST_COLLECT_PADS_STATE_LOCKED标志位有关系。

默认情况下的pad(注意,这里的pad为单独的一个pad,并非整个collectpad,这些状态为单个pad私有,并不是collectpad的属性)均为等待状态,而锁的初始化则根据element调用collect添加pad的函数gst_collect_pads_add_pad的最后一个参数决定。

  1. /**
  2.  * gst_collect_pads_set_waiting:
  3.  * @pads: the collectpads
  4.  * @data: the data to use
  5.  * @waiting: boolean indicating whether this pad should operate
  6.  *           in waiting or non-waiting mode
  7.  *
  8.  * Sets a pad to waiting or non-waiting mode, if at least this pad
  9.  * has not been created with locked waiting state,
  10.  * in which case nothing happens.
  11.  *
  12.  * This function should be called with @pads STREAM_LOCK held, such as
  13.  * in the callback.
  14.  *
  15.  * MT safe.
  16.  */
  17. void
  18. gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
  19.     gboolean waiting)
  20. {
  21.   g_return_if_fail (pads != NULL);
  22.   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
  23.   g_return_if_fail (data != NULL);
  24.   GST_DEBUG_OBJECT (pads, “Setting pad %s to waiting %d, locked %d”,
  25.       GST_PAD_NAME (data->pad), waiting,
  26.       GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
  27.   /* Do something only on a change and if not locked */
  28.   /* 修改等待状态标志位需要在没有上锁的情况下处理,
  29.    * 可以通过GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);方式加解锁。
  30.    * 如果设置为非等待,则会把对应的queuedpad自增,当所有pad都处于非等待状态,则框架收集条件总是满足。
  31.    * 可能存在4.3.2的问题。
  32.    */
  33.   if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
  34.       (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
  35.           ! !waiting)) {
  36.     /* Set waiting state for this pad */
  37.     if (waiting)
  38.       GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
  39.     else
  40.       GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
  41.     /* Update number of queued pads if needed */
  42.     if (!data->buffer &&
  43.         !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
  44.       if (waiting)
  45.         pads->priv->queuedpads–;
  46.       else
  47.         pads->priv->queuedpads++;
  48.     }
  49.     /* signal waiters because something changed */
  50.     GST_COLLECT_PADS_EVT_BROADCAST (pads);
  51.   }
  52. }

未经允许不得转载:博客 » gstreamer的collectpad源码分析

赞 (0)

评论 0

评论前必须登录!

登陆 注册