#include <algorithm>
#include <cstring>
#include <iostream>
#include <vector>

#include <ext/hash_map>
#include <ext/hash_set>

extern "C" {
   #include <aio.h>
   #include <fcntl.h>
   #include <pthread.h>
   #include <sys/time.h>
   #include <sys/stat.h>
}

#include <pcrecpp.h>

//Global constants
typedef std::pair<char const *, char const *> StringRef;

long const BUFSIZE = 10*1024*1024;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pcrecpp::RE re("^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$");
pcrecpp::RE re2("^http://www.tbray.org/ongoing/");

struct File {
   File(char const * filename) 
    : fd(open(filename, O_RDONLY))
   {
      if (fd == -1) {
         perror("Cannot open");
         exit(-1);
      }
   }
   
   ~File() {
      if (close(fd) != 0) {
         perror("close");
         exit(-1);
      }
   }
   
   off_t size() const {
      off_t size = lseek(fd, 0, SEEK_END);
      if (size == -1) {
         perror("lseek");
         exit(-1);
      }
      return size;
   }
   
   void set(off_t pos) const {
      if (lseek(fd, pos, SEEK_SET) != pos) {
         perror("lseek");
         exit(-1);
      }
   }

   int fd;
};

struct PtrHash {
   std::size_t operator() (char const * str) const {
      return reinterpret_cast<std::size_t>(str);
   }
};

struct StrHash {
   std::size_t operator() (char const * str) const {
      std::size_t h = 0;
      for (; *str != 0; ++str) {
         h = 5 * h + *str;
      }
      return h;
   }
};

struct StrRefHash {
   std::size_t operator() (StringRef str) const {
      std::size_t h = 0;
      for (char const * s = str.first; s != str.second; ++s) {
         h = 5 * h + *s;
      }
      return h;
   }
};

struct PtrEqual {
   bool operator() (char const * a, char const * b) const {
      return a == b;
   }
};

struct StrEqual {
   bool operator() (char const * a, char const * b) const {
      return strcmp(a, b) == 0;
   }
};

struct StrRefEqual {
   bool operator() (StringRef const & a, StringRef const & b) const {
      if (a.second - a.first != b.second - b.first) return false;
      
      char const * a_str = a.first;
      char const * b_str = b.first;
      char const * stop  = a.second;
      
      while (a_str != stop) {
         if (*a_str++ != *b_str++) return false;
      }
      return true;
   }
};

bool operator==(StringRef ref, char const * str) {
   char const * cur = ref.first;
   while (cur != ref.second and *str != '\0') {
      if (*cur != *str) return false;
      ++cur;
      ++str;
   }
   return (cur == ref.second and *str == 0);
}


typedef __gnu_cxx::hash_map<char const *, unsigned long long, PtrHash, PtrEqual> ThreadHash;
typedef __gnu_cxx::hash_map<char const *, unsigned long long, StrHash, StrEqual> Hash;
typedef __gnu_cxx::hash_set<StringRef, StrRefHash, StrRefEqual> Set;

Hash tot_s404s;
Hash tot_u_bytes;
Hash tot_u_hits;
Hash tot_clients;
Hash tot_refs;

void merge_hash(Hash & a, ThreadHash & b) {
   for (ThreadHash::const_iterator i = b.begin(); i != b.end(); ++i) {
      a[i->first] += i->second;
   }
}

struct Data
{
   Data(char const * filename, off_t start, off_t stop) 
    : file(filename),
      start(start),
      stop(stop)
   {
       buffer[0] = new char[BUFSIZE];
       buffer[1] = new char[BUFSIZE];
   }

   StringRef find_string(StringRef & line)
   {
      char const * start = line.first;
      while (start != line.second and *start == ' ') ++start;
      char const * end = start + 1;
      while (end != line.second and *end != ' ' and *end != '\n') ++end;
      line.first = end + 1;
      return std::make_pair(start, end);
   }
   
   StringRef locate_or_create(StringRef str) 
   {
      Set::const_iterator ret = strings.find(str);
      if (ret == strings.end()) {
         std::size_t len = str.second - str.first;
         char * data = static_cast<char *>(malloc(len + 1));
         memcpy(data, str.first, len);
         data[len] = 0;
         StringRef ref = std::make_pair(data, data + len);
         strings.insert(ref);
         return ref;
      } else {
         return *ret;
      }
   }

   void analyse_line(StringRef cur)
   {
      StringRef client = find_string(cur);

      find_string(cur);
      find_string(cur);
      find_string(cur);
      find_string(cur);
      StringRef cmd = find_string(cur);
      if (not (cmd == "\"GET")) return;
      StringRef url = find_string(cur);
      find_string(cur);
      StringRef code = find_string(cur);
      if (code == "404") {
         s404s[locate_or_create(url).first]++;
         return;
      }
      std::size_t size = 0;
      if (code == "304") {
         find_string(cur);
      } else if (code == "200") {
         StringRef size_str = find_string(cur);
         size = strtol(size_str.first, NULL, 10);
      } else {
         return;
      }

      StringRef url_ref = locate_or_create(url);
      u_bytes[url_ref.first] += size;
      if (re.FullMatch(url_ref.first)) {
         u_hits[url_ref.first]++;
         
         StringRef client_ref = locate_or_create(client);
         clients[client_ref.first]++;
         StringRef ref = find_string(cur);
         if (not (ref == "\"-\"" or ref ==  "\"\""))
         {
            if (not re2.PartialMatch(pcrecpp::StringPiece(ref.first + 1, 
                                                          (ref.second - ref.first - 2))))
            {
               StringRef ref_ref = locate_or_create(StringRef(ref.first + 1, ref.second - 1));
               refs[ref_ref.first]++;
            }
         }
      }
   }
   
