android socket 推送服务版本解析

不点 阅读:644 2021-04-01 10:14:40 评论:0

客户端代码:

 

主功能界面;

package com.rf.pushclient; 
 
import android.os.Bundle; 
import android.app.Activity; 
import android.content.Intent; 
import android.view.Menu; 
import android.view.View; 
import android.view.View.OnClickListener; 
import android.widget.Button; 
 
public class MainActivity extends Activity { 
	private Button mConnect; 
	private Button mDisConnect; 
	@Override 
	protected void onCreate(Bundle savedInstanceState) { 
		super.onCreate(savedInstanceState); 
		setContentView(R.layout.activity_main); 
		 
		mConnect=(Button) findViewById(R.id.connect); 
		mDisConnect=(Button) findViewById(R.id.disconnect); 
		mConnect.setOnClickListener(new OnClickListener(){ 
 
			@Override 
			public void onClick(View v) { 
				// TODO Auto-generated method stub 
				startService(new Intent(MainActivity.this, PushService.class)); 
			} 
			 
		}); 
		mDisConnect.setOnClickListener(new OnClickListener(){ 
 
			@Override 
			public void onClick(View v) { 
				// TODO Auto-generated method stub 
				stopService(new Intent(MainActivity.this, PushService.class)); 
			} 
			 
		}); 
		 
	} 
 
 
 
}


服务类:

package com.rf.pushclient; 
 
import java.net.InetSocketAddress; 
 
import android.app.Notification; 
import android.app.NotificationManager; 
import android.app.PendingIntent; 
import android.app.Service; 
import android.content.Intent; 
import android.os.Handler; 
import android.os.IBinder; 
import android.os.Message; 
 
public class PushService extends Service { 
	private PushClient mClient; 
 
	private Handler mHandler; 
 
	@Override 
	public IBinder onBind(Intent intent) { 
		return null; 
	} 
 
	@Override 
	public void onCreate() { 
		super.onCreate(); 
		//加载Handler 
		initHandler(); 
	} 
 
	@Override 
	public int onStartCommand(Intent intent, int flags, int startId) { 
		//启动连接通道线程 
		mClient = new PushClient(new InetSocketAddress("192.168.10.111", 10001), 
				mHandler); 
		//线程启动 
		mClient.start(); 
		return super.onStartCommand(intent, flags, startId); 
	} 
 
	@Override 
	public void onDestroy() { 
	   //线程销毁 
		if (mClient != null) { 
			mClient.disConnect(); 
		} 
		super.onDestroy(); 
	} 
 
	/** 
	 * 初始化Handler 
	 */ 
	private void initHandler() { 
		mHandler=new Handler(){ 
 
			@Override 
			public void handleMessage(Message msg) { 
				// TODO Auto-generated method stub 
				switch(msg.what){ 
				case 100: 
					String message=msg.obj.toString(); 
					//状态栏提示相关信息 
					showNotification(message); 
					break; 
				default: 
					break; 
				 
				} 
			} 
			 
		}; 
	} 
 
	/** 
	 *  
	 * 在状态栏显示通知 
	 */ 
 
	@SuppressWarnings("deprecation") 
	private void showNotification(String msg) { 
		// 创建一个NotificationManager的引用 
		NotificationManager notificationManager = (NotificationManager) getSystemService(android.content.Context.NOTIFICATION_SERVICE); 
 
		// 定义Notification的各种属性 
		Notification notification = new Notification(R.drawable.ic_launcher, 
				msg, System.currentTimeMillis()); 
 
		// FLAG_AUTO_CANCEL 该通知能被状态栏的清除按钮给清除掉 
		// FLAG_NO_CLEAR 该通知不能被状态栏的清除按钮给清除掉 
		// FLAG_ONGOING_EVENT 通知放置在正在运行 
		// FLAG_INSISTENT 是否一直进行,比如音乐一直播放,知道用户响应 
		notification.flags |= Notification.FLAG_AUTO_CANCEL; // 表明在点击了通知栏中的"清除通知"后,此通知不清除,经常与FLAG_ONGOING_EVENT一起使用 
 
		// DEFAULT_ALL 使用所有默认值,比如声音,震动,闪屏等等 
		// DEFAULT_LIGHTS 使用默认闪光提示 
		// DEFAULT_SOUND 使用默认提示声音 
		// DEFAULT_VIBRATE 使用默认手机震动,需加上<uses-permission 
		// android:name="android.permission.VIBRATE" />权限 
		notification.defaults = Notification.DEFAULT_SOUND; 
 
		// 设置通知的事件消息 
		CharSequence contentTitle = msg; // 通知栏标题 
		CharSequence contentText = msg; // 通知栏内容 
		Intent notificationIntent = new Intent(this, TestActivity.class); // 点击该通知后要跳转的Activity 
 
		PendingIntent contentItent = PendingIntent.getActivity(this, 0, 
				notificationIntent, 0); 
		notification.setLatestEventInfo(this, contentTitle, contentText, 
				contentItent); 
		// 把Notification传递给NotificationManager 
		notificationManager.notify(0, notification); 
	} 
 
} 


线程类:

/* 
 *  
 */ 
package com.rf.pushclient; 
 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
import java.util.concurrent.atomic.AtomicBoolean; 
import android.os.Handler; 
import android.os.Message; 
 
 
 
/** 
 *  
 * @author zengjiantao 
 * @date 2013-4-8 
 */ 
public class PushClient extends Thread { 
 
	private static final int BUFFER_SIZE = 1024; 
 
	/** 
	 * 远程地址 
	 */ 
	private final InetSocketAddress mRemoteAddress; 
 
	/** 
	 * 连接通道 
	 */ 
	private SocketChannel mSocketChannel; 
 
	/** 
	 * 接收缓冲区 
	 */ 
	private final ByteBuffer mReceiveBuf; 
 
	/** 
	 * 端口选择器 
	 */ 
	private Selector mSelector; 
 
	/** 
	 * 线程是否结束的标志 
	 */ 
	private final AtomicBoolean mShutdown; 
	 
	/** 
	 *  消息处理 
	 */ 
	private final Handler mHandler; 
 
	static { 
		java.lang.System.setProperty("java.net.preferIPv4Stack", "true"); 
		java.lang.System.setProperty("java.net.preferIPv6Addresses", "false"); 
	} 
 
