Explorar o código

1. 暂存,完成invite信息交互

kindring %!s(int64=2) %!d(string=hai) anos
pai
achega
60b8311966

+ 97 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/GBEventSubscribe.java

@@ -22,9 +22,14 @@ public class GBEventSubscribe {
     public interface Event {
         void response(int code, JSONObject response);
     }
+    public interface InviteEvent {
+        void response(int code, JSONObject response,ServerTransaction serverTransaction);
+    }
 
     private Map<String, Map<IGBHookSubscribe, GBEventSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
 
+    private Map<String, Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent>> allInviteSubscribes = new ConcurrentHashMap<>();
+
     public void addSubscribe(IGBHookSubscribe hookSubscribe, GBEventSubscribe.Event event) {
         if (hookSubscribe.getExpires() == null) {
             // 默认5分钟过期
@@ -34,9 +39,18 @@ public class GBEventSubscribe {
         allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
     }
 
-    public GBEventSubscribe.Event sendStreamNotify(String type, JSONObject hookResponse, ServerTransaction serverTransaction){
-        GBEventSubscribe.Event event = null;
-        Map<IGBHookSubscribe, GBEventSubscribe.Event> eventMap = allSubscribes.get(type);
+    public void addInviteSubscribe(IGBHookSubscribe hookSubscribe, GBEventSubscribe.InviteEvent event) {
+        if (hookSubscribe.getExpires() == null) {
+            // 默认5分钟过期
+            Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
+            hookSubscribe.setExpires(expiresInstant);
+        }
+        allInviteSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
+    }
+
+    public GBEventSubscribe.InviteEvent sendStreamNotify(String type, JSONObject hookResponse){
+        GBEventSubscribe.InviteEvent event = null;
+        Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent> eventMap = allInviteSubscribes.get(type);
         if (eventMap == null) {
             logger.warn("[发送国标通知失败] {}",type);
             return null;
@@ -129,6 +143,47 @@ public class GBEventSubscribe {
         }
     }
 
+
+    public void removeInviteSubscribe(IGBHookSubscribe hookSubscribe) {
+        Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent> eventMap = allInviteSubscribes.get(hookSubscribe.getHookType());
+        if (eventMap == null) {
+            return;
+        }
+        Set<Map.Entry<IGBHookSubscribe, GBEventSubscribe.InviteEvent>> entries = eventMap.entrySet();
+
+        if (entries.size() > 0) {
+            List<Map.Entry<IGBHookSubscribe, GBEventSubscribe.InviteEvent>> entriesToRemove = new ArrayList<>();
+            for (Map.Entry<IGBHookSubscribe, GBEventSubscribe.InviteEvent> entry : entries) {
+                JSONObject content = entry.getKey().getContent();
+                if (content == null || content.size() == 0) {
+                    entriesToRemove.add(entry);
+                    continue;
+                }
+                Boolean result = null;
+                for (String s : content.keySet()) {
+                    if (result == null) {
+                        result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
+                    } else {
+                        if (content.getString(s) == null) {
+                            continue;
+                        }
+                        result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
+                    }
+                }
+                if (result) {
+                    entriesToRemove.add(entry);
+                }
+            }
+
+            if (!CollectionUtils.isEmpty(entriesToRemove)) {
+                for (Map.Entry<IGBHookSubscribe, GBEventSubscribe.InviteEvent> entry : entriesToRemove) {
+                    entries.remove(entry);
+                }
+            }
+
+        }
+    }
+
     public List<GBEventSubscribe.Event> getSubscribes(HookType type) {
         Map<IGBHookSubscribe, GBEventSubscribe.Event> eventMap = allSubscribes.get(type);
         if (eventMap == null) {
@@ -141,6 +196,18 @@ public class GBEventSubscribe {
         return result;
     }
 
+    public List<GBEventSubscribe.InviteEvent> getInviteSubscribes(HookType type) {
+        Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent> eventMap = allInviteSubscribes.get(type);
+        if (eventMap == null) {
+            return null;
+        }
+        List<GBEventSubscribe.InviteEvent> result = new ArrayList<>();
+        for (IGBHookSubscribe key : eventMap.keySet()) {
+            result.add(eventMap.get(key));
+        }
+        return result;
+    }
+
 
     public List<IGBHookSubscribe> getAll(){
         ArrayList<IGBHookSubscribe> result = new ArrayList<>();
@@ -151,6 +218,15 @@ public class GBEventSubscribe {
         return result;
     }
 
+    public List<IGBHookSubscribe> getInviteAll(){
+        ArrayList<IGBHookSubscribe> result = new ArrayList<>();
+        Collection<Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent>> values = allInviteSubscribes.values();
+        for (Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent> value : values) {
+            result.addAll(value.keySet());
+        }
+        return result;
+    }
+
     @Scheduled(cron="0 0/5 * * * ?")   //每5分钟执行一次
     public void execute(){
         Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
@@ -169,5 +245,23 @@ public class GBEventSubscribe {
         }
     }
 
+    @Scheduled(cron="0 0/5 * * * ?")   //每5分钟执行一次
+    public void executeInviteSubscribes(){
+        Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
+        int total = 0;
+        for (String hookType : allInviteSubscribes.keySet()) {
+            Map<IGBHookSubscribe, GBEventSubscribe.InviteEvent> hookSubscribeEventMap = allInviteSubscribes.get(hookType);
+            if (hookSubscribeEventMap.size() > 0) {
+                for (IGBHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
+                    if (hookSubscribe.getExpires().isBefore(instant)) {
+                        // 过期的
+                        hookSubscribeEventMap.remove(hookSubscribe);
+                        total ++;
+                    }
+                }
+            }
+        }
+    }
+
 
 }

+ 46 - 23
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.HfyAiInfo;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -12,9 +13,11 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.TestInviteRequestProcessor;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -86,6 +89,13 @@ public class SIPCommander implements ISIPCommander {
     @Autowired
     private IMediaServerService mediaServerService;
 
+    @Autowired
+    private DynamicTask dynamicTask;
+
+    @Autowired
+    private TestInviteRequestProcessor testInviteRequestProcessor;
+
+
 
     /**
      * 云台方向放控制,使用配置文件中的默认镜头移动速度
@@ -511,14 +521,8 @@ public class SIPCommander implements ISIPCommander {
     }
 
 
-    public void sendBoradcastInviteCmd(ServerTransaction serverTransaction,MediaServerItem mediaServerItem,SSRCInfo ssrcInfo,Device device,
-                                       String startTime, String endTime,
-                                       InviteStreamCallback inviteStreamCallback,
-                                       SipSubscribe.Event okEvent,
-                                       SipSubscribe.Event errorEvent)throws
-            InvalidArgumentException, SipException, ParseException{
+    public String createBroadcastInviteSdp(MediaServerItem mediaServerItem,SSRCInfo ssrcInfo){
         logger.info("{} 语音对讲拉语音流的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
-        int testPort = 31234;
         // 创建sdp
         StringBuffer content = new StringBuffer(200);
         content.append("v=0\r\n");
@@ -532,8 +536,19 @@ public class SIPCommander implements ISIPCommander {
         content.append("a=setup:passive\r\n");
         content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
         content.append("f=v/////a/1/8/1\r\n");//f 参数?
+//        String _ssrc = ssrcInfo.getSsrc();
+        return content.toString();
+    }
+    public void sendBoradcastInviteCmd(ServerTransaction serverTransaction,MediaServerItem mediaServerItem,SSRCInfo ssrcInfo,Device device,
+                                       String startTime, String endTime,
+                                       InviteStreamCallback inviteStreamCallback,
+                                       SipSubscribe.Event okEvent,
+                                       SipSubscribe.Event errorEvent)throws
+            InvalidArgumentException, SipException, ParseException{
+        String sdpContent = createBroadcastInviteSdp(mediaServerItem,ssrcInfo);
         String _ssrc = ssrcInfo.getSsrc();
         // 使用 返回sdp ack 数据给设备
+        logger.info("返回给设备的 audio invite sdp部分为{}",sdpContent);
         try {
             CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                     : udpSipProvider.getNewCallId();
@@ -548,11 +563,17 @@ public class SIPCommander implements ISIPCommander {
 //                    logger.error("[命令发送失败] 语音广播 发送BYE: {}", e.getMessage());
 //                }
             }, 60 * 1000);
-            responseSdpACK(serverTransaction,
-                    content.toString(),
-                    device.getDeviceId(),
-                    device.getHostAddress(),
-                    device.getPort()
+//            testInviteRequestProcessor.responseBroadcastSdpACK(serverTransaction,
+//                    sdpContent,
+//                    device.getDeviceId(),
+//                    device.getHostAddress(),
+//                    device.getPort()
+//            );
+            testInviteRequestProcessor.responseBroadcastSdpACK(serverTransaction,
+                    sdpContent,
+                    sipConfig.getId(),
+                    mediaServerItem.getSdpIp(),
+                    sipConfig.getPort()
             );
         }catch(SipException e){
             logger.warn("下发audio invite 失败");
@@ -560,21 +581,23 @@ public class SIPCommander implements ISIPCommander {
 
 
 
-        logger.info("下发audio invite 为{}",content.toString());
         // todo 订阅audio推流流变化
-        Request request = headerProvider.createPlaybackInviteRequest(device,null, content.toString(), null, SipUtils.getNewFromTag(), null, callIdHeader, ssrcInfo.getSsrc());
-        transmitRequest(device.getTransport(), request, (e)->{
-            logger.warn("下发audio invite 失败");
-        }, event -> {
-            ResponseEvent responseEvent = (ResponseEvent) event.event;
-            SIPResponse response = (SIPResponse) responseEvent.getResponse();
-            streamSession.put(device.getDeviceId(), null, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback);
-            okEvent.response(event);
-        });
+//        Request request = headerProvider.createPlaybackInviteRequest(device,null, content.toString(), null, SipUtils.getNewFromTag(), null, callIdHeader, ssrcInfo.getSsrc());
+//        transmitRequest(device.getTransport(), request, (e)->{
+//            logger.warn("下发audio invite 失败");
+//        }, event -> {
+//            ResponseEvent responseEvent = (ResponseEvent) event.event;
+//            SIPResponse response = (SIPResponse) responseEvent.getResponse();
+//            streamSession.put(device.getDeviceId(), null, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback);
+//            okEvent.response(event);
+//        });
 //        if (inviteStreamCallback != null) {
 //            inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
 //        }
     }
+
+
+
     /**
      * 请求历史媒体下载
      *
@@ -737,7 +760,7 @@ public class SIPCommander implements ISIPCommander {
         broadcastXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
 
 //        broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
-        broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
+        broadcastXml.append("<SourceID>" + device.getDeviceId() + "</SourceID>\r\n");
 
         broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
         broadcastXml.append("</Notify>\r\n");

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -786,6 +786,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         return null;
     }
 
+
     public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
 
         // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
@@ -857,9 +858,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             subscribeKey.put("port", port);
             subscribeKey.put("ssrc", ssrc);
 
-            GBEventSubscribe.Event subscribe = GBHookSubscribe.sendNotify(GB_Event.HOOK_BROADCAST_INVITE,subscribeKey);
+            GBEventSubscribe.InviteEvent subscribe = GBHookSubscribe.sendStreamNotify(GB_Event.HOOK_BROADCAST_INVITE,subscribeKey);
             if (subscribe != null ) {
-                subscribe.response(0, subscribeKey);
+                subscribe.response(0, subscribeKey,serverTransaction);
             }
         } else {
             logger.warn("来自无效设备/平台的请求");

+ 22 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/TestInviteRequestProcessor.java

@@ -0,0 +1,22 @@
+package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
+
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
+import gov.nist.javax.sip.message.SIPResponse;
+import org.springframework.stereotype.Component;
+
+import javax.sip.InvalidArgumentException;
+import javax.sip.ServerTransaction;
+import javax.sip.SipException;
+import java.text.ParseException;
+
+@Component
+public class TestInviteRequestProcessor extends SIPRequestProcessorParent {
+    public SIPResponse responseBroadcastSdpACK(ServerTransaction serverTransaction, String sdp, String gbId, String addr, int port)  throws SipException, InvalidArgumentException, ParseException {
+        return responseSdpACK(serverTransaction,
+                sdp,
+                gbId,
+                addr,
+                port
+        );
+    }
+}

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -314,8 +314,8 @@ public class ZLMRTPServerFactory {
 //        param.put("from_mp4", 1);
         param.put("is_udp", 0);
         // start send rtp
-//        param.put("dst_url", dst_url);
-//        param.put("dst_port", dst_port);
+        param.put("dst_url", dst_url);
+        param.put("dst_port", dst_port);
         param.put("use_ps", 0);
         param.put("pt", 8);
         param.put("close_delay_ms", 15000);

+ 11 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -8,7 +8,9 @@ import java.util.*;
 
 import javax.sip.InvalidArgumentException;
 import javax.sip.ResponseEvent;
+import javax.sip.ServerTransaction;
 import javax.sip.SipException;
+import javax.sip.header.CallIdHeader;
 
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
@@ -24,6 +26,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import gov.nist.javax.sdp.TimeDescriptionImpl;
 import gov.nist.javax.sdp.fields.TimeField;
+import gov.nist.javax.sip.SipProviderImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -119,6 +122,7 @@ public class PlayServiceImpl implements IPlayService {
 
 
 
+
     @Override
     public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,int isUsePs,
                            ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
@@ -511,7 +515,7 @@ public class PlayServiceImpl implements IPlayService {
 
 
         // 注册subScript事件
-        GBHookSubscribe.addSubscribe(broadcastForInviteHook,(int code,JSONObject json)->{
+        GBHookSubscribe.addInviteSubscribe(broadcastForInviteHook,(int code, JSONObject json, ServerTransaction serverTransaction)->{
             logger.info("[语音广播] 接收到设备invite信息___订阅事件触发 JSONDATA: {}",json.toJSONString());
             String streamId = null;
             String ssrcStr = json.getString("ssrc");
@@ -537,7 +541,11 @@ public class PlayServiceImpl implements IPlayService {
 
             // TODO: 2023/3/7 开始下发invite信息给设备
             try {
-                cmder.sendBoradcastInviteCmd(mediaServerItem, ssrcInfo, device, null, null,
+//                String sdpContent = cmder.createBroadcastInviteSdp(mediaServerItem, ssrcInfo);
+//                String _ssrc = ssrcInfo.getSsrc();
+
+
+                cmder.sendBoradcastInviteCmd(serverTransaction,mediaServerItem, ssrcInfo, device, null, null,
                 null,
                 (_event) -> {
                 },
@@ -557,7 +565,7 @@ public class PlayServiceImpl implements IPlayService {
 
                 msg.setData(wvpResult);
                 resultHolder.invokeAllResult(msg);
-            }catch(InvalidArgumentException | SipException | ParseException e){
+            }catch(InvalidArgumentException |  SipException | ParseException e){
                 logger.error("[下发audio拉流invite失败]",e);
 //                    SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null));
 //                    eventResult.msg = "命令发送失败";

+ 1 - 1
web_src/config/index.js

@@ -3,7 +3,7 @@
 // see http://vuejs-templates.github.io/webpack for documentation.
 
 const path = require('path')
-const baseUrl = "https://192.168.1.26:29001"
+const baseUrl = "https://192.168.31.250:29001"
 const ZLMServer = "https://kindring.cn:29010"
 module.exports = {
   dev: {