[Live-devel] Adding UDP Video to an Audio RTSP URL coming from an AXIS encoder in the same stream doesn't work

Eric Eastman Eric.Eastman at AITEngineering.com
Thu Nov 2 10:20:42 PDT 2023


I'm trying to combine video and audio in the same stream. I do see posts were you just add a subsession, but it doesn't work if the subsession is an rtsp url. It also doesn't work the other way around where I do the RTSP URL audio first and then add the MPEG4 video subsession.

When I do the following it works and the Audio comes thru with no video:
//This is just audio, there's no cameras hooked up to the encoder.
inputAudioStr = "rtsp://193.168.5.11:554/axis-media/media.amp?video=0&audio=1";
sms = ProxyServerMediaSession::createNew(*env, rtspServer, inputAudioStr, streamName);

When I add the video from the UDP it doesn't work:
//This is just audio, there's no cameras hooked up to the encoder.
inputAudioStr = "rtsp://193.168.5.11:554/axis-media/media.amp?video=0&audio=1";
//This is where the video comes from
inputAddressStr = "239.255.0.1";
inputPortNum = "11841";
intr_addr = "0.0.0.17";
isTransportingStream = false;
sms = ProxyServerMediaSession::createNew(*env, rtspServer, inputAudioStr, streamName);
sms->addSubsession(UDPServerMediaSubsession::createNew(*env, inputAddressStr, inputPortNum, intr_addr, isTransportingStream));

If I just do the video from the UDP it also works:
//This is where the video comes from
inputAddressStr = "239.255.0.1";
inputPortNum = "11841";
intr_addr = "0.0.0.17";
isTransportingStream = false;
sms = ServerMediaSession::createNew(*env, streamName, streamName, descriptionString);
sms->addSubsession(UDPServerMediaSubsession::createNew(*env, inputAddressStr, inputPortNum, intr_addr, isTransportingStream));

Any help or direction would be appreciated.

Thanks,

Eric Eastman

P.S. see attached code
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.live555.com/pipermail/live-devel/attachments/20231102/a2ce8114/attachment-0001.htm>
-------------- next part --------------
#pragma once

#define STRINGIZE2(s) #s
#define STRINGIZE(s) STRINGIZE2(s)

#define VERSION_MAJOR               1
#define VERSION_MINOR               0
#define VERSION_REVISION            0
#define VERSION_BUILD               0

#define VER_FILE_DESCRIPTION_STR    "An RTSP server that proxies exisitng RTSP Servers or takes in raw MPEGTS UDP streams as input."
#define VER_FILE_VERSION            VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION, VERSION_BUILD
#define VER_FILE_VERSION_STR        STRINGIZE(VERSION_MAJOR)        \
                                    "." STRINGIZE(VERSION_MINOR)    \
                                    "." STRINGIZE(VERSION_REVISION) \
                                    "." STRINGIZE(VERSION_BUILD)    \

#define VER_PRODUCTNAME_STR         "RtspProxyServer"
#define VER_PRODUCT_VERSION         VER_FILE_VERSION
#define VER_PRODUCT_VERSION_STR     VER_FILE_VERSION_STR
#define VER_ORIGINAL_FILENAME_STR   VER_PRODUCTNAME_STR ".exe"
#define VER_INTERNAL_NAME_STR       VER_ORIGINAL_FILENAME_STR
#define VER_COPYRIGHT_STR           "Copyright (C) 2021"

//Adding pause for debug only
#define USE_PAUSE
#ifndef _DEBUG
#undef USE_PAUSE
#endif

#ifdef _DEBUG
#define VER_VER_DEBUG             VS_FF_DEBUG
#else
#define VER_VER_DEBUG             0
#endif

#define VER_FILEOS                  VOS_NT_WINDOWS32
#define VER_FILEFLAGS               VER_VER_DEBUG
#define VER_FILETYPE                VFT_APP
-------------- next part --------------
#include "OnDemandMediaSubsession.h"
#include <GroupsockHelper.hh>

OnDemandMediaSubsession
::OnDemandMediaSubsession(UsageEnvironment& env,
	Boolean reuseFirstSource,
	portNumBits initialPortNum,
	Boolean multiplexRTCPWithRTP)
	: ServerMediaSubsession(env),
	fSDPLines(NULL), fReuseFirstSource(reuseFirstSource),
	fMultiplexRTCPWithRTP(multiplexRTCPWithRTP), fLastStreamToken(NULL),
	fAppHandlerTask(NULL), fAppHandlerClientData(NULL) {
	fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
	if (fMultiplexRTCPWithRTP) {
		fInitialPortNum = initialPortNum;
	}
	else {
		// Make sure RTP ports are even-numbered:
		fInitialPortNum = (initialPortNum + 1)&~1;
	}
	gethostname(fCNAME, sizeof fCNAME);
	fCNAME[sizeof fCNAME - 1] = '\0'; // just in case
}

OnDemandMediaSubsession::~OnDemandMediaSubsession() {
	delete[] fSDPLines;

	// Clean out the destinations hash table:
	while (1) {
		Destinations* destinations
			= (Destinations*)(fDestinationsHashTable->RemoveNext());
		if (destinations == NULL) break;
		delete destinations;
	}
	delete fDestinationsHashTable;
}

char const*
OnDemandMediaSubsession::sdpLines(int addressFamily) {
	if (fSDPLines == NULL) {
		// We need to construct a set of SDP lines that describe this
		// subsession (as a unicast stream).  To do so, we first create
		// dummy (unused) source and "RTPSink" objects,
		// whose parameters we use for the SDP lines:
		unsigned estBitrate;
		FramedSource* inputSource = createNewStreamSource(0, estBitrate);
		if (inputSource == NULL) return NULL; // file not found

		Groupsock* dummyGroupsock = createGroupsock(nullAddress(addressFamily), 0);
		unsigned char rtpPayloadType = 96 + trackNumber() - 1; // if dynamic
		RTPSink* dummyRTPSink = createNewRTPSink(dummyGroupsock, rtpPayloadType, inputSource);
		if (dummyRTPSink != NULL && dummyRTPSink->estimatedBitrate() > 0) estBitrate = dummyRTPSink->estimatedBitrate();

		setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);
		Medium::close(dummyRTPSink);
		delete dummyGroupsock;
		closeStreamSource(inputSource);
	}

	return fSDPLines;
}

