基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

在阅读本文前需要对socket以及自定义协议有一个基本的了解,可以先查看上一篇文章《基于Java Socket的自定义协议,实现Android与服务器的长连接(一)》学习相关的基础知识点。

wkiol1hflu6dkj_3aao1zsneebq652-jpg-wh_651x-s_3817930003

一、协议定义

上一篇文章中,我们对socket编程和自定义协议做了一个简单的了解,本文将在此基础上加以深入,来实现Android和服务器之间的长连接,现定义协议如下:

  • 数据类协议(Data)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,0表示数据)
    • 业务类型(pattion,8bit,0表示push,其他暂未定)
    • 数据格式(dtype,8bit,0表示json,其他暂未定)
    • 消息id(msgId,32bit)
    • 正文数据(data)
  • 数据ack类协议(DataAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,1表示数据ack)
    • ack消息id(ackMsgId,32bit)
    • 预留信息(unused)
  • 心跳类协议(ping)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,2表示心跳)
    • 心跳id(pingId,32bit,client上报取奇数,即1,3,5…,server下发取偶数,即0,2,4…)
    • 预留信息(unused)
  • 心跳ack类协议(pingAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,3表示心跳ack)
    • ack心跳id(pingId,32bit,client上报取奇数,即1,3,5…,server下发取偶数,即0,2,4…)
    • 预留信息(unused)

二、协议实现

从上述的协议定义中,我们可以看出,四种协议有共同的3个要素,分别是:长度、版本号、数据类型,那么我们可以先抽象出一个基本的协议,如下:

1. BasicProtocol

  1. import android.util.Log;
  2. import com.shandiangou.sdgprotocol.lib.Config;
  3. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  4. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  5. import java.io.ByteArrayOutputStream;
  6. /**
  7.  * Created by meishan on 16/12/1.
  8.  * <p>
  9.  * 协议类型: 0表示数据,1表示数据Ack,2表示ping,3表示pingAck
  10.  */
  11. public abstract class BasicProtocol {
  12.     // 长度均以字节(byte)为单位
  13.     public static final int LENGTH_LEN = 4;       //记录整条数据长度数值的长度
  14.     protected static final int VER_LEN = 1;       //协议的版本长度(其中前3位作为预留位,后5位作为版本号)
  15.     protected static final int TYPE_LEN = 1;      //协议的数据类型长度
  16.     private int reserved = 0;                     //预留信息
  17.     private int version = Config.VERSION;         //版本号
  18.     /**
  19.      * 获取整条数据长度
  20.      * 单位:字节(byte)
  21.      *
  22.      * @return
  23.      */
  24.     protected int getLength() {
  25.         return LENGTH_LEN + VER_LEN + TYPE_LEN;
  26.     }
  27.     public int getReserved() {
  28.         return reserved;
  29.     }
  30.     public void setReserved(int reserved) {
  31.         this.reserved = reserved;
  32.     }
  33.     public int getVersion() {
  34.         return version;
  35.     }
  36.     public void setVersion(int version) {
  37.         this.version = version;
  38.     }
  39.     /**
  40.      * 获取协议类型,由子类实现
  41.      *
  42.      * @return
  43.      */
  44.     public abstract int getProtocolType();
  45.     /**
  46.      * 由预留值和版本号计算完整版本号的byte[]值
  47.      *
  48.      * @return
  49.      */
  50.     private int getVer(byte r, byte v, int vLen) {
  51.         int num = 0;
  52.         int rLen = 8 – vLen;
  53.         for (int i = 0; i < rLen; i++) {
  54.             num += (((r >> (rLen – 1 – i)) & 0x1) << (7 – i));
  55.         }
  56.         return num + v;
  57.     }
  58.     /**
  59.      * 拼接发送数据,此处拼接了协议版本、协议类型和数据长度,具体内容子类中再拼接
  60.      * 按顺序拼接
  61.      *
  62.      * @return
  63.      */
  64.     public byte[] genContentData() {
  65.         byte[] length = SocketUtil.int2ByteArrays(getLength());
  66.         byte reserved = (byte) getReserved();
  67.         byte version = (byte) getVersion();
  68.         byte[] ver = {(byte) getVer(reserved, version, 5)};
  69.         byte[] type = {(byte) getProtocolType()};
  70.         ByteArrayOutputStream baos = new ByteArrayOutputStream(LENGTH_LEN + VER_LEN + TYPE_LEN);
  71.         baos.write(length, 0, LENGTH_LEN);
  72.         baos.write(ver, 0, VER_LEN);
  73.         baos.write(type, 0, TYPE_LEN);
  74.         return baos.toByteArray();
  75.     }
  76.     /**
  77.      * 解析出整条数据长度
  78.      *
  79.      * @param data
  80.      * @return
  81.      */
  82.     protected int parseLength(byte[] data) {
  83.         return SocketUtil.byteArrayToInt(data, 0, LENGTH_LEN);
  84.     }
  85.     /**
  86.      * 解析出预留位
  87.      *
  88.      * @param data
  89.      * @return
  90.      */
  91.     protected int parseReserved(byte[] data) {
  92.         byte r = data[LENGTH_LEN];//前4个字节(0,1,2,3)为数据长度的int值,与版本号组成一个字节
  93.         return (r >> 5) & 0xFF;
  94.     }
  95.     /**
  96.      * 解析出版本号
  97.      *
  98.      * @param data
  99.      * @return
  100.      */
  101.     protected int parseVersion(byte[] data) {
  102.         byte v = data[LENGTH_LEN]; //与预留位组成一个字节
  103.         return ((v << 3) & 0xFF) >> 3;
  104.     }
  105.     /**
  106.      * 解析出协议类型
  107.      *
  108.      * @param data
  109.      * @return
  110.      */
  111.     public static int parseType(byte[] data) {
  112.         byte t = data[LENGTH_LEN + VER_LEN];//前4个字节(0,1,2,3)为数据长度的int值,以及ver占一个字节
  113.         return t & 0xFF;
  114.     }
  115.     /**
  116.      * 解析接收数据,此处解析了协议版本、协议类型和数据长度,具体内容子类中再解析
  117.      *
  118.      * @param data
  119.      * @return
  120.      * @throws ProtocolException 协议版本不一致,抛出异常
  121.      */
  122.     public int parseContentData(byte[] data) throws ProtocolException {
  123.         int reserved = parseReserved(data);
  124.         int version = parseVersion(data);
  125.         int protocolType = parseType(data);
  126.         if (version != getVersion()) {
  127.             throw new ProtocolException(“input version is error: “ + version);
  128.         }
  129.         return LENGTH_LEN + VER_LEN + TYPE_LEN;
  130.     }
  131.     @Override
  132.     public String toString() {
  133.         return “Version: “ + getVersion() + “, Type: “ + getProtocolType();
  134.     }
  135. }

