Log Files with Flume and Hive

So here is the situation, you are told that you need to come up with some kind of solution to first capture, then retain and finally process all the company log files.  Of course, the business users want to be able to query the data in any kind of imaginable way, but they really can’t define any use cases just yet.  Does this sound like what you need to figure out?  Well if so, you have come to the right place. 

The first thing you consider is that the log file data will grow and grow and grow…  So you are going to need something that will scale in an economical way.  The volume of data and the lack clear requirements make using a data warehouse not the best fit.  Naturally, you come to Hadoop.  Great that is the first step, so now how do you get all those log files into Hadoop. flume_hive

Certainly, Flume should be the perfect fit here.  After all Flume was designed around ingesting log files.  So you start doing some web searches, and you come across some information, but the problem is that all the examples you find are not exactly what you need.  There is the over-used Twitter example, and there are some other JSON examples, but what you want is just plain and simple log files.  There just are not that many good examples for that.  Hopefully, this article will help with that. 

How to manage Log Files with Flume and Hive?  This is what this article will go over.  It will provide the details of ingesting simple log files into HDFS and then using Hive to make that data easily accessible.  This example will use log files from Juniper Netscreen Firewalls, but the general approach can be tweaked to work with whatever log files you need to ingest and process. 

Source Code

Everything that will be discussed is available on GitHub at

flume-logs repository

Prerequisites

Since I don’t want this article to be unbearably long, I am going to make some assumptions:

  • You already have a cluster installed and configured.  I am using CDH 4.5, but there is nothing specific about any of this and it all should work with any Hadoop distribution.
  • You already have a general understanding of Flume.  I won’t go over the general concepts and general setup.  There are much better articles out there that have already taken care of that. 
  • You already have a general understanding of Hive.  Similar to the situation with Flume, I want to concentrate on telling you something you don’t already know. 
  • You already have an understanding of regular expressions. 

Solution Overview

The solution starts with the Flume agent receiving events.  In the case of this example, I will use netcat for the sake of simplicity, but it real life you will probably use some variant of syslog.  Events will be received at the agent source, and the only thing we really need to do is make sure it has a timestamp in the header, so we will use the timestamp interceptor. 

Next, for the channel, we will use a memory channel.  Again, this is just for the sake of simplicity.  In a production setting, you may want to make a different choice depending upon your needs and circumstances.  Otherwise, there will be nothing special happening in the channel.

Finally, we get to the sink, and this is where it starts to get interesting.  We already know that we want to send our data to HDFS, so we will use an HDFS sink.  However, we also know that we are going to be using Hive to make the data more accessible.  Thus, on the Hive side we are going to want to partition our data. 

The most obvious way to do that would be to partition based upon date and time, which will make sense, because that will divide the log entries into neat buckets based on time, and then you can use HiveQL to extract information for particular columns.  This will impact how we write the data.  We will need to use the time based escape sequences on the Flume side to write the entries, and then on the Hive side we need to define matching partitions. 

However, the complexity goes a little deeper, because Hive also requires that we use a schema.  So we will need to extract the data that we think will be important from the log entries and place that into columns, so that Hive will be able to read it.  There are a number of strategies that could be used here, but for this example, I will just extract some barebones information. 

Taking the thinking to the next level, it becomes clear that there are a number of possibilities here.  Different kinds firewalls might have slightly different formats and so forth.  In the simple case, we will need to extract the data, place it into column format, and write it out.  But, there is also the possibility that we may need to reorder some of the extracted data to line up with columns in the schema.  For example, you may find that in the parsing, you extract the last column in the schema as the first piece of data, so you would need to save that piece of data and write it later. 

Yet another thing to consider on the Hive side is the way the data gets written into HDFS.  By default, Flume will write the data as a Sequence File, which may or may not work for you.  In my case, I needed to keep those log files as simple and as easy to access and read as possible, which meant that they needed to be stored as CSV files.  Thus, it was necessary to create a custom serializer, which would extract, reorder and write in a CSV format. 

Flume Configuration

The Flume configuration is shown below and it is also in the conf directory of the GitHub project.  There are a few points that are worth mentioning.  The source is not particularly interesting or complicated.  It is pretty much your basic netcat source. 

The sink does however have some things worthy pointing out.  The first is that it assumes that you have created the directory (/user/flume/logs) for the flume user, and you will also need to give this directory relaxed permissions to allow access. 

The use of the writeFormat=Text and fileType=DataStream where needed to write the files on HDFS as CSV files.  Also, the use of the custom serializer, CSVSerializer, will extract the data using the regular expression specified with the regex property, which in this case is just basic connection information.  The regexorder property is used to reorder the regex groups to match with the desired output order to match with the Hive schema.  Notice, that it was necessary to define the serializer using the fully qualified class name and to specify the embedded Builder class, which will be explained in the source code section. 