   void calc_sum() 
   {
      int buf = 0;
      memset(&my_aiocb, 0, sizeof(my_aiocb));
      my_aiocb.aio_buf    = buffer[buf];
      my_aiocb.aio_fildes = file.fd;
      my_aiocb.aio_nbytes = BUFSIZE;
      my_aiocb.aio_offset = start;
      
      if (aio_read64(&my_aiocb) != 0) {
         perror("aio_read");
         exit(-1);
      }
   
      bool ignore_first = (start != 0);
      struct aiocb64 * my_ptr = &my_aiocb;
   
      std::vector<char> line;
      while (true) {
         int first = 0;
         if (aio_suspend64(&my_ptr, 1, NULL) != 0) {
            perror("aio_suspend");
            exit(-1);
         }
   
         if (aio_error64(&my_aiocb) != 0) {
            perror("aio_error64");
            exit(-1);
         }
         
         off_t len = aio_return64(&my_aiocb);
         if (len == 0) {
            if (line.size() > 0 and not ignore_first) 
               analyse_line(std::make_pair(&line[0], &line[0] + line.size()));
            return;
         }
         
         my_aiocb.aio_buf    = buffer[1 - buf];
         my_aiocb.aio_offset = start + BUFSIZE;
         if (aio_read64(&my_aiocb) != 0) {
            perror("aio_read");
            exit(-1);
         }
   
         for (int i = 0; i != len; ++i) {
            if (buffer[buf][i] == '\n') {
               if (ignore_first) {
                  ignore_first = false;
               } else {
                  if (line.size() > 0) {
                     line.insert(line.end(), buffer[buf] + first, buffer[buf] + i + 1);
                     analyse_line(std::make_pair(&line[0], &line[0] + line.size()));
                     line.clear();
                  } else {
                     analyse_line(std::make_pair(buffer[buf] + first, buffer[buf] + i + 1));
                  }
               }
               if (start + i > stop) {
                  return;
               }
               first = i + 1;
            }
         }
         if (first != len) {
            line.insert(line.end(), buffer[buf] + first, buffer[buf] + len);
         }
         start += len;
         buf = 1 - buf;
      }
   }

   pthread_t tid;
   File file;
   off_t start, stop;
   struct aiocb64 my_aiocb;
   char * buffer[2];

   Set  strings;
   ThreadHash s404s, u_bytes, u_hits, clients, refs;
};

extern "C" void * run(void * arg)
{  
   Data * data = static_cast<Data *>(arg);
   data->calc_sum();

   pthread_mutex_lock(&mutex);
   merge_hash(tot_s404s, data->s404s);
   merge_hash(tot_u_bytes, data->u_bytes);
   merge_hash(tot_u_hits, data->u_hits);
   merge_hash(tot_clients, data->clients);
   merge_hash(tot_refs, data->refs);
   pthread_mutex_unlock(&mutex);

   return NULL;
}

bool cmp(std::pair<char const *, unsigned long long> const & a,
         std::pair<char const *, unsigned long long> const & b)
{
   return a.second > b.second;
}

void report(Hash & set, char const * str, bool schrink = false)
{
   std::cout << "Top " << str << ":\n";
   std::vector<std::pair<char const *, unsigned long long> > vec(10);
   std::partial_sort_copy(set.begin(), set.end(),
                          vec.begin(), vec.end(), cmp);

   for (std::size_t i = 0; i < vec.size() and i < set.size(); ++i) {
      if (schrink) {
         fprintf(stdout, " %9.1fM: %s\n", vec[i].second / (1024.0 * 1024.0),
                                          vec[i].first);
      } else {
         fprintf(stdout, " %10llu: %s\n", vec[i].second,
                                          vec[i].first);
      }
   }
   std::cout << std::endl;
}

int main(int argc, char ** argv)
{
   if (argc != 3) {
      std::cout << "Wrong number of arguments: " << argv[0]
                << " filename  num_threads\n";
      exit (-1);
   }

   int num_threads = std::max(1, atoi(argv[2]));
   
   File file(argv[1]);
   off_t file_size = file.size();
   
   off_t chunk_size = file_size / num_threads;
   while (chunk_size % 8192 != 0) ++chunk_size;
   
   std::vector<Data *> datas;
   for (int i = 0; i != num_threads; ++i) {
      datas.push_back(new Data(argv[1], i * chunk_size, 
                               std::min((i + 1) * chunk_size, file_size)));
      pthread_create(&(datas[i]->tid), 0, run, datas[i]);
   }
   for (int i = 0; i != num_threads; ++i) {
      pthread_join(datas[i]->tid, NULL);
   }

   std::cout << tot_u_hits.size() << " resources, "
             << tot_s404s.size() << " 404s, "
             << tot_clients.size() << " clients\n\n";

   report(tot_u_hits,  "URIs by hit");
   report(tot_u_bytes, "URIs by bytes", true);
   report(tot_s404s,   "404s");
   report(tot_clients, "client addresses");
   report(tot_refs,    "referrers");
}