上述涉及到的Config类和SocketUtil类如下:

  1. /**
  2.  * Created by meishan on 16/12/2.
  3.  */
  4. public class Config {
  5.     public static final int VERSION = 1;                 //协议版本号
  6.     public static final String ADDRESS = “10.17.64.237”; //服务器地址
  7.     public static final int PORT = 9013;                 //服务器端口号
  8. }
  1. import java.io.BufferedInputStream;
  2. import java.io.BufferedOutputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import java.nio.ByteBuffer;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10.  * Created by meishan on 16/12/1.
  11.  */
  12. public class SocketUtil {
  13.     private static Map<Integer, String> msgImp = new HashMap<>();
  14.     static {
  15.         msgImp.put(DataProtocol.PROTOCOL_TYPE, “com.shandiangou.sdgprotocol.lib.protocol.DataProtocol”);       //0
  16.         msgImp.put(DataAckProtocol.PROTOCOL_TYPE, “com.shandiangou.sdgprotocol.lib.protocol.DataAckProtocol”); //1
  17.         msgImp.put(PingProtocol.PROTOCOL_TYPE, “com.shandiangou.sdgprotocol.lib.protocol.PingProtocol”);       //2
  18.         msgImp.put(PingAckProtocol.PROTOCOL_TYPE, “com.shandiangou.sdgprotocol.lib.protocol.PingAckProtocol”); //3
  19.     }
  20.     /**
  21.      * 解析数据内容
  22.      *
  23.      * @param data
  24.      * @return
  25.      */
  26.     public static BasicProtocol parseContentMsg(byte[] data) {
  27.         int protocolType = BasicProtocol.parseType(data);
  28.         String className = msgImp.get(protocolType);
  29.         BasicProtocol basicProtocol;
  30.         try {
  31.             basicProtocol = (BasicProtocol) Class.forName(className).newInstance();
  32.             basicProtocol.parseContentData(data);
  33.         } catch (Exception e) {
  34.             basicProtocol = null;
  35.             e.printStackTrace();
  36.         }
  37.         return basicProtocol;
  38.     }
  39.     /**
  40.      * 读数据
  41.      *
  42.      * @param inputStream
  43.      * @return
  44.      * @throws SocketExceptions
  45.      */
  46.     public static BasicProtocol readFromStream(InputStream inputStream) {
  47.         BasicProtocol protocol;
  48.         BufferedInputStream bis;
  49.         //header中保存的是整个数据的长度值,4个字节表示。在下述write2Stream方法中,会先写入header
  50.         byte[] header = new byte[BasicProtocol.LENGTH_LEN];
  51.         try {
  52.             bis = new BufferedInputStream(inputStream);
  53.             int temp;
  54.             int len = 0;
  55.             while (len < header.length) {
  56.                 temp = bis.read(header, len, header.length – len);
  57.                 if (temp > 0) {
  58.                     len += temp;
  59.                 } else if (temp == -1) {
  60.                     bis.close();
  61.                     return null;
  62.                 }
  63.             }
  64.             len = 0;
  65.             int length = byteArrayToInt(header);//数据的长度值
  66.             byte[] content = new byte[length];
  67.             while (len < length) {
  68.                 temp = bis.read(content, len, length – len);
  69.                 if (temp > 0) {
  70.                     len += temp;
  71.                 }
  72.             }
  73.             protocol = parseContentMsg(content);
  74.         } catch (IOException e) {
  75.             e.printStackTrace();
  76.             return null;
  77.         }
  78.         return protocol;
  79.     }
  80.     /**
  81.      * 写数据
  82.      *
  83.      * @param protocol
  84.      * @param outputStream
  85.      */
  86.     public static void write2Stream(BasicProtocol protocol, OutputStream outputStream) {
  87.         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
  88.         byte[] buffData = protocol.genContentData();
  89.         byte[] header = int2ByteArrays(buffData.length);
  90.         try {
  91.             bufferedOutputStream.write(header);
  92.             bufferedOutputStream.write(buffData);
  93.             bufferedOutputStream.flush();
  94.         } catch (IOException e) {
  95.             e.printStackTrace();
  96.         }
  97.     }
  98.     /**
  99.      * 关闭输入流
  100.      *
  101.      * @param is
  102.      */
  103.     public static void closeInputStream(InputStream is) {
  104.         try {
  105.             if (is != null) {
  106.                 is.close();
  107.             }
  108.         } catch (IOException e) {
  109.             e.printStackTrace();
  110.         }
  111.     }
  112.     /**
  113.      * 关闭输出流
  114.      *
  115.      * @param os
  116.      */
  117.     public static void closeOutputStream(OutputStream os) {
  118.         try {
  119.             if (os != null) {
  120.                 os.close();
  121.             }
  122.         } catch (IOException e) {
  123.             e.printStackTrace();
  124.         }
  125.     }
  126.     public static byte[] int2ByteArrays(int i) {
  127.         byte[] result = new byte[4];
  128.         result[0] = (byte) ((i >> 24) & 0xFF);
  129.         result[1] = (byte) ((i >> 16) & 0xFF);
  130.         result[2] = (byte) ((i >> 8) & 0xFF);
  131.         result[3] = (byte) (i & 0xFF);
  132.         return result;
  133.     }
  134.     public static int byteArrayToInt(byte[] b) {
  135.         int intValue = 0;
  136.         for (int i = 0; i < b.length; i++) {
  137.             intValue += (b[i] & 0xFF) << (8 * (3 – i)); //int占4个字节(0,1,2,3)
  138.         }
  139.         return intValue;
  140.     }
  141.     public static int byteArrayToInt(byte[] b, int byteOffset, int byteCount) {
  142.         int intValue = 0;
  143.         for (int i = byteOffset; i < (byteOffset + byteCount); i++) {
  144.             intValue += (b[i] & 0xFF) << (8 * (3 – (i – byteOffset)));
  145.         }
  146.         return intValue;
  147.     }
  148.     public static int bytes2Int(byte[] b, int byteOffset) {
  149.         ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE);
  150.         byteBuffer.put(b, byteOffset, 4); //占4个字节
  151.         byteBuffer.flip();
  152.         return byteBuffer.getInt();
  153.     }
  154. }

