Skip to main content

Camel-Elasticsearch: create timestamped indices

One nice feature of the logstash-elasticsearch integration is that, by default, logstash will use timestamped indices when feeding data to elasticsearch.

This means that yesterday's data is in a separate index from today's data and from each other day's data, simplifying index management. For instance, suppose you only want to keep the last 30 days:

elasticsearch-remove-old-indices.sh -i 30

The Apache Camel Elasticsearch component provides no such feature out of the box, but luckily it is quite easy to implement (when you know what to do. /grin ).

As a matter of fact, it is enough to define the proper header on the message and the elasticsearch component will then use that header as the index name. Unfortunately this is not documented anywhere, but it can be understood by looking at the source. Once again: use the source, Luke.

So, let's suppose the route is something as simple as:

        <route autostartup="true" id="processMirthMessages-route">
            <from uri="sql:{{sql.selectMessage}}?consumer.delay=5000&consumer.onConsume={{sql.markMessage}}">
            <to uri="elasticsearch://mirth?operation=INDEX&indexType=mmsg">
        </to></from></route>

Then all that is needed to is to define a content enricher bean as follows:

        <route autostartup="true" id="processMirthMessages-route">
            <from uri="sql:{{sql.selectMessage}}?consumer.delay=5000&consumer.onConsume={{sql.markMessage}}">
            <bean method="process" ref="eSheaders">
            <to uri="elasticsearch://mirth?operation=INDEX&indexType=mmsg">
        </to></bean></from></route>

The bean is also pretty simple (imports omitted for brevity):

public class ESHeaders {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        DateFormat df=new SimpleDateFormat("YYYY.MM.dd");
        in.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, "mirth2-"+df.format(new Date()));
    }
}

Update: get timestamp index name from the message itself.

If the data to be indexed contains, as it should, a @timestamp field then the content enricher bean can be imrproved to use it as follows:

public void process(Exchange exchange) {
        Message in = exchange.getIn();
        String indexName=null;
        DateFormat df=new SimpleDateFormat("YYYY.MM.dd");
        try {
            Map body = (Map) in.getBody();
            if(body.containsKey("@timestamp")) {
                logger.trace("Computing indexName from @timestamp: "+body.get("@timestamp"));
                indexName = "mirth2-"+df.format((Date) body.get("@timestamp")));
            } else {
                indexName = "mirth2-"+df.format(new Date()));
            }
        } catch(Exception e) {
            logger.error("Cannot compute index name, failing back to default");
            indexName = "mirth2-"+df.format(new Date()));
        }
        in.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, indexName);
    }

Comments

Parachute Token said…
Greeat blog you have

Popular posts from this blog

Mirth: recover space when mirthdb grows out of control

I was recently asked to recover a mirth instance whose embedded database had grown to fill all available space so this is just a note-to-self kind of post. Btw: the recovery, depending on db size and disk speed, is going to take long. The problem A 1.8 Mirth Connect instance was started, then forgotten (well neglected, actually). The user also forgot to setup pruning so the messages filled the embedded Derby database until it grew to fill all the available space on the disk. The SO is linux. The solution First of all: free some disk space so that the database can be started in embedded mode from the cli. You can also copy the whole mirth install to another server if you cannot free space. Depending on db size you will need a corresponding amount of space: in my case a 5GB db required around 2GB to start, process logs and then store the temp files during shrinking. Then open a shell as the user that mirth runs as (you're not running it as root, are you?) and cd in

From 0 to ZFS replication in 5m with syncoid

The ZFS filesystem has many features that once you try them you can never go back. One of the lesser known is probably the support for replicating a zfs filesystem by sending the changes over the network with zfs send/receive. Technically the filesystem changes don't even need to be sent over a network: you could as well dump them on a removable disk, then receive  from the same removable disk.

How to automatically import a ZFS pool built on top of iSCSI devices with systemd

When using ZFS on top of iSCSI devices one needs to deal with the fact that iSCSI devices usually appear late in the boot process. ZFS on the other hand is loaded early and the iSCSI devices are not present at the time ZFS scans available devices for pools to import. This means that not all ZFS pools might be imported after the system has completed boot, even if the underlying devices are present and functional. A quick and dirty solution would be to run  zpool import <poolname> after boot, either manually or from cron. A better, more elegant solution is instead to hook into systemd events and trigger zpool import as soon as the devices are created.