[Live-devel] [PATCH] RTP/RTCP: use the packets sent by the receiver to

Gilles Chanteperdrix gilles.chanteperdrix at xenomai.org
Sun Dec 15 00:54:01 PST 2013


correct port numbers differences due to NAT. This implementation functions
correctly, but may not be the best integration into live architecture.
---
 groupsock/NetAddress.cpp                           |   18 +++++
 groupsock/include/NetAddress.hh                    |    5 +-
 liveMedia/OnDemandServerMediaSubsession.cpp        |   69 ++++++++++++++++++--
 liveMedia/RTCP.cpp                                 |   33 ++++++++++
 liveMedia/include/OnDemandServerMediaSubsession.hh |   17 +++--
 5 files changed, 133 insertions(+), 9 deletions(-)

diff --git a/groupsock/NetAddress.cpp b/groupsock/NetAddress.cpp
index ee2e605..66832dd 100644
--- a/groupsock/NetAddress.cpp
+++ b/groupsock/NetAddress.cpp
@@ -261,6 +261,11 @@ Boolean AddressPortLookupTable::Remove(netAddressBits address1,
   return fTable->Remove((char*)key);
 }
 
+Boolean AddressPortLookupTable::IsEmpty()
+{
+  return fTable->IsEmpty();
+}
+
 AddressPortLookupTable::Iterator::Iterator(AddressPortLookupTable& table)
   : fIter(HashTable::Iterator::create(*(table.fTable))) {
 }
@@ -274,6 +279,19 @@ void* AddressPortLookupTable::Iterator::next() {
   return fIter->next(key);
 }
 
+void* AddressPortLookupTable::Iterator::next(netAddressBits &address1, 
+					netAddressBits &address2,
+					portNumBits &port)
+{
+  const char *key;
+  const int *k;
+  void *v = fIter->next(key);
+  k = (const int *)key;
+  address1 = k[0];
+  address2 = k[1];
+  port = k[2];
+  return v;
+}
 
 ////////// isMulticastAddress() implementation //////////
 
diff --git a/groupsock/include/NetAddress.hh b/groupsock/include/NetAddress.hh
index 5b47c03..21b7588 100644
--- a/groupsock/include/NetAddress.hh
+++ b/groupsock/include/NetAddress.hh
@@ -122,6 +122,7 @@ class AddressPortLookupTable {
 	void* Lookup(netAddressBits address1, netAddressBits address2,
 		     Port port);
 		// Returns 0 if not found
+	Boolean IsEmpty();
 
 	// Used to iterate through the entries in the table
 	class Iterator {
@@ -129,8 +130,10 @@ class AddressPortLookupTable {
 		Iterator(AddressPortLookupTable& table);
 		virtual ~Iterator();
 
-		void* next(); // NULL iff none
+		void *next(); // NULL iff none
 
+		void *next(netAddressBits &address1, netAddressBits &address2,
+			portNumBits &port);
 	    private:
 		HashTable::Iterator* fIter;
 	};
diff --git a/liveMedia/OnDemandServerMediaSubsession.cpp b/liveMedia/OnDemandServerMediaSubsession.cpp
index 978a965..da81a0a 100644
--- a/liveMedia/OnDemandServerMediaSubsession.cpp
+++ b/liveMedia/OnDemandServerMediaSubsession.cpp
@@ -72,6 +72,37 @@ OnDemandServerMediaSubsession::sdpLines() {
   return fSDPLines;
 }
 
+void StreamState::incomingRTP()
+{
+  struct sockaddr_in source;
+  int freei = fDestsCount;
+  unsigned i, id;
+  
+  readSocket(fRTPSink->envir(), fRTPgs->socketNum(), 
+	    (unsigned char *)&id, sizeof(id), source);
+
+  for (i = 0; i < fDestsCount; i++) {
+    if (fDests[i].addr.sin_addr.s_addr == source.sin_addr.s_addr
+	&& fDests[i].addr.sin_port == source.sin_port)
+      return;
+    if (fDests[i].addr.sin_addr.s_addr == 0)
+      freei = i;
+  }
+  i = freei;
+  if (i == fDestsCount) {
+    ++fDestsCount;
+    fDests = (struct Dests_t *)realloc(fDests, fDestsCount * sizeof(*fDests));
+  }
+  fDests[i].id = id;
+  fDests[i].addr = source;
+}
+
+static void _incomingRTP(void *instance, int)
+{
+  StreamState *stream = (StreamState *)instance;
+  stream->incomingRTP();
+}
+
 void OnDemandServerMediaSubsession
 ::getStreamParameters(unsigned clientSessionId,
 		      netAddressBits clientAddress,
@@ -177,9 +208,9 @@ void OnDemandServerMediaSubsession
   // Record these destinations as being for this client session id:
   Destinations* destinations;
   if (tcpSocketNum < 0) { // UDP
-    destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
+    destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort, clientSessionId);
   } else { // TCP
-    destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
+    destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId, clientSessionId);
   }
   fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
 }
@@ -415,7 +446,12 @@ StreamState::StreamState(OnDemandServerMediaSubsession& master,
     fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
     fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
     fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
-    fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
+    fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS), 
+    fDests(NULL), fDestsCount(0) {
+  
+  fRTPSink->envir().taskScheduler().setBackgroundHandling
+    (fRTPgs->socketNum(), SOCKET_READABLE|SOCKET_EXCEPTION,
+      (TaskScheduler::BackgroundHandlerProc *)_incomingRTP, this);
 }
 
 StreamState::~StreamState() {
@@ -455,7 +491,30 @@ void StreamState
   } else {
     // Tell the RTP and RTCP 'groupsocks' about this destination
     // (in case they don't already have it):
-    if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
+    if (fRTPgs != NULL) {
+      unsigned i;
+      for (i = 0; i < fDestsCount; i++)
+	if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr
+	    && (dests->rtpPort.num() == fDests[i].addr.sin_port ||
+		fDests[i].id == dests->sessionId))
+	  break;
+      if (i == fDestsCount)
+	for (i = 0; i < fDestsCount; i++)
+	  if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr)
+	    break;
+      if (i < fDestsCount) {
+	if (dests->rtpPort.num() != fDests[i].addr.sin_port) {
+	  fprintf(stderr, "Received RTP for session %x, "
+		  "with different ports, changing %hu -> %hu\n", 
+		  dests->sessionId, ntohs(dests->rtpPort.num()), 
+		  ntohs(fDests[i].addr.sin_port));
+	  dests->rtpPort = ntohs(fDests[i].addr.sin_port);
+	}
+	fDests[i].addr.sin_addr.s_addr = 0;
+      }
+
+      fRTPgs->addDestination(dests->addr, dests->rtpPort);
+    }
     if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
     if (fRTCPInstance != NULL) {
       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
@@ -519,6 +578,7 @@ void StreamState::endPlaying(Destinations* dests) {
 }
 
 void StreamState::reclaim() {
+  fRTPSink->envir().taskScheduler().turnOffBackgroundReadHandling(fRTPgs->socketNum());
   // Delete allocated media objects
   Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL;
   Medium::close(fRTPSink); fRTPSink = NULL;
@@ -529,4 +589,5 @@ void StreamState::reclaim() {
 
   delete fRTPgs; fRTPgs = NULL;
   delete fRTCPgs; fRTCPgs = NULL;
+  free(fDests);
 }
diff --git a/liveMedia/RTCP.cpp b/liveMedia/RTCP.cpp
index fce4624..dfe07d3 100644
--- a/liveMedia/RTCP.cpp
+++ b/liveMedia/RTCP.cpp
@@ -169,6 +169,8 @@ RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
 }
 
 struct RRHandlerRecord {
+  portNumBits origPortNum;
+  Boolean seen;
   TaskFunc* rrHandlerTask;
   void* rrHandlerClientData;
 };
@@ -258,6 +260,8 @@ void RTCPInstance
   }
 
   RRHandlerRecord* rrHandler = new RRHandlerRecord;