接下来我们实现具体的协议。

2. DataProtocol

  1. import android.util.Log;
  2. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  3. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  4. import java.io.ByteArrayOutputStream;
  5. import java.io.Serializable;
  6. import java.io.UnsupportedEncodingException;
  7. /**
  8.  * Created by meishan on 16/12/1.
  9.  */
  10. public class DataProtocol extends BasicProtocol implements Serializable {
  11.     public static final int PROTOCOL_TYPE = 0;
  12.     private static final int PATTION_LEN = 1;
  13.     private static final int DTYPE_LEN = 1;
  14.     private static final int MSGID_LEN = 4;
  15.     private int pattion;
  16.     private int dtype;
  17.     private int msgId;
  18.     private String data;
  19.     @Override
  20.     public int getLength() {
  21.         return super.getLength() + PATTION_LEN + DTYPE_LEN + MSGID_LEN + data.getBytes().length;
  22.     }
  23.     @Override
  24.     public int getProtocolType() {
  25.         return PROTOCOL_TYPE;
  26.     }
  27.     public int getPattion() {
  28.         return pattion;
  29.     }
  30.     public void setPattion(int pattion) {
  31.         this.pattion = pattion;
  32.     }
  33.     public int getDtype() {
  34.         return dtype;
  35.     }
  36.     public void setDtype(int dtype) {
  37.         this.dtype = dtype;
  38.     }
  39.     public void setMsgId(int msgId) {
  40.         this.msgId = msgId;
  41.     }
  42.     public int getMsgId() {
  43.         return msgId;
  44.     }
  45.     public String getData() {
  46.         return data;
  47.     }
  48.     public void setData(String data) {
  49.         this.data = data;
  50.     }
  51.     /**
  52.      * 拼接发送数据
  53.      *
  54.      * @return
  55.      */
  56.     @Override
  57.     public byte[] genContentData() {
  58.         byte[] base = super.genContentData();
  59.         byte[] pattion = {(byte) this.pattion};
  60.         byte[] dtype = {(byte) this.dtype};
  61.         byte[] msgid = SocketUtil.int2ByteArrays(this.msgId);
  62.         byte[] data = this.data.getBytes();
  63.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  64.         baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id
  65.         baos.write(pattion, 0, PATTION_LEN);       //业务类型
  66.         baos.write(dtype, 0, DTYPE_LEN);           //业务数据格式
  67.         baos.write(msgid, 0, MSGID_LEN);           //消息id
  68.         baos.write(data, 0, data.length);          //业务数据
  69.         return baos.toByteArray();
  70.     }
  71.     /**
  72.      * 解析接收数据,按顺序解析
  73.      *
  74.      * @param data
  75.      * @return
  76.      * @throws ProtocolException
  77.      */
  78.     @Override
  79.     public int parseContentData(byte[] data) throws ProtocolException {
  80.         int pos = super.parseContentData(data);
  81.         //解析pattion
  82.         pattion = data[pos] & 0xFF;
  83.         pos += PATTION_LEN;
  84.         //解析dtype
  85.         dtype = data[pos] & 0xFF;
  86.         pos += DTYPE_LEN;
  87.         //解析msgId
  88.         msgId = SocketUtil.byteArrayToInt(data, pos, MSGID_LEN);
  89.         pos += MSGID_LEN;
  90.         //解析data
  91.         try {
  92.             this.data = new String(data, pos, data.length – pos, “utf-8”);
  93.         } catch (UnsupportedEncodingException e) {
  94.             e.printStackTrace();
  95.         }
  96.         return pos;
  97.     }
  98.     @Override
  99.     public String toString() {
  100.         return “data: “ + data;
  101.     }
  102. }

3. DataAckProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6.  * Created by meishan on 16/12/1.
  7.  */
  8. public class DataAckProtocol extends BasicProtocol {
  9.     public static final int PROTOCOL_TYPE = 1;
  10.     private static final int ACKMSGID_LEN = 4;
  11.     private int ackMsgId;
  12.     private String unused;
  13.     @Override
  14.     public int getLength() {
  15.         return super.getLength() + ACKMSGID_LEN + unused.getBytes().length;
  16.     }
  17.     @Override
  18.     public int getProtocolType() {
  19.         return PROTOCOL_TYPE;
  20.     }
  21.     public int getAckMsgId() {
  22.         return ackMsgId;
  23.     }
  24.     public void setAckMsgId(int ackMsgId) {
  25.         this.ackMsgId = ackMsgId;
  26.     }
  27.     public String getUnused() {
  28.         return unused;
  29.     }
  30.     public void setUnused(String unused) {
  31.         this.unused = unused;
  32.     }
  33.     /**
  34.      * 拼接发送数据
  35.      *
  36.      * @return
  37.      */
  38.     @Override
  39.     public byte[] genContentData() {
  40.         byte[] base = super.genContentData();
  41.         byte[] ackMsgId = SocketUtil.int2ByteArrays(this.ackMsgId);
  42.         byte[] unused = this.unused.getBytes();
  43.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44.         baos.write(base, 0, base.length);              //协议版本+数据类型+数据长度+消息id
  45.         baos.write(ackMsgId, 0, ACKMSGID_LEN);         //消息id
  46.         baos.write(unused, 0, unused.length);          //unused
  47.         return baos.toByteArray();
  48.     }
  49.     @Override
  50.     public int parseContentData(byte[] data) throws ProtocolException {
  51.         int pos = super.parseContentData(data);
  52.         //解析ackMsgId
  53.         ackMsgId = SocketUtil.byteArrayToInt(data, pos, ACKMSGID_LEN);
  54.         pos += ACKMSGID_LEN;
  55.         //解析unused
  56.         try {
  57.             unused = new String(data, pos, data.length – pos, “utf-8”);
  58.         } catch (UnsupportedEncodingException e) {
  59.             e.printStackTrace();
  60.         }
  61.         return pos;
  62.     }
  63. }

4. PingProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6.  * Created by meishan on 16/12/1.
  7.  */
  8. public class PingProtocol extends BasicProtocol {
  9.     public static final int PROTOCOL_TYPE = 2;
  10.     private static final int PINGID_LEN = 4;
  11.     private int pingId;
  12.     private String unused;
  13.     @Override
  14.     public int getLength() {
  15.         return super.getLength() + PINGID_LEN + unused.getBytes().length;
  16.     }
  17.     @Override
  18.     public int getProtocolType() {
  19.         return PROTOCOL_TYPE;
  20.     }
  21.     public int getPingId() {
  22.         return pingId;
  23.     }
  24.     public void setPingId(int pingId) {
  25.         this.pingId = pingId;
  26.     }
  27.     public String getUnused() {
  28.         return unused;
  29.     }
  30.     public void setUnused(String unused) {
  31.         this.unused = unused;
  32.     }
  33.     /**
  34.      * 拼接发送数据
  35.      *
  36.      * @return
  37.      */
  38.     @Override
  39.     public byte[] genContentData() {
  40.         byte[] base = super.genContentData();
  41.         byte[] pingId = SocketUtil.int2ByteArrays(this.pingId);
  42.         byte[] unused = this.unused.getBytes();
  43.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44.         baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id
  45.         baos.write(pingId, 0, PINGID_LEN);         //消息id
  46.         baos.write(unused, 0, unused.length);            //unused
  47.         return baos.toByteArray();
  48.     }
  49.     @Override
  50.     public int parseContentData(byte[] data) throws ProtocolException {
  51.         int pos = super.parseContentData(data);
  52.         //解析pingId
  53.         pingId = SocketUtil.byteArrayToInt(data, pos, PINGID_LEN);
  54.         pos += PINGID_LEN;
  55.         try {
  56.             unused = new String(data, pos, data.length – pos, “utf-8”);
  57.         } catch (UnsupportedEncodingException e) {
  58.             e.printStackTrace();
  59.         }
  60.         return pos;
  61.     }
  62. }

5. PingAckProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6.  * Created by meishan on 16/12/1.
  7.  */
  8. public class PingAckProtocol extends BasicProtocol {
  9.     public static final int PROTOCOL_TYPE = 3;
  10.     private static final int ACKPINGID_LEN = 4;
  11.     private int ackPingId;
  12.     private String unused;
  13.     @Override
  14.     public int getLength() {
  15.         return super.getLength() + ACKPINGID_LEN + unused.getBytes().length;
  16.     }
  17.     @Override
  18.     public int getProtocolType() {
  19.         return PROTOCOL_TYPE;
  20.     }
  21.     public int getAckPingId() {
  22.         return ackPingId;
  23.     }
  24.     public void setAckPingId(int ackPingId) {
  25.         this.ackPingId = ackPingId;
  26.     }
  27.     public String getUnused() {
  28.         return unused;
  29.     }
  30.     public void setUnused(String unused) {
  31.         this.unused = unused;
  32.     }
  33.     /**
  34.      * 拼接发送数据
  35.      *
  36.      * @return
  37.      */
  38.     @Override
  39.     public byte[] genContentData() {
  40.         byte[] base = super.genContentData();
  41.         byte[] ackPingId = SocketUtil.int2ByteArrays(this.ackPingId);
  42.         byte[] unused = this.unused.getBytes();
  43.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44.         baos.write(base, 0, base.length);                //协议版本+数据类型+数据长度+消息id
  45.         baos.write(ackPingId, 0, ACKPINGID_LEN);         //消息id
  46.         baos.write(unused, 0, unused.length);            //unused
  47.         return baos.toByteArray();
  48.     }
  49.     @Override
  50.     public int parseContentData(byte[] data) throws ProtocolException {
  51.         int pos = super.parseContentData(data);
  52.         //解析ackPingId
  53.         ackPingId = SocketUtil.byteArrayToInt(data, pos, ACKPINGID_LEN);
  54.         pos += ACKPINGID_LEN;
  55.         //解析unused
  56.         try {
  57.             unused = new String(data, pos, data.length – pos, “utf-8”);
  58.         } catch (UnsupportedEncodingException e) {
  59.             e.printStackTrace();
  60.         }
  61.         return pos;
  62.     }
  63. }

