![]() System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /usr/local/lib/node_modules/mediasoup/worker/src/RTC/ |
Upload File : |
#define MS_CLASS "RTC::SimpleConsumer" // #define MS_LOG_DEV_LEVEL 3 #include "RTC/SimpleConsumer.hpp" #include "DepLibUV.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include "Channel/Notifier.hpp" #include "RTC/Codecs/Tools.hpp" namespace RTC { /* Instance methods. */ SimpleConsumer::SimpleConsumer( const std::string& id, const std::string& producerId, RTC::Consumer::Listener* listener, json& data) : RTC::Consumer::Consumer(id, producerId, listener, data, RTC::RtpParameters::Type::SIMPLE) { MS_TRACE(); // Ensure there is a single encoding. if (this->consumableRtpEncodings.size() != 1u) MS_THROW_TYPE_ERROR("invalid consumableRtpEncodings with size != 1"); auto& encoding = this->rtpParameters.encodings[0]; const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); this->keyFrameSupported = RTC::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType); // Create RtpStreamSend instance for sending a single stream to the remote. CreateRtpStream(); } SimpleConsumer::~SimpleConsumer() { MS_TRACE(); delete this->rtpStream; } void SimpleConsumer::FillJson(json& jsonObject) const { MS_TRACE(); // Call the parent method. RTC::Consumer::FillJson(jsonObject); // Add rtpStream. this->rtpStream->FillJson(jsonObject["rtpStream"]); } void SimpleConsumer::FillJsonStats(json& jsonArray) const { MS_TRACE(); // Add stats of our send stream. jsonArray.emplace_back(json::value_t::object); this->rtpStream->FillJsonStats(jsonArray[0]); // Add stats of our recv stream. if (this->producerRtpStream) { jsonArray.emplace_back(json::value_t::object); this->producerRtpStream->FillJsonStats(jsonArray[1]); } } void SimpleConsumer::FillJsonScore(json& jsonObject) const { MS_TRACE(); MS_ASSERT(this->producerRtpStreamScores, "producerRtpStreamScores not set"); jsonObject["score"] = this->rtpStream->GetScore(); if (this->producerRtpStream) jsonObject["producerScore"] = this->producerRtpStream->GetScore(); else jsonObject["producerScore"] = 0; jsonObject["producerScores"] = *this->producerRtpStreamScores; } void SimpleConsumer::HandleRequest(Channel::Request* request) { MS_TRACE(); switch (request->methodId) { case Channel::Request::MethodId::CONSUMER_REQUEST_KEY_FRAME: { if (IsActive()) RequestKeyFrame(); request->Accept(); break; } case Channel::Request::MethodId::CONSUMER_SET_PREFERRED_LAYERS: { // Do nothing. request->Accept(); break; } default: { // Pass it to the parent class. RTC::Consumer::HandleRequest(request); } } } void SimpleConsumer::ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t /*mappedSsrc*/) { MS_TRACE(); this->producerRtpStream = rtpStream; } void SimpleConsumer::ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t /*mappedSsrc*/) { MS_TRACE(); this->producerRtpStream = rtpStream; // Emit the score event. EmitScore(); } void SimpleConsumer::ProducerRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { MS_TRACE(); // Emit the score event. EmitScore(); } void SimpleConsumer::ProducerRtcpSenderReport(RTC::RtpStream* /*rtpStream*/, bool /*first*/) { MS_TRACE(); // Do nothing. } uint8_t SimpleConsumer::GetBitratePriority() const { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); // Audio SimpleConsumer does not play the BWE game. if (this->kind != RTC::Media::Kind::VIDEO) return 0u; if (!IsActive()) return 0u; return this->priority; } uint32_t SimpleConsumer::IncreaseLayer(uint32_t bitrate, bool /*considerLoss*/) { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); MS_ASSERT(IsActive(), "should be active"); // If this is not the first time this method is called within the same iteration, // return 0 since a video SimpleConsumer does not keep state about this. if (this->managingBitrate) return 0u; this->managingBitrate = true; // Video SimpleConsumer does not really play the BWE game when. However, let's // be honest and try to be nice. auto nowMs = DepLibUV::GetTimeMs(); auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs); if (desiredBitrate < bitrate) return desiredBitrate; else return bitrate; } void SimpleConsumer::ApplyLayers() { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video"); MS_ASSERT(IsActive(), "should be active"); this->managingBitrate = false; // SimpleConsumer does not play the BWE game (even if video kind). } uint32_t SimpleConsumer::GetDesiredBitrate() const { MS_TRACE(); MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed"); // Audio SimpleConsumer does not play the BWE game. if (this->kind != RTC::Media::Kind::VIDEO) return 0u; if (!IsActive()) return 0u; auto nowMs = DepLibUV::GetTimeMs(); auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs); // If consumer.rtpParameters.encodings[0].maxBitrate was given and it's // greater than computed one, then use it. auto maxBitrate = this->rtpParameters.encodings[0].maxBitrate; if (maxBitrate > desiredBitrate) desiredBitrate = maxBitrate; return desiredBitrate; } void SimpleConsumer::SendRtpPacket(RTC::RtpPacket* packet) { MS_TRACE(); if (!IsActive()) return; auto payloadType = packet->GetPayloadType(); // NOTE: This may happen if this Consumer supports just some codecs of those // in the corresponding Producer. if (this->supportedCodecPayloadTypes.find(payloadType) == this->supportedCodecPayloadTypes.end()) { MS_DEBUG_DEV("payload type not supported [payloadType:%" PRIu8 "]", payloadType); return; } // If we need to sync, support key frames and this is not a key frame, ignore // the packet. if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame()) return; // Whether this is the first packet after re-sync. bool isSyncPacket = this->syncRequired; // Sync sequence number and timestamp if required. if (isSyncPacket) { if (packet->IsKeyFrame()) MS_DEBUG_TAG(rtp, "sync key frame received"); this->rtpSeqManager.Sync(packet->GetSequenceNumber() - 1); this->syncRequired = false; } // Update RTP seq number and timestamp. uint16_t seq; this->rtpSeqManager.Input(packet->GetSequenceNumber(), seq); // Save original packet fields. auto origSsrc = packet->GetSsrc(); auto origSeq = packet->GetSequenceNumber(); // Rewrite packet. packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); packet->SetSequenceNumber(seq); if (isSyncPacket) { MS_DEBUG_TAG( rtp, "sending sync packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "] from original [seq:%" PRIu16 "]", packet->GetSsrc(), packet->GetSequenceNumber(), packet->GetTimestamp(), origSeq); } // Process the packet. if (this->rtpStream->ReceivePacket(packet)) { // Send the packet. this->listener->OnConsumerSendRtpPacket(this, packet); // May emit 'trace' event. EmitTraceEventRtpAndKeyFrameTypes(packet); } else { MS_WARN_TAG( rtp, "failed to send packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "] from original [seq:%" PRIu16 "]", packet->GetSsrc(), packet->GetSequenceNumber(), packet->GetTimestamp(), origSeq); } // Restore packet fields. packet->SetSsrc(origSsrc); packet->SetSequenceNumber(origSeq); } void SimpleConsumer::GetRtcp( RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) { MS_TRACE(); MS_ASSERT(rtpStream == this->rtpStream, "RTP stream does not match"); if (static_cast<float>((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) return; auto* report = this->rtpStream->GetRtcpSenderReport(nowMs); if (!report) return; packet->AddSenderReport(report); // Build SDES chunk for this sender. auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk(); packet->AddSdesChunk(sdesChunk); this->lastRtcpSentTime = nowMs; } void SimpleConsumer::NeedWorstRemoteFractionLost( uint32_t /*mappedSsrc*/, uint8_t& worstRemoteFractionLost) { MS_TRACE(); if (!IsActive()) return; auto fractionLost = this->rtpStream->GetFractionLost(); // If our fraction lost is worse than the given one, update it. if (fractionLost > worstRemoteFractionLost) worstRemoteFractionLost = fractionLost; } void SimpleConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) { MS_TRACE(); if (!IsActive()) return; // May emit 'trace' event. EmitTraceEventNackType(); this->rtpStream->ReceiveNack(nackPacket); } void SimpleConsumer::ReceiveKeyFrameRequest( RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) { MS_TRACE(); switch (messageType) { case RTC::RTCP::FeedbackPs::MessageType::PLI: { EmitTraceEventPliType(ssrc); break; } case RTC::RTCP::FeedbackPs::MessageType::FIR: { EmitTraceEventFirType(ssrc); break; } default:; } this->rtpStream->ReceiveKeyFrameRequest(messageType); if (IsActive()) RequestKeyFrame(); } void SimpleConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) { MS_TRACE(); this->rtpStream->ReceiveRtcpReceiverReport(report); } uint32_t SimpleConsumer::GetTransmissionRate(uint64_t nowMs) { MS_TRACE(); if (!IsActive()) return 0u; return this->rtpStream->GetBitrate(nowMs); } float SimpleConsumer::GetRtt() const { MS_TRACE(); return this->rtpStream->GetRtt(); } void SimpleConsumer::UserOnTransportConnected() { MS_TRACE(); this->syncRequired = true; if (IsActive()) RequestKeyFrame(); } void SimpleConsumer::UserOnTransportDisconnected() { MS_TRACE(); this->rtpStream->Pause(); } void SimpleConsumer::UserOnPaused() { MS_TRACE(); this->rtpStream->Pause(); if (this->externallyManagedBitrate && this->kind == RTC::Media::Kind::VIDEO) this->listener->OnConsumerNeedZeroBitrate(this); } void SimpleConsumer::UserOnResumed() { MS_TRACE(); this->syncRequired = true; if (IsActive()) RequestKeyFrame(); } void SimpleConsumer::CreateRtpStream() { MS_TRACE(); auto& encoding = this->rtpParameters.encodings[0]; const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); MS_DEBUG_TAG( rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType); // Set stream params. RTC::RtpStream::Params params; params.ssrc = encoding.ssrc; params.payloadType = mediaCodec->payloadType; params.mimeType = mediaCodec->mimeType; params.clockRate = mediaCodec->clockRate; params.cname = this->rtpParameters.rtcp.cname; // Check in band FEC in codec parameters. if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1) { MS_DEBUG_TAG(rtp, "in band FEC enabled"); params.useInBandFec = true; } // Check DTX in codec parameters. if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1) { MS_DEBUG_TAG(rtp, "DTX enabled"); params.useDtx = true; } // Check DTX in the encoding. if (encoding.dtx) { MS_DEBUG_TAG(rtp, "DTX enabled"); params.useDtx = true; } for (const auto& fb : mediaCodec->rtcpFeedback) { if (!params.useNack && fb.type == "nack" && fb.parameter.empty()) { MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported"); params.useNack = true; } else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli") { MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported"); params.usePli = true; } else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir") { MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported"); params.useFir = true; } } // Create a RtpStreamSend for sending a single media stream. size_t bufferSize = params.useNack ? 600u : 0u; this->rtpStream = new RTC::RtpStreamSend(this, params, bufferSize); this->rtpStreams.push_back(this->rtpStream); // If the Consumer is paused, tell the RtpStreamSend. if (IsPaused() || IsProducerPaused()) this->rtpStream->Pause(); const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding); if (rtxCodec && encoding.hasRtx) this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc); } void SimpleConsumer::RequestKeyFrame() { MS_TRACE(); if (this->kind != RTC::Media::Kind::VIDEO) return; auto mappedSsrc = this->consumableRtpEncodings[0].ssrc; this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc); } inline void SimpleConsumer::EmitScore() const { MS_TRACE(); json data = json::object(); FillJsonScore(data); Channel::Notifier::Emit(this->id, "score", data); } inline void SimpleConsumer::OnRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { MS_TRACE(); // Emit the score event. EmitScore(); } inline void SimpleConsumer::OnRtpStreamRetransmitRtpPacket( RTC::RtpStreamSend* /*rtpStream*/, RTC::RtpPacket* packet) { MS_TRACE(); this->listener->OnConsumerRetransmitRtpPacket(this, packet); // May emit 'trace' event. EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx()); } } // namespace RTC