`

jafka学习之LogManager

    博客分类:
  • mq
阅读更多
       今天终于要讲到LogManager了,在讲LogManager之前,我们还是先来看下几个基本概念和jafka的存储结构。
      

 
     下面是一个网友画的图:
       

       从这个里面可以看到,消息队列跟路径下,每个topic的每个分区都是一个目录,里面是一个一个的文件,都以jafka结尾
      那下来,我们看下类的实现。
      
public class LogManager implements PartitionChooser, Closeable {
    // 服务器配置
    final ServerConfig config;
    // 一个定时任务执行器
    private final Scheduler scheduler ;
    // log清理间隔
    final long logCleanupIntervalMs ;
    // 默认log清理间隔
    final long logCleanupDefaultAgeMs ;
    // 实现需要恢复,这个后面再讲
    final boolean needRecovery ;

    private final Logger logger = LoggerFactory.getLogger(LogManager.class);

    ///////////////////////////////////////////////////////////////////////
    final int numPartitions ;
    // 消息队列的跟路径
    final File logDir;
    // log的刷新频率
    final int flushInterval ;
    // log创建的写锁
    private final Object logCreationLock = new Object();

    final Random random = new Random();
    // 一个启动器的闭锁
    final CountDownLatch startupLatch;

    // 一个log的池,key为topic,value是partition和Log的对应
    private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();

    // log flush的后台任务执行器
    private final Scheduler logFlusherScheduler = new Scheduler(1, "jafka-logflusher-" , false);

    // topic的注册的task
    private final LinkedBlockingQueue<TopicTask> topicRegisterTasks = new LinkedBlockingQueue<TopicTask>();

    // topic注册task是否停止状态
    private volatile boolean stopTopicRegisterTasks = false;

    // log刷新频率的一个map,key为topic,value为刷新频率
    final Map<String, Integer> logFlushIntervalMap;

    // log保持的一个map,key为topic,value为时长
    final Map<String, Long> logRetentionMSMap;

    // log保留的size,这个后面详细解释
    final int logRetentionSize ;

    /////////////////////////////////////////////////////////////////////////
    // 记录broker、topic在zk的注册情况
    private ServerRegister serverRegister;

    // topic和分区个数的一个map
    private final Map<String, Integer> topicPartitionsMap;

    // log的回滚策略
    private RollingStrategy rollingStategy;

    // 最大的消息的size
    private final int maxMessageSize ;
   
    LogManager这类,主要做两件事情,一个是初始化的时候从目录中恢复log,一个是对运行时的相关命令做出响应。
  我们先来看下恢复log这个:
   
 public void load() throws IOException {
     // 滚动策略为fixedsize 的滚动策略
        if (this .rollingStategy == null) {
            this.rollingStategy = new FixedSizeRollingStrategy(config .getLogFileSize());
        }
        // 如果消息队列的根路径不存在,则创建
        if (!logDir .exists()) {
            logger.info( "No log directory found, creating '" + logDir.getAbsolutePath() + "'");
            logDir.mkdirs();
        }
        // 判断是否是目录并且可写
        if (!logDir .isDirectory() || !logDir.canRead()) {
            throw new IllegalArgumentException(logDir .getAbsolutePath() + " is not a readable log directory.");
        }
        // 获取这个里面的所有的子文件
        File[] subDirs = logDir.listFiles();
        if (subDirs != null) {
            for (File dir : subDirs) {
               // 跳过文件,因为在这个目录下,全部是topic-partition的文件夹
                if (!dir.isDirectory()) {
                    logger.warn( "Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
                } else {
                    logger.info( "Loading log from " + dir.getAbsolutePath());
                   
                    // 获取到目录的名称
                    final String topicNameAndPartition = dir.getName();
                    if (-1 == topicNameAndPartition.indexOf('-' )) {
                        throw new IllegalArgumentException("error topic directory: " + dir.getAbsolutePath());
                    }
                   
                    // 将目录名称按照-进行分割,得到一个 kv,然后key就是topic,value是partition
                    final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
                    final String topic = topicPartion.k ;
                    final int partition = topicPartion.v;
                   
                    // 针对这个分区创建一个log对象
                    Log log = new Log(dir, partition, this.rollingStategy , flushInterval , needRecovery , maxMessageSize);

                    // 将新创建的log加入到map中
                    logs.putIfNotExists(topic, new Pool<Integer, Log>());
                    Pool<Integer, Log> parts = logs.get(topic);

                    parts.put(partition, log);
                   
                    // 如果时间的partition比配置中的大,则进行升级
                    int configPartition = getPartition(topic);
                    if (configPartition <= partition) {
                        topicPartitionsMap.put(topic, partition + 1);
                    }
                }
            }
        }

        /* Schedule the cleanup task to delete old logs */
        if (this .scheduler != null) {
            logger.debug( "starting log cleaner every " + logCleanupIntervalMs + " ms" );
            this.scheduler .scheduleWithRate(new Runnable() {

                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error( "cleanup log failed.", e);
                    }
                }

            }, 60 * 1000, logCleanupIntervalMs);
        }
        // 将topic的注册任务注册到 zk里面
        if (config .getEnableZookeeper()) {
            this.serverRegister = new ServerRegister(config, this );
            serverRegister.startup();
            TopicRegisterTask task = new TopicRegisterTask();
            task.setName( "jafka.topicregister");
            task.setDaemon( true);
            task.start();
        }
    }
    其他运行时的我就不做详细解释了,我都加了注释了,请大家对照着注释进行看下。
    
public class LogManager implements PartitionChooser, Closeable {
    // 服务器配置
    final ServerConfig config;
    // 一个定时任务执行器
    private final Scheduler scheduler ;
    // log清理间隔
    final long logCleanupIntervalMs ;
    // 默认log清理间隔
    final long logCleanupDefaultAgeMs ;
    // 实现需要恢复,这个后面再讲
    final boolean needRecovery ;

    private final Logger logger = LoggerFactory.getLogger(LogManager.class);

    ///////////////////////////////////////////////////////////////////////
    final int numPartitions ;
    // 消息队列的跟路径
    final File logDir;
    // log的刷新频率
    final int flushInterval ;
    // log创建的写锁
    private final Object logCreationLock = new Object();

    final Random random = new Random();
    // 一个启动器的闭锁
    final CountDownLatch startupLatch;

    // 一个log的池,key为topic,value是partition和Log的对应
    private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, Pool<Integer, Log>>();

    // log flush的后台任务执行器
    private final Scheduler logFlusherScheduler = new Scheduler(1, "jafka-logflusher-" , false);

    // topic的注册的task
    private final LinkedBlockingQueue<TopicTask> topicRegisterTasks = new LinkedBlockingQueue<TopicTask>();

    // topic注册task是否停止状态
    private volatile boolean stopTopicRegisterTasks = false;

    // log刷新频率的一个map,key为topic,value为刷新频率
    final Map<String, Integer> logFlushIntervalMap;

    // log保持的一个map,key为topic,value为时长
    final Map<String, Long> logRetentionMSMap;

    // log保留的size,这个后面详细解释
    final int logRetentionSize ;

    /////////////////////////////////////////////////////////////////////////
    // 记录broker、topic在zk的注册情况
    private ServerRegister serverRegister;

    // topic和分区个数的一个map
    private final Map<String, Integer> topicPartitionsMap;

    // log的回滚策略
    private RollingStrategy rollingStategy;

    // 最大的消息的size
    private final int maxMessageSize ;

    public LogManager(ServerConfig config, //
                      Scheduler scheduler, //
                      long logCleanupIntervalMs, //
                      long logCleanupDefaultAgeMs, //
                      boolean needRecovery) {
        super();
        this.config = config;
        this.maxMessageSize = config.getMaxMessageSize();
        this.scheduler = scheduler;
        //        this.time = time;
        this.logCleanupIntervalMs = logCleanupIntervalMs;
        this.logCleanupDefaultAgeMs = logCleanupDefaultAgeMs;
        this.needRecovery = needRecovery;
        //
        this.logDir = Utils.getCanonicalFile (new File(config.getLogDir()));
        this.numPartitions = config.getNumPartitions();
        this.flushInterval = config.getFlushInterval();
        this.topicPartitionsMap = config.getTopicPartitionsMap();
        this.startupLatch = config.getEnableZookeeper() ? new CountDownLatch(1) : null;
        this.logFlushIntervalMap = config.getFlushIntervalMap();
        this.logRetentionSize = config.getLogRetentionSize();
        this.logRetentionMSMap = getLogRetentionMSMap(config.getLogRetentionHoursMap());
        //
    }

    public void setRollingStategy(RollingStrategy rollingStategy) {
        this.rollingStategy = rollingStategy;
    }

    public void load() throws IOException {
     // 滚动策略为fixedsize 的滚动策略
        if (this .rollingStategy == null) {
            this.rollingStategy = new FixedSizeRollingStrategy(config .getLogFileSize());
        }
        // 如果消息队列的根路径不存在,则创建
        if (!logDir .exists()) {
            logger.info( "No log directory found, creating '" + logDir.getAbsolutePath() + "'");
            logDir.mkdirs();
        }
        // 判断是否是目录并且可写
        if (!logDir .isDirectory() || !logDir.canRead()) {
            throw new IllegalArgumentException(logDir .getAbsolutePath() + " is not a readable log directory.");
        }
        // 获取这个里面的所有的子文件
        File[] subDirs = logDir.listFiles();
        if (subDirs != null) {
            for (File dir : subDirs) {
               // 跳过文件,因为在这个目录下,全部是topic-partition的文件夹
                if (!dir.isDirectory()) {
                    logger.warn( "Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?");
                } else {
                    logger.info( "Loading log from " + dir.getAbsolutePath());
                   
                    // 获取到目录的名称
                    final String topicNameAndPartition = dir.getName();
                    if (-1 == topicNameAndPartition.indexOf('-' )) {
                        throw new IllegalArgumentException("error topic directory: " + dir.getAbsolutePath());
                    }
                   
                    // 将目录名称按照-进行分割,得到一个 kv,然后key就是topic,value是partition
                    final KV<String, Integer> topicPartion = Utils.getTopicPartition(topicNameAndPartition);
                    final String topic = topicPartion.k ;
                    final int partition = topicPartion.v;
                   
                    // 针对这个分区创建一个log对象
                    Log log = new Log(dir, partition, this.rollingStategy , flushInterval , needRecovery , maxMessageSize);

                    // 将新创建的log加入到map中
                    logs.putIfNotExists(topic, new Pool<Integer, Log>());
                    Pool<Integer, Log> parts = logs.get(topic);

                    parts.put(partition, log);
                   
                    // 如果时间的partition比配置中的大,则进行升级
                    int configPartition = getPartition(topic);
                    if (configPartition <= partition) {
                        topicPartitionsMap.put(topic, partition + 1);
                    }
                }
            }
        }

        /* Schedule the cleanup task to delete old logs */
        if (this .scheduler != null) {
            logger.debug( "starting log cleaner every " + logCleanupIntervalMs + " ms" );
            this.scheduler .scheduleWithRate(new Runnable() {

                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error( "cleanup log failed.", e);
                    }
                }

            }, 60 * 1000, logCleanupIntervalMs);
        }
        // 将topic的注册任务注册到 zk里面
        if (config .getEnableZookeeper()) {
            this.serverRegister = new ServerRegister(config, this );
            serverRegister.startup();
            TopicRegisterTask task = new TopicRegisterTask();
            task.setName( "jafka.topicregister");
            task.setDaemon( true);
            task.start();
        }
    }

    private void registeredTaskLooply() {
        while (!stopTopicRegisterTasks ) {
            try {
                TopicTask task = topicRegisterTasks.take();
                if (task.type == TaskType.SHUTDOWN) break;
                serverRegister.processTask(task);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.debug( "stop topic register task");
    }

    class TopicRegisterTask extends Thread {

        @Override
        public void run() {
            registeredTaskLooply();
        }
    }

    // 将log保持的时间单位从小时换算到毫秒
    private Map<String, Long> getLogRetentionMSMap(Map<String, Integer> logRetentionHourMap) {
        Map<String, Long> ret = new HashMap<String, Long>();
        for (Map.Entry<String, Integer> e : logRetentionHourMap.entrySet()) {
            ret.put(e.getKey(), e.getValue() * 60 * 60 * 1000L);
        }
        return ret;
    }

    public void close() {
     // log的刷新器停止
        logFlusherScheduler.shutdown();
       
        // 将所有的log都关闭
        Iterator<Log> iter = getLogIterator();
        while (iter.hasNext()) {
            Closer. closeQuietly(iter.next(), logger);
        }
       
        // 构建一个topic shutdown的消息
        if (config .getEnableZookeeper()) {
            stopTopicRegisterTasks = true ;
            //wake up again and again
            topicRegisterTasks.add(new TopicTask(TaskType.SHUTDOWN , null));
            topicRegisterTasks.add(new TopicTask(TaskType.SHUTDOWN , null));
            Closer. closeQuietly(serverRegister);
        }
    }

    /**
     * Runs through the log removing segments older than a certain age
     *
     * @throws IOException
     */
    private void cleanupLogs() throws IOException {
        logger.trace( "Beginning log cleanup...");
        int total = 0;
        Iterator<Log> iter = getLogIterator();
        long startMs = System.currentTimeMillis();
       
        // 遍历所有的log执行对过期的log进行清理和过大的log进行清理的操作
        while (iter.hasNext()) {
            Log log = iter.next();
            total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log);
        }
        if (total > 0) {
            logger.warn( "Log cleanup completed. " + total + " files deleted in " + (System.currentTimeMillis () - startMs) / 1000 + " seconds" );
        } else {
            logger.trace( "Log cleanup completed. " + total + " files deleted in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds");
        }
    }

    /**
     * Runs through the log removing segments until the size of the log is at least
     * logRetentionSize bytes in size
     *
     * @throws IOException
     */
    private int cleanupSegmentsToMaintainSize(final Log log) throws IOException {
     // 如果log大小永远有效或者还没有到清理的时机就不清理
        if (logRetentionSize < 0 || log.size() < logRetentionSize) return 0;

        // 对log文件执行过滤,过滤的条件是,是否log的大小超过了 retentionsize
        List<LogSegment> toBeDeleted = log.markDeletedWhile(new LogSegmentFilter() {

            long diff = log.size() - logRetentionSize;

            public boolean filter(LogSegment segment) {
                diff -= segment.size();
                return diff >= 0;
            }
        });
       
        // 将需要删除的 logsegment从log中删除
        return deleteSegments(log, toBeDeleted);
    }

    private int cleanupExpiredSegments(Log log) throws IOException {
     // 获取到当前时间
        final long startMs = System.currentTimeMillis ();
       
        // 获取到topic
        String topic = Utils.getTopicPartition(log.dir.getName()). k;
       
        // 获取到log清楚的毫秒数
        Long logCleanupThresholdMS = logRetentionMSMap.get(topic);
        if (logCleanupThresholdMS == null) {
            logCleanupThresholdMS = this.logCleanupDefaultAgeMs ;
        }
        final long expiredThrshold = logCleanupThresholdMS.longValue();
        List<LogSegment> toBeDeleted = log.markDeletedWhile(new LogSegmentFilter() {
            // 如果文件超时,则被过滤为需要删除的文件
            public boolean filter(LogSegment segment) {
                //check file which has not been modified in expiredThrshold millionseconds
                return startMs - segment.getFile().lastModified() > expiredThrshold;
            }
        });
       
        // 执行删除文件操作
        return deleteSegments(log, toBeDeleted);
    }

    /**
     * Attemps to delete all provided segments from a log and returns how many it was able to
     */
    private int deleteSegments(Log log, List<LogSegment> segments) {
        int total = 0;
        // 对于需要删除的LogSegment,关闭messageSet,删除原始文件
        for (LogSegment segment : segments) {
            boolean deleted = false;
            try {
                try {
                    segment.getMessageSet().close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
                if (!segment.getFile().delete()) {
                    deleted = true;
                } else {
                    total += 1;
                }
            } finally {
                logger.warn(String. format("DELETE_LOG[%s] %s => %s", log.name, segment.getFile().getAbsolutePath(),
                        deleted));
            }
        }
        return total;
    }

    /**
     * Register this broker in ZK for the first time.
     */
    public void startup() {
     // 注册broker,并创建topic的任务
        if (config .getEnableZookeeper()) {
            serverRegister.registerBrokerInZk();
            for (String topic : getAllTopics()) {
                serverRegister.processTask(new TopicTask(TaskType.CREATE , topic));
            }
            startupLatch.countDown();
        }
       
        // 初始化写log的刷新器
        logger.debug( "Starting log flusher every {} ms with the following overrides {}", config.getFlushSchedulerThreadRate(), logFlushIntervalMap);
        logFlusherScheduler.scheduleWithRate(new Runnable() {
            public void run() {
                flushAllLogs( false);
            }
        }, config.getFlushSchedulerThreadRate(), config.getFlushSchedulerThreadRate());
    }

    /**
     * flush all messages to disk
     *
     * @param force flush anyway(ignore flush interval)
     */
    public void flushAllLogs(final boolean force) {
        Iterator<Log> iter = getLogIterator();
        while (iter.hasNext()) {
            Log log = iter.next();
            try {
                boolean needFlush = force;
                // 如果不需要刷新
                if (!needFlush) {
                    long timeSinceLastFlush = System.currentTimeMillis() - log.getLastFlushedTime();
                    Integer logFlushInterval = logFlushIntervalMap.get(log.getTopicName());
                    if (logFlushInterval == null) {
                        logFlushInterval = config.getDefaultFlushIntervalMs();
                    }
                    final String flushLogFormat = "[%s] flush interval %d, last flushed %d, need flush? %s";
                    // 判断刷新时间间隔是否大于刷新频率,如果大于则其实为强制刷新
                    needFlush = timeSinceLastFlush >= logFlushInterval.intValue();
                    logger.trace(String.format(flushLogFormat, log.getTopicName(), logFlushInterval,
                            log.getLastFlushedTime(), needFlush));
                }
                if (needFlush) {
                    log.flush();
                }
            } catch (IOException ioe) {
                logger.error( "Error flushing topic " + log.getTopicName(), ioe);
                logger.error( "Halting due to unrecoverable I/O error while flushing logs: " + ioe.getMessage(), ioe);
                Runtime. getRuntime().halt(1);
            } catch (Exception e) {
                logger.error( "Error flushing topic " + log.getTopicName(), e);
            }
        }
    }

    private Collection<String> getAllTopics() {
        return logs .keySet();
    }

    private Iterator<Log> getLogIterator() {
        return new IteratorTemplate<Log>() {
            // 拿到一个topic里面的partition和Log的对应关系
            final Iterator<Pool<Integer, Log>> iterator = logs.values().iterator();

            Iterator<Log> logIter;

            @Override
            protected Log makeNext() {
                while (true ) {
                    if (logIter != null && logIter.hasNext()) {
                        return logIter .next();
                    }
                    if (!iterator .hasNext()) {
                        return allDone();
                    }
                    logIter = iterator.next().values().iterator();
                }
            }
        };
    }

    private void awaitStartup() {
        if (config .getEnableZookeeper()) {
            try {
                startupLatch.await();
            } catch (InterruptedException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }

    // 给一个topic和partition获取到partition和log的对应关系
    private Pool<Integer, Log> getLogPool(String topic, int partition) {
        awaitStartup();
        if (topic.length() <= 0) {
            throw new IllegalArgumentException("topic name can't be empty");
        }
        // 这个里面是有一些判断,但是逻辑没啥
        Integer definePartition = this.topicPartitionsMap .get(topic);
        if (definePartition == null) {
            definePartition = numPartitions;
        }
        if (partition < 0 || partition >= definePartition.intValue()) {
            String msg = "Wrong partition [%d] for topic [%s], valid partitions [0,%d)";
            msg = format(msg, partition, topic, definePartition.intValue() - 1);
            logger.warn(msg);
            throw new InvalidPartitionException(msg);
        }
        return logs .get(topic);
    }

    /**
     * Get the log if exists or return null
     *
     * @param topic     topic name
     * @param partition partition index
     * @return a log for the topic or null if not exist
     */
    public ILog getLog(String topic, int partition) {
     // 验证topic是否非法
        TopicNameValidator. validate(topic);
        Pool<Integer, Log> p = getLogPool(topic, partition);
        return p == null ? null : p.get(partition);
    }

    /**
     * Create the log if it does not exist or return back exist log
     *
     * @param topic     the topic name
     * @param partition the partition id
     * @return read or create a log
     * @throws IOException any IOException
     */
    public ILog getOrCreateLog(String topic, int partition) throws IOException {
     // 如果要创建的partition比配置中的partition大,则抛出异常
        final int configPartitionNumber = getPartition(topic);
        if (partition >= configPartitionNumber) {
            throw new IOException("partition is bigger than the number of configuration: " + configPartitionNumber);
        }
        boolean hasNewTopic = false;
        // 获取到这个topic所对应partition的一个pool
        Pool<Integer, Log> parts = getLogPool(topic, partition);
        if (parts == null) {
           // 如果pool为空,则创建一个pool
            Pool<Integer, Log> found = logs.putIfNotExists(topic, new Pool<Integer, Log>());
            if (found == null) {
                hasNewTopic = true;
            }
            parts = logs.get(topic);
        }
        // 根据partiition获取到对应的log
        Log log = parts.get(partition);
        if (log == null ) {
           // 创建log对象
            log = createLog(topic, partition);
            // 将新创建的logput到parts中
            Log found = parts.putIfNotExists(partition, log);
            if (found != null) {
                Closer. closeQuietly(log, logger);
                log = found;
            } else {
                logger.info( format("Created log for [%s-%d], now create other logs if necessary", topic, partition));
                final int configPartitions = getPartition(topic);
                // 对于配置的partition都做一下检查
                for (int i = 0; i < configPartitions; i++) {
                    getOrCreateLog(topic, i);
                }
            }
        }
        // 如果发现有新创建的topic,则构造任务,并注册到 zk中
        if (hasNewTopic && config .getEnableZookeeper()) {
            topicRegisterTasks.add(new TopicTask(TaskType.CREATE , topic));
        }
        return log;
    }

    /**
     * create logs with given partition number
     *
     * @param topic        the topic name
     * @param partitions   partition number
     * @param forceEnlarge enlarge the partition number of log if smaller than runtime
     * @return the partition number of the log after enlarging
     */
    public int createLogs(String topic, final int partitions, final boolean forceEnlarge) {
        // 判断topic名称是否合法
     TopicNameValidator.validate(topic);
        synchronized (logCreationLock ) {
            final int configPartitions = getPartition(topic);
            if (configPartitions >= partitions || !forceEnlarge) {
                return configPartitions;
            }
            topicPartitionsMap.put(topic, partitions);
            if (config .getEnableZookeeper()) {
                if (getLogPool(topic, 0) != null) {//created already
                    topicRegisterTasks.add(new TopicTask(TaskType.ENLARGE , topic));
                } else {
                    topicRegisterTasks.add(new TopicTask(TaskType.CREATE , topic));
                }
            }
            return partitions;
        }
    }

    /**
     * delete topic who is never used
     * <p>
     * This will delete all log files and remove node data from zookeeper
     * </p>
     *
     * @param topic topic name
     * @return number of deleted partitions or - 1 if authentication failed
     */
    public int deleteLogs(String topic, String password) {
     // 验证密码是否合法
        if (!config .getAuthentication().auth(password)) {
            return -1;
        }
        int value = 0;
        synchronized (logCreationLock ) {
           // 获取到这个topic里面的所有的partition以及对应的log
            Pool<Integer, Log> parts = logs.remove(topic);
            if (parts != null) {
               // 对log挨个执行删除操作
                List<Log> deleteLogs = new ArrayList<Log>(parts.values());
                for (Log log : deleteLogs) {
                    log.delete();
                    value++;
                }
            }
            // 如果zk enable,则要发送DELETE的任务
            if (config .getEnableZookeeper()) {
                topicRegisterTasks.add(new TopicTask(TaskType.DELETE , topic));
            }
        }
        return value;
    }

    private Log createLog(String topic, int partition) throws IOException {
        synchronized (logCreationLock ) {
           // 创建对应的文件夹,并且构造一个Log对象
            File d = new File(logDir , topic + "-" + partition);
            d.mkdirs();
            return new Log(d, partition, this.rollingStategy , flushInterval , false, maxMessageSize);
        }
    }

    // 获取log的个数
    private int getPartition(String topic) {
        Integer p = topicPartitionsMap.get(topic);
        return p != null ? p.intValue() : this.numPartitions ;
    }

    /**
     * Pick a random partition from the given topic
     */
    public int choosePartition(String topic) {
        return random .nextInt(getPartition(topic));
    }

    /**
     * read offsets before given time
     *
     * @param offsetRequest the offset request
     * @return offsets before given time
     */
    public List<Long> getOffsets(OffsetRequest offsetRequest) {
     // 根据topic和partition定位到log对象
        ILog log = getLog(offsetRequest. topic, offsetRequest.partition);
        if (log != null ) {
           // 得到log对象的写指针
            return log.getOffsetsBefore(offsetRequest);
        }
        return ILog.EMPTY_OFFSETS ;
    }

    public Map<String, Integer> getTopicPartitionsMap() {
        return topicPartitionsMap ;
    }

}
 
  • 大小: 68.9 KB
  • 大小: 25.1 KB
0
0
分享到:
评论
1 楼 liguanqun811 2016-10-09  
感觉LogManager打开了所有的LogSegment(文件)的句柄,会受到系统的限制吧?

相关推荐

    Laravel开发-logmanager

    Laravel开发-logmanager Laravel 5界面用于预览、下载和删除Laravel日志文件。

    日志LogManager

    日志LogManager

    LogManager_Linux.rar_Linux日志_LogManager_linux 日志_linux 日志_日志

    一个具备日志分级,自动回滚机制的日志类封装,便于移植到自己的程序中,作为调试日志。

    PyPI 官网下载 | flask-logmanager-0.2.9.tar.gz

    资源来自pypi官网。 资源全名:flask-logmanager-0.2.9.tar.gz

    org.apache.log4j

    Log4j是Apache的一个开放源代码项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件、甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等;我们也可以控制每一条日志的输出格式...

    log4j需要jar

    项目继承log4j时需要的jar,没有会报错java.lang.NoClassDefFoundError: org/apache/log4j/LogManager

    Log4net详细说明使用

    log4net.ILog log = log4net.LogManager.GetLogger("testApp.Logging");//获取一个日志记录器 log.Info(DateTime.Now.ToString() + ": login success");//写入一条新log 这样就将信息同时输出到控制台和写入到文件名...

    logManager:模拟器日志管理器

    日志管理器模拟器日志管理器至现在为止,其中包括一些功能: 通过两种方式将数据放入数据库:按用户手动和按脚本自动。 跟踪所有数据,然后显示为表格和数字。关于echarts的一些变化如何改变线条的颜色?...

    logmanager:Magento 日志管理器

    Magento 日志管理器有时您在 Magento 商家的网站上工作,您必须从 Magento 扩展程序中筛选数千行日志才能找到有用的东西。 此扩展程序可让您:基本功能 启用或禁用商店中某些 3rd 方扩展的日志记录查看所有正确安装...

    LogManager:崩溃日志捕获上传以及处理

    (1)文件名称:手机名称_手机系统版本_环境_外部版本号_版本升级号_崩溃名称_用户ID.文本文件后缀如:华为_10.2_Debug_2.3.1_10_NSRangeException_1100112.txt(2)格式:纯文本格式(3)说明:版本升级号没有则不...

    wildfly-logstash:JBoss Wildfly的Logstash附加程序

    SocketAppender基于jboss-logmanager-ext( )。 两者都应为原创作品而功劳。 此模块已通过Wildfly 8至18测试。 用maven编译jar文件: mvn package 要创建包含以下模块的ZIP文件: mvn package -P zip 解压缩在$...

    udp-to-s3-logManager:用 NodeJs 编写的 UDP 到 AWS S3 日志管理应用程序。 它还支持执行操作,例如如果消息与正则表达式匹配则发送电子邮件

    udp 到 s3-logManager 用 NodeJs 编写的 UDP 到 AWS S3 日志管理应用程序。 此应用程序从 UDP 端口收集消息并将它们存储在 S3 中。 如果匹配正则表达式,它还支持对每条消息运行操作。 例如,您可以将应用程序配置...

    logCollect:Go语言实现的日志收集服务,由LogAgent、LogTransfer和LogManager三部分组成

    logCollect 日志收集服务 软件架构 说明 通过在运维平台上配置日志收集项,logAgent从etcd中获取要收集的日志信息从业务服务器读取日志信息,发往kafka,logTransfer负责从kafka读取日志,写入到Elasticsearch中,...

    fst-1.60.zip

    LogManager.zip,log4j的jdk logmanager实现log4j、log4j2和logback的jdk logmanager实现。SLF4JBridgehandler的替代品。

    log-manager:分层日志记录+多个附加程序-Ember.js日志记录的基础

    var logManager = new LogManager ( ) ; logManager . addAppenderTo ( 'router' , new ConsoleAppender ( ) ) ; logManager . addAppenderTo ( 'router.transitions' , new AjaxAppender ( ) ) . logManager . ...

    Unity Log dll

    Unity log dll , 使用方法 Logger.LogManager.EnableLog = true; Logger.LogManager.Log ("hello !");

    log-writer:日志记录器

    log-writer.js LogWriter {ログファイル出力} 安装: 例子: var LogWriter = require ( 'log-writer' ) ;...LogManager . setWiter ( writer ) ; var log = LogManager . getLogger ( ) ; log . trace ( msg , arg1

    Second Lilfe Log Manager-开源

    Second Life Log Manager,用于管理Second Life的聊天/即时消息日志。 指定的帐户/头像。 这是一个非常基本的SL Chat Log Manager。 您必须在SL Client中启用聊天记录。

    C#实现程序单例日志输出功能

    首先,在你的解决方案中,适当的目录中新建一个类,比如 LogManager: 编写如下代码: /// /// 日志管理 /// public class LogManager { private string _logDir; // 日志文件存放目录 private static ...

    Apache Log4j_1.2.17 完整依赖包

    Apache Log4j_1.2.17 完整依赖包,在jdk1.8.201中测试通过。使用教程https://www.tutorialspoint.com/springmvc/springmvc_log4j.htm

Global site tag (gtag.js) - Google Analytics