|
|
@@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.GBHookSubscribeFactory;
|
|
|
import com.genersoft.iot.vmp.gb28181.HookSubscribeForKey;
|
|
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
|
|
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
|
|
+import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
|
|
|
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
|
|
|
@@ -35,6 +36,8 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
|
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
|
|
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
|
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
|
|
+import com.genersoft.iot.vmp.vmanager.bean.ErrorHook;
|
|
|
+import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
|
|
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
|
|
import gov.nist.javax.sip.message.SIPRequest;
|
|
|
import gov.nist.javax.sip.message.SIPResponse;
|
|
|
@@ -53,6 +56,7 @@ import javax.sip.SipException;
|
|
|
import java.math.BigDecimal;
|
|
|
import java.math.RoundingMode;
|
|
|
import java.text.ParseException;
|
|
|
+import java.util.EventObject;
|
|
|
import java.util.List;
|
|
|
import java.util.UUID;
|
|
|
|
|
|
@@ -117,14 +121,14 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
|
|
|
@Override
|
|
|
public void play(MediaServerItem mediaServerItem, String deviceId,
|
|
|
- String channelId,int isUsePs,
|
|
|
- ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
|
|
|
- Runnable timeoutCallback) {
|
|
|
+ String channelId, int isUsePs,
|
|
|
+ ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
|
|
|
+ Runnable timeoutCallback) {
|
|
|
|
|
|
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
|
|
|
WVPResult wvpResult = new WVPResult();
|
|
|
RequestMessage msg = new RequestMessage();
|
|
|
- Device device = redisCatchStorage.getDevice(deviceId);
|
|
|
+ Device device = storager.queryVideoDevice(deviceId);
|
|
|
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
|
|
|
|
|
|
msg.setKey(key);
|
|
|
@@ -209,26 +213,25 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
}
|
|
|
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
|
|
|
logger.info(JSONObject.toJSONString(ssrcInfo));
|
|
|
- if (ssrcInfo == null) {
|
|
|
+ if (ssrcInfo == null) {
|
|
|
wvpResult.setCode(ErrorCode.ERROR100.getCode());
|
|
|
wvpResult.setMsg("开启收流失败");
|
|
|
msg.setData(wvpResult);
|
|
|
resultHolder.invokeAllResult(msg);
|
|
|
return;
|
|
|
}
|
|
|
- play(mediaServerItem, ssrcInfo, device, channelId,
|
|
|
- isUsePs,
|
|
|
+ play(mediaServerItem, ssrcInfo, device, channelId, isUsePs,
|
|
|
(mediaServerItemInUse, response) -> {
|
|
|
- if (hookEvent != null) {
|
|
|
- hookEvent.response(mediaServerItem, response);
|
|
|
- }
|
|
|
- }, event -> {
|
|
|
- // sip error错误
|
|
|
- logger.warn("sip 错误,点播失败");
|
|
|
- wvpResult.setCode(ErrorCode.ERROR100.getCode());
|
|
|
- wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
|
|
|
- msg.setData(wvpResult);
|
|
|
- resultHolder.invokeAllResult(msg);
|
|
|
+ if (hookEvent != null) {
|
|
|
+ hookEvent.response(mediaServerItem, response);
|
|
|
+ }
|
|
|
+ }, event -> {
|
|
|
+ // sip error错误
|
|
|
+ logger.warn("sip 错误,点播失败");
|
|
|
+ wvpResult.setCode(ErrorCode.ERROR100.getCode());
|
|
|
+ wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
|
|
|
+ msg.setData(wvpResult);
|
|
|
+ resultHolder.invokeAllResult(msg);
|
|
|
if (errorEvent != null) {
|
|
|
errorEvent.response(event);
|
|
|
}
|
|
|
@@ -280,6 +283,8 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
}
|
|
|
}
|
|
|
}, userSetting.getPlayTimeout());
|
|
|
+ RequestMessage msg = new RequestMessage();
|
|
|
+ msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
|
|
|
//端口获取失败的ssrcInfo 没有必要发送点播指令
|
|
|
if (ssrcInfo.getPort() <= 0) {
|
|
|
logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
|
|
|
@@ -287,88 +292,87 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
// 释放ssrc
|
|
|
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
-
|
|
|
- RequestMessage msg = new RequestMessage();
|
|
|
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
|
|
|
msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常"));
|
|
|
resultHolder.invokeAllResult(msg);
|
|
|
return;
|
|
|
}
|
|
|
try {
|
|
|
- cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isUsePs, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
|
|
|
- logger.info("收到订阅消息: " + response.toJSONString());
|
|
|
- logger.info("停止超时任务: " + timeOutTaskKey);
|
|
|
- dynamicTask.stop(timeOutTaskKey);
|
|
|
- // hook响应
|
|
|
- onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
|
|
|
- hookEvent.response(mediaServerItemInuse, response);
|
|
|
- logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
|
|
|
- String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
|
|
|
- String path = "snap";
|
|
|
- String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
|
|
|
- // 请求截图
|
|
|
- logger.info("[请求截图]: " + fileName);
|
|
|
- zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
|
|
|
-
|
|
|
- }, (event) -> {
|
|
|
- ResponseEvent responseEvent = (ResponseEvent) event.event;
|
|
|
- String contentString = new String(responseEvent.getResponse().getRawContent());
|
|
|
- // 获取ssrc
|
|
|
- int ssrcIndex = contentString.indexOf("y=");
|
|
|
- // 检查是否有y字段
|
|
|
- if (ssrcIndex >= 0) {
|
|
|
- //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
|
|
|
- String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
- // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
|
|
|
- if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
|
|
|
- if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
|
|
|
- logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
|
|
-
|
|
|
- if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
|
|
|
- // ssrc 不可用
|
|
|
- // 释放ssrc
|
|
|
- mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
- streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
- event.msg = "下级自定义了ssrc,但是此ssrc不可用";
|
|
|
- event.statusCode = 400;
|
|
|
- errorEvent.response(event);
|
|
|
- return;
|
|
|
- }
|
|
|
+ cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, isUsePs,
|
|
|
+ (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
|
|
|
+ logger.info("收到订阅消息: " + response.toJSONString());
|
|
|
+ logger.info("停止超时任务: " + timeOutTaskKey);
|
|
|
+ dynamicTask.stop(timeOutTaskKey);
|
|
|
+ // hook响应
|
|
|
+ onPublishHandlerForPlay(msg, mediaServerItemInuse, response, device.getDeviceId(), channelId, ssrcInfo.getSsrc());
|
|
|
+ hookEvent.response(mediaServerItemInuse, response);
|
|
|
+ logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
|
|
|
+ String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream());
|
|
|
+ String path = "snap";
|
|
|
+ String fileName = device.getDeviceId() + "_" + channelId + ".jpg";
|
|
|
+ // 请求截图
|
|
|
+ logger.info("[请求截图]: " + fileName);
|
|
|
+ zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
|
|
|
+
|
|
|
+ },
|
|
|
+ (okEvent) -> {
|
|
|
+ ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
|
|
|
+ String contentString = new String(responseEvent.getResponse().getRawContent());
|
|
|
+ // 获取ssrc
|
|
|
+ int ssrcIndex = contentString.indexOf("y=");
|
|
|
+ // 检查是否有y字段
|
|
|
+ if (ssrcIndex >= 0) {
|
|
|
+ //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
|
|
|
+ String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
+ // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
|
|
|
+ if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
|
|
|
+ if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
|
|
|
+ logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
|
|
|
+
|
|
|
+ if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
|
|
|
+ // ssrc 不可用
|
|
|
+ // 释放ssrc
|
|
|
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
+ streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
+ okEvent.msg = "下级自定义了ssrc,但是此ssrc不可用";
|
|
|
+ okEvent.statusCode = 400;
|
|
|
+ errorEvent.response(okEvent);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 单端口模式streamId也有变化,需要重新设置监听
|
|
|
- if (!mediaServerItem.isRtpEnable()) {
|
|
|
- // 添加订阅
|
|
|
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
|
|
|
- subscribe.removeSubscribe(hookSubscribe);
|
|
|
- hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
|
|
|
- subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
|
|
|
- logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
|
|
- dynamicTask.stop(timeOutTaskKey);
|
|
|
- // hook响应
|
|
|
- onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
|
|
|
- hookEvent.response(mediaServerItemInUse, response);
|
|
|
- });
|
|
|
- }
|
|
|
- // 关闭rtp server
|
|
|
- mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
|
|
|
- // 重新开启ssrc server
|
|
|
- mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort());
|
|
|
+ // 单端口模式streamId也有变化,需要重新设置监听
|
|
|
+ if (!mediaServerItem.isRtpEnable()) {
|
|
|
+ // 添加订阅
|
|
|
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
|
|
|
+ subscribe.removeSubscribe(hookSubscribe);
|
|
|
+ hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
|
|
|
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
|
|
|
+ logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
|
|
+ dynamicTask.stop(timeOutTaskKey);
|
|
|
+ // hook响应
|
|
|
+ onPublishHandlerForPlay(msg, mediaServerItemInUse, response, device.getDeviceId(), channelId, ssrcInfo.getSsrc());
|
|
|
+ hookEvent.response(mediaServerItemInUse, response);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ // 关闭rtp server
|
|
|
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
|
|
|
+ // 重新开启ssrc server
|
|
|
+ mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort());
|
|
|
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- , (event) -> {
|
|
|
- dynamicTask.stop(timeOutTaskKey);
|
|
|
- mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
|
|
|
- // 释放ssrc
|
|
|
- mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
+ , (errEvent) -> {
|
|
|
+ dynamicTask.stop(timeOutTaskKey);
|
|
|
+ mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
|
|
|
+ // 释放ssrc
|
|
|
+ mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
|
|
|
- streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
- errorEvent.response(event);
|
|
|
- });
|
|
|
+ streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
+ errorEvent.response(errEvent);
|
|
|
+ });
|
|
|
} catch (InvalidArgumentException | SipException | ParseException e) {
|
|
|
|
|
|
logger.error("[命令发送失败] 点播消息: {}", e.getMessage());
|
|
|
@@ -378,86 +382,300 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
|
|
|
|
|
|
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
|
|
|
- SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
|
|
|
- eventResult.msg = "命令发送失败";
|
|
|
+// 有什么大病一般的代码
|
|
|
+// SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
|
|
|
+// eventResult.msg = "命令发送失败";
|
|
|
+ EventObject eventObject = new EventObject("命令发送失败");
|
|
|
+ SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(eventObject);
|
|
|
errorEvent.response(eventResult);
|
|
|
+//
|
|
|
+// RequestMessage msg = new RequestMessage();
|
|
|
+// msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
|
|
|
+// msg.setData(WVPResult.fail(ErrorCode.ERR_Invite_fail, "点播命令发送失败: " + e.getMessage()));
|
|
|
+// resultHolder.invokeAllResult(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- RequestMessage msg = new RequestMessage();
|
|
|
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId);
|
|
|
- msg.setData(WVPResult.fail(ErrorCode.ERR_Invite_fail.getCode(), "点播命令发送至设备异常"));
|
|
|
- resultHolder.invokeAllResult(msg);
|
|
|
+ /**
|
|
|
+ * 从缓存中获取流信息
|
|
|
+ *
|
|
|
+ * @param deviceId
|
|
|
+ * @param channelId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private StreamInfo getStreamInfo(String deviceId, String channelId, WVPResult wvpResult) {
|
|
|
+ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
|
|
|
+ // 两种情况, 一种是已经在推流中了. 一种是还没有开始推流
|
|
|
+ if (streamInfo == null) {
|
|
|
+ // 未开始推流, 开启新推流
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 能够读取到视频缓存
|
|
|
+ String streamId = streamInfo.getStream();
|
|
|
+ // 应该是不可能出现的异常情况,
|
|
|
+ if (streamId == null) {
|
|
|
+ logger.warn("[读取推流信息] 失败, redis缓存异常 streamId 为空");
|
|
|
+ // 说明缓存的流信息有问题
|
|
|
+ // 移除redis 缓存
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ redisCatchStorage.deleteDeviceStream(deviceId, channelId);
|
|
|
+ storager.stopPlay(deviceId, channelId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String mediaServerId = streamInfo.getMediaServerId();
|
|
|
+ // 获取视频推流通道对应的流媒体信息
|
|
|
+ MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
|
|
|
+ JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
|
|
|
+ if (rtpInfo == null) {
|
|
|
+ logger.warn("[读取推流信息] 失败, rtpInfo为null");
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ redisCatchStorage.deleteDeviceStream(deviceId, channelId);
|
|
|
+ storager.stopPlay(deviceId, channelId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (rtpInfo.getInteger("code") != 0) {
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ redisCatchStorage.deleteDeviceStream(deviceId, channelId);
|
|
|
+ storager.stopPlay(deviceId, channelId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (!rtpInfo.getBoolean("exist")) {
|
|
|
+ // 说明流已经停止
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ redisCatchStorage.deleteDeviceStream(deviceId, channelId);
|
|
|
+ storager.stopPlay(deviceId, channelId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ int localPort = rtpInfo.getInteger("local_port");
|
|
|
+ if (localPort == 0) {
|
|
|
+ logger.warn("[读取推流信息] 失败, rtpServer存在,但是尚未开始推流");
|
|
|
+ // 此时说明rtpServer已经创建但是流还没有推上来, 所以稍微等待一下重试即可
|
|
|
+ // 使用 wvpResult 返回对应信息提示
|
|
|
+ wvpResult.setCode(ErrorCode.ERROR_Retry.getCode());
|
|
|
+ wvpResult.setMsg("点播已经在进行中,请稍后进行重试");
|
|
|
}
|
|
|
+ return streamInfo;
|
|
|
}
|
|
|
|
|
|
+ // 直接抛弃原有的播放 函数 创建新地播放函数, 重新梳理播放逻辑
|
|
|
+ public void startPlay(RequestMessage requestMsg, MediaServerItem mediaServerItem, Device device, String channelId, int isUsePs) {
|
|
|
+ // 获取缓存的流信息
|
|
|
+ WVPResult wvpResult = new WVPResult();
|
|
|
+ StreamInfo streamInfo = getStreamInfo(device.getDeviceId(), channelId, wvpResult);
|
|
|
|
|
|
+ if (streamInfo != null) {
|
|
|
+ logger.info("已经启用视频流 {}", wvpResult.toString());
|
|
|
+ // 说明已经在推流中了
|
|
|
+ if (wvpResult.getCode() != ErrorCode.ERROR_Retry.getCode()) {
|
|
|
+ // 在获取流信息没有更改 wvpResult 时将返回值设置为成功
|
|
|
+ logger.warn("{} 已经在推流, 并且无异常", device.getDeviceId(), channelId);
|
|
|
+ // 数据类型转换, 懒得改前端了
|
|
|
+ wvpResult = WVPResult.success(
|
|
|
+ new StreamContent(streamInfo)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ resultHolder.invokeResult(requestMsg.setData(wvpResult));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 说明还没有开始推流
|
|
|
+ if (!mediaServerItem.isRtpEnable()) {
|
|
|
+ logger.warn("设备应用的流媒体未启用");
|
|
|
+ resultHolder.invokeAllResult(
|
|
|
+ requestMsg.setData(WVPResult.fail(ErrorCode.ERROR100, "设备应用的流媒体未启用"))
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 生成streamId
|
|
|
+ String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
|
|
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
|
|
|
+ if (ssrcInfo == null) {
|
|
|
+ logger.warn("开启收流失败");
|
|
|
+ resultHolder.invokeAllResult(
|
|
|
+ requestMsg.setData(WVPResult.fail(ErrorCode.ERROR100, "开启收流失败"))
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 尝试启用收流
|
|
|
+ if (ssrcInfo.getPort() <= 0) {
|
|
|
+ logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
|
|
|
+ resultHolder.invokeAllResult(
|
|
|
+ requestMsg.setData(WVPResult.fail(ErrorCode.ERROR100, "点播端口分配异常, 请重试"))
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ executePlayLive(requestMsg, mediaServerItem, ssrcInfo, device, channelId, isUsePs);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行推流相关操作 发送命令
|
|
|
+ *
|
|
|
+ * @param requestMsg
|
|
|
+ * @param mediaServerItem
|
|
|
+ * @param device
|
|
|
+ * @param channelId
|
|
|
+ * @param isUsePs
|
|
|
+ */
|
|
|
+ private void executePlayLive(RequestMessage requestMsg, MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, int isUsePs) {
|
|
|
+ logger.info("[支持点播推流] 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}",
|
|
|
+ ssrcInfo.getPort(), device.getStreamMode(),
|
|
|
+ ssrcInfo.getSsrc(), device.isSsrcCheck());
|
|
|
+ String stream = ssrcInfo.getStream();
|
|
|
+ logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
|
|
|
+
|
|
|
+ // 构建errorHook 回调函数
|
|
|
+ ErrorHook errorHook = wvpResult ->
|
|
|
+ {
|
|
|
+ logger.info("收到点播命令回调 {}", wvpResult);
|
|
|
+ if (wvpResult.getCode() != ErrorCode.SUCCESS.getCode()) {
|
|
|
+ logger.error("[发送点播命令] 失败 {}", wvpResult.getMsg());
|
|
|
+ requestMsg.setData(wvpResult);
|
|
|
+ stopPlay(device, channelId);
|
|
|
+// resultHolder.invokeAllResult(requestMsg);
|
|
|
+ }
|
|
|
+ // 监听zlm的流改变事件, 在流改变时直接回复 http 信息
|
|
|
+ addHookSubscribeForStreamChange(requestMsg, mediaServerItem, device, channelId, ssrcInfo);
|
|
|
+ };
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 发送命令
|
|
|
+ cmder.sendPlayCmd(mediaServerItem, ssrcInfo, device, channelId, isUsePs, errorHook);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ logger.error("[发送点播命令] 失败 {}", e.getMessage());
|
|
|
+ // 未知的错误, 打印错误堆栈
|
|
|
+ requestMsg.setData(WVPResult.fail(ErrorCode.ERROR100, "发送点播命令失败"));
|
|
|
+ resultHolder.invokeAllResult(requestMsg);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 停止推流
|
|
|
+ public void stopPlay(Device device, String channelId) {
|
|
|
+ WVPResult wvpResult = new WVPResult();
|
|
|
+ StreamInfo streamInfo = getStreamInfo(device.getDeviceId(), channelId, wvpResult);
|
|
|
+ // 获取 流媒体
|
|
|
|
|
|
|
|
|
+ // 发送bye给设备
|
|
|
+ ErrorHook errorHook = wvpResult1 ->
|
|
|
+ {
|
|
|
+ logger.info("关闭推流回调 {}", wvpResult1);
|
|
|
+ if (wvpResult1.getCode() != ErrorCode.SUCCESS.getCode()) {
|
|
|
+ logger.error("[关闭推流回调] 失败 {}", wvpResult1.getMsg());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //
|
|
|
+ if (streamInfo != null) {
|
|
|
+ logger.info("找到正在推流的视频流 尝试停止 {}", wvpResult.toString());
|
|
|
+ MediaServerItem mediaServerItem = mediaServerService.getOne(
|
|
|
+ streamInfo.getMediaServerId()
|
|
|
+ );
|
|
|
+ // 获取ssrc
|
|
|
+ logger.info("移除并关闭推流通道");
|
|
|
+ redisCatchStorage.stopPlay(streamInfo);
|
|
|
+ redisCatchStorage.deleteDeviceStream(device.getDeviceId(), channelId);
|
|
|
+ mediaServerService.releaseSsrc(streamInfo.getMediaServerId(), streamInfo.getSsrc());
|
|
|
+ mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
|
|
|
+ } else {
|
|
|
+ logger.info("在没有服务端缓存的情况下关闭推流通道");
|
|
|
+ }
|
|
|
+ };
|
|
|
+ try {
|
|
|
+ // 发送命令
|
|
|
+ cmder.stopPlayCmd(device, channelId, errorHook);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ logger.error("[关闭推流] 失败 {}", e.getMessage());
|
|
|
+ // 未知的错误, 打印错误堆栈
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 截取视频快照
|
|
|
+ *
|
|
|
+ * @param mediaServerItemInUse
|
|
|
+ * @param deviceId
|
|
|
+ * @param channelId
|
|
|
+ * @param ssrcInfo
|
|
|
+ */
|
|
|
+ public void screenshot(MediaServerItem mediaServerItemInUse, String deviceId, String channelId, SSRCInfo ssrcInfo) {
|
|
|
+ String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv",
|
|
|
+ mediaServerItemInUse.getHttpPort(), "rtp", ssrcInfo.getStream());
|
|
|
+ String path = "snap";
|
|
|
+ String fileName = deviceId + "_" + channelId + ".jpg";
|
|
|
+ // 请求截图
|
|
|
+ logger.info("[尝试截取视频快照]: " + fileName);
|
|
|
+ zlmresTfulUtils.getSnap(mediaServerItemInUse, streamUrl, 15, 1, path, fileName);
|
|
|
+ }
|
|
|
+
|
|
|
public void openBroadcast(MediaServerItem mediaServerItem,
|
|
|
Device device,
|
|
|
int waitTime,
|
|
|
- BroadcastCallback callback){
|
|
|
- logger.warn("[语音广播] 开语音广播 新");
|
|
|
- JSONObject errJson = new JSONObject();
|
|
|
- try {
|
|
|
- cmder.audioBroadcastCmd(device);
|
|
|
- } catch (InvalidArgumentException | SipException | ParseException e) {
|
|
|
+ BroadcastCallback callback) {
|
|
|
+ logger.warn("[语音广播] 开语音广播 新");
|
|
|
+ JSONObject errJson = new JSONObject();
|
|
|
+ try {
|
|
|
+ cmder.audioBroadcastCmd(device);
|
|
|
+ } catch (InvalidArgumentException | SipException | ParseException e) {
|
|
|
logger.error("[命令发送失败] 发送broadcast中 errorMsg: {}", e.getMessage());
|
|
|
errJson.put("msg", "[命令发送失败] 无法发送broadcast消息");
|
|
|
callback.run(2, errJson, null);
|
|
|
}
|
|
|
logger.warn("等待设备返回invite");
|
|
|
- HookSubscribeForKey broadcastForInviteHook = GBHookSubscribeFactory.on_broadcast_invite(device.getDeviceId());
|
|
|
- // 创建计时器,计时结束未收到invite则自动进行失败处理
|
|
|
- String timeOutTaskKey = UUID.randomUUID().toString();
|
|
|
- dynamicTask.startDelay(timeOutTaskKey, () -> {
|
|
|
- // todo 发送 bye 通知给设备?
|
|
|
- logger.warn("invite超时");
|
|
|
- errJson.put("msg", "等待设备语音invite信息超时");
|
|
|
- callback.run(1, errJson, null);
|
|
|
- }, waitTime);
|
|
|
-
|
|
|
- GBHookSubscribe.addInviteSubscribe(broadcastForInviteHook,
|
|
|
- (int code, JSONObject json, SIPRequest request) -> {
|
|
|
- // invite信息返回
|
|
|
- logger.info("[语音广播] 接收到设备invite信息___订阅事件触发 JSONDATA: {}", json.toJSONString());
|
|
|
- // 取消计时器
|
|
|
- dynamicTask.stop(timeOutTaskKey);
|
|
|
- callback.run(0, json, request);
|
|
|
- });
|
|
|
- };
|
|
|
- @Override
|
|
|
- public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
|
|
|
+ HookSubscribeForKey broadcastForInviteHook = GBHookSubscribeFactory.on_broadcast_invite(device.getDeviceId());
|
|
|
+ // 创建计时器,计时结束未收到invite则自动进行失败处理
|
|
|
+ String timeOutTaskKey = UUID.randomUUID().toString();
|
|
|
+ dynamicTask.startDelay(timeOutTaskKey, () -> {
|
|
|
+ // todo 发送 bye 通知给设备?
|
|
|
+ logger.warn("invite超时");
|
|
|
+ errJson.put("msg", "等待设备语音invite信息超时");
|
|
|
+ callback.run(1, errJson, null);
|
|
|
+ }, waitTime);
|
|
|
+
|
|
|
+ GBHookSubscribe.addInviteSubscribe(
|
|
|
+ broadcastForInviteHook,
|
|
|
+ (int code, JSONObject json, SIPRequest request) -> {
|
|
|
+ // invite信息返回
|
|
|
+ logger.info("[语音广播] 接收到设备invite信息___订阅事件触发 JSONDATA: {}", json.toJSONString());
|
|
|
+ // 取消计时器
|
|
|
+ dynamicTask.stop(timeOutTaskKey);
|
|
|
+ callback.run(0, json, request);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ ;
|
|
|
+
|
|
|
+ private void onPublishHandlerForPlay(
|
|
|
+ RequestMessage requestMsg, MediaServerItem mediaServerItem,
|
|
|
+ JSONObject response, String deviceId, String channelId, String ssrc) {
|
|
|
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
|
|
|
- RequestMessage msg = new RequestMessage();
|
|
|
- msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
|
|
|
if (streamInfo != null) {
|
|
|
+ streamInfo.setBack(false);
|
|
|
+ streamInfo.setSsrc(ssrc);
|
|
|
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
|
|
|
if (deviceChannel != null) {
|
|
|
deviceChannel.setStreamId(streamInfo.getStream());
|
|
|
storager.startPlay(deviceId, channelId, streamInfo.getStream());
|
|
|
}
|
|
|
redisCatchStorage.startPlay(streamInfo);
|
|
|
-
|
|
|
- WVPResult wvpResult = new WVPResult();
|
|
|
- wvpResult.setCode(ErrorCode.SUCCESS.getCode());
|
|
|
- wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
|
|
|
- wvpResult.setData(streamInfo);
|
|
|
-
|
|
|
- msg.setData(wvpResult);
|
|
|
- resultHolder.invokeAllResult(msg);
|
|
|
-
|
|
|
+ // 流信息转换
|
|
|
+ requestMsg.setData(WVPResult.success(new StreamContent(streamInfo)));
|
|
|
+ resultHolder.invokeAllResult(requestMsg);
|
|
|
} else {
|
|
|
logger.warn("设备预览API调用失败!");
|
|
|
- msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!"));
|
|
|
- resultHolder.invokeAllResult(msg);
|
|
|
+ requestMsg.setData(WVPResult.fail(ErrorCode.ERROR100, "设备预览API调用失败!"));
|
|
|
+ resultHolder.invokeAllResult(requestMsg);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, PlayBackCallback playBackCallback) {
|
|
|
+ private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String ssrc, PlayBackCallback playBackCallback) {
|
|
|
|
|
|
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
|
|
|
PlayBackResult<StreamInfo> playBackResult = new PlayBackResult<>();
|
|
|
if (streamInfo != null) {
|
|
|
+ streamInfo.setBack(true);
|
|
|
+ streamInfo.setSsrc(ssrc);
|
|
|
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
|
|
|
if (deviceChannel != null) {
|
|
|
deviceChannel.setStreamId(streamInfo.getStream());
|
|
|
@@ -489,7 +707,6 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
} else {
|
|
|
// 尝试获取device配置的zlm服务
|
|
|
mediaServerItem = mediaServerService.getOne(device.getMediaServerId());
|
|
|
-
|
|
|
// 如果默认zlm无法找到则随机分配一个zlm
|
|
|
if(mediaServerItem == null){
|
|
|
logger.warn("无法找到设备默认流媒体服务,即将使用默认流媒体服务");
|
|
|
@@ -638,7 +855,7 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
|
|
dynamicTask.stop(playBackTimeOutTaskKey);
|
|
|
// hook响应
|
|
|
- onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);
|
|
|
+ onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, ssrcInfo.getSsrc(), playBackCallback);
|
|
|
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
|
|
|
});
|
|
|
}
|
|
|
@@ -777,7 +994,7 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
|
|
|
dynamicTask.stop(downLoadTimeOutTaskKey);
|
|
|
// hook响应
|
|
|
- onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack);
|
|
|
+ onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, ssrcInfo.getSsrc(), hookCallBack);
|
|
|
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
|
|
|
});
|
|
|
}
|
|
|
@@ -847,7 +1064,8 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
RequestMessage msg = new RequestMessage();
|
|
|
msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
|
|
|
msg.setId(uuid);
|
|
|
- StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
|
|
|
+ StreamInfo streamInfo = onPublishHandler(
|
|
|
+ inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
|
|
|
if (streamInfo != null) {
|
|
|
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
|
|
|
msg.setData(JSON.toJSONString(streamInfo));
|
|
|
@@ -860,9 +1078,9 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
|
|
|
- String streamId = resonse.getString("stream");
|
|
|
- JSONArray tracks = resonse.getJSONArray("tracks");
|
|
|
+ public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject json, String deviceId, String channelId) {
|
|
|
+ String streamId = json.getString("stream");
|
|
|
+ JSONArray tracks = json.getJSONArray("tracks");
|
|
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
|
|
|
streamInfo.setDeviceID(deviceId);
|
|
|
streamInfo.setChannelId(channelId);
|
|
|
@@ -1051,9 +1269,9 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
null,
|
|
|
null);
|
|
|
|
|
|
- callback.run(0, null,null);
|
|
|
- }catch(InvalidArgumentException | SipException | ParseException e){
|
|
|
- logger.error("[下发audio拉流invite失败]",e);
|
|
|
+ callback.run(0, null, null);
|
|
|
+ } catch (InvalidArgumentException | SipException | ParseException e) {
|
|
|
+ logger.error("[下发audio拉流invite失败]", e);
|
|
|
logger.error("[zlm控制异常] 创建媒体流失败");
|
|
|
errJson.put("msg", "[zlm控制异常] 创建媒体流失败");
|
|
|
callback.run(2, errJson, null);
|
|
|
@@ -1061,5 +1279,24 @@ public class PlayServiceImpl implements IPlayService {
|
|
|
// eventResult.msg = "命令发送失败";
|
|
|
// errorEvent.response(eventResult);
|
|
|
}
|
|
|
- };
|
|
|
+ }
|
|
|
+
|
|
|
+ ;
|
|
|
+
|
|
|
+
|
|
|
+ private void addHookSubscribeForStreamChange(
|
|
|
+ RequestMessage requestMsg, MediaServerItem mediaServerItem,
|
|
|
+ Device device, String channelId, SSRCInfo ssrcInfo) {
|
|
|
+ String stream = ssrcInfo.getStream();
|
|
|
+ HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
|
|
|
+ subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
|
|
|
+ // zlm 事件触发. 流改变事件
|
|
|
+ logger.info("[ZLM HOOK] 收到ZLM流 编号信息: {}", json.toJSONString());
|
|
|
+ logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
|
|
|
+ // 处理结果
|
|
|
+ onPublishHandlerForPlay(requestMsg, mediaServerItemInUse, json, device.getDeviceId(), channelId, ssrcInfo.getSsrc());
|
|
|
+ // 请求截图
|
|
|
+ screenshot(mediaServerItemInUse, device.getDeviceId(), channelId, ssrcInfo);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|