`

jafka学习之网络

    博客分类:
  • mq
阅读更多
       首先我们先来看SocketServer这个的实现类,这个类虽然实现的很简单,但是包含了nio请求的最基本的过程。
 
这个类实现的比较经典
我们先来看下局部变量:
 
  // 从类的命名来看是一个RequestHandler的工程类
    private final RequestHandlerFactory handlerFactory;

    // 可以接受的最大请求数
    private final int maxRequestSize ;

    // 任务处理者的数量
    private final Processor[] processors ;

    // 封装的Acceptor
    private final Acceptor acceptor ;

    // SocketServer的统计类
    private final SocketServerStats stats ;

    // Server的配置类
    private final ServerConfig serverConfig ;
 接着我们来看下构造函数和startup函数
   
public SocketServer(RequestHandlerFactory handlerFactory, //
            ServerConfig serverConfig) {
        super();
        this.serverConfig = serverConfig;
        this.handlerFactory = handlerFactory;
        this.maxRequestSize = serverConfig.getMaxSocketRequestSize();
        // 初始化处理器
        this.processors = new Processor[serverConfig.getNumThreads()];
        this.stats = new SocketServerStats(1000L * 1000L * 1000L * serverConfig.getMonitoringPeriodSecs());
        // 初始化响应器
        this.acceptor = new Acceptor(serverConfig.getPort(), //
                processors, //
                serverConfig.getSocketSendBuffer(), //
                serverConfig.getSocketReceiveBuffer());
    }


 /**
     * Start the socket server and waiting for finished
     *
     * @throws InterruptedException
     */
    public void startup() throws InterruptedException {
        final int maxCacheConnectionPerThread = serverConfig.getMaxConnections() / processors.length ;
        logger.info("start " + processors. length + " Processor threads");
        // processor和acceptor全部都起来
        for (int i = 0; i < processors. length; i++) {
            processors[i] = new Processor(handlerFactory , stats , maxRequestSize , maxCacheConnectionPerThread);
            Utils. newThread("jafka-processor-" + i, processors[i], false ).start();
        }
        Utils. newThread("jafka-acceptor", acceptor, false).start();
        acceptor.awaitStartup();
    }
 
我们接下来来看Acceptor的实现类。
在看之前,我们先来了解下AbstractServerThread这个类,这个是服务器端所有线程的父类,里面包含一个Selector。
Acceptor类是一个线程类,我们还是先来看下它的局部变量。
 // 绑定的端口
    private int port ;
    // 一批任务处理器
    private Processor[] processors;
    // send和reciveBuffer的大小
    private int sendBufferSize ;
    private int receiveBufferSize ;
 既然是线程类,我们接着来看run方法。
1. run方法的开头进行了socket的绑定。
// 初始化ServerSocket,以非阻塞的方式启动,并经ACCEPT事件注册到selector,
    // 这样就可以处理accept事件了
        final ServerSocketChannel serverChannel;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking( false);
            serverChannel.socket().bind( new InetSocketAddress(port));
            serverChannel.register(getSelector(), SelectionKey.OP_ACCEPT );
        } catch (IOException e) {
            logger.error("listener on port " + port + " failed.");
            throw new RuntimeException(e);
        }
 2. 第二部分是socket绑定完毕后,通知server启动成功
  3. 从selector里面select出来一个请求,然后选择一个processor进行处理,我们简单来看代码
  
//
        int currentProcessor = 0;
        while(isRunning()) {
            int ready = -1;
            try {
                ready = getSelector().select(500L);
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
            if(ready<=0)continue;
            Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
            while(iter.hasNext() && isRunning())
                try {
                    SelectionKey key = iter.next();
                    iter.remove();
                    // 如果是accetp事件,则选择一个processor进行处理
                    if(key.isAcceptable()) {
                        accept(key,processors[currentProcessor]);
                    } else {
                        throw new IllegalStateException("Unrecognized key state for acceptor thread.");
                    }
                    // 选择下一个processor
                    currentProcessor = (currentProcessor + 1) % processors .length ;
                } catch (Throwable t) {
                    logger.error("Error in acceptor",t);
                }
            }
 
4. 当server退出后关闭server和selector
最后我们来看下accept私有函数的实现:
其实非常简单,将请求交个processor进行处理
 
private void accept(SelectionKey key, Processor processor) throws IOException{
    // 获取到ServerSocketChannel类
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize );
       
        // 配置刚才accept来的socket,进行设置后,交由processor进行处理
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking( false);
        socketChannel.socket().setTcpNoDelay( true);
        socketChannel.socket().setSendBufferSize(sendBufferSize );
        //
        processor.accept(socketChannel);
    }
 接下来我们接着来看Processor类的实现。
Processor我们先来看下accept函数
public void accept(SocketChannel socketChannel) {
        newConnections.add(socketChannel);
        getSelector().wakeup();
    }
 这个地方为什么要调用wakeup,我们目前还没有想到,还在继续思考
下来我们来看最主要的run函数:
public void run() {
    // 启动成功
        startupComplete();
   
       
        while (isRunning()) {
            try {
                // setup any new connections that have been queued up
          // 将新进来请求的read事件注册到channel上
                configureNewConnections();

                final Selector selector = getSelector();
                int ready = selector.select(500);
                if (ready <= 0) continue;
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext() && isRunning()) {
                    SelectionKey key = null;
                    try {
                        key = iter.next();
                        iter.remove();
                        // 如果通道可读,就触发读操作
                        if (key.isReadable()) {
                            read(key);
                        // 如果可写,就触发写操作
                        } else if (key.isWritable()) {
                            write(key);
                        } else if (!key.isValid()) {
                            close(key);
                        } else {
                            throw new IllegalStateException("Unrecognized key state for processor thread.");
                        }
                    // 如果发送读操作的异常
                    } catch (EOFException eofe) {
                        Socket socket = channelFor(key).socket();
                        logger.debug(format( "connection closed by %s:%d." , socket.getInetAddress(), socket.getPort()));
                        close(key);
                    // 如果发生InvalidRequestException异常
                    } catch (InvalidRequestException ire) {
                        Socket socket = channelFor(key).socket();
                        logger.info(format( "Closing socket connection to %s:%d due to invalid request: %s", socket.getInetAddress(), socket.getPort(),
                                ire.getMessage()));
                        close(key);
                    } catch (Throwable t) {
                        Socket socket = channelFor(key).socket();
                        final String msg = "Closing socket for %s:%d becaulse of error";
                        if (logger .isDebugEnabled()) {
                            logger.error(format(msg, socket.getInetAddress(), socket.getPort()), t);
                        } else {
                            logger.error(format(msg, socket.getInetAddress(), socket.getPort()));
                        }
                        close(key);
                    }
                }
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }

        }
        //
        logger.info("Closing selector while shutting down");
        closeSelector();
        shutdownComplete();
    }
 1. 通知processor启动成功,这个里面我们也看到,程序中都散落这一些CountDownLatch
2. 将新来的请求的read事件注册到channel上
3. 从selector里面选择read和write事件,然后分别调用read和write函数
我们重点来看下read函数的实现:
private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = channelFor(key);
        Receive request = null;
        if (key.attachment() == null) {
            request = new BoundedByteBufferReceive(maxRequestSize );
            key.attach(request);
        } else {
            request = (Receive) key.attachment();
        }
        int read = request.readFrom(socketChannel);
        stats.recordBytesRead(read);
        if (read < 0) {
            close(key);
        // 如果读完毕了,进行处理,将response对象赋值给attach,然后发送响应
        } else if (request.complete()) {
            Send maybeResponse = handle(key, request);
            key.attach( null);
            // if there is a response, send it, otherwise do nothing
            if (maybeResponse != null) {
                key.attach(maybeResponse);
                key.interestOps(SelectionKey.OP_WRITE );
            }
        } else {
            // 如果没读完,则继续注册读事件,因为之前已经将read删除了
            // more reading to be done
            key.interestOps(SelectionKey. OP_READ);
            getSelector().wakeup();
            if (logger .isTraceEnabled()) {
                logger.trace("reading request not been done. " + request);
            }
        }
    }
 我们接着来看handle函数
 /**
     * Handle a completed request producing an optional response
     */
    private Send handle(SelectionKey key, Receive request) {
        // requestbuffer的第一个short就是RequestType enum的value
        final short requestTypeId = request.buffer().getShort();
        final RequestKeys requestType = RequestKeys.valueOf(requestTypeId);
        if (requestLogger .isTraceEnabled()) {
            if (requestType == null) {
                throw new InvalidRequestException("No mapping found for handler id " + requestTypeId);
            }
            String logFormat = "Handling %s request from %s";
            requestLogger.trace(format(logFormat, requestType, channelFor(key).socket().getRemoteSocketAddress()));
        }
        // 从ReqeustHandler工厂里面取得RequestHandler
        RequestHandler handlerMapping = requesthandlerFactory.mapping(requestType, request);
        if (handlerMapping == null) {
            throw new InvalidRequestException("No handler found for request");
        }
        // 调用handler将结果返回
        long start = System.nanoTime();
        Send maybeSend = handlerMapping.handler(requestType, request);
        stats.recordRequest(requestType, System.nanoTime() - start);
        return maybeSend;
    }
 我们也看下write函数,堪称比较经典的实现
private void write(SelectionKey key) throws IOException {
        // 因为我们注册read事件的时候,有一个attachment里面是send
        Send response = (Send) key.attachment();
        SocketChannel socketChannel = channelFor(key);
        // 将response写入socket流
        int written = response.writeTo(socketChannel);
        stats.recordBytesWritten(written);
        // 如果写完注册读事件,如果没写完,则继续注册写事件
        if (response.complete()) {
            key.attach( null);
            key.interestOps(SelectionKey. OP_READ);
        } else {
            key.interestOps(SelectionKey. OP_WRITE);
            getSelector().wakeup();
        }
    }
 
分享到:
评论

相关推荐

    jafka集群安装与部署

    jafka集群安装与部署 讲述jafka的整个安装和测试过程。

    jafka:快速,简单的分布式发布-订阅消息系统(mq)

    #A快速分布式消息传递系统(MQ) Jafka mq是从克隆的分布式发布-订阅消息系统。 因此它具有以下功能: 具有O(1)磁盘结构的持久消息传递即使在存储大量TB消息的情况下也能提供恒定的时间性能。 高吞吐量:即使使用...

    jafka, 一种快速简单的分布式发布订阅消息系统( mq ).zip

    jafka, 一种快速简单的分布式发布订阅消息系统( mq ) #A 快速分布式邮件系统( MQ ) Jafka是一个分布式发布订阅消息系统,从 Apache 克隆。因此,它具有以下特性:具有 O(1) 磁盘结构的持久消息传递,即使有大量的...

    kafka学习文档

    kafka 的 wiki 是徆丌错的学习文档: https://cwiki.apache.org/confluence/display/KAFKA/Index 接下来就是一系列文章,文章都是循序渐迕的方式带你了览 kafka: 关亍 kafka 的基本知识,分布式的基础:《分布式消息...

    java8集合源码分析-notes:读书笔记

    zeromq的作者之一用C语言重写的通信框架, OpenMQ Open-MQ 是一个开源的消息中间件,类似IBM的 WebSphere MQ(MQSeries),采用 C++ 和 Qt 库编写的,支持Windows、Unix 以及 Mac OS 平台,支持 JMS。 ZeroMQ ZeroMQ...

Global site tag (gtag.js) - Google Analytics