`

Jafka学习之Producer发送之组件说明

    博客分类:
  • mq
阅读更多

         1. QueueItem介绍,从类的定义来看,这个里面包含一个数据,超那个topic的那个分区发送

         

public class QueueItem<T> {

    public final T data ;

    public final int partition ;

    public final String topic ;
}

         2. EventHandler介绍,从类的定义来看,可以初始化,可以被关闭,在handle方法里面包含一个encoder,一个SyncProducer,和一批消息

      

public interface EventHandler<T> extends Closeable{

    /**
     * Initializes the event handler using a Properties object
     *
     * @param properties the properties used to initialize the event
     *        handler
     */
    void init(Properties properties);

    /**
     * Callback to dispatch the batched data and send it to a Jafka server
     *
     * @param events the data sent to the producer
     * @param producer the low- level producer used to send the data
     * @param encoder data encoder
     */
    void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);

    /**
     * Cleans up and shuts down the event handler
     */
    void close();
}

 

           3. CallBackHandler从类的命名来看就是一个回调函数,这个在异步发送的时候,可以回调用户端的一些程序

         

public interface EventHandler<T> extends Closeable{

    /**
     * Initializes the event handler using a Properties object
     *
     * @param properties the properties used to initialize the event
     *        handler
     */
    void init(Properties properties);

    /**
     * Callback to dispatch the batched data and send it to a Jafka server
     *
     * @param events the data sent to the producer
     * @param producer the low- level producer used to send the data
     * @param encoder data encoder
     */
    void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder);

    /**
     * Cleans up and shuts down the event handler
     */
    void close();
}

          4 我们接着来看下SyncProducer的实现

        

public class SyncProducer implements Closeable {

    private final Logger logger = Logger.getLogger(SyncProducer .class );

    //private static final RequestKeys RequestKey = RequestKeys.Produce;//0

    /////////////////////////////////////////////////////////////////////
    // 同步发送器的配置
    private final SyncProducerConfig config ;
    // BlockingChannel这个待会再讲,感觉是封装了一个channel
    private final BlockingChannel blockingChannel ;
    // 这个是一个对象锁,待会讲解锁的妙用
    private final Object lock = new Object();
    // 是否已经关闭的标志位
    private volatile boolean shutdown = false;

    // 主机和端口号
    private final String host ;

    private final int port ;

  接着是它的构造函数:
 public SyncProducer(SyncProducerConfig config) {
        super();
        this.config = config;
        this.host = config.getHost();
        this.port = config.getPort();
        //
        this.blockingChannel = new BlockingChannel(host, port , -1, config.socketTimeoutMs, config.bufferSize );
    }

        这个里面讲readBufferSize设置成-1了,因为它不需要读数据

        我们接下来看下这三个send的重载函数
        
// 采用随机partition进行发送
public void send(String topic, ByteBufferMessageSet message) {
        send(topic, ProducerRequest. RandomPartition, message);
    }

// 检验消息大小后,构建ProducerRequest对象进行send
    public void send(String topic, int partition, ByteBufferMessageSet messages) {
        messages.verifyMessageSize(config .maxMessageSize );
        send( new ProducerRequest(topic, partition, messages));
    }

    private void send(Request request) {
        // 从request对象构建出send对象
        BoundedByteBufferSend send = new BoundedByteBufferSend(request);
        synchronized (lock ) {
            long startTime = System.nanoTime();
            int written = -1;
            try {
                // 建立链接并发送send,最终返回一个number,表明发送的字节数量
                written = connect().send(send);
            } catch (IOException e) {
                // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
                disconnect();
                throw new RuntimeException(e);
            } finally {
                if (logger .isDebugEnabled()) {
                    logger.debug(format( "write %d bytes data to %s:%d", written, host, port));
                }
            }
            final long endTime = System.nanoTime ();
            SyncProducerStats.recordProduceRequest(endTime - startTime);
        }
    }
 
        
分享到:
评论

相关推荐

    Kafka Producer机制优化-提高发送消息可靠性

    Kafka Producer机制优化-提高发送消息可靠性

    Helix Producer Plus设置说明.doc

    Helix Producer Plus设置

    ProShow Producer 模板 玫瑰婚礼

    ProShow Producer模板,玫瑰婚礼,绚丽多彩的画面,替换图片即可。

    Photodex ProShow Producer 模板资源5G

    Photodex ProShow Producer 模板资源和插件5G,爱好制作电子相册的朋友一定喜欢,一定能满足你的需求,资源难得。

    Easy RealMedia Producer V1.94

    Easy RealMedia Producer V1.94

    Laravel开发-producer

    Laravel开发-producer 基于规则的简单类解析

    RocketMQ概念 producer:生产者,消息发送者

    producer:生产者,消息发送者 producer group:生产者组,由多个生产者组成, nameSrv:路由注册中心,将 Broker:代理服务器,负责消息的存储,投递,查询 BrokerCluster:代理服务器集群,保证高可用和高可靠 ...

    Easy Real Media Producer v1.93

    v10内核,根据实际使用的需要提供了比Helix RealMedia Producer和RealProducer10还要多的过滤设置。 基本上可以用它来替代Helix RealMedia Producer和RealProducer v10,使用Real v10内核时,全面支持Real10文件...

    amazon-kinesis-producer, 亚马逊Kinesis制作库.zip

    amazon-kinesis-producer, 亚马逊Kinesis制作库 室Producer库简介在亚马逊 Kinesis Producer Producer Producer Producer Producer Producer Producer performs performs performs per

    Helix Producer Plus V9.01

    配合Helix先进的功能,Realnetworks推出了第10代的流媒体压缩软件Helix Producer。Realnetworks全新改写代码的图形化专业流媒体文件制作工具。利用它,你可以轻松地实现RealAudio8、RealAudio9文件格式到实时文件的...

    Helix Producer Plus V9.01 附汉化

    Helix Producer Plus V9.01 拷贝gui.rpui到安装文件夹"Helix Producer Plus\resources"目录下替换同名文件(若想恢复E文版的可先备份该文件)

    pentaho-kafka-producer.zip

    kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。

    深入解析RocketMQ

    Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...

    spring-boot-activemq-producer

    spring-boot-activemq-producer 源码

    ProShow Producer v3.0.1992

    Proshow能够轻易地制作漂亮的幻灯片,只需将选择的图片拖入即可,还可以将自己的解说或cd音轨作为声音背景,提供超过280种幻灯变换...相对于Gold版,Producer版有更多的一些功能,主要是为了那些商业设计专家而准备的。

    producer-customer.c文件

    利用信号量实现的多线程之间的同步与互斥,详情看博客文章Linux多线程编程(二)---线程之间的同步与互斥进阶实验

    MP3Producer

    最为方便快捷的CD转MP3工具,占用资源小,运行稳定,破解版

    Helix Moile Server+Helix producer做直播

    想简单的做直播吗,这里给大家提供一个简单的方案。该文章主要介绍怎样搭建Helix Server以及简单的配置,和怎样配合Helix producer做直播详细方法。

    Helix Producer Plus v9.0.1 keyen

    Helix Producer Plus V9.01 realmedia专业转码器的算号器 不用资源分哦

    绿色版 OID Producer

    无需安装的绿色版OidProducer,

Global site tag (gtag.js) - Google Analytics