/********** This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. (See .) This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA **********/ // "liveMedia" // Copyright (c) 1996-2007 Live Networks, Inc. All rights reserved. // Duplication of Streams // Implementation // Author: Bernhard Feiten, 11.8.2008, based on MPEG1or2Demux #include "StreamDup.h" ////////// StreamDupFrame /////////////////////////// // instantiated for each duplicated stream and called e.g. from the different RTPSink objects. class StreamDupFrame: public FramedSource { public: StreamDup& source() const { return fOurSource; } private: // We are created only by a StreamDup (a friend) StreamDupFrame( UsageEnvironment& env, StreamDup& source, unsigned int streamId); virtual ~StreamDupFrame(); private: // redefined virtual functions: virtual void doGetNextFrame(); // virtual void doStopGettingFrames(); virtual char const* MIMEtype() const; virtual unsigned maxFrameSize() const; private: static void afterGettingFrame( void* clientData, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds); void afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds); private: StreamDup& fOurSource; unsigned int fStreamId; unsigned long fFrameCount; char const* fMIMEtype; friend class StreamDup; }; ///////// StreamDupFrame implementation /////// StreamDupFrame::StreamDupFrame( UsageEnvironment& env, StreamDup& source, unsigned int streamId) : FramedSource(env), fOurSource(source), fStreamId(streamId), fFrameCount(0) { // Set our MIME type string for known media types: fMIMEtype = MediaSource::MIMEtype(); } StreamDupFrame::~StreamDupFrame() { fOurSource.unregisterDuplicateStream(fStreamId); } void StreamDupFrame::doGetNextFrame() { // Arrange to read data directly into the client's buffer: fFrameCount++; fOurSource.getNextFrame( fStreamId, fFrameCount, fTo, fMaxSize, afterGettingFrame, this, handleClosure, this); } //void StreamDupFrame::doStopGettingFrames() //{ // fOurSource.stopGettingFrames(); //} char const* StreamDupFrame::MIMEtype() const { return fMIMEtype; } unsigned StreamDupFrame::maxFrameSize() const { return 65535; } void StreamDupFrame::afterGettingFrame( void* clientData, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { StreamDupFrame* stream = (StreamDupFrame*)clientData; stream->afterGettingFrame1( frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); } void StreamDupFrame::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { fFrameSize = frameSize; fPresentationTime = presentationTime; fDurationInMicroseconds = durationInMicroseconds; FramedSource::afterGetting(this); } //////////////// end StreamDupFrame implementation ////////////////// //////////////// StreamDup implementation ///////////////////////////// StreamDup* StreamDup::createNew( UsageEnvironment& env, FramedSource* inputSource, Boolean reclaimWhenLastReferenceDies) { // Need to add source type checking here??? ##### return new StreamDup(env, inputSource, reclaimWhenLastReferenceDies); } StreamDup::StreamDup( UsageEnvironment& env, FramedSource* inputSource, Boolean reclaimWhenLastReferenceDies) : Medium(env), fInputSource(inputSource), fReclaimWhenLastReferenceDies(reclaimWhenLastReferenceDies), fNumOfStreamDups(0), fNumPendingReads(0), fSourceFrameCount(0), fIsCurrentlyAwaitingData(false) { for (unsigned i = 0; i < 256; ++i) { fOutput[i].isPotentiallyReadable = False; fOutput[i].isCurrentlyActive = False; fOutput[i].isCurrentlyAwaitingData = False; } } StreamDup::~StreamDup() { Medium::close(fInputSource); } FramedSource* StreamDup::registerDuplicateStream() { fNumOfStreamDups++ ; return new StreamDupFrame(envir(), *this, fNumOfStreamDups-1); // Where is it deleted ? } void StreamDup::unregisterDuplicateStream(unsigned int streamId) { if (fNumOfStreamDups> 0) fNumOfStreamDups --; fOutput[streamId].isCurrentlyActive = False; fOutput[streamId].isCurrentlyAwaitingData = False; } void StreamDup::getNextFrame( unsigned int streamId, unsigned long frameCount, unsigned char* to, unsigned maxSize, StreamDup::afterGettingFunc* afterGettingFunc, void* afterGettingClientData, StreamDup::onCloseFunc* onCloseFunc, void* onCloseClientData) { // register the request in the out stream array // envir() << "FramedSource[" << this << "]::getNextFrame() streamId: " << streamId << "\n"; fOutput[streamId].to = to; fOutput[streamId].maxSize = maxSize; fOutput[streamId].fAfterGettingFunc = afterGettingFunc; fOutput[streamId].afterGettingClientData = afterGettingClientData; fOutput[streamId].fOnCloseFunc = onCloseFunc; fOutput[streamId].onCloseClientData = onCloseClientData; fOutput[streamId].isCurrentlyActive = True; fOutput[streamId].isCurrentlyAwaitingData = True; if (fIsCurrentlyAwaitingData) return; // check if a new frame from the source should be fetched if (frameCount > fSourceFrameCount) doGetNextFrame(); else { memmove(fOutput[streamId].to, fTo, fFrameSize); // memmove(fOutput[streamId].afterGettingClientData,this,sizeof(this)); fOutput[streamId].isCurrentlyAwaitingData = false; (*fOutput[streamId].fAfterGettingFunc) (fOutput[streamId].afterGettingClientData, fFrameSize, fNumTruncatedBytes, fPresentationTime, fDurationInMicroseconds); } return; } void StreamDup::doGetNextFrame( ) { if ( fIsCurrentlyAwaitingData ) return; fIsCurrentlyAwaitingData = true; fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, handleClosure, this); } void StreamDup::afterGettingFrame( void* clientdata, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds ) { StreamDup * stdp; if((stdp = (StreamDup*)clientdata) == NULL ) return; stdp->afterGettingFrame1(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); } void StreamDup::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds ) { fSourceFrameCount ++; fFrameSize = frameSize; fNumTruncatedBytes = numTruncatedBytes; fPresentationTime = presentationTime; fDurationInMicroseconds = durationInMicroseconds; fIsCurrentlyAwaitingData = false; for (int i=0;i<256;i++){ if (fOutput[i].isCurrentlyAwaitingData) { memmove(fOutput[i].to, fTo, fFrameSize); // memmove(fOutput[i].afterGettingClientData,this,sizeof(this)); fOutput[i].isCurrentlyAwaitingData = false; (*fOutput[i].fAfterGettingFunc) (fOutput[i].afterGettingClientData, fFrameSize, 0 /* numTruncatedBytes */, fPresentationTime, fDurationInMicroseconds); } } } void StreamDup::handleClosure(void* clientData) { FramedSource* source = (FramedSource*)clientData; // source->fIsCurrentlyAwaitingData = False; // because we got a close instead // if (source->fOnCloseFunc != NULL) // (*(source->fOnCloseFunc))(source->fOnCloseClientData); }