00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "inspircd.h"
00015 #include <tds.h>
00016 #include <tdsconvert.h>
00017 #include "users.h"
00018 #include "channels.h"
00019 #include "modules.h"
00020
00021 #include "m_sqlv2.h"
00022
00023
00024
00025
00026
00027 class SQLConn;
00028 class MsSQLResult;
00029 class ResultNotifier;
00030 class MsSQLListener;
00031 class ModuleMsSQL;
00032
00033 typedef std::map<std::string, SQLConn*> ConnMap;
00034 typedef std::deque<MsSQLResult*> ResultQueue;
00035
00036 ResultNotifier* notifier = NULL;
00037 MsSQLListener* listener = NULL;
00038 int QueueFD = -1;
00039
00040 ConnMap connections;
00041 Mutex* QueueMutex;
00042 Mutex* ResultsMutex;
00043 Mutex* LoggingMutex;
00044
00045 class QueryThread : public Thread
00046 {
00047 private:
00048 ModuleMsSQL* Parent;
00049 InspIRCd* ServerInstance;
00050 public:
00051 QueryThread(InspIRCd* si, ModuleMsSQL* mod)
00052 : Thread(), Parent(mod), ServerInstance(si)
00053 {
00054 }
00055 ~QueryThread() { }
00056 virtual void Run();
00057 };
00058
00059 class ResultNotifier : public BufferedSocket
00060 {
00061 ModuleMsSQL* mod;
00062
00063 public:
00064 ResultNotifier(ModuleMsSQL* m, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), mod(m)
00065 {
00066 }
00067
00068 virtual bool OnDataReady()
00069 {
00070 char data = 0;
00071 if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0)
00072 {
00073 Dispatch();
00074 return true;
00075 }
00076 return false;
00077 }
00078
00079 void Dispatch();
00080 };
00081
00082 class MsSQLListener : public ListenSocketBase
00083 {
00084 ModuleMsSQL* Parent;
00085 irc::sockets::insp_sockaddr sock_us;
00086 socklen_t uslen;
00087 FileReader* index;
00088
00089 public:
00090 MsSQLListener(ModuleMsSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P)
00091 {
00092 uslen = sizeof(sock_us);
00093 if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
00094 {
00095 throw ModuleException("Could not getsockname() to find out port number for ITC port");
00096 }
00097 }
00098
00099 virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip)
00100 {
00101 new ResultNotifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str());
00102 }
00103
00104
00105 int GetPort()
00106 {
00107 #ifdef IPV6
00108 return ntohs(sock_us.sin6_port);
00109 #else
00110 return ntohs(sock_us.sin_port);
00111 #endif
00112 }
00113 };
00114
00115
00116 class MsSQLResult : public SQLresult
00117 {
00118 private:
00119 int currentrow;
00120 int rows;
00121 int cols;
00122
00123 std::vector<std::string> colnames;
00124 std::vector<SQLfieldList> fieldlists;
00125 SQLfieldList emptyfieldlist;
00126
00127 SQLfieldList* fieldlist;
00128 SQLfieldMap* fieldmap;
00129
00130 public:
00131 MsSQLResult(Module* self, Module* to, unsigned int rid)
00132 : SQLresult(self, to, rid), currentrow(0), rows(0), cols(0), fieldlist(NULL), fieldmap(NULL)
00133 {
00134 }
00135
00136 ~MsSQLResult()
00137 {
00138 }
00139
00140 void AddRow(int colsnum, char **dat, char **colname)
00141 {
00142 colnames.clear();
00143 cols = colsnum;
00144 for (int i = 0; i < colsnum; i++)
00145 {
00146 fieldlists.resize(fieldlists.size()+1);
00147 colnames.push_back(colname[i]);
00148 SQLfield sf(dat[i] ? dat[i] : "", dat[i] ? false : true);
00149 fieldlists[rows].push_back(sf);
00150 }
00151 rows++;
00152 }
00153
00154 void UpdateAffectedCount()
00155 {
00156 rows++;
00157 }
00158
00159 virtual int Rows()
00160 {
00161 return rows;
00162 }
00163
00164 virtual int Cols()
00165 {
00166 return cols;
00167 }
00168
00169 virtual std::string ColName(int column)
00170 {
00171 if (column < (int)colnames.size())
00172 {
00173 return colnames[column];
00174 }
00175 else
00176 {
00177 throw SQLbadColName();
00178 }
00179 return "";
00180 }
00181
00182 virtual int ColNum(const std::string &column)
00183 {
00184 for (unsigned int i = 0; i < colnames.size(); i++)
00185 {
00186 if (column == colnames[i])
00187 return i;
00188 }
00189 throw SQLbadColName();
00190 return 0;
00191 }
00192
00193 virtual SQLfield GetValue(int row, int column)
00194 {
00195 if ((row >= 0) && (row < rows) && (column >= 0) && (column < Cols()))
00196 {
00197 return fieldlists[row][column];
00198 }
00199
00200 throw SQLbadColName();
00201
00202
00203 return SQLfield("",true);
00204 }
00205
00206 virtual SQLfieldList& GetRow()
00207 {
00208 if (currentrow < rows)
00209 return fieldlists[currentrow];
00210 else
00211 return emptyfieldlist;
00212 }
00213
00214 virtual SQLfieldMap& GetRowMap()
00215 {
00216
00217
00218
00219 if(fieldmap)
00220 {
00221 fieldmap->clear();
00222 }
00223 else
00224 {
00225 fieldmap = new SQLfieldMap;
00226 }
00227
00228 if (currentrow < rows)
00229 {
00230 for (int i = 0; i < Cols(); i++)
00231 {
00232 fieldmap->insert(std::make_pair(ColName(i), GetValue(currentrow, i)));
00233 }
00234 currentrow++;
00235 }
00236
00237 return *fieldmap;
00238 }
00239
00240 virtual SQLfieldList* GetRowPtr()
00241 {
00242 fieldlist = new SQLfieldList();
00243
00244 if (currentrow < rows)
00245 {
00246 for (int i = 0; i < Rows(); i++)
00247 {
00248 fieldlist->push_back(fieldlists[currentrow][i]);
00249 }
00250 currentrow++;
00251 }
00252 return fieldlist;
00253 }
00254
00255 virtual SQLfieldMap* GetRowMapPtr()
00256 {
00257 fieldmap = new SQLfieldMap();
00258
00259 if (currentrow < rows)
00260 {
00261 for (int i = 0; i < Cols(); i++)
00262 {
00263 fieldmap->insert(std::make_pair(colnames[i],GetValue(currentrow, i)));
00264 }
00265 currentrow++;
00266 }
00267
00268 return fieldmap;
00269 }
00270
00271 virtual void Free(SQLfieldMap* fm)
00272 {
00273 delete fm;
00274 }
00275
00276 virtual void Free(SQLfieldList* fl)
00277 {
00278 delete fl;
00279 }
00280
00281
00282 };
00283
00284 class SQLConn : public classbase
00285 {
00286 private:
00287 ResultQueue results;
00288 InspIRCd* ServerInstance;
00289 Module* mod;
00290 SQLhost host;
00291 TDSLOGIN* login;
00292 TDSSOCKET* sock;
00293 TDSCONTEXT* context;
00294
00295 public:
00296 QueryQueue queue;
00297
00298 SQLConn(InspIRCd* SI, Module* m, const SQLhost& hi)
00299 : ServerInstance(SI), mod(m), host(hi), login(NULL), sock(NULL), context(NULL)
00300 {
00301 if (OpenDB())
00302 {
00303 std::string query("USE " + host.name);
00304 if (tds_submit_query(sock, query.c_str()) == TDS_SUCCEED)
00305 {
00306 if (tds_process_simple_query(sock) != TDS_SUCCEED)
00307 {
00308 LoggingMutex->Lock();
00309 ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
00310 LoggingMutex->Unlock();
00311 CloseDB();
00312 }
00313 }
00314 else
00315 {
00316 LoggingMutex->Lock();
00317 ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
00318 LoggingMutex->Unlock();
00319 CloseDB();
00320 }
00321 }
00322 else
00323 {
00324 LoggingMutex->Lock();
00325 ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not connect to DB with id: " + host.id);
00326 LoggingMutex->Unlock();
00327 CloseDB();
00328 }
00329 }
00330
00331 ~SQLConn()
00332 {
00333 CloseDB();
00334 }
00335
00336 SQLerror Query(SQLrequest &req)
00337 {
00338 if (!sock)
00339 return SQLerror(SQL_BAD_CONN, "Socket was NULL, check if SQL server is running.");
00340
00341
00342 char* query;
00343
00344
00345 char* queryend;
00346
00347
00348 unsigned long paramlen;
00349
00350
00351 unsigned long querylength = 0;
00352
00353 paramlen = 0;
00354 for(ParamL::iterator i = req.query.p.begin(); i != req.query.p.end(); i++)
00355 {
00356 paramlen += i->size();
00357 }
00358
00359
00360
00361
00362
00363
00364 query = new char[req.query.q.length() + (paramlen*2) + 1];
00365 queryend = query;
00366
00367 for(unsigned long i = 0; i < req.query.q.length(); i++)
00368 {
00369 if(req.query.q[i] == '?')
00370 {
00371 if(req.query.p.size())
00372 {
00373
00374
00375 char* escaped = new char[(req.query.p.front().length() * 2) + 1];
00376 char* escend = escaped;
00377 for (std::string::iterator p = req.query.p.front().begin(); p < req.query.p.front().end(); p++)
00378 {
00379 if (*p == '\'')
00380 {
00381 *escend = *p;
00382 escend++;
00383 *escend = *p;
00384 }
00385 *escend = *p;
00386 escend++;
00387 }
00388 *escend = 0;
00389
00390 for (char* n = escaped; *n; n++)
00391 {
00392 *queryend = *n;
00393 queryend++;
00394 }
00395 delete[] escaped;
00396 req.query.p.pop_front();
00397 }
00398 else
00399 break;
00400 }
00401 else
00402 {
00403 *queryend = req.query.q[i];
00404 queryend++;
00405 }
00406 querylength++;
00407 }
00408 *queryend = 0;
00409 req.query.q = query;
00410
00411 MsSQLResult* res = new MsSQLResult((Module*)mod, req.GetSource(), req.id);
00412 res->dbid = host.id;
00413 res->query = req.query.q;
00414
00415 char* msquery = strdup(req.query.q.data());
00416 LoggingMutex->Lock();
00417 ServerInstance->Logs->Log("m_mssql",DEBUG,"doing Query: %s",msquery);
00418 LoggingMutex->Unlock();
00419 if (tds_submit_query(sock, msquery) != TDS_SUCCEED)
00420 {
00421 std::string error("failed to execute: "+std::string(req.query.q.data()));
00422 delete[] query;
00423 delete res;
00424 free(msquery);
00425 return SQLerror(SQL_QSEND_FAIL, error);
00426 }
00427 delete[] query;
00428 free(msquery);
00429
00430 int tds_res;
00431 while (tds_process_tokens(sock, &tds_res, NULL, TDS_TOKEN_RESULTS) == TDS_SUCCEED)
00432 {
00433
00434
00435 switch (tds_res)
00436 {
00437 case TDS_ROWFMT_RESULT:
00438 break;
00439
00440 case TDS_DONE_RESULT:
00441 if (sock->rows_affected > -1)
00442 {
00443 for (int c = 0; c < sock->rows_affected; c++) res->UpdateAffectedCount();
00444 continue;
00445 }
00446 break;
00447
00448 case TDS_ROW_RESULT:
00449 while (tds_process_tokens(sock, &tds_res, NULL, TDS_STOPAT_ROWFMT|TDS_RETURN_DONE|TDS_RETURN_ROW) == TDS_SUCCEED)
00450 {
00451 if (tds_res != TDS_ROW_RESULT)
00452 break;
00453
00454 if (!sock->current_results)
00455 continue;
00456
00457 if (sock->res_info->row_count > 0)
00458 {
00459 int cols = sock->res_info->num_cols;
00460 char** name = new char*[MAXBUF];
00461 char** data = new char*[MAXBUF];
00462 for (int j=0; j<cols; j++)
00463 {
00464 TDSCOLUMN* col = sock->current_results->columns[j];
00465 name[j] = col->column_name;
00466
00467 int ctype;
00468 int srclen;
00469 unsigned char* src;
00470 CONV_RESULT dres;
00471 ctype = tds_get_conversion_type(col->column_type, col->column_size);
00472 src = &(sock->current_results->current_row[col->column_offset]);
00473 srclen = col->column_cur_size;
00474 tds_convert(sock->tds_ctx, ctype, (TDS_CHAR *) src, srclen, SYBCHAR, &dres);
00475 data[j] = (char*)dres.ib;
00476 }
00477 ResultReady(res, cols, data, name);
00478 }
00479 }
00480 break;
00481
00482 default:
00483 break;
00484 }
00485 }
00486 ResultsMutex->Lock();
00487 results.push_back(res);
00488 ResultsMutex->Unlock();
00489 SendNotify();
00490 return SQLerror();
00491 }
00492
00493 static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
00494 {
00495 SQLConn* sc = (SQLConn*)pContext->parent;
00496 LoggingMutex->Lock();
00497 sc->ServerInstance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
00498 LoggingMutex->Unlock();
00499 return 0;
00500 }
00501
00502 static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
00503 {
00504 SQLConn* sc = (SQLConn*)pContext->parent;
00505 LoggingMutex->Lock();
00506 sc->ServerInstance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
00507 LoggingMutex->Unlock();
00508 return 0;
00509 }
00510
00511 void ResultReady(MsSQLResult *res, int cols, char **data, char **colnames)
00512 {
00513 res->AddRow(cols, data, colnames);
00514 }
00515
00516 void AffectedReady(MsSQLResult *res)
00517 {
00518 res->UpdateAffectedCount();
00519 }
00520
00521 bool OpenDB()
00522 {
00523 CloseDB();
00524
00525 TDSCONNECTION* conn = NULL;
00526
00527 login = tds_alloc_login();
00528 tds_set_app(login, "TSQL");
00529 tds_set_library(login,"TDS-Library");
00530 tds_set_host(login, "");
00531 tds_set_server(login, host.host.c_str());
00532 tds_set_server_addr(login, host.host.c_str());
00533 tds_set_user(login, host.user.c_str());
00534 tds_set_passwd(login, host.pass.c_str());
00535 tds_set_port(login, host.port);
00536 tds_set_packet(login, 512);
00537
00538 context = tds_alloc_context(this);
00539 context->msg_handler = HandleMessage;
00540 context->err_handler = HandleError;
00541
00542 sock = tds_alloc_socket(context, 512);
00543 tds_set_parent(sock, NULL);
00544
00545 conn = tds_read_config_info(NULL, login, context->locale);
00546
00547 if (tds_connect(sock, conn) == TDS_SUCCEED)
00548 {
00549 tds_free_connection(conn);
00550 return 1;
00551 }
00552 tds_free_connection(conn);
00553 return 0;
00554 }
00555
00556 void CloseDB()
00557 {
00558 if (sock)
00559 {
00560 tds_free_socket(sock);
00561 sock = NULL;
00562 }
00563 if (context)
00564 {
00565 tds_free_context(context);
00566 context = NULL;
00567 }
00568 if (login)
00569 {
00570 tds_free_login(login);
00571 login = NULL;
00572 }
00573 }
00574
00575 SQLhost GetConfHost()
00576 {
00577 return host;
00578 }
00579
00580 void SendResults()
00581 {
00582 while (results.size())
00583 {
00584 MsSQLResult* res = results[0];
00585 ResultsMutex->Lock();
00586 if (res->GetDest())
00587 {
00588 res->Send();
00589 }
00590 else
00591 {
00592
00593
00594
00595
00596 delete res;
00597 }
00598 results.pop_front();
00599 ResultsMutex->Unlock();
00600 }
00601 }
00602
00603 void ClearResults()
00604 {
00605 while (results.size())
00606 {
00607 MsSQLResult* res = results[0];
00608 delete res;
00609 results.pop_front();
00610 }
00611 }
00612
00613 void SendNotify()
00614 {
00615 if (QueueFD < 0)
00616 {
00617 if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1)
00618 {
00619
00620 return;
00621 }
00622
00623 irc::sockets::insp_sockaddr addr;
00624
00625 #ifdef IPV6
00626 irc::sockets::insp_aton("::1", &addr.sin6_addr);
00627 addr.sin6_family = AF_FAMILY;
00628 addr.sin6_port = htons(listener->GetPort());
00629 #else
00630 irc::sockets::insp_inaddr ia;
00631 irc::sockets::insp_aton("127.0.0.1", &ia);
00632 addr.sin_family = AF_FAMILY;
00633 addr.sin_addr = ia;
00634 addr.sin_port = htons(listener->GetPort());
00635 #endif
00636
00637 if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
00638 {
00639
00640 return;
00641 }
00642 }
00643 char id = 0;
00644 send(QueueFD, &id, 1, 0);
00645 }
00646
00647 void DoLeadingQuery()
00648 {
00649 SQLrequest& req = queue.front();
00650 req.error = Query(req);
00651 }
00652
00653 };
00654
00655
00656 class ModuleMsSQL : public Module
00657 {
00658 private:
00659 unsigned long currid;
00660 QueryThread* queryDispatcher;
00661
00662 public:
00663 ModuleMsSQL(InspIRCd* Me)
00664 : Module(Me), currid(0)
00665 {
00666 LoggingMutex = ServerInstance->Mutexes->CreateMutex();
00667 ResultsMutex = ServerInstance->Mutexes->CreateMutex();
00668 QueueMutex = ServerInstance->Mutexes->CreateMutex();
00669
00670 ServerInstance->Modules->UseInterface("SQLutils");
00671
00672 if (!ServerInstance->Modules->PublishFeature("SQL", this))
00673 {
00674 throw ModuleException("m_mssql: Unable to publish feature 'SQL'");
00675 }
00676
00677
00678 #ifdef IPV6
00679 listener = new MsSQLListener(this, ServerInstance, 0, "::1");
00680 #else
00681 listener = new MsSQLListener(this, ServerInstance, 0, "127.0.0.1");
00682 #endif
00683
00684 if (listener->GetFd() == -1)
00685 {
00686 ServerInstance->Modules->DoneWithInterface("SQLutils");
00687 throw ModuleException("m_mssql: unable to create ITC pipe");
00688 }
00689 else
00690 {
00691 LoggingMutex->Lock();
00692 ServerInstance->Logs->Log("m_mssql", DEBUG, "MsSQL: Interthread comms port is %d", listener->GetPort());
00693 LoggingMutex->Unlock();
00694 }
00695
00696 ReadConf();
00697
00698 queryDispatcher = new QueryThread(ServerInstance, this);
00699 ServerInstance->Threads->Create(queryDispatcher);
00700
00701 ServerInstance->Modules->PublishInterface("SQL", this);
00702 Implementation eventlist[] = { I_OnRequest, I_OnRehash };
00703 ServerInstance->Modules->Attach(eventlist, this, 2);
00704 }
00705
00706 virtual ~ModuleMsSQL()
00707 {
00708 delete queryDispatcher;
00709 ClearQueue();
00710 ClearAllConnections();
00711
00712 ServerInstance->SE->DelFd(listener);
00713 ServerInstance->BufferedSocketCull();
00714
00715 if (QueueFD >= 0)
00716 {
00717 shutdown(QueueFD, 2);
00718 close(QueueFD);
00719 }
00720
00721 if (notifier)
00722 {
00723 ServerInstance->SE->DelFd(notifier);
00724 notifier->Close();
00725 ServerInstance->BufferedSocketCull();
00726 }
00727
00728 ServerInstance->Modules->UnpublishInterface("SQL", this);
00729 ServerInstance->Modules->UnpublishFeature("SQL");
00730 ServerInstance->Modules->DoneWithInterface("SQLutils");
00731
00732 delete LoggingMutex;
00733 delete ResultsMutex;
00734 delete QueueMutex;
00735 }
00736
00737
00738 void SendQueue()
00739 {
00740 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
00741 {
00742 iter->second->SendResults();
00743 }
00744 }
00745
00746 void ClearQueue()
00747 {
00748 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
00749 {
00750 iter->second->ClearResults();
00751 }
00752 }
00753
00754 bool HasHost(const SQLhost &host)
00755 {
00756 for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
00757 {
00758 if (host == iter->second->GetConfHost())
00759 return true;
00760 }
00761 return false;
00762 }
00763
00764 bool HostInConf(const SQLhost &h)
00765 {
00766 ConfigReader conf(ServerInstance);
00767 for(int i = 0; i < conf.Enumerate("database"); i++)
00768 {
00769 SQLhost host;
00770 host.id = conf.ReadValue("database", "id", i);
00771 host.host = conf.ReadValue("database", "hostname", i);
00772 host.port = conf.ReadInteger("database", "port", "1433", i, true);
00773 host.name = conf.ReadValue("database", "name", i);
00774 host.user = conf.ReadValue("database", "username", i);
00775 host.pass = conf.ReadValue("database", "password", i);
00776 if (h == host)
00777 return true;
00778 }
00779 return false;
00780 }
00781
00782 void ReadConf()
00783 {
00784 ClearOldConnections();
00785
00786 ConfigReader conf(ServerInstance);
00787 for(int i = 0; i < conf.Enumerate("database"); i++)
00788 {
00789 SQLhost host;
00790
00791 host.id = conf.ReadValue("database", "id", i);
00792 host.host = conf.ReadValue("database", "hostname", i);
00793 host.port = conf.ReadInteger("database", "port", "1433", i, true);
00794 host.name = conf.ReadValue("database", "name", i);
00795 host.user = conf.ReadValue("database", "username", i);
00796 host.pass = conf.ReadValue("database", "password", i);
00797
00798 if (HasHost(host))
00799 continue;
00800
00801 this->AddConn(host);
00802 }
00803 }
00804
00805 void AddConn(const SQLhost& hi)
00806 {
00807 if (HasHost(hi))
00808 {
00809 LoggingMutex->Lock();
00810 ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str());
00811 LoggingMutex->Unlock();
00812 return;
00813 }
00814
00815 SQLConn* newconn;
00816
00817 newconn = new SQLConn(ServerInstance, this, hi);
00818
00819 connections.insert(std::make_pair(hi.id, newconn));
00820 }
00821
00822 void ClearOldConnections()
00823 {
00824 ConnMap::iterator iter,safei;
00825 for (iter = connections.begin(); iter != connections.end(); iter++)
00826 {
00827 if (!HostInConf(iter->second->GetConfHost()))
00828 {
00829 delete iter->second;
00830 safei = iter;
00831 --iter;
00832 connections.erase(safei);
00833 }
00834 }
00835 }
00836
00837 void ClearAllConnections()
00838 {
00839 ConnMap::iterator i;
00840 while ((i = connections.begin()) != connections.end())
00841 {
00842 connections.erase(i);
00843 delete i->second;
00844 }
00845 }
00846
00847 virtual void OnRehash(User* user, const std::string ¶meter)
00848 {
00849 QueueMutex->Lock();
00850 ReadConf();
00851 QueueMutex->Unlock();
00852 }
00853
00854 virtual const char* OnRequest(Request* request)
00855 {
00856 if(strcmp(SQLREQID, request->GetId()) == 0)
00857 {
00858 SQLrequest* req = (SQLrequest*)request;
00859
00860 QueueMutex->Lock();
00861
00862 ConnMap::iterator iter;
00863
00864 const char* returnval = NULL;
00865
00866 if((iter = connections.find(req->dbid)) != connections.end())
00867 {
00868 req->id = NewID();
00869 iter->second->queue.push(*req);
00870 returnval= SQLSUCCESS;
00871 }
00872 else
00873 {
00874 req->error.Id(SQL_BAD_DBID);
00875 }
00876
00877 QueueMutex->Unlock();
00878
00879 return returnval;
00880 }
00881 return NULL;
00882 }
00883
00884 unsigned long NewID()
00885 {
00886 if (currid+1 == 0)
00887 currid++;
00888
00889 return ++currid;
00890 }
00891
00892 virtual Version GetVersion()
00893 {
00894 return Version("$Id: m_mssql.cpp 10579 2008-09-21 12:56:03Z w00t $", VF_VENDOR | VF_SERVICEPROVIDER, API_VERSION);
00895 }
00896
00897 };
00898
00899 void ResultNotifier::Dispatch()
00900 {
00901 mod->SendQueue();
00902 }
00903
00904 void QueryThread::Run()
00905 {
00906 while (this->GetExitFlag() == false)
00907 {
00908 SQLConn* conn = NULL;
00909 QueueMutex->Lock();
00910 for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
00911 {
00912 if (i->second->queue.totalsize())
00913 {
00914 conn = i->second;
00915 break;
00916 }
00917 }
00918 QueueMutex->Unlock();
00919 if (conn)
00920 {
00921 conn->DoLeadingQuery();
00922 QueueMutex->Lock();
00923 conn->queue.pop();
00924 QueueMutex->Unlock();
00925 }
00926 usleep(1000);
00927 }
00928 }
00929
00930 MODULE_INIT(ModuleMsSQL)