void OnDemandMediaSubsession
::getStreamParameters(unsigned clientSessionId,
	struct sockaddr_storage const& clientAddress,
	Port const& clientRTPPort,
	Port const& clientRTCPPort,
	int tcpSocketNum,
	unsigned char rtpChannelId,
	unsigned char rtcpChannelId,
	struct sockaddr_storage& destinationAddress,
	u_int8_t& /*destinationTTL*/,
	Boolean& isMulticast,
	Port& serverRTPPort,
	Port& serverRTCPPort,
	void*& streamToken) {
	if (addressIsNull(destinationAddress)) {
		// normal case - use the client address as the destination address:
		destinationAddress = clientAddress;
	}
	isMulticast = False;

	if (fLastStreamToken != NULL && fReuseFirstSource) {
		// Special case: Rather than creating a new 'OnDemandStreamState',
		// we reuse the one that we've already created:
		serverRTPPort = ((OnDemandStreamState*)fLastStreamToken)->serverRTPPort();
		serverRTCPPort = ((OnDemandStreamState*)fLastStreamToken)->serverRTCPPort();
		++((OnDemandStreamState*)fLastStreamToken)->referenceCount();
		streamToken = fLastStreamToken;
	}
	else {
		// Normal case: Create a new media source:
		unsigned streamBitrate;
		FramedSource* mediaSource
			= createNewStreamSource(clientSessionId, streamBitrate);

		// Create 'groupsock' and 'sink' objects for the destination,
		// using previously unused server port numbers:
		RTPSink* rtpSink = NULL;
		BasicUDPSink* udpSink = NULL;
		Groupsock* rtpGroupsock = NULL;
		Groupsock* rtcpGroupsock = NULL;

		if (clientRTPPort.num() != 0 || tcpSocketNum >= 0) { // Normal case: Create destinations
			portNumBits serverPortNum;
			if (clientRTCPPort.num() == 0) {
				// We're streaming raw UDP (not RTP). Create a single groupsock:
				NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
				for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
					serverRTPPort = serverPortNum;
					rtpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTPPort);
					if (rtpGroupsock->socketNum() >= 0) break; // success
				}

				udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
			}
			else {
				// Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
				// groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even).
				// (If we're multiplexing RTCP and RTP over the same port number, it can be odd or even.)
				NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
				for (portNumBits serverPortNum = fInitialPortNum; ; ++serverPortNum) {
					serverRTPPort = serverPortNum;
					rtpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTPPort);
					if (rtpGroupsock->socketNum() < 0) {
						delete rtpGroupsock;
						continue; // try again
					}

					if (fMultiplexRTCPWithRTP) {
						// Use the RTP 'groupsock' object for RTCP as well:
						serverRTCPPort = serverRTPPort;
						rtcpGroupsock = rtpGroupsock;
					}
					else {
						// Create a separate 'groupsock' object (with the next (odd) port number) for RTCP:
						serverRTCPPort = ++serverPortNum;
						rtcpGroupsock = createGroupsock(nullAddress(destinationAddress.ss_family), serverRTCPPort);
						if (rtcpGroupsock->socketNum() < 0) {
							delete rtpGroupsock;
							delete rtcpGroupsock;
							continue; // try again
						}
					}

					break; // success
				}

				unsigned char rtpPayloadType = 96 + trackNumber() - 1; // if dynamic
				rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
				if (rtpSink != NULL && rtpSink->estimatedBitrate() > 0) streamBitrate = rtpSink->estimatedBitrate();
			}

			// Turn off the destinations for each groupsock.  They'll get set later
			// (unless TCP is used instead):
			if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
			if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();

			if (rtpGroupsock != NULL) {
				// Try to use a big send buffer for RTP -  at least 0.1 second of
				// specified bandwidth and at least 50 KB
				unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
				if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
				increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
			}
		}

		// Set up the state of the stream.  The stream will get started later:
		streamToken = fLastStreamToken
			= new OnDemandStreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
				streamBitrate, mediaSource,
				rtpGroupsock, rtcpGroupsock);
	}

	// Record these destinations as being for this client session id:
	Destinations* destinations;
	if (tcpSocketNum < 0) { // UDP
		destinations = new Destinations(destinationAddress, clientRTPPort, clientRTCPPort);
	}
	else { // TCP
		destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
	}
	fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
}

