大话ion系列(四)

2021年11月22日 阅读数:4
这篇文章主要向大家介绍大话ion系列(四),主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

点击上方“LiveVideoStack”关注咱们css

做者 | 王朋闯
本文为王朋闯老师创做的系列ion文章,LiveVideoStack已得到受权发布,将来将持续更新。

大话ion系列(一)web

大话ion系列(二)json

大话ion系列(三)swift


7、Simulcast流程微信


1. Simulcast概念markdown


先介绍WebRTC的一个概念——Simulcast(联播,俗称大小流):
session


推流端===f/h/q==>SFU--f--->收流端A                 |---q--->收流端B                 |---h--->收流端C

  • 上行通常是三路流,按分辨率和码率,通常分为fhq(大中小)三层闭包

  • 下行能够分给不一样的用户不一样的流,好比网很差时分发个小流q,网变好了再切回大流f架构

  • 三层的streamId、trackId是同样的,可是rid和ssrc是不一样的,rid通常是f、h、qapp

  • 对应的SDP部分


.........a=rid:f senda=rid:h senda=rid:q senda=simulcast:send f;h;q

2.收发流程

看本章以前,最好看一下前一章,熟悉一下收发流程,本文只重点介绍其中的Simulcast部分。

收发包逻辑打通步骤:

SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通

3.Simulcast上行流程

非Simulcast状况,OnTrack通常会触发两次:一个audioTrack+一个videoTrack。

Simulcast下,OnTrack通常会触发四次:一个audioTrack+三个videoTrack(rid分别为fhq)。

这个流程会触发四次:
OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack

三个videoTrack,共用同一个WebRTCReceiver。
type WebRTCReceiver struct {。。。    receiver       *webrtc.RTPReceiver    codec          webrtc.RTPCodecParameters    rtcpCh         chan []rtcp.Packet    buffers        [3]*buffer.Buffer//须要三个buffer    upTracks       [3]*webrtc.TrackRemote//三个TrackRemote。。。    pendingTracks  [3][]*DownTrack//三个层,每层来订阅的downtrack。。。}

接下来看一下AddUpTrack是如何工做的:

func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool) {    if w.closed.get() {        return    }   //根据RID来区分layer    var layer int    switch track.RID() {//若是没开simulcast,为""    case fullResolution:        layer = 2    case halfResolution:        layer = 1    default:        layer = 0//若是没开simulcast,为0    }     w.Lock()  //设置空域层layer的track    w.upTracks[layer] = track   //设置空域层layer的buff    w.buffers[layer] = buff    w.available[layer].set(true)   //设置空域层layer的downtrack    w.downTracks[layer].Store(make([]*DownTrack,0, 10))    w.pendingTracks[layer] = make([]*DownTrack,0, 10)    w.Unlock()   //闭包函数,按最佳质量订阅,切到f层    subBestQuality := func(targetLayerint) {        for l := 0; l <targetLayer; l++ {            dts :=w.downTracks[l].Load()            if dts == nil{                continue            }            for _, dt :=range dts.([]*DownTrack) {                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)            }        }    }   //闭包函数,按最差质量订阅,切到q层    subLowestQuality := func(targetLayerint) {        for l := 2; l !=targetLayer; l-- {            dts :=w.downTracks[l].Load()            if dts == nil{                continue            }            for _, dt :=range dts.([]*DownTrack) {                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)            }        }    }   //是否开启大小流    if w.isSimulcast {    //若是配置最佳质量,则等到f层到来时,订阅它        if bestQualityFirst &&(!w.available[2].get() || layer == 2) {            subBestQuality(layer)      //若是配置最差质量,则等到q层到来时,订阅它        } else if!bestQualityFirst && (!w.available[0].get() ||layer == 0) {            subLowestQuality(layer)        }    }   //启动读写流程    go w.writeRTP(layer)}

真正的收发包流程来了:

func (w *WebRTCReceiver) writeRTP(layer int) {    defer func() {//这里设置自动清理函数        w.closeOnce.Do(func() {            w.closed.set(true)            w.closeTracks()        })    }()   //建立一个PLI包,后边要用    pli := []rtcp.Packet{        &rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer)},    }     for {    //这里能够看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer        pkt, err :=w.buffers[layer].ReadExtended()        if err ==io.EOF {            return        }     //若是开启大小流        if w.isSimulcast {      //一开始是pending状态            ifw.pending[layer].get() {        //若是收到的包是关键帧                ifpkt.KeyFrame {                    w.Lock()          //若是有切换中的layer,那就切一下                    for idx,dt := range w.pendingTracks[layer] {                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)                        w.storeDownTrack(layer, dt)                        dt.SwitchSpatialLayerDone(int32(layer))                       w.pendingTracks[layer][idx] = nil                    }                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]                   w.pending[layer].set(false)                    w.Unlock()                } else {          //若是是非关键字,说明须要发送PLI                    w.SendRTCP(pli)                }            }        }     //这里是否是有疑问,[]*downTracks是SessionLocal.Publish里塞过来的,后边会介绍:)        for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){      //下行track写入rtp包            if err = dt.WriteRTP(pkt, layer);err != nil {                if err ==io.EOF && err == io.ErrClosedPipe {                    w.Lock()                    w.deleteDownTrack(layer, dt.id)                    w.Unlock()                }                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")            }        }    } }

至此一个简单的Simulcast收发模型:
   SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK       |                                                    |....       |                                                    |--->downTracks[0][N].WriteRTP       |       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP                    |                                      |....                    |                                      |---->downTracks[0][N].WriteRTP                    |                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP                    |                                      |....                    |                                      |----->downTracks[1][N].WriteRTP                    |                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP                                                           |....                                                           |------>downTracks[2][N].WriteRTP                             

上面省略了SDK--->ReadStreamSRTP.buffer.Write,这个buffer和WebRTCReceiver.buffer是同一个。

订阅端SDK的切大小流操做,其实就是在0-2来回挂载downTrack而已。

4.Simulcast下行流程

读者前边的疑问,downTracks是哪里塞过来的?流程在这里:

OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack

pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver){    //Simulcast通常会触发OnTrack四次,一个audio,三个video    //因为三个video的trackId同样,共用一个WebRTCReceiver        r, pub := p.router.AddReceiver(receiver,track)        if pub {//这里video到来的第一次pub才为true      //这里把receiver发布到router里,其余peer的downtrack会挂载到receiver下            p.session.Publish(p.router, r)

这里为了方便,再贴一下整个流程的代码,比较繁琐,能够跳过。

SessionLocal.Publish

func (s *SessionLocal) Publish(router Router,r Receiver) {    for _, p := ranges.Peers() {        // Don't sub toself        if router.ID() == p.ID() || p.Subscriber() == nil{            continue        }        //表示根据r的信息建立downtrack,并增长到p.Subscriber()和r中        if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil {            Logger.Error(err, "Errorsubscribing transport to Router")            continue        }    }}

router.AddDownTracks

func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error {。。。//若是recv不为空,表示根据recv的信息建立downtrack,并增长到s和recv中    if recv != nil{        if _, err :=r.AddDownTrack(s, recv); err != nil {            return err        }        s.negotiate()        return nil    }//若是recv为空,表示遍历房间中全部的receivers,并增长到s和recv中    if len(r.receivers)> 0 {        for _, rcv := ranger.receivers {            if _, err :=r.AddDownTrack(s, rcv); err != nil {                return err            }        }        s.negotiate()    }    return nil}

router.AddDownTrack

根据recv的信息建立downtrack,并增长到sub和recv中。
func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error) {    for _, dt := rangesub.GetDownTracks(recv.StreamID()) {//避免重复添加        if dt.ID() ==recv.TrackID() {            return dt, nil        }    }     codec := recv.Codec()    if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil {        return nil,err    }    //建立downtrack,downtrack用来给客户端下发流    downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{        MimeType:     codec.MimeType,        ClockRate:    codec.ClockRate,        Channels:     codec.Channels,        SDPFmtpLine:  codec.SDPFmtpLine,        RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},    }, recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)    if err != nil{        return nil,err    }    //把downtrack增长到pc中    if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{        Direction:webrtc.RTPTransceiverDirectionSendonly,    }); err != nil {        return nil,err    }     // 设置关闭回调,关闭时pc自动删除track    downTrack.OnCloseHandler(func() {        if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed {            if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil {                if err ==webrtc.ErrConnectionClosed {                    return                }                Logger.Error(err, "Errorclosing down track")            } else {//若是删除成功,再从sub中删除,而后重协商                sub.RemoveDownTrack(recv.StreamID(), downTrack)                sub.negotiate()            }        }    })   //设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发    downTrack.OnBind(func() {        go sub.sendStreamDownTracksReports(recv.StreamID())    })   //增长downTracksub中,sub只是用来管理downtracks和生成SenderReport    sub.AddDownTrack(recv.StreamID(), downTrack)   //增长downTrackWebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP    recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)    return downTrack, nil}

5.Simulcast切换流程

第一种,自动切换。

上边的subBestQuality,会在f层receiver到来时,自动订阅f层。

第二种,手动切换。

经过信令或datachannel控制来切换。

先来说一下datachannel信令通道,在main里建立了一个内置dc,处理函数为datachannel.SubscriberAPI。

func main() {    nsfu := sfu.NewSFU(conf.Config)    dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)   dc.Use(datachannel.SubscriberAPI)    s :=server.NewWrapperedGRPCWebServer(options, nsfu)    if err := s.Serve(); err != nil{        logger.Error(err,"failed to serve")        os.Exit(1)    }    select {}}

客户端发过来的切大小流指令会进入此函数。

funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor {    return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs) {        srm := &setRemoteMedia{}        if err :=json.Unmarshal(args.Message.Data, srm); err != nil {            return        }        // Publisherchanging active layers        if srm.Layers !=nil && len(srm.Layers) > 0 {。。。//当前sdk逻辑不会进入这里        } else {      //按流ID查找downTracks            downTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)            for _, dt :=range downTracks {                switch dt.Kind() {                casewebrtc.RTPCodecTypeAudio:                    dt.Mute(!srm.Audio)//音频是否须要mute/unmute                casewebrtc.RTPCodecTypeVideo:                    switchsrm.Video {//视频是否须要切大小流/mute                    casehighValue:            //这里把d.reSync.set设置为true了,writeSimulcastRTP里会自动发PLI                        dt.Mute(false)                        dt.SwitchSpatialLayer(2, true)                    casemediumValue:                        dt.Mute(false)                        dt.SwitchSpatialLayer(1, true)                    caselowValue:                        dt.Mute(false)                        dt.SwitchSpatialLayer(0, true)                    casemutedValue:                        dt.Mute(true)                    }                    switchsrm.Framerate {//当前sdk逻辑也不会进入这里,srm.Framerate=""                    }                }             }        }        next.Process(ctx, args)    })}

DownTrack.SwitchSpatialLayer

func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error {    if d.trackType ==SimulcastDownTrack {        // Don't switchuntil previous switch is done or canceled        csl := atomic.LoadInt32(&d.currentSpatialLayer)         //若是当前运行layer不是正在切的layer,或当前layer是要切的        //换句话说,若是当前layer没切完成,或者当前layer和要切的同样,那就返回错误        if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer {            returnErrSpatialLayerBusy        }        //切换layer        if err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil {            atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)            if setAsMax {                atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)            }        }        return nil    }    returnErrSpatialNotSupported}

WebRTCReceiver.SwitchDownTrack

func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error {    if w.closed.get() {        returnerrNoReceiverFound    }    //切换就是把track放入pending    if w.available[layer].get() {        w.Lock()        w.pending[layer].set(true)        w.pendingTracks[layer] = append(w.pendingTracks[layer],track)        w.Unlock()        return nil    }    return errNoReceiverFound}

而后在writeRTP里切换:

func (w *WebRTCReceiver) writeRTP(layer int) {....    for {        pkt, err :=w.buffers[layer].ReadExtended()        if err ==io.EOF {            return        }         //若是是大小流        if w.isSimulcast {            //若是正在切换,pending[layer]get()为true            ifw.pending[layer].get() {                // 若是是关键帧,才会切换,好在前边Mute流程里发送了PLI,这里应该很快来一个关键帧                ifpkt.KeyFrame {                    w.Lock()                     //=========这里切换                    for idx, dt:= range w.pendingTracks[layer] {                    //删除原来的                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)                        //存储新的dt,之后writeRTP会写入新的dt                        w.storeDownTrack(layer, dt)                        //设置切换完成                        dt.SwitchSpatialLayerDone(int32(layer))                        //pending中此dt置空                       w.pendingTracks[layer][idx] = nil                    }                    //清空pendingTracks此layer                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]                    //标志位置为false                   w.pending[layer].set(false)                    w.Unlock()                } else {                    // 若是不是关键帧,再次发送PLI                    w.SendRTCP(pli)                }            }        }         for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){            if err = dt.WriteRTP(pkt, layer);err != nil {                if err ==io.EOF && err == io.ErrClosedPipe {                    w.Lock()                    w.deleteDownTrack(layer, dt.id)                    w.Unlock()                }                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")            }        }    }}

6. 总结

Simulcast在ion-sfu中,默认是经过datachannel来操做切换的。

首先,切换是操做pendingTracks:

SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->写入pendingTracks

而后,在WebRTCReceiver.writeRTP里进行实质切换:

WebRTCReceiver.writeRTP--->读取pendingTracks---》更换downTracks--》storeDownTrack--》OK

以后,写包就会写入新track。至此一个简单的Simulcast收发模型就建成了:

SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK       |                                              |....       |                                              |--->downTracks[0][N].WriteRTP       |       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP                    |                                    |....                    |                                    |---->downTracks[0][N].WriteRTP                    |                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP                    |                                     |....                    |                                     |----->downTracks[1][N].WriteRTP                    |                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP                                                         |....                                                         |------>downTracks[2][N].WriteRTP



 
做者简介:
王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:
本文发布于知乎,已得到做者受权转载。



讲师招募

LiveVideoStackCon 2022 音视频技术大会 上海站,正在面向社会公开招募讲师,不管你所处的公司大小,title高低,老鸟仍是菜鸟,只要你的内容对技术人有帮助,其余都是次要的。欢迎经过 speaker@livevideostack.com 提交我的资料及议题描述,咱们将会在24小时内给予反馈。

喜欢咱们的内容就点个“在看”吧!

本文分享自微信公众号 - LiveVideoStack(livevideostack)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。