`

Jafka学习之Producer发送前调度

    博客分类:
  • mq
阅读更多
     我们再来看下消息的整个发送的过程,做工作的类为:ProducerSendThread 我们来看看:
   
final String threadName ;

    // 消息队列
    final BlockingQueue<QueueItem<T>> queue;

    // 消息的encoder
    final Encoder<T> serializer;

    // 底层的同步的消息发送器
    final SyncProducer underlyingProducer;

    // 事件处理类,它执行真实的消息发送
    final EventHandler<T> eventHandler;

    // 用户可干预的事件处理类
    final CallbackHandler<T> callbackHandler;

    final long queueTime ;

    final int batchSize ;

    private final Logger logger = LoggerFactory.getLogger(ProducerSendThread .class );

    /////////////////////////////////////////////////////////////////////
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);

    private volatile boolean shutdown = false;
    有几个属性我们目前不知道什么意思,待分析完代码后再看是什么意思吧。
  说到线程类,我们主要看下run方法即可:
    
public void run() {
        try {
            List<QueueItem<T>> remainingEvents = processEvents();
            //handle remaining events
            if (remainingEvents.size() > 0) {
                logger.debug(format( "Dispatching last batch of %d events to the event handler", remainingEvents.size()));
                tryToHandle(remainingEvents);
            }
        } catch (Exception e) {
            logger.error("Error in sending events: ", e);
        } finally {
            shutdownLatch.countDown();
        }
    }
    从这里可以看到,处理方式是比较简单的,调用了processEvents函数后,如果还有剩余,就调用tryToHandle函数再去处理。
  我们先来看看processEvents函数吧:
   
private List<QueueItem<T>> processEvents() {
        long lastSend = System.currentTimeMillis();
        final List<QueueItem<T>> events = new ArrayList<QueueItem<T>>();
        boolean full = false;
        while (!shutdown ) {
            try {
                // 从队列里面获取一个元素
                QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit. MILLISECONDS);
                long elapsed = System.currentTimeMillis() - lastSend;
                boolean expired = item == null;
                if (item != null) {
                    if (callbackHandler != null) {
                        // 如果callbackHandler不为空,则进行回调
                        List<QueueItem<T>> items = callbackHandler.afterDequeuingExistingData(item);
                        if (items != null) {
                            events.addAll(items);
                        }
                    } else {
                        events.add(item);
                    }
                    //
                    full = events.size() >= batchSize;
                }
                // 如果超时或者队列已满
                if (full || expired) {
                    if (logger .isDebugEnabled()) {
                        if (expired) {
                            logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..");
                        } else {
                            logger.debug(format( "Batch(%d) full. Sending.." , batchSize));
                        }
                    }
                    // 调用tryToHandle函数去处理事件
                    tryToHandle(events);
                    lastSend = System.currentTimeMillis();
                    events.clear();
                }
            } catch (InterruptedException e) {
                logger.warn(e.getMessage(), e);
            }
        }
        if (queue .size() > 0) {
            throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue");
        }
        // 回调用户的lastBatchBeforeClose函数,这个函数还可以再返回一些消息
        if (this .callbackHandler != null) {
            List<QueueItem<T>> remainEvents = this.callbackHandler .lastBatchBeforeClose();
            if (remainEvents != null) {
                events.addAll(remainEvents);
            }
        }
        return events;
    }
    我们最后来看下tryToHandle函数
private void tryToHandle(List<QueueItem<T>> events) {
        if (logger .isDebugEnabled()) {
            logger.debug("handling " + events.size() + " events");
        }
        if (events.size() > 0) {
            try {
                this.eventHandler .handle(events, underlyingProducer, serializer );
            } catch (RuntimeException e) {
                logger.error("Error in handling batch of " + events.size() + " events", e);
            }
        }
    }
 
   从这个函数来看,最终的消息发送其实是eventHandler在起左右。
我们今天来看下DefaultEventHandler这个类的实现,它其实是底层的发送的处理类。
   
public class DefaultEventHandler<T> implements EventHandler<T> {
    // 用户的回调的handler,这个handler其实贯穿准备消息和用户发送消息的整个生命周期之中
    private final CallbackHandler<T> callbackHandler;
    // 从命名上就能看出是队列的压缩
    private final Set<String> compressedTopics ;
    // 消息是否编码
    private final CompressionCodec codec ;

    private final Logger logger = LoggerFactory.getLogger(DefaultEventHandler. class);
    // 重试次数
    private final int numRetries ;

下面我们再来看下比较重要的方法handle

 public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder) {
        List<QueueItem<T>> processedEvents = events;
        if (this .callbackHandler != null) {
            processedEvents = this.callbackHandler .beforeSendingData(events);
        }
        send(collate(processedEvents, encoder), producer);
    }
   从这个地方来看,有两个函数,一个是collate,一个是send,这两个函数比较重要。
       我们先来看collate函数:
       
 private List<ProducerRequest> collate(List<QueueItem<T>> events, Encoder<T> encoder) {
        if(events == null || events.isEmpty()){
            return Collections.emptyList();
        }
        final Map<String, Map<Integer, List<Message>>> topicPartitionData = new HashMap<String, Map<Integer, List<Message>>>();
        for (QueueItem<T> event : events) {
            // 初始化map,可以看出来,key是topic,值是每个partition的一个消息列表
            Map<Integer, List<Message>> partitionData = topicPartitionData.get(event.topic );
            if (partitionData == null) {
                partitionData = new HashMap<Integer, List<Message>>();
                topicPartitionData.put(event. topic, partitionData);
            }
            List<Message> data = partitionData.get(event.partition );
            if (data == null) {
                data = new ArrayList<Message>();
                partitionData.put(event. partition, data);
            }
            // 经过encoder处理后将消息入队列
            data.add(encoder.toMessage(event. data));
        }
        //
        final List<ProducerRequest> requests = new ArrayList<ProducerRequest>();
        for (Map.Entry<String, Map<Integer, List<Message>>> e : topicPartitionData.entrySet()) {
            final String topic = e.getKey();
            for (Map.Entry<Integer, List<Message>> pd : e.getValue().entrySet()) {
                final Integer partition = pd.getKey();
                // 后进ProducerRequest东西,这个里面包含topic,partition以及一个value列表
                requests.add( new ProducerRequest(topic, partition, convert(topic, pd.getValue())));
            }
        }
        return requests;
    }
      我们接下来再看下convert函数,这个函数明显就是一个工具函数,包含一个消息列表好codec,为发送MessageSet做准备。
     
private ByteBufferMessageSet convert(String topic, List<Message> messages) {
        //compress condition:
        if (codec != CompressionCodec.NoCompressionCodec //
                && ( compressedTopics.isEmpty() || compressedTopics.contains(topic))) {
            return new ByteBufferMessageSet(codec, messages.toArray(new Message[messages.size()]));
        }
        return new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec, messages.toArray( new Message[messages
                .size()]));
    }
       最后我们来看下send函数:
   
private void send(List<ProducerRequest> produces, SyncProducer syncProducer) {
        if (produces.isEmpty()) {
            return;
        }
        final int maxAttempts = 1 + numRetries;
        for (int i = 0; i < maxAttempts; i++) {
            try {
                syncProducer.multiSend(produces);
                break;
            } catch (RuntimeException e) {
                logger.warn("error sending message, attempts times: " + i, e);
                if (i == maxAttempts - 1) {
                    throw e;
                }
            }
        }
    }
      这个地方可以看出来,最终还是调用的是syncProducer的multiSend函数。
SyncProducer类以及multiSend函数我们在下一篇博客里面讲解。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics