![]() System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /usr/local/lib/node_modules/mediasoup/worker/src/RTC/ |
Upload File : |
#define MS_CLASS "RTC::SctpAssociation" // #define MS_LOG_DEV_LEVEL 3 #include "RTC/SctpAssociation.hpp" #include "DepUsrSCTP.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include "Channel/Notifier.hpp" #include <cstdlib> // std::malloc(), std::free() #include <cstring> // std::memset(), std::memcpy() #include <string> // Free send buffer threshold (in bytes) upon which send_cb will be executed. static uint32_t SendBufferThreshold{ 256u }; /* SCTP events to which we are subscribing. */ /* clang-format off */ uint16_t EventTypes[] = { SCTP_ADAPTATION_INDICATION, SCTP_ASSOC_CHANGE, SCTP_ASSOC_RESET_EVENT, SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT, SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT, SCTP_STREAM_CHANGE_EVENT }; /* clang-format on */ /* Static methods for usrsctp callbacks. */ inline static int onRecvSctpData( struct socket* /*sock*/, union sctp_sockstore /*addr*/, void* data, size_t len, struct sctp_rcvinfo rcv, int flags, void* ulpInfo) { auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast<uintptr_t>(ulpInfo)); if (!sctpAssociation) { MS_WARN_TAG(sctp, "no SctpAssociation found"); std::free(data); return 0; } if (flags & MSG_NOTIFICATION) { sctpAssociation->OnUsrSctpReceiveSctpNotification( static_cast<union sctp_notification*>(data), len); } else { uint16_t streamId = rcv.rcv_sid; uint32_t ppid = ntohl(rcv.rcv_ppid); uint16_t ssn = rcv.rcv_ssn; MS_DEBUG_TAG( sctp, "data chunk received [length:%zu, streamId:%" PRIu16 ", SSN:%" PRIu16 ", TSN:%" PRIu32 ", PPID:%" PRIu32 ", context:%" PRIu32 ", flags:%d]", len, rcv.rcv_sid, rcv.rcv_ssn, rcv.rcv_tsn, ntohl(rcv.rcv_ppid), rcv.rcv_context, flags); sctpAssociation->OnUsrSctpReceiveSctpData( streamId, ssn, ppid, flags, static_cast<uint8_t*>(data), len); } std::free(data); return 1; } inline static int onSendSctpData(struct socket* /*sock*/, uint32_t freeBuffer, void* ulpInfo) { auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast<uintptr_t>(ulpInfo)); if (!sctpAssociation) { MS_WARN_TAG(sctp, "no SctpAssociation found"); return 0; } sctpAssociation->OnUsrSctpSentData(freeBuffer); return 1; } namespace RTC { /* Static. */ static constexpr size_t SctpMtu{ 1200 }; static constexpr uint16_t MaxSctpStreams{ 65535 }; /* Instance methods. */ SctpAssociation::SctpAssociation( Listener* listener, uint16_t os, uint16_t mis, size_t maxSctpMessageSize, size_t sctpSendBufferSize, bool isDataChannel) : listener(listener), os(os), mis(mis), maxSctpMessageSize(maxSctpMessageSize), sctpSendBufferSize(sctpSendBufferSize), isDataChannel(isDataChannel) { MS_TRACE(); // Get a id for this SctpAssociation. this->id = DepUsrSCTP::GetNextSctpAssociationId(); // Register ourselves in usrsctp. // NOTE: This must be done before calling usrsctp_bind(). usrsctp_register_address(reinterpret_cast<void*>(this->id)); int ret; this->socket = usrsctp_socket( AF_CONN, SOCK_STREAM, IPPROTO_SCTP, onRecvSctpData, onSendSctpData, SendBufferThreshold, reinterpret_cast<void*>(this->id)); if (!this->socket) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_socket() failed: %s", std::strerror(errno)); } usrsctp_set_ulpinfo(this->socket, reinterpret_cast<void*>(this->id)); // Make the socket non-blocking. ret = usrsctp_set_non_blocking(this->socket, 1); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_set_non_blocking() failed: %s", std::strerror(errno)); } // Set SO_LINGER. // This ensures that the usrsctp close call deletes the association. This // prevents usrsctp from calling the global send callback with references to // this class as the address. struct linger lingerOpt; // NOLINT(cppcoreguidelines-pro-type-member-init) lingerOpt.l_onoff = 1; lingerOpt.l_linger = 0; ret = usrsctp_setsockopt(this->socket, SOL_SOCKET, SO_LINGER, &lingerOpt, sizeof(lingerOpt)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SO_LINGER) failed: %s", std::strerror(errno)); } // Set SCTP_ENABLE_STREAM_RESET. struct sctp_assoc_value av; // NOLINT(cppcoreguidelines-pro-type-member-init) av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_RESET_ASSOC_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; ret = usrsctp_setsockopt(this->socket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(av)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SCTP_ENABLE_STREAM_RESET) failed: %s", std::strerror(errno)); } // Set SCTP_NODELAY. uint32_t noDelay = 1; ret = usrsctp_setsockopt(this->socket, IPPROTO_SCTP, SCTP_NODELAY, &noDelay, sizeof(noDelay)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SCTP_NODELAY) failed: %s", std::strerror(errno)); } // Enable events. struct sctp_event event; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&event, 0, sizeof(event)); event.se_on = 1; for (size_t i{ 0 }; i < sizeof(EventTypes) / sizeof(uint16_t); ++i) { event.se_type = EventTypes[i]; ret = usrsctp_setsockopt(this->socket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SCTP_EVENT) failed: %s", std::strerror(errno)); } } // Init message. struct sctp_initmsg initmsg; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&initmsg, 0, sizeof(initmsg)); initmsg.sinit_num_ostreams = this->os; initmsg.sinit_max_instreams = this->mis; ret = usrsctp_setsockopt(this->socket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, sizeof(initmsg)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SCTP_INITMSG) failed: %s", std::strerror(errno)); } // Server side. struct sockaddr_conn sconn; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&sconn, 0, sizeof(sconn)); sconn.sconn_family = AF_CONN; sconn.sconn_port = htons(5000); sconn.sconn_addr = reinterpret_cast<void*>(this->id); #ifdef HAVE_SCONN_LEN sconn.sconn_len = sizeof(sconn); #endif ret = usrsctp_bind(this->socket, reinterpret_cast<struct sockaddr*>(&sconn), sizeof(sconn)); if (ret < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_bind() failed: %s", std::strerror(errno)); } auto bufferSize = static_cast<int>(sctpSendBufferSize); if (usrsctp_setsockopt(this->socket, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(int)) < 0) { usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); MS_THROW_ERROR("usrsctp_setsockopt(SO_SNDBUF) failed: %s", std::strerror(errno)); } // Register the SctpAssociation into the global map. DepUsrSCTP::RegisterSctpAssociation(this); } SctpAssociation::~SctpAssociation() { MS_TRACE(); usrsctp_set_ulpinfo(this->socket, nullptr); usrsctp_close(this->socket); // Deregister ourselves from usrsctp. usrsctp_deregister_address(reinterpret_cast<void*>(this->id)); // Register the SctpAssociation from the global map. DepUsrSCTP::DeregisterSctpAssociation(this); delete[] this->messageBuffer; } void SctpAssociation::TransportConnected() { MS_TRACE(); // Just run the SCTP stack if our state is 'new'. if (this->state != SctpState::NEW) return; try { int ret; struct sockaddr_conn rconn; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&rconn, 0, sizeof(rconn)); rconn.sconn_family = AF_CONN; rconn.sconn_port = htons(5000); rconn.sconn_addr = reinterpret_cast<void*>(this->id); #ifdef HAVE_SCONN_LEN rconn.sconn_len = sizeof(rconn); #endif ret = usrsctp_connect(this->socket, reinterpret_cast<struct sockaddr*>(&rconn), sizeof(rconn)); if (ret < 0 && errno != EINPROGRESS) MS_THROW_ERROR("usrsctp_connect() failed: %s", std::strerror(errno)); // Disable MTU discovery. sctp_paddrparams peerAddrParams; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&peerAddrParams, 0, sizeof(peerAddrParams)); std::memcpy(&peerAddrParams.spp_address, &rconn, sizeof(rconn)); peerAddrParams.spp_flags = SPP_PMTUD_DISABLE; // The MTU value provided specifies the space available for chunks in the // packet, so let's subtract the SCTP header size. peerAddrParams.spp_pathmtu = SctpMtu - sizeof(struct sctp_common_header); ret = usrsctp_setsockopt( this->socket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &peerAddrParams, sizeof(peerAddrParams)); if (ret < 0) MS_THROW_ERROR("usrsctp_setsockopt(SCTP_PEER_ADDR_PARAMS) failed: %s", std::strerror(errno)); // Announce connecting state. this->state = SctpState::CONNECTING; this->listener->OnSctpAssociationConnecting(this); } catch (const MediaSoupError& /*error*/) { this->state = SctpState::FAILED; this->listener->OnSctpAssociationFailed(this); } } void SctpAssociation::FillJson(json& jsonObject) const { MS_TRACE(); // Add port (always 5000). jsonObject["port"] = 5000; // Add OS. jsonObject["OS"] = this->os; // Add MIS. jsonObject["MIS"] = this->mis; // Add maxMessageSize. jsonObject["maxMessageSize"] = this->maxSctpMessageSize; // Add sendBufferSize. jsonObject["sendBufferSize"] = this->sctpSendBufferSize; // Add sctpBufferedAmountLowThreshold. jsonObject["sctpBufferedAmount"] = this->sctpBufferedAmount; // Add isDataChannel. jsonObject["isDataChannel"] = this->isDataChannel; } void SctpAssociation::ProcessSctpData(const uint8_t* data, size_t len) { MS_TRACE(); #if MS_LOG_DEV_LEVEL == 3 MS_DUMP_DATA(data, len); #endif usrsctp_conninput(reinterpret_cast<void*>(this->id), data, len, 0); } void SctpAssociation::SendSctpMessage( RTC::DataConsumer* dataConsumer, uint32_t ppid, const uint8_t* msg, size_t len, onQueuedCallback* cb) { MS_TRACE(); // This must be controlled by the DataConsumer. MS_ASSERT( len <= this->maxSctpMessageSize, "given message exceeds max allowed message size [message size:%zu, max message size:%zu]", len, this->maxSctpMessageSize); const auto& parameters = dataConsumer->GetSctpStreamParameters(); // Fill stcp_sendv_spa. struct sctp_sendv_spa spa; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&spa, 0, sizeof(spa)); spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; spa.sendv_sndinfo.snd_sid = parameters.streamId; spa.sendv_sndinfo.snd_ppid = htonl(ppid); spa.sendv_sndinfo.snd_flags = SCTP_EOR; // If ordered it must be reliable. if (parameters.ordered) { spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_NONE; spa.sendv_prinfo.pr_value = 0; } // Configure reliability: https://tools.ietf.org/html/rfc3758 else { spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; spa.sendv_sndinfo.snd_flags |= SCTP_UNORDERED; if (parameters.maxPacketLifeTime != 0) { spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; spa.sendv_prinfo.pr_value = parameters.maxPacketLifeTime; } else if (parameters.maxRetransmits != 0) { spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; spa.sendv_prinfo.pr_value = parameters.maxRetransmits; } } this->sctpBufferedAmount += len; // Notify the listener about the buffered amount increase regardless // usrsctp_sendv result. // In case of failure the correct value will be later provided by usrsctp // via onSendSctpData. this->listener->OnSctpAssociationBufferedAmount(this, this->sctpBufferedAmount); int ret = usrsctp_sendv( this->socket, msg, len, nullptr, 0, &spa, static_cast<socklen_t>(sizeof(spa)), SCTP_SENDV_SPA, 0); if (ret < 0) { MS_WARN_TAG( sctp, "error sending SCTP message [sid:%" PRIu16 ", ppid:%" PRIu32 ", message size:%zu]: %s", parameters.streamId, ppid, len, std::strerror(errno)); if (cb) { (*cb)(false); delete cb; } } else if (cb) { (*cb)(true); delete cb; } if (errno == EWOULDBLOCK || errno == EAGAIN) { Channel::Notifier::Emit(dataConsumer->id, "sctpsendbufferfull"); } } void SctpAssociation::HandleDataConsumer(RTC::DataConsumer* dataConsumer) { MS_TRACE(); auto streamId = dataConsumer->GetSctpStreamParameters().streamId; // We need more OS. if (streamId > this->os - 1) AddOutgoingStreams(/*force*/ false); } void SctpAssociation::DataProducerClosed(RTC::DataProducer* dataProducer) { MS_TRACE(); auto streamId = dataProducer->GetSctpStreamParameters().streamId; // Send SCTP_RESET_STREAMS to the remote. // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7 if (this->isDataChannel) ResetSctpStream(streamId, StreamDirection::OUTGOING); else ResetSctpStream(streamId, StreamDirection::INCOMING); } void SctpAssociation::DataConsumerClosed(RTC::DataConsumer* dataConsumer) { MS_TRACE(); auto streamId = dataConsumer->GetSctpStreamParameters().streamId; // Send SCTP_RESET_STREAMS to the remote. ResetSctpStream(streamId, StreamDirection::OUTGOING); } void SctpAssociation::ResetSctpStream(uint16_t streamId, StreamDirection direction) { MS_TRACE(); // Do nothing if an outgoing stream that could not be allocated by us. if (direction == StreamDirection::OUTGOING && streamId > this->os - 1) return; int ret; struct sctp_assoc_value av; // NOLINT(cppcoreguidelines-pro-type-member-init) socklen_t len = sizeof(av); ret = usrsctp_getsockopt(this->socket, IPPROTO_SCTP, SCTP_RECONFIG_SUPPORTED, &av, &len); if (ret == 0) { if (av.assoc_value != 1) { MS_DEBUG_TAG(sctp, "stream reconfiguration not negotiated"); return; } } else { MS_WARN_TAG( sctp, "could not retrieve whether stream reconfiguration has been negotiated: %s\n", std::strerror(errno)); return; } // As per spec: https://tools.ietf.org/html/rfc6525#section-4.1 len = sizeof(sctp_assoc_t) + (2 + 1) * sizeof(uint16_t); auto* srs = static_cast<struct sctp_reset_streams*>(std::malloc(len)); switch (direction) { case StreamDirection::INCOMING: srs->srs_flags = SCTP_STREAM_RESET_INCOMING; break; case StreamDirection::OUTGOING: srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; break; } srs->srs_number_streams = 1; srs->srs_stream_list[0] = streamId; // No need for htonl(). ret = usrsctp_setsockopt(this->socket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, len); if (ret == 0) { MS_DEBUG_TAG(sctp, "SCTP_RESET_STREAMS sent [streamId:%" PRIu16 "]", streamId); } else { MS_WARN_TAG(sctp, "usrsctp_setsockopt(SCTP_RESET_STREAMS) failed: %s", std::strerror(errno)); } std::free(srs); } void SctpAssociation::AddOutgoingStreams(bool force) { MS_TRACE(); uint16_t additionalOs{ 0 }; if (MaxSctpStreams - this->os >= 32) additionalOs = 32; else additionalOs = MaxSctpStreams - this->os; if (additionalOs == 0) { MS_WARN_TAG(sctp, "cannot add more outgoing streams [OS:%" PRIu16 "]", this->os); return; } auto nextDesiredOs = this->os + additionalOs; // Already in progress, ignore (unless forced). if (!force && nextDesiredOs == this->desiredOs) return; // Update desired value. this->desiredOs = nextDesiredOs; // If not connected, defer it. if (this->state != SctpState::CONNECTED) { MS_DEBUG_TAG(sctp, "SCTP not connected, deferring OS increase"); return; } struct sctp_add_streams sas; // NOLINT(cppcoreguidelines-pro-type-member-init) std::memset(&sas, 0, sizeof(sas)); sas.sas_instrms = 0; sas.sas_outstrms = additionalOs; MS_DEBUG_TAG(sctp, "adding %" PRIu16 " outgoing streams", additionalOs); int ret = usrsctp_setsockopt( this->socket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, static_cast<socklen_t>(sizeof(sas))); if (ret < 0) MS_WARN_TAG(sctp, "usrsctp_setsockopt(SCTP_ADD_STREAMS) failed: %s", std::strerror(errno)); } void SctpAssociation::OnUsrSctpSendSctpData(void* buffer, size_t len) { MS_TRACE(); const uint8_t* data = static_cast<uint8_t*>(buffer); #if MS_LOG_DEV_LEVEL == 3 MS_DUMP_DATA(data, len); #endif this->listener->OnSctpAssociationSendData(this, data, len); } void SctpAssociation::OnUsrSctpReceiveSctpData( uint16_t streamId, uint16_t ssn, uint32_t ppid, int flags, const uint8_t* data, size_t len) { // Ignore WebRTC DataChannel Control DATA chunks. if (ppid == 50) { MS_WARN_TAG(sctp, "ignoring SCTP data with ppid:50 (WebRTC DataChannel Control)"); return; } if (this->messageBufferLen != 0 && ssn != this->lastSsnReceived) { MS_WARN_TAG( sctp, "message chunk received with different SSN while buffer not empty, buffer discarded [ssn:%" PRIu16 ", last ssn received:%" PRIu16 "]", ssn, this->lastSsnReceived); this->messageBufferLen = 0; } // Update last SSN received. this->lastSsnReceived = ssn; auto eor = static_cast<bool>(flags & MSG_EOR); if (this->messageBufferLen + len > this->maxSctpMessageSize) { MS_WARN_TAG( sctp, "ongoing received message exceeds max allowed message size [message size:%zu, max message size:%zu, eor:%u]", this->messageBufferLen + len, this->maxSctpMessageSize, eor ? 1 : 0); this->lastSsnReceived = 0; return; } // If end of message and there is no buffered data, notify it directly. if (eor && this->messageBufferLen == 0) { MS_DEBUG_DEV("directly notifying listener [eor:1, buffer len:0]"); this->listener->OnSctpAssociationMessageReceived(this, streamId, ppid, data, len); } // If end of message and there is buffered data, append data and notify buffer. else if (eor && this->messageBufferLen != 0) { std::memcpy(this->messageBuffer + this->messageBufferLen, data, len); this->messageBufferLen += len; MS_DEBUG_DEV("notifying listener [eor:1, buffer len:%zu]", this->messageBufferLen); this->listener->OnSctpAssociationMessageReceived( this, streamId, ppid, this->messageBuffer, this->messageBufferLen); this->messageBufferLen = 0; } // If non end of message, append data to the buffer. else if (!eor) { // Allocate the buffer if not already done. if (!this->messageBuffer) this->messageBuffer = new uint8_t[this->maxSctpMessageSize]; std::memcpy(this->messageBuffer + this->messageBufferLen, data, len); this->messageBufferLen += len; MS_DEBUG_DEV("data buffered [eor:0, buffer len:%zu]", this->messageBufferLen); } } void SctpAssociation::OnUsrSctpReceiveSctpNotification(union sctp_notification* notification, size_t len) { if (notification->sn_header.sn_length != (uint32_t)len) return; switch (notification->sn_header.sn_type) { case SCTP_ADAPTATION_INDICATION: { MS_DEBUG_TAG( sctp, "SCTP adaptation indication [%x]", notification->sn_adaptation_event.sai_adaptation_ind); break; } case SCTP_ASSOC_CHANGE: { switch (notification->sn_assoc_change.sac_state) { case SCTP_COMM_UP: { MS_DEBUG_TAG( sctp, "SCTP association connected, streams [out:%" PRIu16 ", in:%" PRIu16 "]", notification->sn_assoc_change.sac_outbound_streams, notification->sn_assoc_change.sac_inbound_streams); // Update our OS. this->os = notification->sn_assoc_change.sac_outbound_streams; // Increase if requested before connected. if (this->desiredOs > this->os) AddOutgoingStreams(/*force*/ true); if (this->state != SctpState::CONNECTED) { this->state = SctpState::CONNECTED; this->listener->OnSctpAssociationConnected(this); } break; } case SCTP_COMM_LOST: { if (notification->sn_header.sn_length > 0) { static const size_t BufferSize{ 1024 }; static char buffer[BufferSize]; uint32_t len = notification->sn_header.sn_length; for (uint32_t i{ 0 }; i < len; ++i) { std::snprintf( buffer, BufferSize, " 0x%02x", notification->sn_assoc_change.sac_info[i]); } MS_DEBUG_TAG(sctp, "SCTP communication lost [info:%s]", buffer); } else { MS_DEBUG_TAG(sctp, "SCTP communication lost"); } if (this->state != SctpState::CLOSED) { this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } break; } case SCTP_RESTART: { MS_DEBUG_TAG( sctp, "SCTP remote association restarted, streams [out:%" PRIu16 ", int:%" PRIu16 "]", notification->sn_assoc_change.sac_outbound_streams, notification->sn_assoc_change.sac_inbound_streams); // Update our OS. this->os = notification->sn_assoc_change.sac_outbound_streams; // Increase if requested before connected. if (this->desiredOs > this->os) AddOutgoingStreams(/*force*/ true); if (this->state != SctpState::CONNECTED) { this->state = SctpState::CONNECTED; this->listener->OnSctpAssociationConnected(this); } break; } case SCTP_SHUTDOWN_COMP: { MS_DEBUG_TAG(sctp, "SCTP association gracefully closed"); if (this->state != SctpState::CLOSED) { this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } break; } case SCTP_CANT_STR_ASSOC: { if (notification->sn_header.sn_length > 0) { static const size_t BufferSize{ 1024 }; static char buffer[BufferSize]; uint32_t len = notification->sn_header.sn_length; for (uint32_t i{ 0 }; i < len; ++i) { std::snprintf( buffer, BufferSize, " 0x%02x", notification->sn_assoc_change.sac_info[i]); } MS_WARN_TAG(sctp, "SCTP setup failed: %s", buffer); } if (this->state != SctpState::FAILED) { this->state = SctpState::FAILED; this->listener->OnSctpAssociationFailed(this); } break; } default:; } break; } // https://tools.ietf.org/html/rfc6525#section-6.1.2. case SCTP_ASSOC_RESET_EVENT: { MS_DEBUG_TAG(sctp, "SCTP association reset event received"); break; } // An Operation Error is not considered fatal in and of itself, but may be // used with an ABORT chunk to report a fatal condition. case SCTP_REMOTE_ERROR: { static const size_t BufferSize{ 1024 }; static char buffer[BufferSize]; uint32_t len = notification->sn_remote_error.sre_length - sizeof(struct sctp_remote_error); for (uint32_t i{ 0 }; i < len; i++) { std::snprintf(buffer, BufferSize, "0x%02x", notification->sn_remote_error.sre_data[i]); } MS_WARN_TAG( sctp, "remote SCTP association error [type:0x%04x, data:%s]", notification->sn_remote_error.sre_error, buffer); break; } // When a peer sends a SHUTDOWN, SCTP delivers this notification to // inform the application that it should cease sending data. case SCTP_SHUTDOWN_EVENT: { MS_DEBUG_TAG(sctp, "remote SCTP association shutdown"); if (this->state != SctpState::CLOSED) { this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } break; } case SCTP_SEND_FAILED_EVENT: { static const size_t BufferSize{ 1024 }; static char buffer[BufferSize]; uint32_t len = notification->sn_send_failed_event.ssfe_length - sizeof(struct sctp_send_failed_event); for (uint32_t i{ 0 }; i < len; ++i) { std::snprintf(buffer, BufferSize, "0x%02x", notification->sn_send_failed_event.ssfe_data[i]); } MS_WARN_TAG( sctp, "SCTP message sent failure [streamId:%" PRIu16 ", ppid:%" PRIu32 ", sent:%s, error:0x%08x, info:%s]", notification->sn_send_failed_event.ssfe_info.snd_sid, ntohl(notification->sn_send_failed_event.ssfe_info.snd_ppid), (notification->sn_send_failed_event.ssfe_flags & SCTP_DATA_SENT) ? "yes" : "no", notification->sn_send_failed_event.ssfe_error, buffer); break; } case SCTP_STREAM_RESET_EVENT: { bool incoming{ false }; bool outgoing{ false }; uint16_t numStreams = (notification->sn_strreset_event.strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); if (notification->sn_strreset_event.strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) incoming = true; if (notification->sn_strreset_event.strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) outgoing = true; if (MS_HAS_DEBUG_TAG(sctp)) { std::string streamIds; for (uint16_t i{ 0 }; i < numStreams; ++i) { auto streamId = notification->sn_strreset_event.strreset_stream_list[i]; // Don't log more than 5 stream ids. if (i > 4) { streamIds.append("..."); break; } if (i > 0) streamIds.append(","); streamIds.append(std::to_string(streamId)); } MS_DEBUG_TAG( sctp, "SCTP stream reset event [flags:%x, i|o:%s|%s, num streams:%" PRIu16 ", stream ids:%s]", notification->sn_strreset_event.strreset_flags, incoming ? "true" : "false", outgoing ? "true" : "false", numStreams, streamIds.c_str()); } // Special case for WebRTC DataChannels in which we must also reset our // outgoing SCTP stream. if (incoming && !outgoing && this->isDataChannel) { for (uint16_t i{ 0 }; i < numStreams; ++i) { auto streamId = notification->sn_strreset_event.strreset_stream_list[i]; ResetSctpStream(streamId, StreamDirection::OUTGOING); } } break; } case SCTP_STREAM_CHANGE_EVENT: { if (notification->sn_strchange_event.strchange_flags == 0) { MS_DEBUG_TAG( sctp, "SCTP stream changed, streams [out:%" PRIu16 ", in:%" PRIu16 ", flags:%x]", notification->sn_strchange_event.strchange_outstrms, notification->sn_strchange_event.strchange_instrms, notification->sn_strchange_event.strchange_flags); } else if (notification->sn_strchange_event.strchange_flags & SCTP_STREAM_RESET_DENIED) { MS_WARN_TAG( sctp, "SCTP stream change denied, streams [out:%" PRIu16 ", in:%" PRIu16 ", flags:%x]", notification->sn_strchange_event.strchange_outstrms, notification->sn_strchange_event.strchange_instrms, notification->sn_strchange_event.strchange_flags); break; } else if (notification->sn_strchange_event.strchange_flags & SCTP_STREAM_RESET_FAILED) { MS_WARN_TAG( sctp, "SCTP stream change failed, streams [out:%" PRIu16 ", in:%" PRIu16 ", flags:%x]", notification->sn_strchange_event.strchange_outstrms, notification->sn_strchange_event.strchange_instrms, notification->sn_strchange_event.strchange_flags); break; } // Update OS. this->os = notification->sn_strchange_event.strchange_outstrms; break; } default: { MS_WARN_TAG( sctp, "unhandled SCTP event received [type:%" PRIu16 "]", notification->sn_header.sn_type); } } } void SctpAssociation::OnUsrSctpSentData(uint32_t freeBuffer) { auto previousSctpBufferedAmount = this->sctpBufferedAmount; this->sctpBufferedAmount = this->sctpSendBufferSize - freeBuffer; if (this->sctpBufferedAmount != previousSctpBufferedAmount) { this->listener->OnSctpAssociationBufferedAmount(this, this->sctpBufferedAmount); } } } // namespace RTC