[Live-devel] [PATCH] RTP/RTCP: use the packets sent by the receiver to
Gilles Chanteperdrix
gilles.chanteperdrix at xenomai.org
Sun Dec 15 00:54:01 PST 2013
correct port numbers differences due to NAT. This implementation functions
correctly, but may not be the best integration into live architecture.
---
groupsock/NetAddress.cpp | 18 +++++
groupsock/include/NetAddress.hh | 5 +-
liveMedia/OnDemandServerMediaSubsession.cpp | 69 ++++++++++++++++++--
liveMedia/RTCP.cpp | 33 ++++++++++
liveMedia/include/OnDemandServerMediaSubsession.hh | 17 +++--
5 files changed, 133 insertions(+), 9 deletions(-)
diff --git a/groupsock/NetAddress.cpp b/groupsock/NetAddress.cpp
index ee2e605..66832dd 100644
--- a/groupsock/NetAddress.cpp
+++ b/groupsock/NetAddress.cpp
@@ -261,6 +261,11 @@ Boolean AddressPortLookupTable::Remove(netAddressBits address1,
return fTable->Remove((char*)key);
}
+Boolean AddressPortLookupTable::IsEmpty()
+{
+ return fTable->IsEmpty();
+}
+
AddressPortLookupTable::Iterator::Iterator(AddressPortLookupTable& table)
: fIter(HashTable::Iterator::create(*(table.fTable))) {
}
@@ -274,6 +279,19 @@ void* AddressPortLookupTable::Iterator::next() {
return fIter->next(key);
}
+void* AddressPortLookupTable::Iterator::next(netAddressBits &address1,
+ netAddressBits &address2,
+ portNumBits &port)
+{
+ const char *key;
+ const int *k;
+ void *v = fIter->next(key);
+ k = (const int *)key;
+ address1 = k[0];
+ address2 = k[1];
+ port = k[2];
+ return v;
+}
////////// isMulticastAddress() implementation //////////
diff --git a/groupsock/include/NetAddress.hh b/groupsock/include/NetAddress.hh
index 5b47c03..21b7588 100644
--- a/groupsock/include/NetAddress.hh
+++ b/groupsock/include/NetAddress.hh
@@ -122,6 +122,7 @@ class AddressPortLookupTable {
void* Lookup(netAddressBits address1, netAddressBits address2,
Port port);
// Returns 0 if not found
+ Boolean IsEmpty();
// Used to iterate through the entries in the table
class Iterator {
@@ -129,8 +130,10 @@ class AddressPortLookupTable {
Iterator(AddressPortLookupTable& table);
virtual ~Iterator();
- void* next(); // NULL iff none
+ void *next(); // NULL iff none
+ void *next(netAddressBits &address1, netAddressBits &address2,
+ portNumBits &port);
private:
HashTable::Iterator* fIter;
};
diff --git a/liveMedia/OnDemandServerMediaSubsession.cpp b/liveMedia/OnDemandServerMediaSubsession.cpp
index 978a965..da81a0a 100644
--- a/liveMedia/OnDemandServerMediaSubsession.cpp
+++ b/liveMedia/OnDemandServerMediaSubsession.cpp
@@ -72,6 +72,37 @@ OnDemandServerMediaSubsession::sdpLines() {
return fSDPLines;
}
+void StreamState::incomingRTP()
+{
+ struct sockaddr_in source;
+ int freei = fDestsCount;
+ unsigned i, id;
+
+ readSocket(fRTPSink->envir(), fRTPgs->socketNum(),
+ (unsigned char *)&id, sizeof(id), source);
+
+ for (i = 0; i < fDestsCount; i++) {
+ if (fDests[i].addr.sin_addr.s_addr == source.sin_addr.s_addr
+ && fDests[i].addr.sin_port == source.sin_port)
+ return;
+ if (fDests[i].addr.sin_addr.s_addr == 0)
+ freei = i;
+ }
+ i = freei;
+ if (i == fDestsCount) {
+ ++fDestsCount;
+ fDests = (struct Dests_t *)realloc(fDests, fDestsCount * sizeof(*fDests));
+ }
+ fDests[i].id = id;
+ fDests[i].addr = source;
+}
+
+static void _incomingRTP(void *instance, int)
+{
+ StreamState *stream = (StreamState *)instance;
+ stream->incomingRTP();
+}
+
void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,
netAddressBits clientAddress,
@@ -177,9 +208,9 @@ void OnDemandServerMediaSubsession
// Record these destinations as being for this client session id:
Destinations* destinations;
if (tcpSocketNum < 0) { // UDP
- destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
+ destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort, clientSessionId);
} else { // TCP
- destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
+ destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId, clientSessionId);
}
fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
}
@@ -415,7 +446,12 @@ StreamState::StreamState(OnDemandServerMediaSubsession& master,
fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
- fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
+ fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS),
+ fDests(NULL), fDestsCount(0) {
+
+ fRTPSink->envir().taskScheduler().setBackgroundHandling
+ (fRTPgs->socketNum(), SOCKET_READABLE|SOCKET_EXCEPTION,
+ (TaskScheduler::BackgroundHandlerProc *)_incomingRTP, this);
}
StreamState::~StreamState() {
@@ -455,7 +491,30 @@ void StreamState
} else {
// Tell the RTP and RTCP 'groupsocks' about this destination
// (in case they don't already have it):
- if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
+ if (fRTPgs != NULL) {
+ unsigned i;
+ for (i = 0; i < fDestsCount; i++)
+ if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr
+ && (dests->rtpPort.num() == fDests[i].addr.sin_port ||
+ fDests[i].id == dests->sessionId))
+ break;
+ if (i == fDestsCount)
+ for (i = 0; i < fDestsCount; i++)
+ if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr)
+ break;
+ if (i < fDestsCount) {
+ if (dests->rtpPort.num() != fDests[i].addr.sin_port) {
+ fprintf(stderr, "Received RTP for session %x, "
+ "with different ports, changing %hu -> %hu\n",
+ dests->sessionId, ntohs(dests->rtpPort.num()),
+ ntohs(fDests[i].addr.sin_port));
+ dests->rtpPort = ntohs(fDests[i].addr.sin_port);
+ }
+ fDests[i].addr.sin_addr.s_addr = 0;
+ }
+
+ fRTPgs->addDestination(dests->addr, dests->rtpPort);
+ }
if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
if (fRTCPInstance != NULL) {
fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
@@ -519,6 +578,7 @@ void StreamState::endPlaying(Destinations* dests) {
}
void StreamState::reclaim() {
+ fRTPSink->envir().taskScheduler().turnOffBackgroundReadHandling(fRTPgs->socketNum());
// Delete allocated media objects
Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL;
Medium::close(fRTPSink); fRTPSink = NULL;
@@ -529,4 +589,5 @@ void StreamState::reclaim() {
delete fRTPgs; fRTPgs = NULL;
delete fRTCPgs; fRTCPgs = NULL;
+ free(fDests);
}
diff --git a/liveMedia/RTCP.cpp b/liveMedia/RTCP.cpp
index fce4624..dfe07d3 100644
--- a/liveMedia/RTCP.cpp
+++ b/liveMedia/RTCP.cpp
@@ -169,6 +169,8 @@ RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
}
struct RRHandlerRecord {
+ portNumBits origPortNum;
+ Boolean seen;
TaskFunc* rrHandlerTask;
void* rrHandlerClientData;
};
@@ -258,6 +260,8 @@ void RTCPInstance
}
RRHandlerRecord* rrHandler = new RRHandlerRecord;
+ rrHandler->seen = False;
+ rrHandler->origPortNum = fromPort.num();
rrHandler->rrHandlerTask = handlerTask;
rrHandler->rrHandlerClientData = clientData;
if (fSpecificRRHandlerTable == NULL) {
@@ -503,9 +507,38 @@ void RTCPInstance::incomingReportHandler1() {
RRHandlerRecord* rrHandler
= (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
if (rrHandler != NULL) {
+ rrHandler->seen = True;
if (rrHandler->rrHandlerTask != NULL) {
(*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
}
+ } else if (tcpReadStreamSocketNum < 0 && !fIsSSMSource &&
+ !fSpecificRRHandlerTable->IsEmpty()) {
+ AddressPortLookupTable::Iterator i = *fSpecificRRHandlerTable;
+ netAddressBits a1, a2;
+ portNumBits p;
+ while(1) {
+ RRHandlerRecord *r = (RRHandlerRecord *)i.next(a1, a2, p);
+ if (r == NULL)
+ break;
+ if (a1 != fromAddr)
+ continue;
+ if (r->seen && p == r->origPortNum)
+ continue;
+ fSpecificRRHandlerTable->Remove(a1, a2, ntohs(p));
+ fSpecificRRHandlerTable->Add(a1, (~0), fromPort, r);
+
+ struct in_addr ia1;
+ ia1.s_addr = a1;
+ RTCPgs()->removeDestination(ia1, ntohs(p));
+ RTCPgs()->addDestination(ia1, fromPort);
+ fprintf(stderr, "RTCP, changing port from %hu to %hu\n",
+ ntohs(p), fromPortNum);
+
+ r->seen = True;
+ if (r->rrHandlerTask != NULL)
+ (*(r->rrHandlerTask))(r->rrHandlerClientData);
+ break;
+ }
}
}
diff --git a/liveMedia/include/OnDemandServerMediaSubsession.hh b/liveMedia/include/OnDemandServerMediaSubsession.hh
index bdfc228..dbd8aa2 100644
--- a/liveMedia/include/OnDemandServerMediaSubsession.hh
+++ b/liveMedia/include/OnDemandServerMediaSubsession.hh
@@ -121,12 +121,14 @@ class Destinations {
public:
Destinations(struct in_addr const& destAddr,
Port const& rtpDestPort,
- Port const& rtcpDestPort)
- : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
+ Port const& rtcpDestPort, unsigned sessionId)
+ : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort),
+ sessionId(sessionId) {
}
- Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
+ Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId, unsigned sessionId)
: isTCP(True), rtpPort(0) /*dummy*/, rtcpPort(0) /*dummy*/,
- tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
+ tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId),
+ rtcpChannelId(rtcpChanId), sessionId(sessionId) {
}
public:
@@ -136,6 +138,7 @@ public:
Port rtcpPort;
int tcpSocketNum;
unsigned char rtpChannelId, rtcpChannelId;
+ unsigned sessionId;
};
class StreamState {
@@ -154,6 +157,7 @@ public:
void pause();
void endPlaying(Destinations* destinations);
void reclaim();
+ void incomingRTP();
unsigned& referenceCount() { return fReferenceCount; }
@@ -186,6 +190,11 @@ private:
Groupsock* fRTPgs;
Groupsock* fRTCPgs;
+ struct Dests_t {
+ unsigned id;
+ struct sockaddr_in addr;
+ } *fDests;
+ unsigned fDestsCount;
};
#endif
--
1.7.10.4
More information about the live-devel
mailing list