void OnDemandMediaSubsession::startStream(unsigned clientSessionId,
	void* streamToken,
	TaskFunc* rtcpRRHandler,
	void* rtcpRRHandlerClientData,
	unsigned short& rtpSeqNum,
	unsigned& rtpTimestamp,
	ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
	void* serverRequestAlternativeByteHandlerClientData) {
	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	Destinations* destinations
		= (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
	if (streamState != NULL) {
		streamState->startPlaying(destinations, clientSessionId,
			rtcpRRHandler, rtcpRRHandlerClientData,
			serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
		RTPSink* rtpSink = streamState->rtpSink(); // alias
		if (rtpSink != NULL) {
			rtpSeqNum = rtpSink->currentSeqNo();
			rtpTimestamp = rtpSink->presetNextTimestamp();
		}
	}
}

void OnDemandMediaSubsession::pauseStream(unsigned /*clientSessionId*/,
	void* streamToken) {
	// Pausing isn't allowed if multiple clients are receiving data from
	// the same source:
	if (fReuseFirstSource) return;

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	if (streamState != NULL) streamState->pause();
}

void OnDemandMediaSubsession::seekStream(unsigned /*clientSessionId*/,
	void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) {
	numBytes = 0; // by default: unknown

	// Seeking isn't allowed if multiple clients are receiving data from the same source:
	if (fReuseFirstSource) return;

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	if (streamState != NULL && streamState->mediaSource() != NULL) {
		seekStreamSource(streamState->mediaSource(), seekNPT, streamDuration, numBytes);

		streamState->startNPT() = (float)seekNPT;
		RTPSink* rtpSink = streamState->rtpSink(); // alias
		if (rtpSink != NULL) rtpSink->resetPresentationTimes();
	}
}

void OnDemandMediaSubsession::seekStream(unsigned /*clientSessionId*/,
	void* streamToken, char*& absStart, char*& absEnd) {
	// Seeking isn't allowed if multiple clients are receiving data from the same source:
	if (fReuseFirstSource) return;

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	if (streamState != NULL && streamState->mediaSource() != NULL) {
		seekStreamSource(streamState->mediaSource(), absStart, absEnd);
	}
}

void OnDemandMediaSubsession::nullSeekStream(unsigned /*clientSessionId*/, void* streamToken,
	double streamEndTime, u_int64_t& numBytes) {
	numBytes = 0; // by default: unknown

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	if (streamState != NULL && streamState->mediaSource() != NULL) {
		// Because we're not seeking here, get the current NPT, and remember it as the new 'start' NPT:
		streamState->startNPT() = getCurrentNPT(streamToken);

		double duration = streamEndTime - streamState->startNPT();
		if (duration < 0.0) duration = 0.0;
		setStreamSourceDuration(streamState->mediaSource(), duration, numBytes);

		RTPSink* rtpSink = streamState->rtpSink(); // alias
		if (rtpSink != NULL) rtpSink->resetPresentationTimes();
	}
}

void OnDemandMediaSubsession::setStreamScale(unsigned /*clientSessionId*/,
	void* streamToken, float scale) {
	// Changing the scale factor isn't allowed if multiple clients are receiving data
	// from the same source:
	if (fReuseFirstSource) return;

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	if (streamState != NULL && streamState->mediaSource() != NULL) {
		setStreamSourceScale(streamState->mediaSource(), scale);
	}
}

float OnDemandMediaSubsession::getCurrentNPT(void* streamToken) {
	do {
		if (streamToken == NULL) break;

		OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
		RTPSink* rtpSink = streamState->rtpSink();
		if (rtpSink == NULL) break;

		return streamState->startNPT()
			+ (rtpSink->mostRecentPresentationTime().tv_sec - rtpSink->initialPresentationTime().tv_sec)
			+ (rtpSink->mostRecentPresentationTime().tv_usec - rtpSink->initialPresentationTime().tv_usec) / 1000000.0f;
	} while (0);

	return 0.0;
}

FramedSource* OnDemandMediaSubsession::getStreamSource(void* streamToken) {
	if (streamToken == NULL) return NULL;

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	return streamState->mediaSource();
}

void OnDemandMediaSubsession
::getRTPSinkandRTCP(void* streamToken,
	RTPSink const*& rtpSink, RTCPInstance const*& rtcp) {
	if (streamToken == NULL) {
		rtpSink = NULL;
		rtcp = NULL;
		return;
	}

	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;
	rtpSink = streamState->rtpSink();
	rtcp = streamState->rtcpInstance();
}

void OnDemandMediaSubsession::deleteStream(unsigned clientSessionId,
	void*& streamToken) {
	OnDemandStreamState* streamState = (OnDemandStreamState*)streamToken;

	// Look up (and remove) the destinations for this client session:
	Destinations* destinations
		= (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
	if (destinations != NULL) {
		fDestinationsHashTable->Remove((char const*)clientSessionId);

		// Stop streaming to these destinations:
		if (streamState != NULL) streamState->endPlaying(destinations, clientSessionId);
	}

	// Delete the "OnDemandStreamState" structure if it's no longer being used:
	if (streamState != NULL) {
		if (streamState->referenceCount() > 0) --streamState->referenceCount();
		if (streamState->referenceCount() == 0) {
			delete streamState;
			streamToken = NULL;
		}
	}

	// Finally, delete the destinations themselves:
	delete destinations;
}

char const* OnDemandMediaSubsession
::getAuxSDPLine(RTPSink* rtpSink, FramedSource* /*inputSource*/) {
	// Default implementation:
	return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
}

void OnDemandMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
	double& /*seekNPT*/, double /*streamDuration*/, u_int64_t& numBytes) {
	// Default implementation: Do nothing
	numBytes = 0;
}

void OnDemandMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
	char*& absStart, char*& absEnd) {
	// Default implementation: do nothing (but delete[] and assign "absStart" and "absEnd" to NULL, to show that we don't handle this)
	delete[] absStart; absStart = NULL;
	delete[] absEnd; absEnd = NULL;
}

void OnDemandMediaSubsession
::setStreamSourceScale(FramedSource* /*inputSource*/, float /*scale*/) {
	// Default implementation: Do nothing
}

void OnDemandMediaSubsession
::setStreamSourceDuration(FramedSource* /*inputSource*/, double /*streamDuration*/, u_int64_t& numBytes) {
	// Default implementation: Do nothing
	numBytes = 0;
}

void OnDemandMediaSubsession::closeStreamSource(FramedSource *inputSource) {
	Medium::close(inputSource);
}

Groupsock* OnDemandMediaSubsession
::createGroupsock(struct sockaddr_storage const& addr, Port port) {
	// Default implementation; may be redefined by subclasses:
	return new Groupsock(envir(), addr, port, 255);
}

RTCPInstance* OnDemandMediaSubsession
::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
	unsigned char const* cname, RTPSink* sink) {
	// Default implementation; may be redefined by subclasses:
	return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/);
}

void OnDemandMediaSubsession
::setRTCPAppPacketHandler(RTCPAppHandlerFunc* handler, void* clientData) {
	fAppHandlerTask = handler;
	fAppHandlerClientData = clientData;
}

void OnDemandMediaSubsession
::sendRTCPAppPacket(u_int8_t subtype, char const* name,
	u_int8_t* appDependentData, unsigned appDependentDataSize) {
	OnDemandStreamState* streamState = (OnDemandStreamState*)fLastStreamToken;
	if (streamState != NULL) {
		streamState->sendRTCPAppPacket(subtype, name, appDependentData, appDependentDataSize);
	}
}

