我们再来看下消息的整个发送的过程,做工作的类为:ProducerSendThread 我们来看看:
final String threadName ; // 消息队列 final BlockingQueue<QueueItem<T>> queue; // 消息的encoder final Encoder<T> serializer; // 底层的同步的消息发送器 final SyncProducer underlyingProducer; // 事件处理类,它执行真实的消息发送 final EventHandler<T> eventHandler; // 用户可干预的事件处理类 final CallbackHandler<T> callbackHandler; final long queueTime ; final int batchSize ; private final Logger logger = LoggerFactory.getLogger(ProducerSendThread .class ); ///////////////////////////////////////////////////////////////////// private final CountDownLatch shutdownLatch = new CountDownLatch(1); private volatile boolean shutdown = false;有几个属性我们目前不知道什么意思,待分析完代码后再看是什么意思吧。
说到线程类,我们主要看下run方法即可:
public void run() { try { List<QueueItem<T>> remainingEvents = processEvents(); //handle remaining events if (remainingEvents.size() > 0) { logger.debug(format( "Dispatching last batch of %d events to the event handler", remainingEvents.size())); tryToHandle(remainingEvents); } } catch (Exception e) { logger.error("Error in sending events: ", e); } finally { shutdownLatch.countDown(); } }从这里可以看到,处理方式是比较简单的,调用了processEvents函数后,如果还有剩余,就调用tryToHandle函数再去处理。
我们先来看看processEvents函数吧:
private List<QueueItem<T>> processEvents() { long lastSend = System.currentTimeMillis(); final List<QueueItem<T>> events = new ArrayList<QueueItem<T>>(); boolean full = false; while (!shutdown ) { try { // 从队列里面获取一个元素 QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit. MILLISECONDS); long elapsed = System.currentTimeMillis() - lastSend; boolean expired = item == null; if (item != null) { if (callbackHandler != null) { // 如果callbackHandler不为空,则进行回调 List<QueueItem<T>> items = callbackHandler.afterDequeuingExistingData(item); if (items != null) { events.addAll(items); } } else { events.add(item); } // full = events.size() >= batchSize; } // 如果超时或者队列已满 if (full || expired) { if (logger .isDebugEnabled()) { if (expired) { logger.debug(elapsed + " ms elapsed. Queue time reached. Sending.."); } else { logger.debug(format( "Batch(%d) full. Sending.." , batchSize)); } } // 调用tryToHandle函数去处理事件 tryToHandle(events); lastSend = System.currentTimeMillis(); events.clear(); } } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } if (queue .size() > 0) { throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue"); } // 回调用户的lastBatchBeforeClose函数,这个函数还可以再返回一些消息 if (this .callbackHandler != null) { List<QueueItem<T>> remainEvents = this.callbackHandler .lastBatchBeforeClose(); if (remainEvents != null) { events.addAll(remainEvents); } } return events; }我们最后来看下tryToHandle函数
private void tryToHandle(List<QueueItem<T>> events) { if (logger .isDebugEnabled()) { logger.debug("handling " + events.size() + " events"); } if (events.size() > 0) { try { this.eventHandler .handle(events, underlyingProducer, serializer ); } catch (RuntimeException e) { logger.error("Error in handling batch of " + events.size() + " events", e); } } }
从这个函数来看,最终的消息发送其实是eventHandler在起左右。
我们今天来看下DefaultEventHandler这个类的实现,它其实是底层的发送的处理类。
public class DefaultEventHandler<T> implements EventHandler<T> { // 用户的回调的handler,这个handler其实贯穿准备消息和用户发送消息的整个生命周期之中 private final CallbackHandler<T> callbackHandler; // 从命名上就能看出是队列的压缩 private final Set<String> compressedTopics ; // 消息是否编码 private final CompressionCodec codec ; private final Logger logger = LoggerFactory.getLogger(DefaultEventHandler. class); // 重试次数 private final int numRetries ; 下面我们再来看下比较重要的方法handle public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder) { List<QueueItem<T>> processedEvents = events; if (this .callbackHandler != null) { processedEvents = this.callbackHandler .beforeSendingData(events); } send(collate(processedEvents, encoder), producer); }从这个地方来看,有两个函数,一个是collate,一个是send,这两个函数比较重要。
我们先来看collate函数:
private List<ProducerRequest> collate(List<QueueItem<T>> events, Encoder<T> encoder) { if(events == null || events.isEmpty()){ return Collections.emptyList(); } final Map<String, Map<Integer, List<Message>>> topicPartitionData = new HashMap<String, Map<Integer, List<Message>>>(); for (QueueItem<T> event : events) { // 初始化map,可以看出来,key是topic,值是每个partition的一个消息列表 Map<Integer, List<Message>> partitionData = topicPartitionData.get(event.topic ); if (partitionData == null) { partitionData = new HashMap<Integer, List<Message>>(); topicPartitionData.put(event. topic, partitionData); } List<Message> data = partitionData.get(event.partition ); if (data == null) { data = new ArrayList<Message>(); partitionData.put(event. partition, data); } // 经过encoder处理后将消息入队列 data.add(encoder.toMessage(event. data)); } // final List<ProducerRequest> requests = new ArrayList<ProducerRequest>(); for (Map.Entry<String, Map<Integer, List<Message>>> e : topicPartitionData.entrySet()) { final String topic = e.getKey(); for (Map.Entry<Integer, List<Message>> pd : e.getValue().entrySet()) { final Integer partition = pd.getKey(); // 后进ProducerRequest东西,这个里面包含topic,partition以及一个value列表 requests.add( new ProducerRequest(topic, partition, convert(topic, pd.getValue()))); } } return requests; }我们接下来再看下convert函数,这个函数明显就是一个工具函数,包含一个消息列表好codec,为发送MessageSet做准备。
private ByteBufferMessageSet convert(String topic, List<Message> messages) { //compress condition: if (codec != CompressionCodec.NoCompressionCodec // && ( compressedTopics.isEmpty() || compressedTopics.contains(topic))) { return new ByteBufferMessageSet(codec, messages.toArray(new Message[messages.size()])); } return new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec, messages.toArray( new Message[messages .size()])); }最后我们来看下send函数:
private void send(List<ProducerRequest> produces, SyncProducer syncProducer) { if (produces.isEmpty()) { return; } final int maxAttempts = 1 + numRetries; for (int i = 0; i < maxAttempts; i++) { try { syncProducer.multiSend(produces); break; } catch (RuntimeException e) { logger.warn("error sending message, attempts times: " + i, e); if (i == maxAttempts - 1) { throw e; } } } }这个地方可以看出来,最终还是调用的是syncProducer的multiSend函数。
SyncProducer类以及multiSend函数我们在下一篇博客里面讲解。
相关推荐
Kafka Producer机制优化-提高发送消息可靠性
ProShow Producer模板,玫瑰婚礼,绚丽多彩的画面,替换图片即可。
Photodex ProShow Producer 模板资源和插件5G,爱好制作电子相册的朋友一定喜欢,一定能满足你的需求,资源难得。
Easy RealMedia Producer V1.94
producer:生产者,消息发送者 producer group:生产者组,由多个生产者组成, nameSrv:路由注册中心,将 Broker:代理服务器,负责消息的存储,投递,查询 BrokerCluster:代理服务器集群,保证高可用和高可靠 ...
Laravel开发-producer 基于规则的简单类解析
软件介绍 一个批量RealMedia文件生成器。采用全新的RealVideo v9&RealVideo; v10内核,根据实际使用的需要提供了比Helix RealMedia ...自动关机前有30秒响应时间,用户可以取消关机。 任务结束后有详细的信息报告。
Helix Producer Plus设置
amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per
配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...
Helix Producer Plus V9.01 拷贝gui.rpui到安装文件夹"Helix Producer Plus\resources"目录下替换同名文件(若想恢复E文版的可先备份该文件)
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...
spring-boot-activemq-producer 源码
Proshow能够轻易地制作漂亮的幻灯片,只需将选择的图片拖入即可,还可以将自己的解说或cd音轨作为声音背景,提供超过280种幻灯变换...相对于Gold版,Producer版有更多的一些功能,主要是为了那些商业设计专家而准备的。
利用信号量实现的多线程之间的同步与互斥,详情看博客文章Linux多线程编程(二)---线程之间的同步与互斥进阶实验
想简单的做直播吗,这里给大家提供一个简单的方案。该文章主要介绍怎样搭建Helix Server以及简单的配置,和怎样配合Helix producer做直播详细方法。
多个生产者多个消费者同步问题——进程调度--操作系统模拟
Helix Producer Plus V9.01 realmedia专业转码器的算号器 不用资源分哦
最为方便快捷的CD转MP3工具,占用资源小,运行稳定,破解版