+  rrHandler->seen = False;
+  rrHandler->origPortNum = fromPort.num();
   rrHandler->rrHandlerTask = handlerTask;
   rrHandler->rrHandlerClientData = clientData;
   if (fSpecificRRHandlerTable == NULL) {
@@ -503,9 +507,38 @@ void RTCPInstance::incomingReportHandler1() {
 	      RRHandlerRecord* rrHandler
 		= (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
 	      if (rrHandler != NULL) {
+		rrHandler->seen = True;
 		if (rrHandler->rrHandlerTask != NULL) {
 		  (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
 		}
+	      } else if (tcpReadStreamSocketNum < 0 && !fIsSSMSource && 
+			!fSpecificRRHandlerTable->IsEmpty()) {
+		AddressPortLookupTable::Iterator i = *fSpecificRRHandlerTable;
+		netAddressBits a1, a2;
+		portNumBits p;
+		while(1) {
+		  RRHandlerRecord *r = (RRHandlerRecord *)i.next(a1, a2, p);
+		  if (r == NULL)
+		    break;
+		  if (a1 != fromAddr)
+		    continue;
+		  if (r->seen && p == r->origPortNum)
+		    continue;
+		  fSpecificRRHandlerTable->Remove(a1, a2, ntohs(p));
+		  fSpecificRRHandlerTable->Add(a1, (~0), fromPort, r);
+
+		  struct in_addr ia1;
+		  ia1.s_addr = a1;
+		  RTCPgs()->removeDestination(ia1, ntohs(p));
+		  RTCPgs()->addDestination(ia1, fromPort);
+		  fprintf(stderr, "RTCP, changing port from %hu to %hu\n",
+			ntohs(p), fromPortNum);
+
+		  r->seen = True;
+		  if (r->rrHandlerTask != NULL)
+		    (*(r->rrHandlerTask))(r->rrHandlerClientData);
+		  break;
+		}
 	      }
 	    }
 
diff --git a/liveMedia/include/OnDemandServerMediaSubsession.hh b/liveMedia/include/OnDemandServerMediaSubsession.hh
index bdfc228..dbd8aa2 100644
--- a/liveMedia/include/OnDemandServerMediaSubsession.hh
+++ b/liveMedia/include/OnDemandServerMediaSubsession.hh
@@ -121,12 +121,14 @@ class Destinations {
 public:
   Destinations(struct in_addr const& destAddr,
                Port const& rtpDestPort,
-               Port const& rtcpDestPort)
-    : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
+	      Port const& rtcpDestPort, unsigned sessionId)
+    : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort),
+      sessionId(sessionId) {
   }
-  Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
+  Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId, unsigned sessionId)
     : isTCP(True), rtpPort(0) /*dummy*/, rtcpPort(0) /*dummy*/,
-      tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
+      tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), 
+      rtcpChannelId(rtcpChanId), sessionId(sessionId) {
   }
 
 public:
@@ -136,6 +138,7 @@ public:
   Port rtcpPort;
   int tcpSocketNum;
   unsigned char rtpChannelId, rtcpChannelId;
+  unsigned sessionId;
 };
 
 class StreamState {
@@ -154,6 +157,7 @@ public:
   void pause();
   void endPlaying(Destinations* destinations);
   void reclaim();
+  void incomingRTP();
 
   unsigned& referenceCount() { return fReferenceCount; }
 
@@ -186,6 +190,11 @@ private:
 
   Groupsock* fRTPgs;
   Groupsock* fRTCPgs;
+  struct Dests_t {
+    unsigned id;
+    struct sockaddr_in addr;
+  } *fDests;
+  unsigned fDestsCount;
 };
 
 #endif
-- 
1.7.10.4



More information about the live-devel mailing list