1. QueueItem介绍,从类的定义来看,这个里面包含一个数据,超那个topic的那个分区发送
public class QueueItem<T> { public final T data ; public final int partition ; public final String topic ; }
2. EventHandler介绍,从类的定义来看,可以初始化,可以被关闭,在handle方法里面包含一个encoder,一个SyncProducer,和一批消息
public interface EventHandler<T> extends Closeable{ /** * Initializes the event handler using a Properties object * * @param properties the properties used to initialize the event * handler */ void init(Properties properties); /** * Callback to dispatch the batched data and send it to a Jafka server * * @param events the data sent to the producer * @param producer the low- level producer used to send the data * @param encoder data encoder */ void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder); /** * Cleans up and shuts down the event handler */ void close(); }
3. CallBackHandler从类的命名来看就是一个回调函数,这个在异步发送的时候,可以回调用户端的一些程序
public interface EventHandler<T> extends Closeable{ /** * Initializes the event handler using a Properties object * * @param properties the properties used to initialize the event * handler */ void init(Properties properties); /** * Callback to dispatch the batched data and send it to a Jafka server * * @param events the data sent to the producer * @param producer the low- level producer used to send the data * @param encoder data encoder */ void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder); /** * Cleans up and shuts down the event handler */ void close(); }
4 我们接着来看下SyncProducer的实现
public class SyncProducer implements Closeable { private final Logger logger = Logger.getLogger(SyncProducer .class ); //private static final RequestKeys RequestKey = RequestKeys.Produce;//0 ///////////////////////////////////////////////////////////////////// // 同步发送器的配置 private final SyncProducerConfig config ; // BlockingChannel这个待会再讲,感觉是封装了一个channel private final BlockingChannel blockingChannel ; // 这个是一个对象锁,待会讲解锁的妙用 private final Object lock = new Object(); // 是否已经关闭的标志位 private volatile boolean shutdown = false; // 主机和端口号 private final String host ; private final int port ; 接着是它的构造函数: public SyncProducer(SyncProducerConfig config) { super(); this.config = config; this.host = config.getHost(); this.port = config.getPort(); // this.blockingChannel = new BlockingChannel(host, port , -1, config.socketTimeoutMs, config.bufferSize ); }
这个里面讲readBufferSize设置成-1了,因为它不需要读数据
我们接下来看下这三个send的重载函数
// 采用随机partition进行发送 public void send(String topic, ByteBufferMessageSet message) { send(topic, ProducerRequest. RandomPartition, message); } // 检验消息大小后,构建ProducerRequest对象进行send public void send(String topic, int partition, ByteBufferMessageSet messages) { messages.verifyMessageSize(config .maxMessageSize ); send( new ProducerRequest(topic, partition, messages)); } private void send(Request request) { // 从request对象构建出send对象 BoundedByteBufferSend send = new BoundedByteBufferSend(request); synchronized (lock ) { long startTime = System.nanoTime(); int written = -1; try { // 建立链接并发送send,最终返回一个number,表明发送的字节数量 written = connect().send(send); } catch (IOException e) { // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect(); throw new RuntimeException(e); } finally { if (logger .isDebugEnabled()) { logger.debug(format( "write %d bytes data to %s:%d", written, host, port)); } } final long endTime = System.nanoTime (); SyncProducerStats.recordProduceRequest(endTime - startTime); } }
相关推荐
Kafka Producer机制优化-提高发送消息可靠性
Helix Producer Plus设置
ProShow Producer模板,玫瑰婚礼,绚丽多彩的画面,替换图片即可。
Photodex ProShow Producer 模板资源和插件5G,爱好制作电子相册的朋友一定喜欢,一定能满足你的需求,资源难得。
Easy RealMedia Producer V1.94
Laravel开发-producer 基于规则的简单类解析
producer:生产者,消息发送者 producer group:生产者组,由多个生产者组成, nameSrv:路由注册中心,将 Broker:代理服务器,负责消息的存储,投递,查询 BrokerCluster:代理服务器集群,保证高可用和高可靠 ...
v10内核,根据实际使用的需要提供了比Helix RealMedia Producer和RealProducer10还要多的过滤设置。 基本上可以用它来替代Helix RealMedia Producer和RealProducer v10,使用Real v10内核时,全面支持Real10文件...
amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per
配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...
Helix Producer Plus V9.01 拷贝gui.rpui到安装文件夹"Helix Producer Plus\resources"目录下替换同名文件(若想恢复E文版的可先备份该文件)
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...
spring-boot-activemq-producer 源码
Proshow能够轻易地制作漂亮的幻灯片,只需将选择的图片拖入即可,还可以将自己的解说或cd音轨作为声音背景,提供超过280种幻灯变换...相对于Gold版,Producer版有更多的一些功能,主要是为了那些商业设计专家而准备的。
利用信号量实现的多线程之间的同步与互斥,详情看博客文章Linux多线程编程(二)---线程之间的同步与互斥进阶实验
最为方便快捷的CD转MP3工具,占用资源小,运行稳定,破解版
想简单的做直播吗,这里给大家提供一个简单的方案。该文章主要介绍怎样搭建Helix Server以及简单的配置,和怎样配合Helix producer做直播详细方法。
Helix Producer Plus V9.01 realmedia专业转码器的算号器 不用资源分哦
无需安装的绿色版OidProducer,