#!/usr/bin/perl

use strict;
use warnings;

use FindBin;
use lib "$FindBin::Bin/../../lib";
use Wigo::Probe qw/:all/;

use List::Util qw/sum/;

use LWP::UserAgent;
my $UA = LWP::UserAgent->new;
$UA->timeout(5);

use JSON;

###
# DEFAULT CONFIG
###

my $conf = {
    'servers' => {
        #'master1' => {
        #    'host' => '127.0.0.1:50070',
        #},
    },
};

init( config => $conf );

if ( ! scalar ( keys %{config->{'servers'}} ) )
{
    message "No HDFS namenode to monitor";
    output 13;
}

my @messages;
for my $name ( keys %{ config->{'servers'} } )
{
    my $server = config->{'servers'}->{$name};

    my $response = $UA->get( 'http://'.$server->{'host'}.'/jmx' );

    detail->{$name}->{'host'} = $server->{'host'};

    detail->{$name}->{'status'} = "UP";
    if ( 1 or $response->is_success )
    {
        my $data;
        eval
        {
            $data = decode_json($response->decoded_content);
        };

        if ( $@ or !$data or ! ref $data eq 'HASH' )
        {
            raise 300;
            push @messages, "$name ERROR";
            detail->{$name}->{'status'} = "Can't decode json response : $@";
            next;
        }

        my @errors;
        foreach my $bean ( @{$data->{'beans'}} )
        {
            if ( $bean->{'name'} eq 'java.lang:type=Memory' )
            {
                foreach my $type ( keys %{$bean->{'HeapMemoryUsage'}} )
                {
                    detail->{$name}->{'memory_used'} = sprintf "%.2f GB", $bean->{'HeapMemoryUsage'}->{'used'} / 1024 / 1024 / 1024;
                    detail->{$name}->{'memory_max'}  = sprintf "%.2f GB", $bean->{'HeapMemoryUsage'}->{'max'} / 1024 / 1024 / 1024;
                    add_metric { 'Tags' => { 'server' => $name, 'metric' => 'HeapMemoryUsage', 'type' => $type }, 'Value' => $bean->{'HeapMemoryUsage'}->{$type} };
                }
            }

            elsif ( $bean->{'name'} eq 'Hadoop:service=NameNode,name=NameNodeStatus' )
            {

                detail->{$name}->{'state'} = $bean->{'State'};
            }

            elsif ( $bean->{'name'} eq 'Hadoop:service=NameNode,name=FSNamesystem' )
            {
                $bean->{'CapacityUsed'} and detail->{$name}->{'CapacityUsed'}   = sprintf "%.3f TB", $bean->{'CapacityUsed'} / (1024 ** 4); 
                $bean->{'CapacityTotal'} and detail->{$name}->{'CapacityTotal'}  = sprintf "%.3f TB", $bean->{'CapacityTotal'} / (1024 ** 4);
                detail->{$name}->{'FilesTotal'}     = $bean->{'FilesTotal'};
                detail->{$name}->{'BlocksTotal'}    = $bean->{'BlocksTotal'};

                if ( $bean->{'MissingBlocks'} )
                {
                    raise 300;
                    push @errors, sprintf("%d MissingBlocks", $name, $bean->{'MissingBlocks'});
                }
                detail->{$name}->{'MissingBlocks'} = $bean->{'MissingBlocks'};

                if ( $bean->{'CorruptBlocks'} )
                {
                    raise 300;
                    push @errors, sprintf("%d CorruptBlocks", $name, $bean->{'CorruptBlocks'});
                }
                detail->{$name}->{'CorruptBlocks'} = $bean->{'CorruptBlocks'};

                if ( $bean->{'UnderReplicatedBlocks'} )
                {
                    raise 200;
                    push @errors, sprintf("%d UnderReplicatedBlocks", $name, $bean->{'UnderReplicatedBlocks'});
                }
                detail->{$name}->{'UnderReplicatedBlocks'} = $bean->{'UnderReplicatedBlocks'};

                for my $metric ( qw /CapacityUsed CapacityTotal FilesTotal BlocksTotal MissingBlocks CorruptBlocks UnderReplicatedBlocks / )
                {
                    add_metric { 'Tags' => { 'server' => $name, 'metric' => $metric }, 'Value' => $bean->{$metric} };
                }
            }
        }
        
        if ( scalar @errors )
        {
            push @messages, "$name : " . join(', ', @errors);
        }
    }
    else
    {
        raise 300;
        push @messages, "$name DOWN";
        detail->{$name}->{'status'} = $response->status_line;
        next;
    }
}

if ( scalar @messages )
{
    message join ' , ' , @messages;
}
else
{
    foreach my $name ( keys %{ detail() } )
    {
        push @messages, sprintf("%s : name node is UP ( %s / %s )", ( $name, detail->{$name}->{'CapacityUsed'} , detail->{$name}->{'CapacityTotal'} ));
    }
    message join ' , ' , @messages;
}

output 0;