Finally, the rolling related properties are intended to create a single file on HDFS for each hour, with the intention of reducing the number of files.  Naturally, these will need to be changed depending upon conditions, or the granularity be decreased and go to daily partitions.  Flume has a very bad habit of creating a lot of files, if you do not manage the configuration properly, and it is definitely something to be aware and watchful of. 

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.sources.source1.type     = netcat
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.interceptors.i1.preserveExisting = true
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.path = /user/flume/logs/year=%Y/month=%m/day=%d/hour=%H
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.rollCount = 10000
tier1.sinks.sink1.hdfs.batchSize = 10000
tier1.sinks.sink1.serializer = com.freitas.flume.serializer.CSVSerializer$Builder
tier1.sinks.sink1.serializer.format = CSV
tier1.sinks.sink1.serializer.regex = .* proto=(\\d+) .* src=(.*) dst=(.*) src_port=(\\d+) dst_port=(\\d+).*
tier1.sinks.sink1.serializer.regexorder = 5 1 2 3 4

tier1.sinks.sink1.channel      = channel1
tier1.channels.channel1.capacity = 100

Custom Serializer

The custom serializer (CSVSerializer.java) is really not that big of a deal.  At first blush, it seemed intimidating, but as I got into it, I found it to be pretty easy.  I was able to pretty easily follow the existing Flume source and leverage bits and pieces from other stuff in there.  The class needs to implement the EventSerializer interface.  I really only needed to define the write() method, which does the bulk of the work.  I also needed to make the constructor private and create a top level embedded class to instantiate an instance.  This is just how Flume does it, and we really can’t argue with it. 

In terms of the logic, it is actually remarkably simple.  The constructor will retrieve the configuration properties and setup the regex to use and create a hash that will be indexed on the order that regex groups need to be written out.  Then in the write() method, it will use the regex on each line, extract the groups, put them in the hash indexed by the desired output ordering, and finally read through the hash and write everything out with comma separators. 

Deploying the Serializer

The deployment of the custom serializer is not completely obvious, and deserves some discussion.  It needs to be done as a Jar file, then the Jar file needs to put in a specific location, and finally Flume will be able to find and use it.  You need to do this regardless of the number of classes.  Here the Jar file has only the one class, but the same steps needs to be followed.  I chose to use a Maven project to make the generation of the Jar file easy, but you can use Ant or whatever build method you prefer.  Once you have the Jar, you will need to do something like:

cd /var/lib/flume-ng/plugins.d
mkdir -p plugins.d/flume-logs/lib
chown -R flume plugins.d
chgrp -R flume plugins.d
cp flume-logs-1.0.0.jar /var/lib/flume-ng/plugins.d/flume-logs/lib

Hive Configuration

On the Hive side, you will need to launch the Hive shell and create an external table with a schema that matches the output generated by the serializer.  In truth, you would probably have started by defining the desired schema first, but it was just easier for me to organize the article this way.  Notice, it is partitioned by the date and time, which will expect the directories that are created by Flume.  The use of the external table works out well, because you don’t need to do any time consuming loading of the data.  It is just there where you need it to be and Hive can just start using it. 

CREATE EXTERNAL TABLE networkData (
  action_time BIGINT,
  src_ip STRING,
  dest_ip STRING,
  src_port STRING,
  dest_port STRING,
  protocol STRING
)
PARTITIONED BY (year int, month int, day int, hour int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/flume/logs/';

Now, this is the one thing that I am the least fond of with this solution.  It will be necessary to execute an ALTER TABLE to add each new partition, in order for Hive to be able to see the data.  This is not really a show stopper.  It could be handled with a crontab script or something like that, but it is something that will need to be done.  It is just the way Hive works. 

ALTER TABLE networkData
ADD PARTITION (year = 2014, month = 01, day = 28, hour = 14);

Generating Events

Okay, so now everything has been configured.  The only thing left is to generate some data and start doing some HiveQL queries.  For this I wrote a Python script to generate some events and pump those into the netcat.  If you just so happen to have an active syslog generators, then feel free to use those.  But for me, I needed to have something generating data to work through the development process. 

The script can be found in the generator directory and would be invoked with something like:

./gen_events.py 2>&1 | nc 127.0.0.1 9999

It will also create a replay log file, if you want to replay the events for some reason. 

Future Considerations

As you can see, this example saves the log files down to hour.  This will only work for short term data that will need to be referenced often.  The problem here is the number of files.  As I am sure you already know, the name node can only support a limited number of files.  Saving a file every hour of every day can very quickly add up to a lot of files.  At some point, it will be necessary aggregate some of this data into fewer files, which will make it just a little harder to reference.  Just something to think about. 

Once you have the data in Hive, then you will need to consider the next level of detail, what to do with all this interesting data.  Depending upon who will view the data and how it will be used, you will need to set something up for your users.  In general, you can expect that your users will not be the typical Hadoop nerds, like yourself.  Instead, they will be business oriented users, and you will need to make things easier for them.  Certainly, Hive will allow for some simple processing to be done, but what happens when your users need to go to the next level?  I would venture to suggest Pig as a good option.  Pig is easier than writting MR jobs, but still provides a rich set of tools to do quite a bit of complex processing.  You may find that you only need to invest is a relatively small collection of Pig scripts that satisfy most of your user's needs.  Further some of the more savvy users will be able to start creating their own Pig scripts pretty quickly. 

Conclusion

I hope you have found this article useful, and it has helped to clear up some of the mystery of managing Log Files with Flume and Hive.  Once you break it down into a step by step process, it really is not that difficult, but knowing the steps that is the trick. 

Best of luck to you in your Big Data adventure!