1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; import org.java_websocket.drafts.Draft_6455; import org.java_websocket.handshake.ServerHandshake;
import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch;
@Slf4j public class DefaultSyncWebSocketClient<T> extends WebSocketClient {
private static final int connectTimeout = 2000; private static final Draft protocolDraft = new Draft_6455(); private static final Map<String, String> httpHeaders = new HashMap<>();
private final Object lock = new Object(); private Object result = null; private CountDownLatch downLatch = null;
public DefaultSyncWebSocketClient(String serverUri) throws URISyntaxException { this(new URI(serverUri)); }
public DefaultSyncWebSocketClient(String serverUri, Map<String, String> httpHeaders) throws URISyntaxException { this(new URI(serverUri), httpHeaders); }
public DefaultSyncWebSocketClient(URI serverUri) { this(serverUri, protocolDraft); }
public DefaultSyncWebSocketClient(URI serverUri, Draft protocolDraft) { this(serverUri, protocolDraft, httpHeaders); }
public DefaultSyncWebSocketClient(URI serverUri, Map<String, String> httpHeaders) { this(serverUri, protocolDraft, httpHeaders); }
public DefaultSyncWebSocketClient(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders) { this(serverUri, protocolDraft, httpHeaders, connectTimeout); }
public DefaultSyncWebSocketClient(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders, int connectTimeout) { super(serverUri, protocolDraft, httpHeaders, connectTimeout); log.debug("构建 DefaultWebSocketClient" + "\n\tURI: {}" + "\n\tDraft: {}" + "\n\tHeaders: {}" + "\n\ttimeOut: {}", serverUri, protocolDraft, httpHeaders, connectTimeout); }
@SuppressWarnings("unchecked") public T sendExt(Object obj) throws RuntimeException { log.debug("sendExt…………"); synchronized (lock){ try { downLatch = new CountDownLatch(1); if (obj instanceof String) { send((String) obj); }else if (obj instanceof byte[]){ send((byte[]) obj); }else{ throw new RuntimeException("不支持的参数类型"); } downLatch.await(); return (T) result; } catch (InterruptedException e) { log.error("连接异常中断 {}", e.getMessage()); throw new RuntimeException("连接异常中断"); } finally { this.close(); } } }
@Override public void onOpen(ServerHandshake handshakedata) { log.debug("onOpen…………"); }
@Override public void onMessage(String message) { log.debug("onMessage…………"); result = message; if(downLatch!=null){ downLatch.countDown(); } }
@Override public void onMessage(ByteBuffer bytes) { log.debug("onMessage-bytes…………"); result = bytes; if(downLatch!=null){ downLatch.countDown(); } }
@Override public void onClose(int code, String reason, boolean remote) { log.debug("onClose…………"); }
@Override public void onError(Exception ex) { log.debug("onError………… {}", ex.getMessage()); } }
|