#!/usr/bin/perl

use strict;
use warnings;

use FindBin;
use lib "$FindBin::Bin/../../lib";
use Wigo::Probe qw/:all/;

use List::Util qw/sum/;

use LWP::UserAgent;
my $UA = LWP::UserAgent->new;
$UA->timeout(5);

use JSON;
use Time::HiRes qw/time/;

###
# DEFAULT CONFIG
###

my $conf = {
    'servers' => {
        #'master1' => {
        #    'host' => '127.0.0.1:60010',
        #},
    }
};

init( config => $conf );

if ( ! scalar ( keys %{config->{'servers'}} ) )
{
    message "No HBase master to monitor";
    output 13;
}

my $now = time;
my $new = {
    'time' => $now,
};

persist or persist({});
my $delta_time = $now - persist->{'time'} if persist->{'time'};

my @messages;
for my $name ( keys %{ config->{'servers'} } )
{
    my $server = config->{'servers'}->{$name};

    my $response = $UA->get( 'http://'.$server->{'host'}.'/jmx' );

    detail->{$name}->{'host'} = $server->{'host'};

    detail->{$name}->{'status'} = "UP";
    if ( $response->is_success )
    {
        my $data;
        eval
        {
            $data = decode_json($response->decoded_content);
        };

        if ( $@ or !$data or ! ref $data eq 'HASH' )
        {
            raise 300;
            push @messages, "$name ERROR";
            detail->{$name}->{'status'} = "Can't decode json response : $@";
            next;
        }

        foreach my $bean ( @{$data->{'beans'}} )
        {

            if ( $bean->{'name'} eq 'java.lang:type=Memory' )
            {
                foreach my $type ( keys %{$bean->{'HeapMemoryUsage'}} )
                {
                    detail->{$name}->{'memory_used'} = sprintf "%.2f GB", $bean->{'HeapMemoryUsage'}->{'used'} / 1024 / 1024 / 1024;
                    detail->{$name}->{'memory_max'}  = sprintf "%.2f GB", $bean->{'HeapMemoryUsage'}->{'max'} / 1024 / 1024 / 1024;
                    add_metric { 'Tags' => { 'server' => $name, 'metric' => 'HeapMemoryUsage', 'type' => $type }, 'Value' => $bean->{'HeapMemoryUsage'}->{$type} };
                }
            }

            elsif ( $bean->{'name'} eq 'hadoop:service=Master,name=Master' )
            {
                detail->{$name}->{'active'} = $bean->{'IsActiveMaster'};

                detail->{$name}->{'region_server_down'} = $bean->{'DeadRegionServers'};
                my $regionServerDown = scalar @{$bean->{'DeadRegionServers'}};
                if ( $regionServerDown > 0 )
                {
                    raise 300 + $regionServerDown;
                    push @messages, sprintf "%s : %d region server DOWN", ($name,$regionServerDown);
                }
                detail->{$name}->{'num_region_server_down'} = $regionServerDown;

                my $regionServer = scalar @{$bean->{'RegionServers'}};
                my $regions = sum map { $_->{'value'}->{'numberOfRegions'} } @{$bean->{'RegionServers'}};

                detail->{$name}->{'regions'} = $regions;
                detail->{$name}->{'region_server'} = $regionServer;
                detail->{$name}->{'average_load'} = sprintf "%.3f", $bean->{'AverageLoad'};

                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'regions' }, 'Value' => $regions };
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'region_server' }, 'Value' => $regionServer };
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'region_server_down' }, 'Value' => $regionServerDown };
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'average_load' }, 'Value' => $bean->{'AverageLoad'} };
            }

            elsif ( $bean->{'name'} eq 'hadoop:service=Master,name=MasterStatistics' )
            {
                detail->{$name}->{'cluster_requests'} = sprintf "%.3f req/s", $bean->{'cluster_requests'};
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'cluster_requests' }, 'Value' => $bean->{'cluster_requests'} };
            }

            if ( $bean->{'name'} eq 'Hadoop:service=HBase,name=Master,sub=Server' )
            {
                detail->{$name}->{'active'} = $bean->{'tag.isActiveMaster'};
                detail->{$name}->{'region_server'} = $bean->{'numRegionServers'};
                detail->{$name}->{'average_load'} = sprintf "%.3f", $bean->{'averageLoad'};
                detail->{$name}->{'region_server_down'} = $bean->{'tag.deadRegionServers'};

                my $regionServerDown = $bean->{'numDeadRegionServers'};
                if ( $regionServerDown > 0 )
                {
                    raise 300 + $regionServerDown;
                    push @messages, sprintf "%s : %d region server DOWN", ($name, $regionServerDown);
                }
                detail->{$name}->{'num_region_server_down'} = $regionServerDown;

                $new->{'reqs'} = $bean->{'clusterRequests'};
                if ( $delta_time and persist->{'reqs'} )
                {
                    my $reqs = ( $new->{'reqs'} - persist->{'reqs'} ) / $delta_time;
                    detail->{$name}->{'cluster_requests'} = sprintf "%.3f req/s", $reqs;
                    add_metric { 'Tags' => { 'server' => $name, 'metric' => 'cluster_requests' }, 'Value' => $reqs };
                }

                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'region_server' }, 'Value' => $bean->{'numRegionServers'} };
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'region_server_down' }, 'Value' => $regionServerDown };
                add_metric { 'Tags' => { 'server' => $name, 'metric' => 'average_load' }, 'Value' => $bean->{'averageLoad'} };
            }
        }
    }
    else
    {
        raise 300;
        push @messages, "$name DOWN";
        detail->{$name}->{'status'} = $response->status_line;
        next;
    }
}

persist $new;

if ( scalar @messages )
{
    message join ' , ' , @messages;
}
else
{
    foreach my $name ( keys %{ detail() } )
    {
        my $msg = sprintf( "%s OK", ( $name ) );
        if ( detail->{$name}->{'region_server'} )
        {
            $msg .= sprintf(" %d region server", ( detail->{$name}->{'region_server'} ));
        }
        if ( detail->{$name}->{'average_load'} )
        {
            $msg .= sprintf(" ( load %.2f )", ( detail->{$name}->{'average_load'} ));
        }
        if ( detail->{$name}->{'cluster_requests'} )
        {
            $msg .= sprintf( " ( %s )", ( detail->{$name}->{'cluster_requests'} ));
        }
        push @messages, $msg;
    }
    message join ' , ' , @messages;
}

persist $new;
output 0;