android socket 推送服务版本解析
不点
阅读:644
2021-04-01 10:14:40
评论:0
客户端代码:
主功能界面;
package com.rf.pushclient;
import android.os.Bundle;
import android.app.Activity;
import android.content.Intent;
import android.view.Menu;
import android.view.View;
import android.view.View.OnClickListener;
import android.widget.Button;
public class MainActivity extends Activity {
private Button mConnect;
private Button mDisConnect;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mConnect=(Button) findViewById(R.id.connect);
mDisConnect=(Button) findViewById(R.id.disconnect);
mConnect.setOnClickListener(new OnClickListener(){
@Override
public void onClick(View v) {
// TODO Auto-generated method stub
startService(new Intent(MainActivity.this, PushService.class));
}
});
mDisConnect.setOnClickListener(new OnClickListener(){
@Override
public void onClick(View v) {
// TODO Auto-generated method stub
stopService(new Intent(MainActivity.this, PushService.class));
}
});
}
}
服务类:
package com.rf.pushclient;
import java.net.InetSocketAddress;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.Intent;
import android.os.Handler;
import android.os.IBinder;
import android.os.Message;
public class PushService extends Service {
private PushClient mClient;
private Handler mHandler;
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
//加载Handler
initHandler();
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
//启动连接通道线程
mClient = new PushClient(new InetSocketAddress("192.168.10.111", 10001),
mHandler);
//线程启动
mClient.start();
return super.onStartCommand(intent, flags, startId);
}
@Override
public void onDestroy() {
//线程销毁
if (mClient != null) {
mClient.disConnect();
}
super.onDestroy();
}
/**
* 初始化Handler
*/
private void initHandler() {
mHandler=new Handler(){
@Override
public void handleMessage(Message msg) {
// TODO Auto-generated method stub
switch(msg.what){
case 100:
String message=msg.obj.toString();
//状态栏提示相关信息
showNotification(message);
break;
default:
break;
}
}
};
}
/**
*
* 在状态栏显示通知
*/
@SuppressWarnings("deprecation")
private void showNotification(String msg) {
// 创建一个NotificationManager的引用
NotificationManager notificationManager = (NotificationManager) getSystemService(android.content.Context.NOTIFICATION_SERVICE);
// 定义Notification的各种属性
Notification notification = new Notification(R.drawable.ic_launcher,
msg, System.currentTimeMillis());
// FLAG_AUTO_CANCEL 该通知能被状态栏的清除按钮给清除掉
// FLAG_NO_CLEAR 该通知不能被状态栏的清除按钮给清除掉
// FLAG_ONGOING_EVENT 通知放置在正在运行
// FLAG_INSISTENT 是否一直进行,比如音乐一直播放,知道用户响应
notification.flags |= Notification.FLAG_AUTO_CANCEL; // 表明在点击了通知栏中的"清除通知"后,此通知不清除,经常与FLAG_ONGOING_EVENT一起使用
// DEFAULT_ALL 使用所有默认值,比如声音,震动,闪屏等等
// DEFAULT_LIGHTS 使用默认闪光提示
// DEFAULT_SOUND 使用默认提示声音
// DEFAULT_VIBRATE 使用默认手机震动,需加上<uses-permission
// android:name="android.permission.VIBRATE" />权限
notification.defaults = Notification.DEFAULT_SOUND;
// 设置通知的事件消息
CharSequence contentTitle = msg; // 通知栏标题
CharSequence contentText = msg; // 通知栏内容
Intent notificationIntent = new Intent(this, TestActivity.class); // 点击该通知后要跳转的Activity
PendingIntent contentItent = PendingIntent.getActivity(this, 0,
notificationIntent, 0);
notification.setLatestEventInfo(this, contentTitle, contentText,
contentItent);
// 把Notification传递给NotificationManager
notificationManager.notify(0, notification);
}
}
线程类:
/*
*
*/
package com.rf.pushclient;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import android.os.Handler;
import android.os.Message;
/**
*
* @author zengjiantao
* @date 2013-4-8
*/
public class PushClient extends Thread {
private static final int BUFFER_SIZE = 1024;
/**
* 远程地址
*/
private final InetSocketAddress mRemoteAddress;
/**
* 连接通道
*/
private SocketChannel mSocketChannel;
/**
* 接收缓冲区
*/
private final ByteBuffer mReceiveBuf;
/**
* 端口选择器
*/
private Selector mSelector;
/**
* 线程是否结束的标志
*/
private final AtomicBoolean mShutdown;
/**
* 消息处理
*/
private final Handler mHandler;
static {
java.lang.System.setProperty("java.net.preferIPv4Stack", "true");
java.lang.System.setProperty("java.net.preferIPv6Addresses", "false");
}
public PushClient(InetSocketAddress remoteAddress, Handler handler) {
mRemoteAddress = remoteAddress;
mHandler = handler;
// 初始化缓冲区
mReceiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
if (mSelector == null) {
// 创建新的Selector
try {
mSelector = Selector.open();
} catch (final IOException e) {
e.printStackTrace();
}
}
mShutdown = new AtomicBoolean(false);
}
/**
* 打开通道
*/
private void startup() {
try {
// 打开通道
mSocketChannel = SocketChannel.open();
// 绑定到本地端口
mSocketChannel.socket().setSoTimeout(30000);
mSocketChannel.configureBlocking(false);
if (mSocketChannel.connect(mRemoteAddress)) {
System.out.println("开始建立连接:" + mRemoteAddress);
}
mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT
| SelectionKey.OP_READ, this);
System.out.println("端口打开成功");
} catch (final IOException e1) {
e1.printStackTrace();
}
}
private void select() {
int nums = 0;
try {
if (mSelector == null) {
return;
}
nums = mSelector.select(1000);
} catch (final Exception e) {
e.printStackTrace();
}
// 如果select返回大于0,处理事件
if (nums > 0) {
Iterator<SelectionKey> iterator = mSelector.selectedKeys()
.iterator();
while (iterator.hasNext()) {
// 得到下一个Key
final SelectionKey key = iterator.next();
iterator.remove();
// 检查其是否还有效
if (!key.isValid()) {
continue;
}
// 处理事件
try {
if (key.isConnectable()) {
connect();
} else if (key.isReadable()) {
read(key);
}
} catch (final Exception e) {
e.printStackTrace();
key.cancel();
}
}
}
}
@Override
public void run() {
//执行第一个方法
startup();
// 启动主循环流程
while (!mShutdown.get()) {
try {
// do select
select();
try {
Thread.sleep(1000);
} catch (final Exception e) {
e.printStackTrace();
}
} catch (final Exception e) {
e.printStackTrace();
}
}
shutdown();
}
private void connect() throws IOException {
if (isConnected()) {
return;
}
// 完成SocketChannel的连接
mSocketChannel.finishConnect();
while (!mSocketChannel.isConnected()) {
try {
Thread.sleep(300);
} catch (final InterruptedException e) {
e.printStackTrace();
}
mSocketChannel.finishConnect();
}
}
public void disConnect() {
mShutdown.set(true);
}
private void shutdown() {
if (isConnected()) {
try {
mSocketChannel.close();
while (mSocketChannel.isOpen()) {
try {
Thread.sleep(300);
} catch (final InterruptedException e) {
e.printStackTrace();
}
mSocketChannel.close();
}
System.out.println("端口关闭成功");
} catch (final IOException e) {
System.err.println("端口关闭错误:");
e.printStackTrace();
} finally {
mSocketChannel = null;
}
} else {
System.out.println("通道为空或者没有连接");
}
// 关闭端口选择器
if (mSelector != null) {
try {
mSelector.close();
System.out.println("端口选择器关闭成功");
} catch (IOException e) {
e.printStackTrace();
} finally {
mSelector = null;
}
}
}
private void read(SelectionKey key) throws IOException {
// 接收消息
final byte[] msg = recieve();
if (msg != null) {
String tmp = new String(msg);
System.out.println("返回内容:");
System.out.println(tmp);
if (mHandler != null) {
Message message = mHandler.obtainMessage(100);
message.obj = tmp;
mHandler.sendMessage(message);
}
}
}
private byte[] recieve() throws IOException {
if (isConnected()) {
int len = 0;
int readBytes = 0;
synchronized (mReceiveBuf) {
mReceiveBuf.clear();
try {
while ((len = mSocketChannel.read(mReceiveBuf)) > 0) {
readBytes += len;
}
} finally {
mReceiveBuf.flip();
}
if (readBytes > 0) {
final byte[] tmp = new byte[readBytes];
mReceiveBuf.get(tmp);
return tmp;
} else {
System.out.println("接收到数据为空,重新启动连接");
return null;
}
}
} else {
System.out.println("端口没有连接");
}
return null;
}
private boolean isConnected() {
return mSocketChannel != null && mSocketChannel.isConnected();
}
}
服务端代码:
package com.rf.pushserver;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 消息推送服务器
*
* @author zengjiantao
* @date 2013-4-8
*/
public class PushServer extends Thread {
private static final int BUFFER_SIZE = 1024;
/**
* 服务器连接通道
*/
private ServerSocketChannel serverSocketChannel;
/**
* 发送缓冲区
*/
private final ByteBuffer sendBuf;
/**
* 端口选择器
*/
private Selector selector;
/**
* 服务器端口
*/
private final int mPort;
/**
* 线程是否结束的标志
*/
private final AtomicBoolean shutdown;
/**
* 发送消息的开关
*/
private final AtomicBoolean sendable;
/**
* 发送消息的内容
*/
private String sendMsg;
private final ExecutorService executorService;
public PushServer(int port) {
mPort = port;
// 初始化缓冲区
sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
if (selector == null) {
// 创建新的Selector
try {
selector = Selector.open();
} catch (final IOException e) {
e.printStackTrace();
}
}
startup();
executorService = Executors.newFixedThreadPool(10);
shutdown = new AtomicBoolean(false);
sendable = new AtomicBoolean(false);
}
private void startup() {
try {
// 打开通道
serverSocketChannel = ServerSocketChannel.open();
// 绑定到本地端口
serverSocketChannel.socket().setSoTimeout(30000);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(mPort));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器端口打开成功");
} catch (final IOException e1) {
e1.printStackTrace();
}
}
private void select() {
int nums = 0;
try {
if (selector == null) {
return;
}
nums = selector.select(1000L);
} catch (final Exception e) {
e.printStackTrace();
}
// 如果select返回大于0,处理事件
if (nums > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys()
.iterator();
while (iterator.hasNext()) {
// 得到下一个Key
final SelectionKey key = iterator.next();
iterator.remove();
// 检查其是否还有效
if (!key.isValid()) {
continue;
}
// 处理事件
if (key.isAcceptable()) {
executorService.execute(new Accepter(key));
// accept(key);
} else if (key.isWritable()) {
if (sendable.get()) {
executorService.execute(new Sender(key, sendMsg));
}
}
}
if (sendable.get()) {
System.out.println("结束推送消息了");
}
sendable.set(false);
}
}
/**
* 用于连接的Runnable
*
* @author zengjiantao
* @date 2013-4-11
*/
class Accepter implements Runnable {
private final SelectionKey key;
public Accepter(SelectionKey key) {
this.key = key;
}
public void run() {
accept(key);
}
}
/**
* 用于发送消息的Runnable
*
* @author zengjiantao
* @date 2013-4-11
*/
class Sender implements Runnable {
private final SelectionKey key;
private final String msg;
public Sender(SelectionKey key, String msg) {
this.key = key;
this.msg = msg;
}
public void run() {
send(key, msg);
}
}
/**
* 接收客户端
*
* @param key
* @throws IOException
*/
private void accept(SelectionKey key) {
// 打开通道
try {
SocketChannel socketChannel = ((ServerSocketChannel) key.channel())
.accept();
// 绑定到本地端口
socketChannel.socket().setSoTimeout(30000);
socketChannel.configureBlocking(false);
synchronized (selector) {
socketChannel.register(selector, SelectionKey.OP_WRITE, this);
}
System.out.println("端口打开成功");
} catch (IOException e) {
System.out.println("端口打开失败");
e.printStackTrace();
key.cancel();
}
}
@Override
public void run() {
// 启动主循环流程
while (!shutdown.get()) {
try {
select();
try {
Thread.sleep(1000L);
} catch (final Exception e) {
e.printStackTrace();
}
} catch (final Exception e) {
e.printStackTrace();
}
}
shutdown();
}
/**
* 打开发送消息的开关
*
* @param msg
*/
void send(final String msg) {
sendMsg = msg;
sendable.set(true);
System.out.println("开始推送消息了");
}
/**
* 向指定连接发送消息
*
* @param key
* @param msg
*/
private void send(final SelectionKey key, final String msg) {
try {
byte[] out = msg.getBytes();
if (out == null || out.length < 1) {
return;
}
synchronized (sendBuf) {
sendBuf.clear();
sendBuf.put(out);
sendBuf.flip();
}
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(sendBuf);
} catch (final IOException e) {
e.printStackTrace();
}
}
/**
* 断开连接
*/
public void disConnect() {
shutdown.set(true);
}
/**
* 关闭端口选择器
*/
private void shutdown() {
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
while (serverSocketChannel.isOpen()) {
try {
Thread.sleep(300L);
} catch (final InterruptedException e) {
e.printStackTrace();
}
serverSocketChannel.close();
}
System.out.println("端口关闭成功");
} catch (IOException e1) {
System.err.println("端口关闭错误:");
e1.printStackTrace();
} finally {
serverSocketChannel = null;
}
}
// 关闭端口选择器
if (selector != null) {
try {
selector.close();
System.out.println("端口选择器关闭成功");
} catch (IOException e) {
e.printStackTrace();
} finally {
selector = null;
}
}
}
public static void main(String[] args) {
try {
final PushServer server = new PushServer(10001);
server.start();
new Thread(new Runnable() {
public void run() {
try {
InputStreamReader input = new InputStreamReader(
System.in);
BufferedReader br = new BufferedReader(input);
String sendText = br.readLine();
server.send(sendText);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。