void OnDemandMediaSubsession
::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
	if (rtpSink == NULL) return;

	char const* mediaType = rtpSink->sdpMediaType();
	unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
	struct sockaddr_storage const& addressForSDP = rtpSink->groupsockBeingUsed().groupAddress();
	portNumBits portNumForSDP = ntohs(rtpSink->groupsockBeingUsed().port().num());

	AddressString ipAddressStr(addressForSDP);
	char* rtpmapLine = rtpSink->rtpmapLine();
	char const* rtcpmuxLine = fMultiplexRTCPWithRTP ? "a=rtcp-mux\r\n" : "";
	char const* rangeLine = rangeSDPLine();
	char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
	if (auxSDPLine == NULL) auxSDPLine = "";

	char const* const sdpFmt =
		"m=%s %u RTP/AVP %d\r\n"
		"c=IN %s %s\r\n"
		"b=AS:%u\r\n"
		"%s"
		"%s"
		"%s"
		"%s"
		"a=control:%s\r\n";
	unsigned sdpFmtSize = strlen(sdpFmt)
		+ strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */
		+ 3/*IP4 or IP6*/ + strlen(ipAddressStr.val())
		+ 20 /* max int len */
		+ strlen(rtpmapLine)
		+ strlen(rtcpmuxLine)
		+ strlen(rangeLine)
		+ strlen(auxSDPLine)
		+ strlen(trackId());
	char* sdpLines = new char[sdpFmtSize];
	sprintf(sdpLines, sdpFmt,
		mediaType, // m= <media>
		portNumForSDP, // m= <port>
		rtpPayloadType, // m= <fmt list>
		addressForSDP.ss_family == AF_INET ? "IP4" : "IP6", ipAddressStr.val(), // c= address
		estBitrate, // b=AS:<bandwidth>
		rtpmapLine, // a=rtpmap:... (if present)
		rtcpmuxLine, // a=rtcp-mux:... (if present)
		rangeLine, // a=range:... (if present)
		auxSDPLine, // optional extra SDP line
		trackId()); // a=control:<track-id>
	delete[](char*)rangeLine; delete[] rtpmapLine;

	delete[] fSDPLines; fSDPLines = strDup(sdpLines);
	delete[] sdpLines;
}


////////// OnDemandStreamState implementation //////////

static void afterPlayingOnDemandStreamState(void* clientData) {
	OnDemandStreamState* streamState = (OnDemandStreamState*)clientData;
	if (streamState->streamDuration() == 0.0) {
		// When the input stream ends, tear it down.  This will cause a RTCP "BYE"
		// to be sent to each client, teling it that the stream has ended.
		// (Because the stream didn't have a known duration, there was no other
		//  way for clients to know when the stream ended.)
		streamState->reclaim();
	}
	// Otherwise, keep the stream alive, in case a client wants to
	// subsequently re-play the stream starting from somewhere other than the end.
	// (This can be done only on streams that have a known duration.)
}

OnDemandStreamState::OnDemandStreamState(OnDemandMediaSubsession& master,
	Port const& serverRTPPort, Port const& serverRTCPPort,
	RTPSink* rtpSink, BasicUDPSink* udpSink,
	unsigned totalBW, FramedSource* mediaSource,
	Groupsock* rtpGS, Groupsock* rtcpGS)
	: fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
	fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
	fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
	fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
	fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
}

OnDemandStreamState::~OnDemandStreamState() {
	reclaim();
}

void OnDemandStreamState
::startPlaying(Destinations* dests, unsigned clientSessionId,
	TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
	ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
	void* serverRequestAlternativeByteHandlerClientData) {
	if (dests == NULL) return;

	if (fRTCPInstance == NULL && fRTPSink != NULL) {
		// Create (and start) a 'RTCP instance' for this RTP sink:
		fRTCPInstance = fMaster.createRTCP(fRTCPgs, fTotalBW, (unsigned char*)fMaster.fCNAME, fRTPSink);
		// Note: This starts RTCP running automatically
		fRTCPInstance->setAppHandler(fMaster.fAppHandlerTask, fMaster.fAppHandlerClientData);
	}

	if (dests->isTCP) {
		// Change RTP and RTCP to use the TCP socket instead of UDP:
		if (fRTPSink != NULL) {
			fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
			RTPInterface
				::setServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum,
					serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
			// So that we continue to handle RTSP commands from the client
		}
		if (fRTCPInstance != NULL) {
			fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);

			struct sockaddr_storage tcpSocketNumAsAddress; // hack
			tcpSocketNumAsAddress.ss_family = AF_INET;
			((sockaddr_in&)tcpSocketNumAsAddress).sin_addr.s_addr = dests->tcpSocketNum;
			fRTCPInstance->setSpecificRRHandler(tcpSocketNumAsAddress, dests->rtcpChannelId,
				rtcpRRHandler, rtcpRRHandlerClientData);
		}
	}
	else {
		// Tell the RTP and RTCP 'groupsocks' about this destination
		// (in case they don't already have it):
		if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort, clientSessionId);
		if (fRTCPgs != NULL && !(fRTCPgs == fRTPgs && dests->rtcpPort.num() == dests->rtpPort.num())) {
			fRTCPgs->addDestination(dests->addr, dests->rtcpPort, clientSessionId);
		}
		if (fRTCPInstance != NULL) {
			fRTCPInstance->setSpecificRRHandler(dests->addr, dests->rtcpPort,
				rtcpRRHandler, rtcpRRHandlerClientData);
		}
	}

	// Some clients dont like the RTCP packet before the initial RTP packet
	//if (fRTCPInstance != NULL) {
	//	// Hack: Send an initial RTCP "SR" packet, before the initial RTP packet, so that receivers will (likely) be able to
	//	// get RTCP-synchronized presentation times immediately:
	//	fRTCPInstance->sendReport();
	//}

	if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
		if (fRTPSink != NULL) {
			fRTPSink->startPlaying(*fMediaSource, afterPlayingOnDemandStreamState, this);
			fAreCurrentlyPlaying = True;
		}
		else if (fUDPSink != NULL) {
			fUDPSink->startPlaying(*fMediaSource, afterPlayingOnDemandStreamState, this);
			fAreCurrentlyPlaying = True;
		}
	}
}

void OnDemandStreamState::pause() {
	if (fRTPSink != NULL) fRTPSink->stopPlaying();
	if (fUDPSink != NULL) fUDPSink->stopPlaying();
	if (fMediaSource != NULL) fMediaSource->stopGettingFrames();
	fAreCurrentlyPlaying = False;
}