三、任务调度

上述已经给出了四种协议的实现,接下来我们将使用它们来实现app和服务端之间的通信,这里我们把数据的发送、接收和心跳分别用一个线程去实现,具体如下:

1. 客户端

  1. import android.os.Handler;
  2. import android.os.Looper;
  3. import android.os.Message;
  4. import android.util.Log;
  5. import com.shandiangou.sdgprotocol.lib.protocol.BasicProtocol;
  6. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  7. import com.shandiangou.sdgprotocol.lib.protocol.PingProtocol;
  8. import java.io.IOException;
  9. import java.io.InputStream;
  10. import java.io.OutputStream;
  11. import java.net.ConnectException;
  12. import java.net.Socket;
  13. import java.util.concurrent.ConcurrentLinkedQueue;
  14. import javax.net.SocketFactory;
  15. /**
  16.  * 写数据采用死循环,没有数据时wait,有新消息时notify
  17.  * <p>
  18.  * Created by meishan on 16/12/1.
  19.  */
  20. public class ClientRequestTask implements Runnable {
  21.     private static final int SUCCESS = 100;
  22.     private static final int FAILED = -1;
  23.     private boolean isLongConnection = true;
  24.     private Handler mHandler;
  25.     private SendTask mSendTask;
  26.     private ReciveTask mReciveTask;
  27.     private HeartBeatTask mHeartBeatTask;
  28.     private Socket mSocket;
  29.     private boolean isSocketAvailable;
  30.     private boolean closeSendTask;
  31.     protected volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
  32.     public ClientRequestTask(RequestCallBack requestCallBacks) {
  33.         mHandler = new MyHandler(requestCallBacks);
  34.     }
  35.     @Override
  36.     public void run() {
  37.         try {
  38.             try {
  39.                 mSocket = SocketFactory.getDefault().createSocket(Config.ADDRESS, Config.PORT);
  40. //                mSocket.setSoTimeout(10);
  41.             } catch (ConnectException e) {
  42.                 failedMessage(-1, “服务器连接异常,请检查网络”);
  43.                 return;
  44.             }
  45.             isSocketAvailable = true;
  46.             //开启接收线程
  47.             mReciveTask = new ReciveTask();
  48.             mReciveTask.inputStream = mSocket.getInputStream();
  49.             mReciveTask.start();
  50.             //开启发送线程
  51.             mSendTask = new SendTask();
  52.             mSendTask.outputStream = mSocket.getOutputStream();
  53.             mSendTask.start();
  54.             //开启心跳线程
  55.             if (isLongConnection) {
  56.                 mHeartBeatTask = new HeartBeatTask();
  57.                 mHeartBeatTask.outputStream = mSocket.getOutputStream();
  58.                 mHeartBeatTask.start();
  59.             }
  60.         } catch (IOException e) {
  61.             failedMessage(-1, “网络发生异常,请稍后重试”);
  62.             e.printStackTrace();
  63.         }
  64.     }
  65.     public void addRequest(DataProtocol data) {
  66.         dataQueue.add(data);
  67.         toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程
  68.     }
  69.     public synchronized void stop() {
  70.         //关闭接收线程
  71.         closeReciveTask();
  72.         //关闭发送线程
  73.         closeSendTask = true;
  74.         toNotifyAll(dataQueue);
  75.         //关闭心跳线程
  76.         closeHeartBeatTask();
  77.         //关闭socket
  78.         closeSocket();
  79.         //清除数据
  80.         clearData();
  81.         failedMessage(-1, “断开连接”);
  82.     }
  83.     /**
  84.      * 关闭接收线程
  85.      */
  86.     private void closeReciveTask() {
  87.         if (mReciveTask != null) {
  88.             mReciveTask.interrupt();
  89.             mReciveTask.isCancle = true;
  90.             if (mReciveTask.inputStream != null) {
  91.                 try {
  92.                     if (isSocketAvailable && !mSocket.isClosed() && mSocket.isConnected()) {
  93.                         mSocket.shutdownInput();//解决java.net.SocketException问题,需要先shutdownInput
  94.                     }
  95.                 } catch (IOException e) {
  96.                     e.printStackTrace();
  97.                 }
  98.                 SocketUtil.closeInputStream(mReciveTask.inputStream);
  99.                 mReciveTask.inputStream = null;
  100.             }
  101.             mReciveTask = null;
  102.         }
  103.     }
  104.     /**
  105.      * 关闭发送线程
  106.      */
  107.     private void closeSendTask() {
  108.         if (mSendTask != null) {
  109.             mSendTask.isCancle = true;
  110.             mSendTask.interrupt();
  111.             if (mSendTask.outputStream != null) {
  112.                 synchronized (mSendTask.outputStream) {//防止写数据时停止,写完再停
  113.                     SocketUtil.closeOutputStream(mSendTask.outputStream);
  114.                     mSendTask.outputStream = null;
  115.                 }
  116.             }
  117.             mSendTask = null;
  118.         }
  119.     }
  120.     /**
  121.      * 关闭心跳线程
  122.      */
  123.     private void closeHeartBeatTask() {
  124.         if (mHeartBeatTask != null) {
  125.             mHeartBeatTask.isCancle = true;
  126.             if (mHeartBeatTask.outputStream != null) {
  127.                 SocketUtil.closeOutputStream(mHeartBeatTask.outputStream);
  128.                 mHeartBeatTask.outputStream = null;
  129.             }
  130.             mHeartBeatTask = null;
  131.         }
  132.     }
  133.     /**
  134.      * 关闭socket
  135.      */
  136.     private void closeSocket() {
  137.         if (mSocket != null) {
  138.             try {
  139.                 mSocket.close();
  140.                 isSocketAvailable = false;
  141.             } catch (IOException e) {
  142.                 e.printStackTrace();
  143.             }
  144.         }
  145.     }
  146.     /**
  147.      * 清除数据
  148.      */
  149.     private void clearData() {
  150.         dataQueue.clear();
  151.         isLongConnection = false;
  152.     }
  153.     private void toWait(Object o) {
  154.         synchronized (o) {
  155.             try {
  156.                 o.wait();
  157.             } catch (InterruptedException e) {
  158.                 e.printStackTrace();
  159.             }
  160.         }
  161.     }
  162.     /**
  163.      * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
  164.      *
  165.      * @param o
  166.      */
  167.     protected void toNotifyAll(Object o) {
  168.         synchronized (o) {
  169.             o.notifyAll();
  170.         }
  171.     }
  172.     private void failedMessage(int code, String msg) {
  173.         Message message = mHandler.obtainMessage(FAILED);
  174.         message.what = FAILED;
  175.         message.arg1 = code;
  176.         message.obj = msg;
  177.         mHandler.sendMessage(message);
  178.     }
  179.     private void successMessage(BasicProtocol protocol) {
  180.         Message message = mHandler.obtainMessage(SUCCESS);
  181.         message.what = SUCCESS;
  182.         message.obj = protocol;
  183.         mHandler.sendMessage(message);
  184.     }
  185.     private boolean isConnected() {
  186.         if (mSocket.isClosed() || !mSocket.isConnected()) {
  187.             ClientRequestTask.this.stop();
  188.             return false;
  189.         }
  190.         return true;
  191.     }
  192.     /**
  193.      * 服务器返回处理,主线程运行
  194.      */
  195.     public class MyHandler extends Handler {
  196.         private RequestCallBack mRequestCallBack;
  197.         public MyHandler(RequestCallBack callBack) {
  198.             super(Looper.getMainLooper());
  199.             this.mRequestCallBack = callBack;
  200.         }
  201.         @Override
  202.         public void handleMessage(Message msg) {
  203.             super.handleMessage(msg);
  204.             switch (msg.what) {
  205.                 case SUCCESS:
  206.                     mRequestCallBack.onSuccess((BasicProtocol) msg.obj);
  207.                     break;
  208.                 case FAILED:
  209.                     mRequestCallBack.onFailed(msg.arg1, (String) msg.obj);
  210.                     break;
  211.                 default:
  212.                     break;
  213.             }
  214.         }
  215.     }
  216.     /**
  217.      * 数据接收线程
  218.      */
  219.     public class ReciveTask extends Thread {
  220.         private boolean isCancle = false;
  221.         private InputStream inputStream;
  222.         @Override
  223.         public void run() {
  224.             while (!isCancle) {
  225.                 if (!isConnected()) {
  226.                     break;
  227.                 }
  228.                 if (inputStream != null) {
  229.                     BasicProtocol reciverData = SocketUtil.readFromStream(inputStream);
  230.                     if (reciverData != null) {
  231.                         if (reciverData.getProtocolType() == 1 || reciverData.getProtocolType() == 3) {
  232.                             successMessage(reciverData);
  233.                         }
  234.                     } else {
  235.                         break;
  236.                     }
  237.                 }
  238.             }
  239.             SocketUtil.closeInputStream(inputStream);//循环结束则退出输入流
  240.         }
  241.     }
  242.     /**
  243.      * 数据发送线程
  244.      * 当没有发送数据时让线程等待
  245.      */
  246.     public class SendTask extends Thread {
  247.         private boolean isCancle = false;
  248.         private OutputStream outputStream;
  249.         @Override
  250.         public void run() {
  251.             while (!isCancle) {
  252.                 if (!isConnected()) {
  253.                     break;
  254.                 }
  255.                 BasicProtocol dataContent = dataQueue.poll();
  256.                 if (dataContent == null) {
  257.                     toWait(dataQueue);//没有发送数据则等待
  258.                     if (closeSendTask) {
  259.                         closeSendTask();//notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程
  260.                     }
  261.                 } else if (outputStream != null) {
  262.                     synchronized (outputStream) {
  263.                         SocketUtil.write2Stream(dataContent, outputStream);
  264.                     }
  265.                 }
  266.             }
  267.             SocketUtil.closeOutputStream(outputStream);//循环结束则退出输出流
  268.         }
  269.     }
  270.     /**
  271.      * 心跳实现,频率5秒
  272.      * Created by meishan on 16/12/1.
  273.      */
  274.     public class HeartBeatTask extends Thread {
  275.         private static final int REPEATTIME = 5000;
  276.         private boolean isCancle = false;
  277.         private OutputStream outputStream;
  278.         private int pingId;
  279.         @Override
  280.         public void run() {
  281.             pingId = 1;
  282.             while (!isCancle) {
  283.                 if (!isConnected()) {
  284.                     break;
  285.                 }
  286.                 try {
  287.                     mSocket.sendUrgentData(0xFF);
  288.                 } catch (IOException e) {
  289.                     isSocketAvailable = false;
  290.                     ClientRequestTask.this.stop();
  291.                     break;
  292.                 }
  293.                 if (outputStream != null) {
  294.                     PingProtocol pingProtocol = new PingProtocol();
  295.                     pingProtocol.setPingId(pingId);
  296.                     pingProtocol.setUnused(“ping…”);
  297.                     SocketUtil.write2Stream(pingProtocol, outputStream);
  298.                     pingId = pingId + 2;
  299.                 }
  300.                 try {
  301.                     Thread.sleep(REPEATTIME);
  302.                 } catch (InterruptedException e) {
  303.                     e.printStackTrace();
  304.                 }
  305.             }
  306.             SocketUtil.closeOutputStream(outputStream);
  307.         }
  308.     }
  309. }

