diff --git a/lib/upipe-av/upipe_av.c b/lib/upipe-av/upipe_av.c index a03bf2019..3afd61eb3 100644 --- a/lib/upipe-av/upipe_av.c +++ b/lib/upipe-av/upipe_av.c @@ -10,7 +10,6 @@ * @short common functions for libav wrappers */ -#include "upipe/udeal.h" #include "upipe/uprobe.h" #include "upipe/upipe.h" #include "upipe/uref_pic.h" @@ -25,8 +24,6 @@ #include -/** structure to protect exclusive access to avcodec_open() */ -struct udeal upipe_av_deal; /** @internal true if only avcodec was initialized */ static bool avcodec_only = false; /** @internal probe used by upipe_av_vlog, defined in upipe_av_init() */ @@ -72,11 +69,6 @@ bool upipe_av_init(bool init_avcodec_only, struct uprobe *uprobe) { avcodec_only = init_avcodec_only; - if (unlikely(!udeal_init(&upipe_av_deal))) { - uprobe_release(uprobe); - return false; - } - if (unlikely(avcodec_only)) { #if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58, 10, 100) avcodec_register_all(); @@ -102,7 +94,6 @@ void upipe_av_clean(void) { if (likely(!avcodec_only)) avformat_network_deinit(); - udeal_clean(&upipe_av_deal); if (logprobe) uprobe_release(logprobe); } diff --git a/lib/upipe-av/upipe_av_codecs.c b/lib/upipe-av/upipe_av_codecs.c index 139c05460..915f63d5b 100644 --- a/lib/upipe-av/upipe_av_codecs.c +++ b/lib/upipe-av/upipe_av_codecs.c @@ -12,6 +12,7 @@ */ #include +#include "upipe/ubase.h" #include "upipe_av_internal.h" /** @This allows to convert from avcodec ID to flow definition codec. diff --git a/lib/upipe-av/upipe_av_internal.h b/lib/upipe-av/upipe_av_internal.h index dd1e2680d..300748994 100644 --- a/lib/upipe-av/upipe_av_internal.h +++ b/lib/upipe-av/upipe_av_internal.h @@ -14,9 +14,6 @@ /** @hidden */ #define _UPIPE_AV_INTERNAL_H_ -#include "upipe/udeal.h" -#include "upipe/upump.h" - #include #include @@ -38,63 +35,6 @@ enum CodecID; #define AV_CODEC_ID_FIRST_AUDIO CODEC_ID_FIRST_AUDIO #endif -/** structure to protect exclusive access to avcodec_open() */ -extern struct udeal upipe_av_deal; - -/** @This allocates a watcher triggering when exclusive access to avcodec_open() - * is granted. - * - * @param upump_mgr management structure for this event loop - * @param cb function to call when the watcher triggers - * @param opaque pointer to the module's internal structure - * @param refcount pointer to urefcount structure to increment during callback, - * or NULL - * @return pointer to allocated watcher, or NULL in case of failure - */ -static inline struct upump *upipe_av_deal_upump_alloc( - struct upump_mgr *upump_mgr, upump_cb cb, void *opaque, - struct urefcount *refcount) -{ - return udeal_upump_alloc(&upipe_av_deal, upump_mgr, cb, opaque, refcount); -} - -/** @This starts the watcher on exclusive access to avcodec_open(). - * - * @param upump watcher allocated by @ref udeal_upump_alloc - */ -static inline void upipe_av_deal_start(struct upump *upump) -{ - udeal_start(&upipe_av_deal, upump); -} - -/** @This tries to grab the exclusive access to avcodec_open(). - */ -static inline bool upipe_av_deal_grab(void) -{ - return udeal_grab(&upipe_av_deal); -} - -/** @This yields exclusive access to avcodec_open() previously acquired from - * @ref upipe_av_deal_grab. - * - * @param upump watcher allocated by @ref udeal_upump_alloc - */ -static inline void upipe_av_deal_yield(struct upump *upump) -{ - udeal_yield(&upipe_av_deal, upump); -} - -/** @This aborts the watcher before it has had a chance to run. It must only - * be called in case of abort, otherwise @ref upipe_av_deal_yield does the - * same job. - * - * @param upump watcher allocated by @ref udeal_upump_alloc - */ -static inline void upipe_av_deal_abort(struct upump *upump) -{ - udeal_abort(&upipe_av_deal, upump); -} - /** @This allows to convert from avcodec ID to flow definition codec. * * @param id avcodec ID diff --git a/lib/upipe-av/upipe_avcodec_decode.c b/lib/upipe-av/upipe_avcodec_decode.c index 5fcf748dc..ec75666d2 100644 --- a/lib/upipe-av/upipe_avcodec_decode.c +++ b/lib/upipe-av/upipe_avcodec_decode.c @@ -33,9 +33,6 @@ #include "upipe/upipe_helper_output.h" #include "upipe/upipe_helper_flow_def.h" #include "upipe/upipe_helper_flow_def_check.h" -#include "upipe/upipe_helper_upump_mgr.h" -#include "upipe/upipe_helper_upump.h" -#include "upipe/upipe_helper_input.h" #include "upipe-av/upipe_avcodec_decode.h" #include "upipe-av/ubuf_av.h" #include "upipe-framers/uref_h26x.h" @@ -107,8 +104,6 @@ struct upipe_avcdec { /** ubuf manager request */ struct urequest ubuf_mgr_request; - /** upump mgr */ - struct upump_mgr *upump_mgr; /** pixel format used for the ubuf manager */ enum AVPixelFormat pix_fmt; /** sample format used for the ubuf manager */ @@ -116,17 +111,6 @@ struct upipe_avcdec { /** number of channels used for the ubuf manager */ unsigned int channels; - /** avcodec_open watcher */ - struct upump *upump_av_deal; - /** temporary uref storage (used during udeal) */ - struct uchain urefs; - /** nb urefs in storage */ - unsigned int nb_urefs; - /** max urefs in storage */ - unsigned int max_urefs; - /** list of blockers (used during udeal) */ - struct uchain blockers; - /** frame counter */ uint64_t counter; /** rap offset */ @@ -164,15 +148,13 @@ struct upipe_avcdec { AVFrame *frame; /** avcodec packet */ AVPacket *avpkt; - /** true if the context will be closed */ - bool close; /** public upipe structure */ struct upipe upipe; }; UPIPE_HELPER_UPIPE(upipe_avcdec, upipe, UPIPE_AVCDEC_SIGNATURE); -UPIPE_HELPER_UREFCOUNT(upipe_avcdec, urefcount, upipe_avcdec_close) +UPIPE_HELPER_UREFCOUNT(upipe_avcdec, urefcount, upipe_avcdec_free) UPIPE_HELPER_VOID(upipe_avcdec) UPIPE_HELPER_OUTPUT(upipe_avcdec, output, flow_def, output_state, request_list) UPIPE_HELPER_FLOW_DEF(upipe_avcdec, flow_def_input, flow_def_attr) @@ -182,12 +164,6 @@ UPIPE_HELPER_UBUF_MGR(upipe_avcdec, ubuf_mgr, flow_format, ubuf_mgr_request, upipe_avcdec_check, upipe_avcdec_register_output_request, upipe_avcdec_unregister_output_request) -UPIPE_HELPER_UPUMP_MGR(upipe_avcdec, upump_mgr) -UPIPE_HELPER_UPUMP(upipe_avcdec, upump_av_deal, upump_mgr) -UPIPE_HELPER_INPUT(upipe_avcdec, urefs, nb_urefs, max_urefs, blockers, upipe_avcdec_decode) - -/** @hidden */ -static void upipe_avcdec_free(struct upipe *upipe); /** @internal @This provides a ubuf_mgr request. * @@ -712,21 +688,6 @@ static int upipe_avcdec_get_buffer_sound(struct AVCodecContext *context, return 0; /* success */ } -/** @This aborts and frees an existing upump watching for exclusive access to - * avcodec_open(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcdec_abort_av_deal(struct upipe *upipe) -{ - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - if (unlikely(upipe_avcdec->upump_av_deal != NULL)) { - upipe_av_deal_abort(upipe_avcdec->upump_av_deal); - upump_free(upipe_avcdec->upump_av_deal); - upipe_avcdec->upump_av_deal = NULL; - } -} - static enum AVPixelFormat upipe_avcodec_get_format(AVCodecContext *context, const enum AVPixelFormat *pix_fmts) { @@ -743,27 +704,21 @@ static enum AVPixelFormat upipe_avcodec_get_format(AVCodecContext *context, return AV_PIX_FMT_NONE; } -/** @internal @This actually calls avcodec_open(). It may only be called by - * one thread at a time. +/** @internal @This calls avcodec_open(). * * @param upipe description structure of the pipe - * @return false if the buffers mustn't be dequeued + * @return an error code */ -static bool upipe_avcdec_do_av_deal(struct upipe *upipe) +static int upipe_avcdec_open(struct upipe *upipe) { - assert(upipe); struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); AVCodecContext *context = upipe_avcdec->context; - if (upipe_avcdec->close) { - upipe_notice_va(upipe, "codec %s (%s) %d closed", context->codec->name, - context->codec->long_name, context->codec->id); + if (!context) + return UBASE_ERR_INVALID; -#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(60, 40, 100) - avcodec_close(context); -#endif - return false; - } + if (avcodec_is_open(context)) + return UBASE_ERR_NONE; switch (context->codec->type) { case AVMEDIA_TYPE_SUBTITLE: @@ -785,7 +740,7 @@ static bool upipe_avcdec_do_av_deal(struct upipe *upipe) /* This should not happen */ upipe_err_va(upipe, "Unsupported media type (%d)", context->codec->type); - return false; + return UBASE_ERR_UNHANDLED; } /* open hardware decoder */ @@ -797,8 +752,7 @@ static bool upipe_avcdec_do_av_deal(struct upipe *upipe) NULL, 0)) < 0)) { upipe_warn_va(upipe, "could not create hw device context (%s)", av_err2str(err)); - upipe_throw_fatal(upipe, UBASE_ERR_EXTERNAL); - return false; + return UBASE_ERR_EXTERNAL; } upipe_notice_va(upipe, "created %s hw device context (%s)", av_hwdevice_get_type_name(upipe_avcdec->hw_device_type), @@ -812,121 +766,11 @@ static bool upipe_avcdec_do_av_deal(struct upipe *upipe) /* open new context */ if (unlikely((err = avcodec_open2(context, context->codec, NULL)) < 0)) { upipe_warn_va(upipe, "could not open codec (%s)", av_err2str(err)); - upipe_throw_fatal(upipe, UBASE_ERR_EXTERNAL); - return false; + return UBASE_ERR_EXTERNAL; } upipe_notice_va(upipe, "codec %s (%s) %d opened", context->codec->name, context->codec->long_name, context->codec->id); - - return true; -} - -/** @internal @This is called to try an exclusive access on avcodec_open() or - * avcodec_close(). - * - * @param upump description structure of the pump - */ -static void upipe_avcdec_cb_av_deal(struct upump *upump) -{ - assert(upump); - struct upipe *upipe = upump_get_opaque(upump, struct upipe *); - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - - /* check udeal */ - if (unlikely(!upipe_av_deal_grab())) - return; - - /* real open_codec function */ - bool ret = upipe_avcdec_do_av_deal(upipe); - - /* clean dealer */ - upipe_av_deal_yield(upump); - upump_free(upipe_avcdec->upump_av_deal); - upipe_avcdec->upump_av_deal = NULL; - - if (upipe_avcdec->close) { - upipe_avcdec_free(upipe); - return; - } - - if (ret) - upipe_avcdec_output_input(upipe); - else - upipe_avcdec_flush_input(upipe); - upipe_avcdec_unblock_input(upipe); - /* All packets have been output, release again the pipe that has been - * used in @ref upipe_avcdec_start_av_deal. */ - upipe_release(upipe); -} - -/** @internal @This is called to trigger avcodec_open() or avcodec_close(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcdec_start_av_deal(struct upipe *upipe) -{ - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - /* abort a pending open request */ - upipe_avcdec_abort_av_deal(upipe); - - /* use udeal/upump callback if available */ - upipe_avcdec_check_upump_mgr(upipe); - if (upipe_avcdec->upump_mgr == NULL) { - upipe_dbg(upipe, "no upump_mgr present, direct call to avcodec_open"); - upipe_avcdec_do_av_deal(upipe); - if (upipe_avcdec->close) - upipe_avcdec_free(upipe); - return; - } - - upipe_dbg(upipe, "upump_mgr present, using udeal"); - struct upump *upump_av_deal = - upipe_av_deal_upump_alloc(upipe_avcdec->upump_mgr, - upipe_avcdec_cb_av_deal, upipe, upipe->refcount); - if (unlikely(!upump_av_deal)) { - upipe_err(upipe, "can't create dealer"); - upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); - return; - } - upipe_avcdec->upump_av_deal = upump_av_deal; - /* Increment upipe refcount to avoid disappearing before all packets - * have been sent. */ - upipe_use(upipe); - upipe_av_deal_start(upump_av_deal); -} - -/** @internal @This is called to trigger avcodec_open(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcdec_open(struct upipe *upipe) -{ - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - upipe_avcdec->close = false; - upipe_avcdec_start_av_deal(upipe); -} - -/** @internal @This is called to trigger avcodec_close(). - * - * We close the context even if it was not opened because it supposedly - * "frees allocated structures". - * - * @param upipe description structure of the pipe - */ -static void upipe_avcdec_close(struct upipe *upipe) -{ - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - if (upipe_avcdec->context == NULL) { - upipe_avcdec_free(upipe); - return; - } - - if (upipe_avcdec->context->codec->capabilities & AV_CODEC_CAP_DELAY) { - /* Feed avcodec with NULL packet to output the remaining frames */ - upipe_avcdec_decode_avpkt(upipe, NULL, NULL); - } - upipe_avcdec->close = true; - upipe_avcdec_start_av_deal(upipe); + return UBASE_ERR_NONE; } /** @internal @This sets the various time attributes. @@ -1666,19 +1510,10 @@ static bool upipe_avcdec_decode(struct upipe *upipe, struct uref *uref, static void upipe_avcdec_input(struct upipe *upipe, struct uref *uref, struct upump **upump_p) { - struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); - - while (unlikely(!avcodec_is_open(upipe_avcdec->context))) { - if (upipe_avcdec->upump_av_deal != NULL) { - upipe_avcdec_hold_input(upipe, uref); - upipe_avcdec_block_input(upipe, upump_p); - return; - } - - upipe_avcdec_open(upipe); - } - - upipe_avcdec_decode(upipe, uref, upump_p); + if (unlikely(!ubase_check(upipe_avcdec_open(upipe)))) + uref_free(uref); + else + upipe_avcdec_decode(upipe, uref, upump_p); } /** @internal @This looks for a decoder with suitable hw support. @@ -1923,10 +1758,6 @@ static int upipe_avcdec_control(struct upipe *upipe, int command, va_list args) return UBASE_ERR_NONE; return upipe_avcdec_free_output_proxy(upipe, request); } - case UPIPE_ATTACH_UPUMP_MGR: - upipe_avcdec_set_upump_av_deal(upipe, NULL); - upipe_avcdec_abort_av_deal(upipe); - return upipe_avcdec_attach_upump_mgr(upipe); case UPIPE_SET_FLOW_DEF: { struct uref *flow_def = va_arg(args, struct uref *); @@ -1964,6 +1795,32 @@ static int upipe_avcdec_control(struct upipe *upipe, int command, va_list args) } } +/** @internal @This calls avcodec_close(). + * + * We close the context even if it was not opened because it supposedly + * "frees allocated structures". + * + * @param upipe description structure of the pipe + */ +static void upipe_avcdec_close(struct upipe *upipe) +{ + struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); + AVCodecContext *context = upipe_avcdec->context; + + if (context) { + if (upipe_avcdec->context->codec->capabilities & AV_CODEC_CAP_DELAY) { + /* Feed avcodec with NULL packet to output the remaining frames */ + upipe_avcdec_decode_avpkt(upipe, NULL, NULL); + } + upipe_notice_va(upipe, "codec %s (%s) %d closed", context->codec->name, + context->codec->long_name, context->codec->id); + +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(60, 40, 100) + avcodec_close(context); +#endif + } +} + /** @This frees a upipe. * * @param upipe description structure of the pipe @@ -1972,6 +1829,8 @@ static void upipe_avcdec_free(struct upipe *upipe) { struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); + upipe_avcdec_close(upipe); + avcodec_free_context(&upipe_avcdec->context); av_frame_free(&upipe_avcdec->frame); av_packet_free(&upipe_avcdec->avpkt); @@ -1981,14 +1840,10 @@ static void upipe_avcdec_free(struct upipe *upipe) uref_free(upipe_avcdec->uref); uref_free(upipe_avcdec->flow_def_format); uref_free(upipe_avcdec->flow_def_provided); - upipe_avcdec_abort_av_deal(upipe); - upipe_avcdec_clean_input(upipe); upipe_avcdec_clean_output(upipe); upipe_avcdec_clean_flow_def(upipe); upipe_avcdec_clean_flow_def_check(upipe); upipe_avcdec_clean_ubuf_mgr(upipe); - upipe_avcdec_clean_upump_av_deal(upipe); - upipe_avcdec_clean_upump_mgr(upipe); upipe_avcdec_clean_urefcount(upipe); upipe_avcdec_free_void(upipe); } @@ -2023,12 +1878,9 @@ static struct upipe *upipe_avcdec_alloc(struct upipe_mgr *mgr, } upipe_avcdec_init_urefcount(upipe); upipe_avcdec_init_ubuf_mgr(upipe); - upipe_avcdec_init_upump_mgr(upipe); - upipe_avcdec_init_upump_av_deal(upipe); upipe_avcdec_init_output(upipe); upipe_avcdec_init_flow_def(upipe); upipe_avcdec_init_flow_def_check(upipe); - upipe_avcdec_init_input(upipe); struct upipe_avcdec *upipe_avcdec = upipe_avcdec_from_upipe(upipe); upipe_avcdec->hw_device_type = AV_HWDEVICE_TYPE_NONE; @@ -2038,7 +1890,6 @@ static struct upipe *upipe_avcdec_alloc(struct upipe_mgr *mgr, upipe_avcdec->frame = frame; upipe_avcdec->avpkt = avpkt; upipe_avcdec->counter = 0; - upipe_avcdec->close = false; upipe_avcdec->pix_fmt = AV_PIX_FMT_NONE; upipe_avcdec->sample_fmt = AV_SAMPLE_FMT_NONE; upipe_avcdec->channels = 0; diff --git a/lib/upipe-av/upipe_avcodec_encode.c b/lib/upipe-av/upipe_avcodec_encode.c index b9b98846b..0435624c8 100644 --- a/lib/upipe-av/upipe_avcodec_encode.c +++ b/lib/upipe-av/upipe_avcodec_encode.c @@ -34,8 +34,6 @@ #include "upipe/upipe_helper_flow_format.h" #include "upipe/upipe_helper_flow_def.h" #include "upipe/upipe_helper_flow_def_check.h" -#include "upipe/upipe_helper_upump_mgr.h" -#include "upipe/upipe_helper_upump.h" #include "upipe/upipe_helper_input.h" #include "upipe-av/upipe_avcodec_encode.h" #include "upipe-av/ubuf_av.h" @@ -130,18 +128,13 @@ struct upipe_avcenc { /** flow format request */ struct urequest flow_format_request; - /** upump mgr */ - struct upump_mgr *upump_mgr; - - /** avcodec_open watcher */ - struct upump *upump_av_deal; - /** temporary uref storage (used during udeal) */ + /** temporary uref storage */ struct uchain urefs; /** nb urefs in storage */ unsigned int nb_urefs; /** max urefs in storage */ unsigned int max_urefs; - /** list of blockers (used during udeal) */ + /** list of blockers */ struct uchain blockers; /** temporary uref storage (used for sound processing) */ @@ -184,10 +177,6 @@ struct upipe_avcenc { AVFrame *frame; /** avcodec packet */ AVPacket *avpkt; - /** true if the context will be closed */ - bool close; - /** true if the context will be reinitialized */ - bool reinit; /** true if the pipe need to be released after output_input */ bool release_needed; @@ -196,7 +185,7 @@ struct upipe_avcenc { }; UPIPE_HELPER_UPIPE(upipe_avcenc, upipe, UPIPE_AVCENC_SIGNATURE); -UPIPE_HELPER_UREFCOUNT(upipe_avcenc, urefcount, upipe_avcenc_close) +UPIPE_HELPER_UREFCOUNT(upipe_avcenc, urefcount, upipe_avcenc_free) UPIPE_HELPER_FLOW(upipe_avcenc, "block.") UPIPE_HELPER_OUTPUT(upipe_avcenc, output, flow_def, output_state, request_list) UPIPE_HELPER_INPUT(upipe_avcenc, urefs, nb_urefs, max_urefs, blockers, upipe_avcenc_handle) @@ -210,33 +199,13 @@ UPIPE_HELPER_UBUF_MGR(upipe_avcenc, ubuf_mgr, flow_format, ubuf_mgr_request, upipe_avcenc_check_ubuf_mgr, upipe_avcenc_register_output_request, upipe_avcenc_unregister_output_request) -UPIPE_HELPER_UPUMP_MGR(upipe_avcenc, upump_mgr) -UPIPE_HELPER_UPUMP(upipe_avcenc, upump_av_deal, upump_mgr) - -/** @hidden */ -static void upipe_avcenc_free(struct upipe *upipe); - -/** @This aborts and frees an existing upump watching for exclusive access to - * avcodec_open(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcenc_abort_av_deal(struct upipe *upipe) -{ - struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); - if (unlikely(upipe_avcenc->upump_av_deal != NULL)) { - upipe_av_deal_abort(upipe_avcenc->upump_av_deal); - upump_free(upipe_avcenc->upump_av_deal); - upipe_avcenc->upump_av_deal = NULL; - } -} /** @internal @This closes and reinitializes the avcodec context. * * @param upipe description structure of the pipe * @return an error code */ -static int upipe_avcenc_do_reinit(struct upipe *upipe) +static int upipe_avcenc_reinit(struct upipe *upipe) { struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); AVCodecContext *context = upipe_avcenc->context; @@ -299,143 +268,31 @@ static int upipe_avcenc_do_reinit(struct upipe *upipe) return UBASE_ERR_NONE; } -/** @internal @This actually calls avcodec_open(). It may only be called by - * one thread at a time. +/** @internal @This is called to trigger avcodec_open(). * * @param upipe description structure of the pipe - * @return false if the buffers mustn't be dequeued + * @return an error code */ -static bool upipe_avcenc_do_av_deal(struct upipe *upipe) +static int upipe_avcenc_open(struct upipe *upipe) { - assert(upipe); struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); AVCodecContext *context = upipe_avcenc->context; - if (upipe_avcenc->close) { - upipe_notice_va(upipe, "codec %s (%s) %d closed", context->codec->name, - context->codec->long_name, context->codec->id); -#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(60, 40, 100) - avcodec_close(context); -#endif - return false; - } + if (!context) + return UBASE_ERR_INVALID; - /* reinit context */ - if (upipe_avcenc->reinit) - return ubase_check(upipe_avcenc_do_reinit(upipe)); + if (avcodec_is_open(context)) + return UBASE_ERR_NONE; /* open new context */ int err; if (unlikely((err = avcodec_open2(context, context->codec, NULL)) < 0)) { upipe_warn_va(upipe, "could not open codec (%s)", av_err2str(err)); - upipe_throw_fatal(upipe, UBASE_ERR_EXTERNAL); - return false; + return UBASE_ERR_EXTERNAL; } upipe_notice_va(upipe, "codec %s (%s) %d opened", context->codec->name, context->codec->long_name, context->codec->id); - - return true; -} - -/** @internal @This is called to try an exclusive access on avcodec_open() or - * avcodec_close(). - * - * @param upump description structure of the pump - */ -static void upipe_avcenc_cb_av_deal(struct upump *upump) -{ - assert(upump); - struct upipe *upipe = upump_get_opaque(upump, struct upipe *); - struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); - - /* check udeal */ - if (unlikely(!upipe_av_deal_grab())) - return; - - /* real open_codec function */ - bool ret = upipe_avcenc_do_av_deal(upipe); - - /* clean dealer */ - upipe_av_deal_yield(upump); - upump_free(upipe_avcenc->upump_av_deal); - upipe_avcenc->upump_av_deal = NULL; - - if (upipe_avcenc->close) { - upipe_avcenc_free(upipe); - return; - } - - bool was_buffered = !upipe_avcenc_check_input(upipe); - if (ret) - upipe_avcenc_output_input(upipe); - else - upipe_avcenc_flush_input(upipe); - upipe_avcenc_unblock_input(upipe); - if (was_buffered && upipe_avcenc_check_input(upipe)) { - /* All packets have been output, release again the pipe that has been - * used in @ref upipe_avcenc_input. */ - if (upipe_avcenc->release_needed) { - upipe_release(upipe); - upipe_avcenc->release_needed = false; - } - } -} - -/** @internal @This is called to trigger avcodec_open() or avcodec_close(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcenc_start_av_deal(struct upipe *upipe) -{ - struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); - /* abort a pending open request */ - upipe_avcenc_abort_av_deal(upipe); - - /* use udeal/upump callback if available */ - upipe_avcenc_check_upump_mgr(upipe); - if (upipe_avcenc->upump_mgr == NULL) { - upipe_dbg(upipe, "no upump_mgr present, direct call to avcodec_open"); - upipe_avcenc_do_av_deal(upipe); - if (upipe_avcenc->close) - upipe_avcenc_free(upipe); - return; - } - - upipe_dbg(upipe, "upump_mgr present, using udeal"); - struct upump *upump_av_deal = - upipe_av_deal_upump_alloc(upipe_avcenc->upump_mgr, - upipe_avcenc_cb_av_deal, upipe, upipe->refcount); - if (unlikely(!upump_av_deal)) { - upipe_err(upipe, "can't create dealer"); - upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); - return; - } - upipe_avcenc->upump_av_deal = upump_av_deal; - upipe_av_deal_start(upump_av_deal); -} - -/** @internal @This is called to trigger avcodec_open(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avcenc_open(struct upipe *upipe) -{ - struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); - upipe_avcenc->close = false; - upipe_avcenc->reinit = false; - upipe_avcenc_start_av_deal(upipe); -} - -/** @internal @This is called to trigger context reinitialization. - * - * @param upipe description structure of the pipe - */ -static void upipe_avcenc_reinit(struct upipe *upipe) -{ - struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); - upipe_avcenc->close = false; - upipe_avcenc->reinit = true; - upipe_avcenc_start_av_deal(upipe); + return UBASE_ERR_NONE; } /** @internal @This is called to trigger avcodec_close(). @@ -449,10 +306,8 @@ static void upipe_avcenc_close(struct upipe *upipe) { struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); AVCodecContext *context = upipe_avcenc->context; - if ((context == NULL) || !avcodec_is_open(context)) { - upipe_avcenc_free(upipe); + if ((context == NULL) || !avcodec_is_open(context)) return; - } if (avcodec_is_open(context)) { if (!ulist_empty(&upipe_avcenc->sound_urefs)) @@ -464,8 +319,12 @@ static void upipe_avcenc_close(struct upipe *upipe) upipe_avcenc_encode_frame(upipe, NULL, NULL); } } - upipe_avcenc->close = true; - upipe_avcenc_start_av_deal(upipe); + + upipe_notice_va(upipe, "codec %s (%s) %d closed", context->codec->name, + context->codec->long_name, context->codec->id); +#if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(60, 40, 100) + avcodec_close(context); +#endif } /** @internal @This builds the flow definition packet. @@ -1147,11 +1006,9 @@ static bool upipe_avcenc_handle(struct upipe *upipe, struct uref *uref, av_frame_free(&frame); } - while (unlikely(!avcodec_is_open(upipe_avcenc->context))) { - if (upipe_avcenc->upump_av_deal != NULL) - return false; - - upipe_avcenc_open(upipe); + if (unlikely(!ubase_check(upipe_avcenc_open(upipe)))) { + uref_free(uref); + return true; } uref_clock_get_rate(uref, &upipe_avcenc->drift_rate); @@ -1945,10 +1802,6 @@ static int upipe_avcenc_control(struct upipe *upipe, return UBASE_ERR_NONE; return upipe_avcenc_free_output_proxy(upipe, request); } - case UPIPE_ATTACH_UPUMP_MGR: - upipe_avcenc_set_upump_av_deal(upipe, NULL); - upipe_avcenc_abort_av_deal(upipe); - return upipe_avcenc_attach_upump_mgr(upipe); case UPIPE_SET_FLOW_DEF: { struct uref *flow_def = va_arg(args, struct uref *); @@ -1983,6 +1836,7 @@ static void upipe_avcenc_free(struct upipe *upipe) { struct upipe_avcenc *upipe_avcenc = upipe_avcenc_from_upipe(upipe); + upipe_avcenc_close(upipe); avcodec_free_context(&upipe_avcenc->context); av_frame_free(&upipe_avcenc->frame); av_packet_free(&upipe_avcenc->avpkt); @@ -2000,11 +1854,8 @@ static void upipe_avcenc_free(struct upipe *upipe) upipe_throw_dead(upipe); uref_free(upipe_avcenc->flow_def_requested); uref_free(upipe_avcenc->options); - upipe_avcenc_abort_av_deal(upipe); upipe_avcenc_clean_input(upipe); upipe_avcenc_clean_ubuf_mgr(upipe); - upipe_avcenc_clean_upump_av_deal(upipe); - upipe_avcenc_clean_upump_mgr(upipe); upipe_avcenc_clean_output(upipe); upipe_avcenc_clean_flow_format(upipe); upipe_avcenc_clean_flow_def(upipe); @@ -2082,8 +1933,6 @@ static struct upipe *upipe_avcenc_alloc(struct upipe_mgr *mgr, upipe_avcenc_init_urefcount(upipe); upipe_avcenc_init_ubuf_mgr(upipe); - upipe_avcenc_init_upump_mgr(upipe); - upipe_avcenc_init_upump_av_deal(upipe); upipe_avcenc_init_output(upipe); upipe_avcenc_init_input(upipe); upipe_avcenc_init_flow_format(upipe); diff --git a/lib/upipe-av/upipe_avformat_source.c b/lib/upipe-av/upipe_avformat_source.c index bc89f9201..331a6018c 100644 --- a/lib/upipe-av/upipe_avformat_source.c +++ b/lib/upipe-av/upipe_avformat_source.c @@ -122,8 +122,6 @@ struct upipe_avfsrc { /** URL */ char *url; - /** avcodec initialization watcher */ - struct upump *upump_av_deal; /** avformat options */ AVDictionary *options; /** avformat context opened from URL */ @@ -474,32 +472,15 @@ static struct upipe *upipe_avfsrc_alloc(struct upipe_mgr *mgr, upipe_avfsrc->systime_rap = UINT64_MAX; upipe_avfsrc->cr_id = UINT64_MAX; upipe_avfsrc->last_cr_id = UINT64_MAX; - upipe_avfsrc->url = NULL; - - upipe_avfsrc->upump_av_deal = NULL; upipe_avfsrc->options = NULL; upipe_avfsrc->context = NULL; upipe_avfsrc->probed = false; + upipe_throw_ready(upipe); return upipe; } -/** @This aborts and frees an existing upump watching for exclusive access to - * avcodec_open(). - * - * @param upipe description structure of the pipe - */ -static void upipe_avfsrc_abort_av_deal(struct upipe *upipe) -{ - struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - if (unlikely(upipe_avfsrc->upump_av_deal != NULL)) { - upipe_av_deal_abort(upipe_avfsrc->upump_av_deal); - upump_free(upipe_avfsrc->upump_av_deal); - upipe_avfsrc->upump_av_deal = NULL; - } -} - /** @internal @This finds the given id in the list of output subpipes. * * @param upipe description structure of the pipe @@ -564,156 +545,6 @@ static void upipe_avfsrc_update_cr(struct upipe *upipe) upipe_avfsrc->cr_id = cr_id; } -/** @internal @This reads data from the source and outputs it. - * It is called either when the idler triggers (permanent storage mode) or - * when data is available on the file descriptor (live stream mode). - * - * @param upump description structure of the read watcher - */ -static void upipe_avfsrc_worker(struct upump *upump) -{ - struct upipe *upipe = upump_get_opaque(upump, struct upipe *); - struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - AVPacket pkt; - - int error = av_read_frame(upipe_avfsrc->context, &pkt); - if (unlikely(error < 0)) { - if (error != AVERROR_EOF) { - upipe_err_va(upipe, "read error from %s (%s)", - upipe_avfsrc->url, av_err2str(error)); - } - upipe_avfsrc_set_upump(upipe, NULL); - upipe_throw_source_end(upipe); - return; - } - - struct upipe_avfsrc_sub *output = - upipe_avfsrc_find_output(upipe, pkt.stream_index); - if (output == NULL) { - av_packet_unref(&pkt); - return; - } - if (unlikely(output->ubuf_mgr == NULL)) { - if (unlikely(!upipe_avfsrc_sub_demand_ubuf_mgr(upipe_avfsrc_sub_to_upipe(output), uref_dup(output->flow_def)))) { - av_packet_unref(&pkt); - return; - } - } - - struct uref *uref = uref_block_alloc(upipe_avfsrc->uref_mgr, - output->ubuf_mgr, pkt.size); - if (unlikely(uref == NULL)) { - av_packet_unref(&pkt); - upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); - return; - } - - if (upipe_avfsrc->cr_id == UINT64_MAX) - upipe_avfsrc_update_cr(upipe); - - AVStream *stream = upipe_avfsrc->context->streams[pkt.stream_index]; - uint64_t systime = upipe_avfsrc->uclock != NULL ? - uclock_now(upipe_avfsrc->uclock) : UINT64_MAX; - uint8_t *buffer; - int read_size = -1; - if (unlikely(!ubase_check(uref_block_write(uref, 0, &read_size, &buffer)))) { - uref_free(uref); - av_packet_unref(&pkt); - upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); - return; - } - assert(read_size == pkt.size); - memcpy(buffer, pkt.data, pkt.size); - uref_block_unmap(uref, 0); - - bool ts = false; - if (upipe_avfsrc->uclock != NULL) - uref_clock_set_cr_sys(uref, systime); - if (pkt.flags & AV_PKT_FLAG_KEY) { - UBASE_FATAL(upipe, uref_pic_set_key(uref)) - upipe_avfsrc->systime_rap = systime; - } - - av_packet_rescale_ts(&pkt, stream->time_base, UCLOCK_TIME_BASE); - - uint64_t dts_orig = UINT64_MAX, dts_pts_delay = 0; - if (pkt.dts != AV_NOPTS_VALUE) { - dts_orig = (uint64_t)pkt.dts - INT64_MIN; - if (pkt.pts != AV_NOPTS_VALUE) { - if (pkt.pts < pkt.dts) { - upipe_warn_va(upipe, "pts in the past (pts=%"PRIi64", " - "dts=%"PRIi64")", pkt.pts, pkt.dts); - } else { - dts_pts_delay = pkt.pts - pkt.dts; - } - } - } else if (pkt.pts != AV_NOPTS_VALUE) { - dts_orig = (uint64_t)pkt.pts - INT64_MIN; - } - - if (dts_orig != UINT64_MAX) { - uref_clock_set_dts_orig(uref, dts_orig); - uref_clock_set_dts_pts_delay(uref, dts_pts_delay); - - if (!upipe_avfsrc->timestamp_offset) - upipe_avfsrc->timestamp_offset = upipe_avfsrc->timestamp_highest - - dts_orig + PCR_OFFSET; - uint64_t dts = dts_orig + upipe_avfsrc->timestamp_offset; - if (output->last_dts_prog != UINT64_MAX) { - if (output->last_dts_prog > dts) { - upipe_warn_va(upipe, "dts %.3f ms in the past, resetting", - (float)(output->last_dts_prog - dts) / - (float)(UCLOCK_FREQ / 1000)); - dts = output->last_dts_prog; - } - } - output->last_dts_prog = dts; - uref_clock_set_dts_prog(uref, dts); - if (upipe_avfsrc->timestamp_highest < dts + dts_pts_delay) - upipe_avfsrc->timestamp_highest = dts + dts_pts_delay; - ts = true; - - /* this is subtly wrong, but whatever */ - if (output->id == upipe_avfsrc->cr_id) { - int discontinuity = 0; - if (upipe_avfsrc->last_cr_id != UINT64_MAX && - upipe_avfsrc->last_cr_id != upipe_avfsrc->cr_id) - discontinuity = 1; - upipe_throw_clock_ref(upipe, uref, dts - PCR_OFFSET, discontinuity); - upipe_avfsrc->last_cr_id = upipe_avfsrc->cr_id; - } - } - if (pkt.duration > 0) - UBASE_FATAL(upipe, uref_clock_set_duration(uref, pkt.duration)) - if (upipe_avfsrc->systime_rap != UINT64_MAX) - uref_clock_set_rap_sys(uref, upipe_avfsrc->systime_rap); - - if (ts) - upipe_throw_clock_ts(upipe, uref); - av_packet_unref(&pkt); - - upipe_input(output->last_inner, uref, &upipe_avfsrc->upump); -} - -/** @internal @This starts the worker. - * - * @param upipe description structure of the pipe - */ -static bool upipe_avfsrc_start(struct upipe *upipe) -{ - struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - struct upump *upump = upump_alloc_idler(upipe_avfsrc->upump_mgr, - upipe_avfsrc_worker, upipe, - upipe->refcount); - if (unlikely(upump == NULL)) { - upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); - return false; - } - upipe_avfsrc_set_upump(upipe, upump); - upump_start(upump); - return true; -} - /** @internal @This returns a flow definition for a raw audio media type. * * @param upipe description structure of the pipe @@ -877,17 +708,14 @@ static struct uref *alloc_data_def(struct upipe *upipe, /** @internal @This probes all flows from the source. * - * @param upump description structure of the dealer + * @param upipe description structure of the pipe + * @return an error code */ -static void upipe_avfsrc_probe(struct upump *upump) +static int upipe_avfsrc_probe(struct upipe *upipe) { - struct upipe *upipe = upump_get_opaque(upump, struct upipe *); struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); AVFormatContext *context = upipe_avfsrc->context; - if (unlikely(!upipe_av_deal_grab())) - return; - unsigned nb_streams = context->nb_streams; int error = 0; if (nb_streams) { @@ -904,22 +732,13 @@ static void upipe_avfsrc_probe(struct upump *upump) else error = avformat_find_stream_info(context, NULL); - upipe_av_deal_yield(upump); - upump_free(upipe_avfsrc->upump_av_deal); - upipe_avfsrc->upump_av_deal = NULL; - upipe_avfsrc->probed = true; - if (unlikely(error < 0)) { upipe_err_va(upipe, "can't probe URL %s (%s)", upipe_avfsrc->url, av_err2str(error)); - if (likely(upipe_avfsrc->url != NULL)) - upipe_notice_va(upipe, "closing URL %s", upipe_avfsrc->url); - avformat_close_input(&upipe_avfsrc->context); - upipe_avfsrc->context = NULL; - ubase_clean_str(&upipe_avfsrc->url); - return; + return UBASE_ERR_EXTERNAL; } + upipe_avfsrc->probed = true; upipe_avfsrc->streams = calloc(context->nb_streams, sizeof(struct uref *)); for (int i = 0; i < context->nb_streams; i++) { @@ -960,27 +779,190 @@ static void upipe_avfsrc_probe(struct upump *upump) codecpar->codec_type, codecpar->codec_id); continue; } - UBASE_FATAL(upipe, uref_flow_set_id(flow_def, i)) + UBASE_RETURN(uref_flow_set_id(flow_def, i)); AVDictionaryEntry *lang = av_dict_get(stream->metadata, "language", NULL, 0); if (lang != NULL && lang->value != NULL) { - UBASE_FATAL(upipe, uref_flow_set_languages(flow_def, 1)) - UBASE_FATAL(upipe, uref_flow_set_language(flow_def, lang->value, 0)) + UBASE_RETURN(uref_flow_set_languages(flow_def, 1)) + UBASE_RETURN(uref_flow_set_language(flow_def, lang->value, 0)) } if (codecpar->extradata_size) { - UBASE_FATAL(upipe, uref_flow_set_global(flow_def)) - UBASE_FATAL(upipe, uref_flow_set_headers(flow_def, codecpar->extradata, - codecpar->extradata_size)) + UBASE_RETURN(uref_flow_set_global(flow_def)) + UBASE_RETURN(uref_flow_set_headers(flow_def, codecpar->extradata, + codecpar->extradata_size)) } upipe_avfsrc->streams[i] = flow_def; } + return upipe_split_throw_update(upipe); +} + +/** @internal @This closes the current context. + * + * @param upipe description structure of the pipe + */ +static void upipe_avfsrc_close(struct upipe *upipe) +{ + struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); + AVFormatContext *context = upipe_avfsrc->context; + + upipe_avfsrc->context = NULL; + if (unlikely(context != NULL)) { + if (likely(upipe_avfsrc->url != NULL)) + upipe_notice_va(upipe, "closing URL %s", upipe_avfsrc->url); + for (int i = 0; i < context->nb_streams; i++) + uref_free(upipe_avfsrc->streams[i]); + avformat_close_input(&context); + upipe_avfsrc_set_upump(upipe, NULL); + upipe_avfsrc_throw_sub_subs(upipe, UPROBE_SOURCE_END); + free(upipe_avfsrc->streams); + upipe_avfsrc->streams = NULL; + } + ubase_clean_str(&upipe_avfsrc->url); + upipe_avfsrc->probed = false; upipe_split_throw_update(upipe); - upipe_avfsrc_start(upipe); } +/** @internal @This reads data from the source and outputs it. + * It is called either when the idler triggers (permanent storage mode) or + * when data is available on the file descriptor (live stream mode). + * + * @param upump description structure of the read watcher + */ +static void upipe_avfsrc_worker(struct upump *upump) +{ + struct upipe *upipe = upump_get_opaque(upump, struct upipe *); + struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); + AVPacket pkt; + + if (unlikely(!upipe_avfsrc->probed)) { + if (unlikely(!ubase_check(upipe_avfsrc_probe(upipe)))) { + upipe_warn_va(upipe, "fail to probe %s", upipe_avfsrc->url); + upipe_avfsrc_close(upipe); + return; + } + } + + int error = av_read_frame(upipe_avfsrc->context, &pkt); + if (unlikely(error < 0)) { + if (error != AVERROR_EOF) { + upipe_err_va(upipe, "read error from %s (%s)", + upipe_avfsrc->url, av_err2str(error)); + } + upipe_avfsrc_set_upump(upipe, NULL); + upipe_throw_source_end(upipe); + return; + } + + struct upipe_avfsrc_sub *output = + upipe_avfsrc_find_output(upipe, pkt.stream_index); + if (output == NULL) { + av_packet_unref(&pkt); + return; + } + if (unlikely(output->ubuf_mgr == NULL)) { + if (unlikely(!upipe_avfsrc_sub_demand_ubuf_mgr(upipe_avfsrc_sub_to_upipe(output), uref_dup(output->flow_def)))) { + av_packet_unref(&pkt); + return; + } + } + + struct uref *uref = uref_block_alloc(upipe_avfsrc->uref_mgr, + output->ubuf_mgr, pkt.size); + if (unlikely(uref == NULL)) { + av_packet_unref(&pkt); + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + return; + } + + if (upipe_avfsrc->cr_id == UINT64_MAX) + upipe_avfsrc_update_cr(upipe); + + AVStream *stream = upipe_avfsrc->context->streams[pkt.stream_index]; + uint64_t systime = upipe_avfsrc->uclock != NULL ? + uclock_now(upipe_avfsrc->uclock) : UINT64_MAX; + uint8_t *buffer; + int read_size = -1; + if (unlikely(!ubase_check(uref_block_write(uref, 0, &read_size, &buffer)))) { + uref_free(uref); + av_packet_unref(&pkt); + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + return; + } + assert(read_size == pkt.size); + memcpy(buffer, pkt.data, pkt.size); + uref_block_unmap(uref, 0); + + bool ts = false; + if (upipe_avfsrc->uclock != NULL) + uref_clock_set_cr_sys(uref, systime); + if (pkt.flags & AV_PKT_FLAG_KEY) { + UBASE_FATAL(upipe, uref_pic_set_key(uref)) + upipe_avfsrc->systime_rap = systime; + } + + av_packet_rescale_ts(&pkt, stream->time_base, UCLOCK_TIME_BASE); + + uint64_t dts_orig = UINT64_MAX, dts_pts_delay = 0; + if (pkt.dts != AV_NOPTS_VALUE) { + dts_orig = (uint64_t)pkt.dts - INT64_MIN; + if (pkt.pts != AV_NOPTS_VALUE) { + if (pkt.pts < pkt.dts) { + upipe_warn_va(upipe, "pts in the past (pts=%"PRIi64", " + "dts=%"PRIi64")", pkt.pts, pkt.dts); + } else { + dts_pts_delay = pkt.pts - pkt.dts; + } + } + } else if (pkt.pts != AV_NOPTS_VALUE) { + dts_orig = (uint64_t)pkt.pts - INT64_MIN; + } + + if (dts_orig != UINT64_MAX) { + uref_clock_set_dts_orig(uref, dts_orig); + uref_clock_set_dts_pts_delay(uref, dts_pts_delay); + + if (!upipe_avfsrc->timestamp_offset) + upipe_avfsrc->timestamp_offset = upipe_avfsrc->timestamp_highest - + dts_orig + PCR_OFFSET; + uint64_t dts = dts_orig + upipe_avfsrc->timestamp_offset; + if (output->last_dts_prog != UINT64_MAX) { + if (output->last_dts_prog > dts) { + upipe_warn_va(upipe, "dts %.3f ms in the past, resetting", + (float)(output->last_dts_prog - dts) / + (float)(UCLOCK_FREQ / 1000)); + dts = output->last_dts_prog; + } + } + output->last_dts_prog = dts; + uref_clock_set_dts_prog(uref, dts); + if (upipe_avfsrc->timestamp_highest < dts + dts_pts_delay) + upipe_avfsrc->timestamp_highest = dts + dts_pts_delay; + ts = true; + + /* this is subtly wrong, but whatever */ + if (output->id == upipe_avfsrc->cr_id) { + int discontinuity = 0; + if (upipe_avfsrc->last_cr_id != UINT64_MAX && + upipe_avfsrc->last_cr_id != upipe_avfsrc->cr_id) + discontinuity = 1; + upipe_throw_clock_ref(upipe, uref, dts - PCR_OFFSET, discontinuity); + upipe_avfsrc->last_cr_id = upipe_avfsrc->cr_id; + } + } + if (pkt.duration > 0) + UBASE_FATAL(upipe, uref_clock_set_duration(uref, pkt.duration)) + if (upipe_avfsrc->systime_rap != UINT64_MAX) + uref_clock_set_rap_sys(uref, upipe_avfsrc->systime_rap); + + if (ts) + upipe_throw_clock_ts(upipe, uref); + av_packet_unref(&pkt); + + upipe_input(output->last_inner, uref, &upipe_avfsrc->upump); +} /** @internal @This iterates over output flow definitions. * @@ -1086,24 +1068,13 @@ static int upipe_avfsrc_set_uri(struct upipe *upipe, const char *url) { struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - if (unlikely(upipe_avfsrc->context != NULL)) { - if (likely(upipe_avfsrc->url != NULL)) - upipe_notice_va(upipe, "closing URL %s", upipe_avfsrc->url); - avformat_close_input(&upipe_avfsrc->context); - upipe_avfsrc->context = NULL; - upipe_avfsrc_set_upump(upipe, NULL); - upipe_avfsrc_abort_av_deal(upipe); - upipe_avfsrc_throw_sub_subs(upipe, UPROBE_SOURCE_END); - free(upipe_avfsrc->streams); - } - ubase_clean_str(&upipe_avfsrc->url); + upipe_avfsrc_close(upipe); if (unlikely(url == NULL)) return UBASE_ERR_NONE; if (unlikely(!upipe_avfsrc_demand_uref_mgr(upipe))) return UBASE_ERR_ALLOC; - upipe_avfsrc_check_upump_mgr(upipe); struct uref *flow_def = uref_alloc_control(upipe_avfsrc->uref_mgr); uref_flow_set_def(flow_def, "void."); @@ -1178,7 +1149,6 @@ static int _upipe_avfsrc_control(struct upipe *upipe, switch (command) { case UPIPE_ATTACH_UPUMP_MGR: upipe_avfsrc_set_upump(upipe, NULL); - upipe_avfsrc_abort_av_deal(upipe); return upipe_avfsrc_attach_upump_mgr(upipe); case UPIPE_ATTACH_UCLOCK: upipe_avfsrc_set_upump(upipe, NULL); @@ -1228,43 +1198,47 @@ static int _upipe_avfsrc_control(struct upipe *upipe, } } -/** @internal @This processes control commands on an avformat source pipe, and - * checks the status of the pipe afterwards. +/** @internal @This checks the status of the pipe. * * @param upipe description structure of the pipe - * @param command type of command to process - * @param args arguments of the command * @return an error code */ -static int upipe_avfsrc_control(struct upipe *upipe, int command, va_list args) +static int upipe_avfsrc_check(struct upipe *upipe) { - UBASE_RETURN(_upipe_avfsrc_control(upipe, command, args)); - struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - if (upipe_avfsrc->upump_mgr != NULL && upipe_avfsrc->url != NULL && - upipe_avfsrc->upump == NULL) { - if (unlikely(upipe_avfsrc->probed)) - return upipe_avfsrc_start(upipe) ? - UBASE_ERR_NONE : UBASE_ERR_EXTERNAL; - if (unlikely(upipe_avfsrc->upump_av_deal != NULL)) - return UBASE_ERR_NONE; + upipe_avfsrc_check_upump_mgr(upipe); - struct upump *upump_av_deal = - upipe_av_deal_upump_alloc(upipe_avfsrc->upump_mgr, - upipe_avfsrc_probe, upipe, upipe->refcount); - if (unlikely(upump_av_deal == NULL)) { - upipe_err(upipe, "can't create dealer"); - upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); - return UBASE_ERR_UPUMP; + if (upipe_avfsrc->upump_mgr != NULL && upipe_avfsrc->url != NULL && + upipe_avfsrc->upump == NULL) { + struct upump *upump = + upump_alloc_idler(upipe_avfsrc->upump_mgr, upipe_avfsrc_worker, + upipe, upipe->refcount); + if (unlikely(upump == NULL)) { + upipe_warn(upipe, "fail to allocate idler"); + return UBASE_ERR_ALLOC; + upipe_avfsrc_set_upump(upipe, upump); + upump_start(upump); } - upipe_avfsrc->upump_av_deal = upump_av_deal; - upipe_av_deal_start(upump_av_deal); } return UBASE_ERR_NONE; } +/** @internal @This processes control commands on an avformat source pipe, and + * checks the status of the pipe afterwards. + * + * @param upipe description structure of the pipe + * @param command type of command to process + * @param args arguments of the command + * @return an error code + */ +static int upipe_avfsrc_control(struct upipe *upipe, int command, va_list args) +{ + UBASE_RETURN(_upipe_avfsrc_control(upipe, command, args)); + return upipe_avfsrc_check(upipe); +} + /** @This frees a upipe. * * @param urefcount_real pointer to urefcount_real structure @@ -1276,17 +1250,6 @@ static void upipe_avfsrc_free(struct urefcount *urefcount_real) struct upipe *upipe = upipe_avfsrc_to_upipe(upipe_avfsrc); upipe_avfsrc_clean_sub_subs(upipe); - upipe_avfsrc_abort_av_deal(upipe); - if (likely(upipe_avfsrc->context != NULL)) { - if (likely(upipe_avfsrc->url != NULL)) - upipe_notice_va(upipe, "closing URL %s", upipe_avfsrc->url); - - for (int i = 0; i < upipe_avfsrc->context->nb_streams; i++) - uref_free(upipe_avfsrc->streams[i]); - - free(upipe_avfsrc->streams); - avformat_close_input(&upipe_avfsrc->context); - } upipe_throw_dead(upipe); av_dict_free(&upipe_avfsrc->options); @@ -1309,8 +1272,7 @@ static void upipe_avfsrc_free(struct urefcount *urefcount_real) static void upipe_avfsrc_no_input(struct upipe *upipe) { struct upipe_avfsrc *upipe_avfsrc = upipe_avfsrc_from_upipe(upipe); - upipe_avfsrc_throw_sub_subs(upipe, UPROBE_SOURCE_END); - upipe_split_throw_update(upipe); + upipe_avfsrc_close(upipe); urefcount_release(upipe_avfsrc_to_urefcount_real(upipe_avfsrc)); }