#include <algorithm>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <cstdlib>
#include <ext/hash_map>
#include <fstream>
#include <iostream>
#include <string>
#include <vector>

extern "C" {
#include <pthread.h>
}

#include <pcrecpp.h>

using namespace boost;
using namespace boost::algorithm;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pcrecpp::RE re("^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$");
pcrecpp::RE re2("^http://www.tbray.org/ongoing/");

struct StringHash {
   std::size_t operator() (std::string const & str) const {
      return __gnu_cxx::hash<char const *>()(str.c_str());
   }
};

typedef __gnu_cxx::hash_map<std::string, unsigned long long, StringHash> Hash;

Hash tot_s404s, tot_u_bytes, tot_u_hits, tot_clients, tot_refs;

void merge_hash(Hash & a, Hash const & b) {
   for (Hash::const_iterator i = b.begin(); i != b.end(); ++i) {
      a[i->first] += i->second;
   }
}

struct Data {

   pthread_t      tid;
   std::ifstream  file;
   std::streampos start, stop;
   Hash           s404s, u_bytes, u_hits, clients, refs;

   Data(char const * filename, off_t start, off_t stop) 
    : file(filename),
      start(start),
      stop(stop)
   {
      if (not file) {
         exit(-1);
      }
   }

   void analyse_line(std::string const & line)
   {
      std::vector<std::string> parts;
      split(parts, line, is_any_of(" "));
   
      if (parts[5] != "\"GET") return;
      if (parts[8] == "404") {
         s404s[parts[6]]++;
         return;
      }
      std::size_t size;
      if (parts[8] == "304") {
         size = 0;
      } else if (parts[8] == "200") {
         size = strtol(parts[9].c_str(), NULL, 10);
      } else {
         return;
      }
      u_bytes[parts[6]] += size;
      if (re.FullMatch(parts[6])) {
         u_hits[parts[6]]++;
         clients[parts[0]]++;
         std::string ref = parts[10].substr(1, parts[10].size() - 2);
         if (parts[10] != "\"-\"" and parts[10] !=  "\"\"" and not re2.PartialMatch(ref)) {
            refs[ref]++;
         }
      }
   }
   
   void calc_sum() 
   {
      file.seekg(start);
      std::string line;
   
      if (start > 0) {
         std::getline(file, line);
      }
      
      if (not file) return;
      
      while (file.tellg() <= stop and std::getline(file, line)) {
         analyse_line(line);
      }
   }
};


extern "C" void * run(void * arg) 
{
   Data * data = static_cast<Data *>(arg);
   data->calc_sum();

   pthread_mutex_lock(&mutex);
   merge_hash(tot_s404s, data->s404s);
   merge_hash(tot_u_bytes, data->u_bytes);
   merge_hash(tot_u_hits, data->u_hits);
   merge_hash(tot_clients, data->clients);
   merge_hash(tot_refs, data->refs);
   pthread_mutex_unlock(&mutex);

   return NULL;
}


bool cmp(std::pair<std::string, unsigned long long> const & a,
         std::pair<std::string, unsigned long long> const & b)
{
   return a.second > b.second;
}

void report(Hash & set, char const * str, bool schrink = false)
{
   std::cout << "Top " << str << ":\n";
   std::vector<std::pair<std::string, unsigned long long> > vec(10);
   std::partial_sort_copy(set.begin(), set.end(),
                          vec.begin(), vec.end(), cmp);

   for (std::size_t i = 0; i < vec.size() and i < set.size(); ++i) {
      if (schrink) {
         fprintf(stdout, " %9.1fM: %s\n", vec[i].second / (1024.0 * 1024.0),
                                          vec[i].first.c_str());
      } else {
         fprintf(stdout, " %10llu: %s\n", vec[i].second,
                                          vec[i].first.c_str());
      }
   }
   std::cout << std::endl;
}

int main(int argc, char ** argv)
{
   if (argc != 3) {
      std::cout << "Wrong number of arguments: " << argv[0]
                << " filename  num_threads\n";
      exit (-1);
   }

   int num_threads = std::max(1, atoi(argv[2]));
   
   std::ifstream file(argv[1]);
   if (not file) {
      exit(-1);
   }
   file.seekg(0, std::ios_base::end);
   std::streampos file_size = file.tellg();
   
   std::streampos chunk_size = file_size / num_threads + 1;

   std::vector<Data *> datas;
   for (int i = 0; i != num_threads; ++i) {
      datas.push_back(new Data(argv[1], i * chunk_size, 
                               std::min(std::streampos((i + 1) * chunk_size), file_size)));
      pthread_create(&(datas[i]->tid), 0, run, datas[i]);
   }
   for (int i = 0; i != num_threads; ++i) {
      pthread_join(datas[i]->tid, NULL);
   }

   std::cout << tot_u_hits.size() << " resources, "
             << tot_s404s.size() << " 404s, "
             << tot_clients.size() << " clients\n\n";

   report(tot_u_hits,  "URIs by hit");
   report(tot_u_bytes, "URIs by bytes", true);
   report(tot_s404s,   "404s");
   report(tot_clients, "client addresses");
   report(tot_refs,    "referrers");
}