	public PushClient(InetSocketAddress remoteAddress, Handler handler) { 
		mRemoteAddress = remoteAddress; 
		mHandler = handler; 
 
		// 初始化缓冲区 
		mReceiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE); 
		if (mSelector == null) { 
			// 创建新的Selector 
			try { 
				mSelector = Selector.open(); 
			} catch (final IOException e) { 
				e.printStackTrace(); 
			} 
		} 
		mShutdown = new AtomicBoolean(false); 
	} 
 
	/** 
	 * 打开通道 
	 */ 
	private void startup() { 
		try { 
			// 打开通道 
			mSocketChannel = SocketChannel.open(); 
			// 绑定到本地端口 
			mSocketChannel.socket().setSoTimeout(30000); 
			mSocketChannel.configureBlocking(false); 
			if (mSocketChannel.connect(mRemoteAddress)) { 
				System.out.println("开始建立连接:" + mRemoteAddress); 
			} 
			mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT 
					| SelectionKey.OP_READ, this); 
			System.out.println("端口打开成功"); 
 
		} catch (final IOException e1) { 
			e1.printStackTrace(); 
		} 
	} 
 
	private void select() { 
		int nums = 0; 
		try { 
			if (mSelector == null) { 
				return; 
			} 
			nums = mSelector.select(1000); 
		} catch (final Exception e) { 
			e.printStackTrace(); 
		} 
 
		// 如果select返回大于0,处理事件 
		if (nums > 0) { 
			Iterator<SelectionKey> iterator = mSelector.selectedKeys() 
					.iterator(); 
			while (iterator.hasNext()) { 
				// 得到下一个Key 
				final SelectionKey key = iterator.next(); 
				iterator.remove(); 
				// 检查其是否还有效 
				if (!key.isValid()) { 
					continue; 
				} 
				// 处理事件 
				try { 
					if (key.isConnectable()) { 
						connect(); 
					} else if (key.isReadable()) { 
						read(key); 
					} 
				} catch (final Exception e) { 
					e.printStackTrace(); 
					key.cancel(); 
				} 
			} 
		} 
	} 
 
	@Override 
	public void run() { 
		//执行第一个方法 
		startup(); 
		// 启动主循环流程 
		while (!mShutdown.get()) { 
			try { 
				// do select 
				select(); 
				try { 
					Thread.sleep(1000); 
				} catch (final Exception e) { 
					e.printStackTrace(); 
				} 
			} catch (final Exception e) { 
				e.printStackTrace(); 
			} 
		} 
		shutdown(); 
	} 
 
	private void connect() throws IOException { 
		if (isConnected()) { 
			return; 
		} 
		// 完成SocketChannel的连接 
		mSocketChannel.finishConnect(); 
		while (!mSocketChannel.isConnected()) { 
			try { 
				Thread.sleep(300); 
			} catch (final InterruptedException e) { 
				e.printStackTrace(); 
			} 
			mSocketChannel.finishConnect(); 
		} 
 
	} 
 
	public void disConnect() { 
		mShutdown.set(true); 
	} 
 
	private void shutdown() { 
		if (isConnected()) { 
			try { 
				mSocketChannel.close(); 
				while (mSocketChannel.isOpen()) { 
					try { 
						Thread.sleep(300); 
					} catch (final InterruptedException e) { 
						e.printStackTrace(); 
					} 
					mSocketChannel.close(); 
				} 
				System.out.println("端口关闭成功"); 
			} catch (final IOException e) { 
				System.err.println("端口关闭错误:"); 
				e.printStackTrace(); 
			} finally { 
				mSocketChannel = null; 
			} 
		} else { 
			System.out.println("通道为空或者没有连接"); 
		} 
		// 关闭端口选择器 
		if (mSelector != null) { 
			try { 
				mSelector.close(); 
				System.out.println("端口选择器关闭成功"); 
			} catch (IOException e) { 
				e.printStackTrace(); 
			} finally { 
				mSelector = null; 
			} 
		} 
	} 
 
	private void read(SelectionKey key) throws IOException { 
		// 接收消息 
		final byte[] msg = recieve(); 
		if (msg != null) { 
			String tmp = new String(msg); 
			System.out.println("返回内容:"); 
			System.out.println(tmp); 
			if (mHandler != null) { 
				Message message = mHandler.obtainMessage(100); 
				message.obj = tmp; 
				mHandler.sendMessage(message); 
			} 
		} 
	} 
 
	private byte[] recieve() throws IOException { 
		if (isConnected()) { 
			int len = 0; 
			int readBytes = 0; 
 
			synchronized (mReceiveBuf) { 
				mReceiveBuf.clear(); 
				try { 
					while ((len = mSocketChannel.read(mReceiveBuf)) > 0) { 
						readBytes += len; 
					} 
				} finally { 
					mReceiveBuf.flip(); 
				} 
				if (readBytes > 0) { 
					final byte[] tmp = new byte[readBytes]; 
					mReceiveBuf.get(tmp); 
					return tmp; 
				} else { 
					System.out.println("接收到数据为空,重新启动连接"); 
					return null; 
				} 
			} 
		} else { 
			System.out.println("端口没有连接"); 
		} 
		return null; 
	} 
 
	private boolean isConnected() { 
		return mSocketChannel != null && mSocketChannel.isConnected(); 
	} 
} 