void OnDemandStreamState::endPlaying(Destinations* dests, unsigned clientSessionId) {
#if 0
	// The following code is temporarily disabled, because it erroneously sends RTCP "BYE"s to all clients if multiple
	// clients are streaming from the same data source (i.e., if "reuseFirstSource" is True), and we don't want that to happen
	// if we're being called as a result of a single one of these clients having sent a "TEARDOWN" (rather than the whole stream
	// having been closed, for all clients).
	// This will be fixed for real later.
	if (fRTCPInstance != NULL) {
		// Hack: Explicitly send a RTCP "BYE" packet now, because the code below will prevent that from happening later,
		// when "fRTCPInstance" gets deleted:
		fRTCPInstance->sendBYE();
	}
#endif

	if (dests->isTCP) {
		if (fRTPSink != NULL) {
			// Comment out the following, because it prevents the "RTSPClientConnection" object
			// from being closed after handling a "TEARDOWN": #####
			//RTPInterface::clearServerRequestAlternativeByteHandler(fRTPSink->envir(), dests->tcpSocketNum);
			fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
		}
		if (fRTCPInstance != NULL) {
			fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);

			struct sockaddr_storage tcpSocketNumAsAddress; // hack
			tcpSocketNumAsAddress.ss_family = AF_INET;
			((sockaddr_in&)tcpSocketNumAsAddress).sin_addr.s_addr = dests->tcpSocketNum;
			fRTCPInstance->unsetSpecificRRHandler(tcpSocketNumAsAddress, dests->rtcpChannelId);
		}
	}
	else {
		// Tell the RTP and RTCP 'groupsocks' to stop using these destinations:
		if (fRTPgs != NULL) fRTPgs->removeDestination(clientSessionId);
		if (fRTCPgs != NULL && fRTCPgs != fRTPgs) fRTCPgs->removeDestination(clientSessionId);
		if (fRTCPInstance != NULL) {
			fRTCPInstance->unsetSpecificRRHandler(dests->addr, dests->rtcpPort);
		}
	}
}

void OnDemandStreamState::sendRTCPAppPacket(u_int8_t subtype, char const* name,
	u_int8_t* appDependentData, unsigned appDependentDataSize) {
	if (fRTCPInstance != NULL) {
		fRTCPInstance->sendAppPacket(subtype, name, appDependentData, appDependentDataSize);
	}
}

void OnDemandStreamState::reclaim() {
	// Delete allocated media objects
	Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL;
	Medium::close(fRTPSink); fRTPSink = NULL;
	Medium::close(fUDPSink); fUDPSink = NULL;

	fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
	if (fMaster.fLastStreamToken == this) fMaster.fLastStreamToken = NULL;

	delete fRTPgs;
	if (fRTCPgs != fRTPgs) delete fRTCPgs;
	fRTPgs = NULL; fRTCPgs = NULL;
}
-------------- next part --------------
#include "liveMedia.hh"
#include "BasicUsageEnvironment.hh"
#include <GroupsockHelper.hh> 
#include "UDPServerMediaSubsession.h"
#include <tclap/CmdLine.h>
#include "version.h"

UsageEnvironment* env;

static void announceURL(RTSPServer* rtspServer, ServerMediaSession* sms); // forward

