OBS源码分析三RTMP推流输出

mac2025-02-08  13

1.第一步先看RTMP推流输出流程图

 

2.第二步RTMP输出代码详细步骤说明 

首先是RtmpOutput输出对象的创建和编码器绑定

RtmpOutput::RtmpOutput() { CreateOutputWithUniqueName(); } RtmpOutput::~RtmpOutput() { obs_encoder_release(videoEncoder_); obs_encoder_release(delayVideoEncoder_); } void RtmpOutput::CreateOutputWithUniqueName() { uid_++; auto uid = uid_.load(); std::string rtmpNameFormat = "rtmp_stream_%d"; std::string rtmpOutputName = string_format(rtmpNameFormat, uid); obs_data_t* rtmpSettings = obs_data_create(); SetDefaultRtmpSettings(rtmpSettings); auto rtmpSettingsGuard = gsl::finally([&]() { obs_data_release(rtmpSettings); }); output_ = obs_output_create("rtmp_output", rtmpOutputName.c_str(), nullptr, nullptr); UpdateOutputSettings(rtmpSettings); auto rtmpOutputGuard = gsl::finally([&]() { obs_output_release(output_); }); std::string rtmpServiceNameFormat = "rtmp_service_%d"; std::string rtmpServiceName = string_format(rtmpServiceNameFormat, uid); service_ = obs_service_create("rtmp_custom", rtmpServiceName.c_str(), nullptr, nullptr); auto serviceGuard = gsl::finally([&]() { obs_service_release(service_); }); obs_output_set_service(output_, service_); }

调用obs_output_create()函数,根据输出id创建推流对象,与创建编码对象类似,推流对象在加载模块时已添加到obs->output_types中,获取到的推流输出对象赋值给streamOutput指针调用obs_output_set_video_encoder()函数,将推流输出video_encoder设置为编码器;

bool obs_output_start(obs_output_t *output) { bool encoded; if (!obs_output_valid(output, "obs_output_start")) return false; if (!output->context.data) return false; output->reconnect_total_retries = 0; encoded = (output->info.flags & OBS_OUTPUT_ENCODED) != 0; if (encoded && output->delay_sec) { return obs_output_delay_start(output); } else { if (obs_output_actual_start(output)) { do_output_signal(output, "starting"); return true; } return false; } }

调用obs_output_start() -> obs_output_actual_start() 回调推流对象output->info.start()回调函数开启推流,其中start绑定至rtmp_stream_start()

bool obs_output_actual_start(obs_output_t *output) { bool success = false; os_event_wait(output->connecting_event); if (delay_active(output) && delay_capturing(output)) { os_event_signal(output->connecting_event); return true; } os_event_wait(output->stopping_event); output->stop_code = 0; if (output->last_error_message) { bfree(output->last_error_message); output->last_error_message = NULL; } blog(LOG_INFO, "output %s actual start", obs_output_get_name(output)); if (output->context.data) { success = output->info.start(output->context.data); if (!success) { if (output->mixer) { obs_stop_core_audio_mix(output->mixer); output->mixer = NULL; } if (output->video_output) { obs_stop_video_output(output->video_output); output->video_output = NULL; } os_event_signal(output->connecting_event); blog(LOG_INFO, "output %s actual start return false", obs_output_get_name(output)); } } if (success && output->video) { output->starting_frame_count = video_output_get_total_frames(output->video); output->starting_drawn_count = obs->video.total_frames; output->starting_lagged_count = obs->video.lagged_frames; } if (os_atomic_load_long(&output->delay_restart_refs)) os_atomic_dec_long(&output->delay_restart_refs); output->caption_timestamp = 0; return success; }

 static bool rtmp_stream_start(void *data),创建线程,执行connect_thread()函数,static void *connect_thread(void *data)

init_connect()初始化推流,调用free_packets清空stream->packets,获取推流设置,赋值到stream,try_connect()连接rtmp服务器​​​​​​​;

static void *connect_thread(void *data) { struct rtmp_stream *stream = data; int ret; os_set_thread_name("rtmp-stream: connect_thread"); if (!init_connect(stream)) { obs_output_signal_stop(stream->output, OBS_OUTPUT_BAD_PATH); return NULL; } ret = try_connect(stream); if (ret != OBS_OUTPUT_SUCCESS) { obs_output_signal_stop(stream->output, ret); info("Connection to %s failed: %d", stream->path.array, ret); } if (!stopping(stream)) pthread_detach(stream->connect_thread); os_atomic_set_bool(&stream->connecting, false); return NULL; }

 RTMP_Init(&stream->rtmp)初始化rtmp客户端,设置推流服务器地址、用户名、密码、流地址、音频编码名称(为何没有添加视频编码名称);

RTMP_Connect()连接rtmp服务器;

RTMP_ConnectStream()连接rtmp流地址; init_send()启动发送函数,reset_semaphore()重置发送信号量,创建推流执行线程send_thread,发送视频关键数send_meta_data();

然后开启推流数据捕获obs_output_begin_data_capture();

send_thread()线程函数:循环等待信号量stream->send_sem 被唤醒,唤醒后 get_next_packet()取出队列中的第一个已编码数据包,执行 send_packet 函数,调用flv_packet_mux进行flv数据封包,再调用RTMP_Write()发送数据包,完成视频数据推流.

static void *send_thread(void *data) { struct rtmp_stream *stream = data; os_set_thread_name("rtmp-stream: send_thread"); while (os_sem_wait(stream->send_sem) == 0) { struct encoder_packet packet; if (stopping(stream) && stream->stop_ts == 0) { break; } if (!get_next_packet(stream, &packet)) continue; if (stopping(stream)) { if (can_shutdown_stream(stream, &packet)) { obs_encoder_packet_release(&packet); break; } } if (!stream->sent_headers) { if (!send_headers(stream)) { os_atomic_set_bool(&stream->disconnected, true); break; } } if (send_packet(stream, &packet, false, packet.track_idx) < 0) { os_atomic_set_bool(&stream->disconnected, true); break; } } if (disconnected(stream)) { info("Disconnected from %s", stream->path.array); } else { info("User stopped the stream"); } if (stream->new_socket_loop) { os_event_signal(stream->send_thread_signaled_exit); os_event_signal(stream->buffer_has_data_event); pthread_join(stream->socket_thread, NULL); stream->socket_thread_active = false; stream->rtmp.m_bCustomSend = false; } set_output_error(stream); RTMP_Close(&stream->rtmp); if (!stopping(stream)) { pthread_detach(stream->send_thread); obs_output_signal_stop(stream->output, stream->sent_headers? OBS_OUTPUT_DISCONNECTED: OBS_OUTPUT_CONNECT_FAILED); } else { obs_output_end_data_capture(stream->output); } free_packets(stream); os_event_reset(stream->stop_event); os_atomic_set_bool(&stream->active, false); stream->sent_headers = false; return NULL; }

 

最新回复(0)