服务端代码:

package com.rf.pushserver; 
 
import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.atomic.AtomicBoolean; 
 
/** 
 * 消息推送服务器 
 *  
 * @author zengjiantao 
 * @date 2013-4-8 
 */ 
public class PushServer extends Thread { 
 
	private static final int BUFFER_SIZE = 1024; 
 
	/** 
	 * 服务器连接通道 
	 */ 
	private ServerSocketChannel serverSocketChannel; 
 
	/** 
	 * 发送缓冲区 
	 */ 
	private final ByteBuffer sendBuf; 
 
	/** 
	 * 端口选择器 
	 */ 
	private Selector selector; 
 
	/** 
	 * 服务器端口 
	 */ 
	private final int mPort; 
 
	/** 
	 * 线程是否结束的标志 
	 */ 
	private final AtomicBoolean shutdown; 
 
	/** 
	 * 发送消息的开关 
	 */ 
	private final AtomicBoolean sendable; 
 
	/** 
	 * 发送消息的内容 
	 */ 
	private String sendMsg; 
 
	private final ExecutorService executorService; 
 
	public PushServer(int port) { 
		mPort = port; 
		// 初始化缓冲区 
		sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE); 
		if (selector == null) { 
			// 创建新的Selector 
			try { 
				selector = Selector.open(); 
			} catch (final IOException e) { 
				e.printStackTrace(); 
			} 
		} 
 