int main(int argc, char** argv) {
#ifdef USE_PAUSE
	// Debug pause used to help attach remote debugger.
	std::cout << "Press Enter to continue . . .";
	std::cin.get();
#endif
	// Begin by setting up our usage environment:
	TaskScheduler* scheduler = BasicTaskScheduler::createNew();
	env = BasicUsageEnvironment::createNew(*scheduler);

	char const* descriptionString = "Session streamed by \"" VER_PRODUCTNAME_STR "\"";

	try {

		// Set up the argument parser
		TCLAP::CmdLine cmd(VER_FILE_DESCRIPTION_STR, ' ', VER_PRODUCT_VERSION_STR);

		TCLAP::MultiArg<std::string> inputArgs("i", "input", "The input for the RTSP server, " 
			"can be either an RTSP URL or a Raw MPEGTS stream.\n\n" 
			"RTSP input: an rtsp url using either the rtsp or rtsps protocol prefix\n\n" 
			"MPEGTS input: <[interface@]host:port> for the UDP stream." 
			" Interface is optional and can either be the IP addresss or the interface Id. Ex: 192.168.1.1 at 239.255.0.0:1234", true,"string");
		cmd.add(inputArgs);

		TCLAP::ValueArg<portNumBits> portArg("p", "port", "The port to host the RTSP server on. Default is 554", false, 554, "int");
		cmd.add(portArg);

		TCLAP::ValueArg<std::string> streamNameArg("s", "streamNames", "Stream name(s) that will be appended to the end of the RTSP url." 
			"To specify a unique stream name for each input url provide a comma(,) seperated list.", false, "stream", "string");
		cmd.add(streamNameArg);

		TCLAP::SwitchArg isRawMPEG4("r", "raw", "Does this video need to be transported? -r if not and it's raw MPEG4 and there's no need to transport.");
		cmd.add(isRawMPEG4);

		TCLAP::MultiArg<std::string> inputSeparateAudio("a", "audio", 
			"(optional) The audio input for the RTSP server, can be either an RTSP URL or\n"
			"a Raw MPEGTS stream.The audio is mixed with the MPEGTS stream or RTSP URL\n"
			"specified in the input parameter.If left blank none is used..", false, "string");
		cmd.add(inputSeparateAudio);

		cmd.parse(argc, argv);

		// Parse arguments
		std::vector<std::string> inputUrls = inputArgs.getValue();
		std::vector<std::string> inputAudioUrls = inputSeparateAudio.getValue();
		portNumBits rtspPort = portArg.getValue();
		std::string streamNamePrefix = streamNameArg.getValue().c_str();
		std::vector<std::string> streamNamePrefixes;
		std::stringstream ss(streamNamePrefix);
		bool isTransportingStream = !isRawMPEG4.getValue();

		// Parse stream names, either 1 or equal to input urls
		while (ss.good()) {
			std::string substr;
			std::getline(ss, substr, ',');
			streamNamePrefixes.push_back(substr);
		}
		std::vector<std::string>::size_type namePefixSize = streamNamePrefixes.size();
		if (namePefixSize > 1 && namePefixSize != inputUrls.size()) {
			throw TCLAP::ArgException("Number of string name prefixes must be 1 or match the number of input urls.", "streamName");
		}

		UserAuthenticationDatabase* authDB = NULL;
#ifdef ACCESS_CONTROL
		// To implement client access control to the RTSP server, do the following:
		authDB = new UserAuthenticationDatabase;
		authDB->addUserRecord("username1", "password1"); // replace these with real strings
		// Repeat the above with each <username>, <password> that you wish to allow
		// access to the server.
#endif

		// Create the RTSP server:
		RTSPServer* rtspServer = RTSPServer::createNew(*env, rtspPort, authDB);
		if (rtspServer == NULL) {
			*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
			exit(1);
		}

		char const* rtspPrefix = "rtsp://";
		unsigned const prefix1Length = 7;
		char const* rtspsPrefix = "rtsps://";
		unsigned const prefix2Length = 8;

		int i = 1;
		// Parse input urls
		for (std::string inputUrl : inputUrls) {

			//Should be big enough
			char streamName[128];

			//Get stream name
			if (namePefixSize > 1) {
				sprintf(streamName, "%s", streamNamePrefixes.at(i-1).c_str());
			}
			else {
				if (inputUrls.size() == 1) {
					sprintf(streamName, "%s", streamNamePrefixes.front().c_str()); // there's just one stream; give it this name
				}
				else {
					sprintf(streamName, "%s-%d", streamNamePrefixes.front().c_str(), i); // there's more than one stream; distinguish them by name
				}
			}

			ServerMediaSession* sms = NULL;

			// Handle Proxying RTSP stream
			if (inputUrl.find(rtspPrefix) == 0) {

				char const* inputAddressStr = inputUrl.c_str();

#pragma warning(suppress: 4018)
				if (inputAudioUrls.size() >= i && !inputAudioUrls.at(i - 1).empty()) {
					char const* inputAudioStr = inputAudioUrls.at(i - 1).c_str();
					//This works fine by itself and plays video only.
					sms = ProxyServerMediaSession::createNew(*env, rtspServer, inputAddressStr, streamName);
					//TODO: How do I add the audio?
					//sms->addSubsession(ProxyServerMediaSession::createNew(*env, rtspServer, inputAudioStr, streamName));
				}
				else {
					//This works fine by itself and plays the video stream with no audio.
					sms = ProxyServerMediaSession::createNew(*env, rtspServer, inputAddressStr, streamName);
				}

				*env << "\n RTSP input source\n";
				*env << "\t(proxying the rtsp stream \""<< inputAddressStr <<")\n";
			}
			// Else Handle MPEGTS stream input
			else {

				// Parse interface IP
				size_t indx = inputUrl.find_first_of("@");
				std::string interface_addr = "";
				if (indx < inputUrl.length()) {
					interface_addr = inputUrl.substr(0, indx);
					inputUrl = inputUrl.substr(indx + 1);
				}

				//Parse IP:Port from remaining Input URL
				indx = inputUrl.find_first_of(":");
				std::string inputHost = inputUrl.substr(0, indx);
				std::string inputPort = inputUrl.substr(indx + 1);

				if (inputHost.empty() || inputPort.empty()) {
					throw TCLAP::ArgException("Invalid input url", "input");
				}

				portNumBits inputPortNum;
				if (sscanf(inputPort.c_str(), "%hu", &inputPortNum) != 1 || inputPortNum <= 0) {
					throw TCLAP::ArgException("Invalid input url", "input");
				}

				// Set up each of the possible streams that can be served by the
				// RTSP server.  Each such stream is implemented using a
				// "ServerMediaSession" object, plus one or more
				// "ServerMediaSubsession" objects for each audio/video substream.

				char const* inputAddressStr = inputHost.c_str();

				char const* intr_addr = interface_addr.c_str();

#pragma warning(suppress: 4018)
				if (inputAudioUrls.size() >= i && !inputAudioUrls.at(i - 1).empty()) {
					char const* inputAudioStr = inputAudioUrls.at(i - 1).c_str();
					//This works fine by itself and plays audio only. If I add any video subsessions then this doesn't work.
					sms = ProxyServerMediaSession::createNew(*env, rtspServer, inputAudioStr, streamName);
					//This breaks the audio and, bonus feature, doesn't add the video.
					sms->addSubsession(UDPServerMediaSubsession::createNew(*env, inputAddressStr, inputPortNum, intr_addr, isTransportingStream));
				}
				else {
					//This works fine by itself and plays the video stream with no audio.
					sms = ServerMediaSession::createNew(*env, streamName, streamName, descriptionString);
					sms->addSubsession(UDPServerMediaSubsession::createNew(*env, inputAddressStr, inputPortNum, intr_addr, isTransportingStream));
				}

				*env << "\n UDP Transport Stream input source \n\t(";
				*env << "IP multicast address " << inputAddressStr << ",";
				*env << " port " << inputPortNum;
#pragma warning(suppress: 4018)
				if (inputAudioUrls.size() >= i && !inputAudioUrls.at(i - 1).empty()) {
					*env << ", Audio ";
					*env << inputAudioUrls.at(i - 1).c_str();
				}
				if (intr_addr != NULL && intr_addr[0] != '\0') {
					*env << ", Listening on interface " << intr_addr << ")\n";
				}
				else {
					*env << ")\n";
				}
			}

			// Add the session to our media server
			rtspServer->addServerMediaSession(sms);
			announceURL(rtspServer, sms);
			i++;
		}


		// Also, attempt to create a HTTP server for RTSP-over-HTTP tunneling.
		// Try first with the default HTTP port (80), and then with the alternative HTTP
		// port numbers (8000 and 8080).

		if (rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080)) {
			*env << "\n(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling.)\n";
		}
		else {
			*env << "\n(RTSP-over-HTTP tunneling is not available.)\n";
		}


		env->taskScheduler().doEventLoop(); // does not return

	}
	catch (TCLAP::ArgException &e)  // catch exceptions
	{
		std::cerr << "error: " << e.error() << " for arg " << e.argId() << std::endl;
	}

	return 0; // only to prevent compiler warning
}

static void announceURL(RTSPServer* rtspServer, ServerMediaSession* sms) {
	UsageEnvironment& env = rtspServer->envir();

	env << " Play this stream using the URL ";
	if (weHaveAnIPv4Address(env)) {
		char* url = rtspServer->ipv4rtspURL(sms);
		env << "\"" << url << "\"";
		delete[] url;
		if (weHaveAnIPv6Address(env)) env << " or ";
	}
	if (weHaveAnIPv6Address(env)) {
		char* url = rtspServer->ipv6rtspURL(sms);
		env << "\"" << url << "\"";
		delete[] url;
	}
	env << "\n";
}
-------------- next part --------------
#include "UDPServerMediaSubsession.h"
#include "BasicUDPSource.hh"
#include "SimpleRTPSource.hh"
#include "MPEG2TransportStreamFramer.hh"
#include "SimpleRTPSink.hh"
#include "GroupsockHelper.hh"

//	Media Subsession for receiving a live MPEGTS multicast stream 

UDPServerMediaSubsession* UDPServerMediaSubsession::createNew(UsageEnvironment& env,
	char const* inputAddressStr, Port const& inputPort, char const* interfaceAddressStr, bool isTransported) {
	return new UDPServerMediaSubsession(env, inputAddressStr, inputPort, interfaceAddressStr, isTransported);
}

UDPServerMediaSubsession::UDPServerMediaSubsession(UsageEnvironment& env,
	char const* inputAddressStr, Port const& inputPort, char const* interfaceAddressStr, bool isTransported)
	: OnDemandMediaSubsession(env, True/*reuseFirstSource*/),
	fInputPort(inputPort), fInputGroupsock(NULL) {
	fInputAddressStr = strDup(inputAddressStr);
	fInterfaceAddressStr = strDup(interfaceAddressStr);
	isStreamTransported = isTransported;
}

UDPServerMediaSubsession::~UDPServerMediaSubsession() {
	delete fInputGroupsock;
	delete[](char*)fInputAddressStr;
	delete[](char*)fInterfaceAddressStr;
}

FramedSource* UDPServerMediaSubsession::createNewStreamSource(unsigned/* clientSessionId*/, unsigned& estBitrate) {
	estBitrate = 5000; // kbps, estimate
	// Clean up old socket
	if (fInputGroupsock != NULL) {
		delete fInputGroupsock;
	}

	struct sockaddr_storage inputAddress;
	if (fInputAddressStr == NULL) {
		inputAddress = nullAddress();
	}
	else {
		NetAddressList inputAddresses(fInputAddressStr);
		if (inputAddresses.numAddresses() == 0) return NULL;
		copyAddress(inputAddress, inputAddresses.firstAddress());
	}

	if (fInterfaceAddressStr != NULL && fInterfaceAddressStr[0] != '\0') {
		// Set receiving address to the one defined for this subsession, 
		// this gets used in the Groupsock class when creating the socket
		ReceivingInterfaceAddr = inet_addr(fInterfaceAddressStr);
	}

	// Create group sock with appropriate ReceivingInterfaceAddr
	fInputGroupsock = new Groupsock(envir(), inputAddress, fInputPort, 255);

	// Set it back to the default
	ReceivingInterfaceAddr = INADDR_ANY;

	// Create the MPEGTS stream framer
	if (isStreamTransported)
	{
		return MPEG2TransportStreamFramer::createNew(envir(), BasicUDPSource::createNew(envir(), fInputGroupsock));
	}
	else
	{
		return BasicUDPSource::createNew(envir(), fInputGroupsock);
	}
}

RTPSink* UDPServerMediaSubsession::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char /*rtpPayloadTypeIfDynamic*/, FramedSource* /*inputSource*/) {
	return SimpleRTPSink::createNew(envir(), rtpGroupsock,
		33, 90000, "video", "MP2T",
		1, True, False /*no 'M' bit*/);
}
-------------- next part --------------
#pragma once

#ifndef _ON_DEMAND_SERVER_MEDIA_SUBSESSION_HH
#include "OnDemandServerMediaSubsession.hh"
#endif // !_ON_DEMAND_SERVER_MEDIA_SUBSESSION_HH


#ifndef _SERVER_MEDIA_SESSION_HH
#include "ServerMediaSession.hh"
#endif
#ifndef _RTP_SINK_HH
#include "RTPSink.hh"
#endif
#ifndef _BASIC_UDP_SINK_HH
#include "BasicUDPSink.hh"
#endif
#ifndef _RTCP_HH
#include "RTCP.hh"
#endif

class OnDemandMediaSubsession : public ServerMediaSubsession {
protected: // we're a virtual base class
	OnDemandMediaSubsession(UsageEnvironment& env, Boolean reuseFirstSource,
		portNumBits initialPortNum = 6970,
		Boolean multiplexRTCPWithRTP = False);
	virtual ~OnDemandMediaSubsession();

protected: // redefined virtual functions
	virtual char const* sdpLines(int addressFamily);
	virtual void getStreamParameters(unsigned clientSessionId,
		struct sockaddr_storage const& clientAddress,
		Port const& clientRTPPort,
		Port const& clientRTCPPort,
		int tcpSocketNum,
		unsigned char rtpChannelId,
		unsigned char rtcpChannelId,
		struct sockaddr_storage& destinationAddress,
		u_int8_t& destinationTTL,
		Boolean& isMulticast,
		Port& serverRTPPort,
		Port& serverRTCPPort,
		void*& streamToken);
	virtual void startStream(unsigned clientSessionId, void* streamToken,
		TaskFunc* rtcpRRHandler,
		void* rtcpRRHandlerClientData,
		unsigned short& rtpSeqNum,
		unsigned& rtpTimestamp,
		ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
		void* serverRequestAlternativeByteHandlerClientData);
	virtual void pauseStream(unsigned clientSessionId, void* streamToken);
	virtual void seekStream(unsigned clientSessionId, void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes);
	virtual void seekStream(unsigned clientSessionId, void* streamToken, char*& absStart, char*& absEnd);
	virtual void nullSeekStream(unsigned clientSessionId, void* streamToken,
		double streamEndTime, u_int64_t& numBytes);
	virtual void setStreamScale(unsigned clientSessionId, void* streamToken, float scale);
	virtual float getCurrentNPT(void* streamToken);
	virtual FramedSource* getStreamSource(void* streamToken);
	virtual void getRTPSinkandRTCP(void* streamToken,
		RTPSink const*& rtpSink, RTCPInstance const*& rtcp);
	virtual void deleteStream(unsigned clientSessionId, void*& streamToken);

protected: // new virtual functions, possibly redefined by subclasses
	virtual char const* getAuxSDPLine(RTPSink* rtpSink,
		FramedSource* inputSource);
	virtual void seekStreamSource(FramedSource* inputSource, double& seekNPT, double streamDuration, u_int64_t& numBytes);
	// This routine is used to seek by relative (i.e., NPT) time.
	// "streamDuration", if >0.0, specifies how much data to stream, past "seekNPT".  (If <=0.0, all remaining data is streamed.)
	// "numBytes" returns the size (in bytes) of the data to be streamed, or 0 if unknown or unlimited.
	virtual void seekStreamSource(FramedSource* inputSource, char*& absStart, char*& absEnd);
	// This routine is used to seek by 'absolute' time.
	// "absStart" should be a string of the form "YYYYMMDDTHHMMSSZ" or "YYYYMMDDTHHMMSS.<frac>Z".
	// "absEnd" should be either NULL (for no end time), or a string of the same form as "absStart".
	// These strings may be modified in-place, or can be reassigned to a newly-allocated value (after delete[]ing the original).
	virtual void setStreamSourceScale(FramedSource* inputSource, float scale);
	virtual void setStreamSourceDuration(FramedSource* inputSource, double streamDuration, u_int64_t& numBytes);
	virtual void closeStreamSource(FramedSource* inputSource);

protected: // new virtual functions, defined by all subclasses
	virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
		unsigned& estBitrate) = 0;
	// "estBitrate" is the stream's estimated bitrate, in kbps
	virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
		unsigned char rtpPayloadTypeIfDynamic,
		FramedSource* inputSource) = 0;