其中涉及到的RequestCallBack接口如下:

  1. /**
  2.  * Created by meishan on 16/12/1.
  3.  */
  4. public interface RequestCallBack {
  5.     void onSuccess(BasicProtocol msg);
  6.     void onFailed(int errorCode, String msg);
  7. }

2. 服务端

  1. import java.io.DataInputStream;
  2. import java.io.DataOutputStream;
  3. import java.net.Socket;
  4. import java.util.Iterator;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.ConcurrentLinkedQueue;
  7. /**
  8.  * Created by meishan on 16/12/1.
  9.  */
  10. public class ServerResponseTask implements Runnable {
  11.     private ReciveTask reciveTask;
  12.     private SendTask sendTask;
  13.     private Socket socket;
  14.     private ResponseCallback tBack;
  15.     private volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
  16.     private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();
  17.     private String userIP;
  18.     public String getUserIP() {
  19.         return userIP;
  20.     }
  21.     public ServerResponseTask(Socket socket, ResponseCallback tBack) {
  22.         this.socket = socket;
  23.         this.tBack = tBack;
  24.         this.userIP = socket.getInetAddress().getHostAddress();
  25.         System.out.println(“用户IP地址:” + userIP);
  26.     }
  27.     @Override
  28.     public void run() {
  29.         try {
  30.             //开启接收线程
  31.             reciveTask = new ReciveTask();
  32.             reciveTask.inputStream = new DataInputStream(socket.getInputStream());
  33.             reciveTask.start();
  34.             //开启发送线程
  35.             sendTask = new SendTask();
  36.             sendTask.outputStream = new DataOutputStream(socket.getOutputStream());
  37.             sendTask.start();
  38.         } catch (Exception e) {
  39.             e.printStackTrace();
  40.         }
  41.     }
  42.     public void stop() {
  43.         if (reciveTask != null) {
  44.             reciveTask.isCancle = true;
  45.             reciveTask.interrupt();
  46.             if (reciveTask.inputStream != null) {
  47.                 SocketUtil.closeInputStream(reciveTask.inputStream);
  48.                 reciveTask.inputStream = null;
  49.             }
  50.             reciveTask = null;
  51.         }
  52.         if (sendTask != null) {
  53.             sendTask.isCancle = true;
  54.             sendTask.interrupt();
  55.             if (sendTask.outputStream != null) {
  56.                 synchronized (sendTask.outputStream) {//防止写数据时停止,写完再停
  57.                     sendTask.outputStream = null;
  58.                 }
  59.             }
  60.             sendTask = null;
  61.         }
  62.     }
  63.     public void addMessage(BasicProtocol data) {
  64.         if (!isConnected()) {
  65.             return;
  66.         }
  67.         dataQueue.offer(data);
  68.         toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程
  69.     }
  70.     public Socket getConnectdClient(String clientID) {
  71.         return onLineClient.get(clientID);
  72.     }
  73.     /**
  74.      * 打印已经链接的客户端
  75.      */
  76.     public static void printAllClient() {
  77.         if (onLineClient == null) {
  78.             return;
  79.         }
  80.         Iterator<String> inter = onLineClient.keySet().iterator();
  81.         while (inter.hasNext()) {
  82.             System.out.println(“client:” + inter.next());
  83.         }
  84.     }
  85.     public void toWaitAll(Object o) {
  86.         synchronized (o) {
  87.             try {
  88.                 o.wait();
  89.             } catch (InterruptedException e) {
  90.                 e.printStackTrace();
  91.             }
  92.         }
  93.     }
  94.     public void toNotifyAll(Object obj) {
  95.         synchronized (obj) {
  96.             obj.notifyAll();
  97.         }
  98.     }
  99.     private boolean isConnected() {
  100.         if (socket.isClosed() || !socket.isConnected()) {
  101.             onLineClient.remove(userIP);
  102.             ServerResponseTask.this.stop();
  103.             System.out.println(“socket closed…”);
  104.             return false;
  105.         }
  106.         return true;
  107.     }
  108.     public class ReciveTask extends Thread {
  109.         private DataInputStream inputStream;
  110.         private boolean isCancle;
  111.         @Override
  112.         public void run() {
  113.             while (!isCancle) {
  114.                 if (!isConnected()) {
  115.                     isCancle = true;
  116.                     break;
  117.                 }
  118.                 BasicProtocol clientData = SocketUtil.readFromStream(inputStream);
  119.                 if (clientData != null) {
  120.                     if (clientData.getProtocolType() == 0) {
  121.                         System.out.println(“dtype: “ + ((DataProtocol) clientData).getDtype() + “, pattion: “ + ((DataProtocol) clientData).getPattion() + “, msgId: “ + ((DataProtocol) clientData).getMsgId() + “, data: “ + ((DataProtocol) clientData).getData());
  122.                         DataAckProtocol dataAck = new DataAckProtocol();
  123.                         dataAck.setUnused(“收到消息:” + ((DataProtocol) clientData).getData());
  124.                         dataQueue.offer(dataAck);
  125.                         toNotifyAll(dataQueue); //唤醒发送线程
  126.                         tBack.targetIsOnline(userIP);
  127.                     } else if (clientData.getProtocolType() == 2) {
  128.                         System.out.println(“pingId: “ + ((PingProtocol) clientData).getPingId());
  129.                         PingAckProtocol pingAck = new PingAckProtocol();
  130.                         pingAck.setUnused(“收到心跳”);
  131.                         dataQueue.offer(pingAck);
  132.                         toNotifyAll(dataQueue); //唤醒发送线程
  133.                         tBack.targetIsOnline(userIP);
  134.                     }
  135.                 } else {
  136.                     System.out.println(“client is offline…”);
  137.                     break;
  138.                 }
  139.             }
  140.             SocketUtil.closeInputStream(inputStream);
  141.         }
  142.     }
  143.     public class SendTask extends Thread {
  144.         private DataOutputStream outputStream;
  145.         private boolean isCancle;
  146.         @Override
  147.         public void run() {
  148.             while (!isCancle) {
  149.                 if (!isConnected()) {
  150.                     isCancle = true;
  151.                     break;
  152.                 }
  153.                 BasicProtocol procotol = dataQueue.poll();
  154.                 if (procotol == null) {
  155.                     toWaitAll(dataQueue);
  156.                 } else if (outputStream != null) {
  157.                     synchronized (outputStream) {
  158.                         SocketUtil.write2Stream(procotol, outputStream);
  159.                     }
  160.                 }
  161.             }
  162.             SocketUtil.closeOutputStream(outputStream);
  163.         }
  164.     }

其中涉及到的ResponseCallback接口如下:

  1. /**
  2.  * Created by meishan on 16/12/1.
  3.  */
  4. public interface ResponseCallback {
  5.     void targetIsOffline(DataProtocol reciveMsg);
  6.     void targetIsOnline(String clientIp);
  7. }

上述代码中处理了几种情况下的异常,比如,建立连接后,服务端停止运行,此时客户端的输入流还在阻塞状态,怎么保证客户端不抛出异常,这些处理可以结合SocketUtil类来看。

四、调用封装

1. 客户端

import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;

  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  2. /**
  3.  * Created by meishan on 16/12/1.
  4.  */
  5. public class ConnectionClient {
  6.     private boolean isClosed;
  7.     private ClientRequestTask mClientRequestTask;
  8.     public ConnectionClient(RequestCallBack requestCallBack) {
  9.         mClientRequestTask = new ClientRequestTask(requestCallBack);
  10.         new Thread(mClientRequestTask).start();
  11.     }
  12.     public void addNewRequest(DataProtocol data) {
  13.         if (mClientRequestTask != null && !isClosed)
  14.             mClientRequestTask.addRequest(data);
  15.     }
  16.     public void closeConnect() {
  17.         isClosed = true;
  18.         mClientRequestTask.stop();
  19.     }
  20. }

2. 服务端

  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  2. import java.io.IOException;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. /**
  8.  * Created by meishan on 16/12/1.
  9.  */
  10. public class ConnectionServer {
  11.     private static boolean isStart = true;
  12.     private static ServerResponseTask serverResponseTask;
  13.     public ConnectionServer() {
  14.     }
  15.     public static void main(String[] args) {
  16.         ServerSocket serverSocket = null;
  17.         ExecutorService executorService = Executors.newCachedThreadPool();
  18.         try {
  19.             serverSocket = new ServerSocket(Config.PORT);
  20.             while (isStart) {
  21.                 Socket socket = serverSocket.accept();
  22.                 serverResponseTask = new ServerResponseTask(socket,
  23.                         new ResponseCallback() {
  24.                             @Override
  25.                             public void targetIsOffline(DataProtocol reciveMsg) {// 对方不在线
  26.                                 if (reciveMsg != null) {
  27.                                     System.out.println(reciveMsg.getData());
  28.                                 }
  29.                             }
  30.                             @Override
  31.                             public void targetIsOnline(String clientIp) {
  32.                                 System.out.println(clientIp + ” is onLine”);
  33.                                 System.out.println(“—————————————–“);
  34.                             }
  35.                         });
  36.                 if (socket.isConnected()) {
  37.                     executorService.execute(serverResponseTask);
  38.                 }
  39.             }
  40.             serverSocket.close();
  41.         } catch (IOException e) {
  42.             e.printStackTrace();
  43.         } finally {
  44.             if (serverSocket != null) {
  45.                 try {
  46.                     isStart = false;
  47.                     serverSocket.close();
  48.                     if (serverSocket != null)
  49.                         serverResponseTask.stop();
  50.                 } catch (IOException e) {
  51.                     e.printStackTrace();
  52.                 }
  53.             }
  54.         }
  55.     }
  56. }

总结

实现自定义协议的关键在于协议的拼装和解析,上述已给出了关键的代码,如果需要查看完整的代码以及demo,可以下载源码

注意:先运行服务端demo的main函数,再查看本机的ip地址,然后修改客户端(android)代码中Config.java里面的ip地址,当然,要确保android手机和服务端在同一个局域里面,最后再打开客户端。

未经允许不得转载:JX BLOG » 基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

赞 (0)

评论 0

评论前必须登录!

登陆 注册