		startup(); 
		executorService = Executors.newFixedThreadPool(10); 
		shutdown = new AtomicBoolean(false); 
		sendable = new AtomicBoolean(false); 
	} 
 
	private void startup() { 
		try { 
			// 打开通道 
			serverSocketChannel = ServerSocketChannel.open(); 
			// 绑定到本地端口 
			serverSocketChannel.socket().setSoTimeout(30000); 
			serverSocketChannel.configureBlocking(false); 
			serverSocketChannel.socket().bind(new InetSocketAddress(mPort)); 
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
			System.out.println("服务器端口打开成功"); 
 
		} catch (final IOException e1) { 
			e1.printStackTrace(); 
		} 
	} 
 
	private void select() { 
		int nums = 0; 
		try { 
			if (selector == null) { 
				return; 
			} 
			nums = selector.select(1000L); 
		} catch (final Exception e) { 
			e.printStackTrace(); 
		} 
 
		// 如果select返回大于0,处理事件 
		if (nums > 0) { 
			Iterator<SelectionKey> iterator = selector.selectedKeys() 
					.iterator(); 
			while (iterator.hasNext()) { 
				// 得到下一个Key 
				final SelectionKey key = iterator.next(); 
				iterator.remove(); 
				// 检查其是否还有效 
				if (!key.isValid()) { 
					continue; 
				} 
 
				// 处理事件 
				if (key.isAcceptable()) { 
					executorService.execute(new Accepter(key)); 
					// accept(key); 
				} else if (key.isWritable()) { 
					if (sendable.get()) { 
						executorService.execute(new Sender(key, sendMsg)); 
					} 
				} 
			} 
			if (sendable.get()) { 
				System.out.println("结束推送消息了"); 
			} 
			sendable.set(false); 
		} 
	} 
 
	/** 
	 * 用于连接的Runnable 
	 *  
	 * @author zengjiantao 
	 * @date 2013-4-11 
	 */ 
	class Accepter implements Runnable { 
 
		private final SelectionKey key; 
 
		public Accepter(SelectionKey key) { 
			this.key = key; 
		} 
 
		public void run() { 
			accept(key); 
		} 
 
	} 
 
	/** 
	 * 用于发送消息的Runnable 
	 *  
	 * @author zengjiantao 
	 * @date 2013-4-11 
	 */ 
	class Sender implements Runnable { 
 
		private final SelectionKey key; 
 
		private final String msg; 
 
		public Sender(SelectionKey key, String msg) { 
			this.key = key; 
			this.msg = msg; 
		} 
 
		public void run() { 
			send(key, msg); 
		} 
 
	} 
 
	/** 
	 * 接收客户端 
	 *  
	 * @param key 
	 * @throws IOException 
	 */ 
	private void accept(SelectionKey key) { 
		// 打开通道 
		try { 
			SocketChannel socketChannel = ((ServerSocketChannel) key.channel()) 
					.accept(); 
			// 绑定到本地端口 
			socketChannel.socket().setSoTimeout(30000); 
			socketChannel.configureBlocking(false); 
			synchronized (selector) { 
				socketChannel.register(selector, SelectionKey.OP_WRITE, this); 
			} 
			System.out.println("端口打开成功"); 
		} catch (IOException e) { 
			System.out.println("端口打开失败"); 
			e.printStackTrace(); 
			key.cancel(); 
		} 
	} 
 
	@Override 
	public void run() { 
		// 启动主循环流程 
		while (!shutdown.get()) { 
			try { 
				select(); 
				try { 
					Thread.sleep(1000L); 
				} catch (final Exception e) { 
					e.printStackTrace(); 
				} 
			} catch (final Exception e) { 
				e.printStackTrace(); 
			} 
		} 
		shutdown(); 
	} 
 
	/** 
	 * 打开发送消息的开关 
	 *  
	 * @param msg 
	 */ 
	void send(final String msg) { 
		sendMsg = msg; 
		sendable.set(true); 
		System.out.println("开始推送消息了"); 
	} 
 
	/** 
	 * 向指定连接发送消息 
	 *  
	 * @param key 
	 * @param msg 
	 */ 
	private void send(final SelectionKey key, final String msg) { 
		try { 
			byte[] out = msg.getBytes(); 
			if (out == null || out.length < 1) { 
				return; 
			} 
			synchronized (sendBuf) { 
				sendBuf.clear(); 
				sendBuf.put(out); 
				sendBuf.flip(); 
			} 
			SocketChannel socketChannel = (SocketChannel) key.channel(); 
			socketChannel.write(sendBuf); 
		} catch (final IOException e) { 
			e.printStackTrace(); 
		} 
	} 
 
	/** 
	 * 断开连接 
	 */ 
	public void disConnect() { 
		shutdown.set(true); 
	} 
 
	/** 
	 * 关闭端口选择器 
	 */ 
	private void shutdown() { 
		if (serverSocketChannel != null) { 
			try { 
				serverSocketChannel.close(); 
				while (serverSocketChannel.isOpen()) { 
					try { 
						Thread.sleep(300L); 
					} catch (final InterruptedException e) { 
						e.printStackTrace(); 
					} 
					serverSocketChannel.close(); 
				} 
				System.out.println("端口关闭成功"); 
			} catch (IOException e1) { 
				System.err.println("端口关闭错误:"); 
				e1.printStackTrace(); 
			} finally { 
				serverSocketChannel = null; 
			} 
		} 
		// 关闭端口选择器 
		if (selector != null) { 
			try { 
				selector.close(); 
				System.out.println("端口选择器关闭成功"); 
			} catch (IOException e) { 
				e.printStackTrace(); 
			} finally { 
				selector = null; 
			} 
		} 
	} 
 
	public static void main(String[] args) { 
		try { 
			final PushServer server = new PushServer(10001); 
			server.start(); 
			new Thread(new Runnable() { 
 
				public void run() {										 
						 
					try { 
						InputStreamReader input = new InputStreamReader( 
								System.in); 
						BufferedReader br = new BufferedReader(input); 
						String sendText = br.readLine(); 
						server.send(sendText); 
					} catch (IOException e) { 
						e.printStackTrace(); 
					} 
 
						 
 
				 
				} 
			}).start(); 
 
		} catch (Exception e) { 
			e.printStackTrace(); 
		} 
	} 
}


 

标签:安卓Android
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

全民解析

全民解析

关注我们