protected: // new virtual functions, may be redefined by a subclass:
	virtual Groupsock* createGroupsock(struct sockaddr_storage const& addr, Port port);
	virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
		unsigned char const* cname, RTPSink* sink);

public:
	void multiplexRTCPWithRTP() { fMultiplexRTCPWithRTP = True; }
	// An alternative to passing the "multiplexRTCPWithRTP" parameter as True in the constructor

	void setRTCPAppPacketHandler(RTCPAppHandlerFunc* handler, void* clientData);
	// Sets a handler to be called if a RTCP "APP" packet arrives from any future client.
	// (Any current clients are not affected; any "APP" packets from them will continue to be
	// handled by whatever handler existed when the client sent its first RTSP "PLAY" command.)
	// (Call with (NULL, NULL) to remove an existing handler - for future clients only)

	void sendRTCPAppPacket(u_int8_t subtype, char const* name,
		u_int8_t* appDependentData, unsigned appDependentDataSize);
	// Sends a custom RTCP "APP" packet to the most recent client (if "reuseFirstSource" was False),
	// or to all current clients (if "reuseFirstSource" was True).
	// The parameters correspond to their
	// respective fields as described in the RTP/RTCP definition (RFC 3550).
	// Note that only the low-order 5 bits of "subtype" are used, and only the first 4 bytes
	// of "name" are used.  (If "name" has fewer than 4 bytes, or is NULL,
	// then the remaining bytes are '\0'.)

protected:
	void setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource,
		unsigned estBitrate);
	// used to implement "sdpLines()"

protected:
	char* fSDPLines;
	HashTable* fDestinationsHashTable; // indexed by client session id

private:
	Boolean fReuseFirstSource;
	portNumBits fInitialPortNum;
	Boolean fMultiplexRTCPWithRTP;
	void* fLastStreamToken;
	char fCNAME[100]; // for RTCP
	RTCPAppHandlerFunc* fAppHandlerTask;
	void* fAppHandlerClientData;
	friend class OnDemandStreamState;
};


class OnDemandStreamState {
public:
	OnDemandStreamState(OnDemandMediaSubsession& master,
		Port const& serverRTPPort, Port const& serverRTCPPort,
		RTPSink* rtpSink, BasicUDPSink* udpSink,
		unsigned totalBW, FramedSource* mediaSource,
		Groupsock* rtpGS, Groupsock* rtcpGS);
	virtual ~OnDemandStreamState();

	void startPlaying(Destinations* destinations, unsigned clientSessionId,
		TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
		ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
		void* serverRequestAlternativeByteHandlerClientData);
	void pause();
	void sendRTCPAppPacket(u_int8_t subtype, char const* name,
		u_int8_t* appDependentData, unsigned appDependentDataSize);
	void endPlaying(Destinations* destinations, unsigned clientSessionId);
	void reclaim();

	unsigned& referenceCount() { return fReferenceCount; }

	Port const& serverRTPPort() const { return fServerRTPPort; }
	Port const& serverRTCPPort() const { return fServerRTCPPort; }

	RTPSink* rtpSink() const { return fRTPSink; }
	RTCPInstance* rtcpInstance() const { return fRTCPInstance; }

	float streamDuration() const { return fStreamDuration; }

	FramedSource* mediaSource() const { return fMediaSource; }
	float& startNPT() { return fStartNPT; }

private:
	OnDemandMediaSubsession& fMaster;
	Boolean fAreCurrentlyPlaying;
	unsigned fReferenceCount;

	Port fServerRTPPort, fServerRTCPPort;

	RTPSink* fRTPSink;
	BasicUDPSink* fUDPSink;

	float fStreamDuration;
	unsigned fTotalBW;
	RTCPInstance* fRTCPInstance;

	FramedSource* fMediaSource;
	float fStartNPT; // initial 'normal play time'; reset after each seek

	Groupsock* fRTPgs;
	Groupsock* fRTCPgs;
};
-------------- next part --------------
#pragma once

#include "OnDemandMediaSubsession.h"

class UDPServerMediaSubsession : public OnDemandMediaSubsession {
public:
	static UDPServerMediaSubsession*
		createNew(UsageEnvironment& env,
			char const* inputAddressStr, // An IP multicast address, or use "0.0.0.0" or NULL for unicast input
			Port const& inputPort,
			char const* interfaceAddressStr = NULL,
			bool isTransported = true);
protected:
	UDPServerMediaSubsession(UsageEnvironment& env,
		char const* inputAddressStr, Port const& inputPort, char const* interfaceAddressStr, bool isTransported = true);
	// called only by createNew();
	virtual ~UDPServerMediaSubsession();

protected: // redefined virtual functions
	virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
		unsigned& estBitrate);
	virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
		unsigned char rtpPayloadTypeIfDynamic,
		FramedSource* inputSource);
protected:
	char const* fInputAddressStr;
	char const* fInterfaceAddressStr;
	Port fInputPort;
	Groupsock* fInputGroupsock;
	bool isStreamTransported;
};


More information about the live-devel mailing list