Apache Kafka Logs: A Comprehensive Guide to Log Data Structure & Management?

Apache Kafka Logs: A Comprehensive Guide to Log Data Structure & Management?


Data is not the same as it used to be a decade ago. Processing data into usable information is a lot harder than imaginable. Apache Kafka, created by Linkedin but now changed to open source development, is an excellent free tool for managing data and making good use of them. In today’s digital world data is very important, it is the data that is driving our perception of reality.

Apache Kafka was precisely made to solve this complicated problem. It is a real-time(less than 10ms) data streaming software that allows users to store, analyze and read data received from a single source(producer) or multiple. It essentially helps in distributing the data to meaningful channels at the fastest possible time frame. For example- In a cricket match there is a source(producer) who checks the real-time score and passes this information to the channels. Channels are like brokers who will then supply the information to the end consumers in the most efficient way. Apache Kafka is that medium where all these things happen. 

  • Apache Kafka can very easily be scaled without any significant system-level downtime.
  • Apache Kafka is a fault-tolerant system since it uses multiple brokers to transmit data, meaning if one broker goes offline, there is always a replicated broker who has the same data stored. 
  • Security tools like Kerberos can be used while building applications based on Kafka.

For your convenience, you can directly get the learn Apache kafka here

What Are Apache Kafka Logs?

  • Apache Kafka Logs are a collection of various data segments present on your disk. All the various data segments have names that are either form-topic or specific-topic partition.
  • Apache Kafka also allows us to replicate data nodes by committing an external log for a distributed system. This mechanism allows us to restore data apart from just reading it, whenever we need it. 

Photo by Markus Spiske from Pexels

Things to Remember while working with Apache Kafka Logs?

  • Remember to avoid logging in redundant data or data which is only to be used for operational purposes. 
  • Normally you can choose to create log-based entries at the very beginning and the end of a module startup(s) or shutdown(s), but you can also create special logs. Special Logs can be used at the start and at the end of a specific phase for carrying out an update.

How to enable Logs in Apache Kafka?

Here is a log configuration which you need to enter into your script to start logs in apache kafka- 

# Enable both file and kafka based logging

log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender

log4j.appender.kafka.topic=flink.logs

log4j.appender.kafka.brokerList=<broker_host>:9092

# Log layout configuration

log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1

log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Once you have done this, you should be able to find the logs in javascript object notation format or json. 

Various Operation & Commands Associated with Apache Kafka Logs

In Apache Kafka, you can execute as many commands as you want to implement various operations. The logging will take place in the background. 

  1. Logging Segments – Use this code

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

  1.     Create a new log instance

apply(

dir:

File,config: LogConfig,

logStartOffset: Long,

recoveryPoint: Long,

scheduler: Scheduler,

brokerTopicStats: BrokerTopicStats,

time: Time = Time.SYSTEM,

maxProducerIdExpirationMs: Int,

producerIdExpirationCheckIntervalMs: Int,

logDirFailureChannel: LogDirFailureChannel): Log

  1.     Reading Records

addAbortedTransactions(

  startOffset: Long,

  segmentEntry: JEntry[JLong, LogSegment],

  fetchInfo: FetchDataInfo): FetchDataInfo

read(

  startOffset: Long,

  maxLength: Int,

  maxOffset: Option[Long],

  minOneMessage: Boolean,

  includeAbortedTxns: Boolean): FetchDataInfo

  1.     Appending Records

maybeRoll(

  messagesSize: Int,

  appendInfo: LogAppendInfo): LogSegment

append(

  records: MemoryRecords,

  isFromClient: Boolean,

  interBrokerProtocolVersion: ApiVersion,

  assignOffsets: Boolean,

  leaderEpoch: Int): LogAppendInfo

  1. Clean segments and build an offset map

collectAbortedTransactions(

  startOffset: Long,

  upperBoundOffset: Long): List[AbortedTxn]

collectAbortedTransactions(

  startOffset: Long,

  upperBoundOffset: Long,

  startingSegmentEntry: JEntry[JLong, LogSegment],

  accumulator: List[AbortedTxn] => Unit): Unit

  1. Deleting Segments

roll(

  expectedNextOffset: Option[Long] = None): LogSegment

asyncDeleteSegment(segment: LogSegment): Unit

deleteSeg(): Unit

  1. Create a new log file

logFile(

  dir: File,

  offset: Long,

  suffix: String = ""): File

  1. Open a new log segment-

offsetIndexFile(

  dir: File,

  offset: Long,

  suffix: String = ""): File

timeIndexFile(

  dir: File,

  offset: Long,

  suffix: String = ""): File

transactionIndexFile(

  dir: File,

  offset: Long,

  suffix: String = “”): File

  1. Close a log

close(): Unit

  1. Recover and rebuild segments

recoverSegment(

  segment: LogSegment,

  leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(

  lastOffset: Long,

  reloadFromCleanShutdown: Boolean,

  producerStateManager: ProducerStateManager): Unit

  1. Add or convert segments:

addSegment(

  segment: LogSegment): LogSegment

convertToOffsetMetadata(

  offset: Long): Option[LogOffsetMetadata]

  1. Load a partition

parseTopicPartitionName(dir: File): TopicPartition

  1. Modify the configuration file

updateConfig(

  updatedKeys: Set[String],

  newConfig: LogConfig): Unit

  1. Truncate an operation:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit