#include <ngx_config.h> #include <ngx_core.h> #include <ngx_http.h> #include <nginx.h> #define NGX_SOCKETLOG_FACILITY_LOCAL7 23 #define NGX_SOCKETLOG_SEVERITY_INFO 6 #define NGX_REDIS_APPEND "*3" CRLF "$6" CRLF "APPEND" CRLF #define NGX_REDIS_AUTH "*2" CRLF "$4" CRLF "AUTH" CRLF //#define NGX_DEF_FORMAT "combined" #define NGX_DEF_FORMAT "main" #define IF_DEBUG 0 #define IF_DEBUG_2 0 //***************************************************************************** typedef struct ngx_http_log_op_s ngx_http_log_op_t; typedef u_char *(*ngx_http_log_op_run_pt) (ngx_http_request_t *r, u_char *buf, ngx_http_log_op_t *op); typedef size_t (*ngx_http_log_op_getlen_pt) (ngx_http_request_t *r, uintptr_t data); struct ngx_redislog_peer; typedef void (*ngx_redislog_send_handler_pt)(struct ngx_redislog_peer*); struct ngx_http_log_op_s { size_t len; ngx_http_log_op_getlen_pt getlen; ngx_http_log_op_run_pt run; uintptr_t data; }; typedef struct { ngx_str_t name; #if defined nginx_version && nginx_version >= 7018 ngx_array_t *flushes; #endif ngx_array_t *ops; /* array of ngx_http_log_op_t */ } ngx_http_log_fmt_t; typedef struct { ngx_array_t formats; /* array of ngx_http_log_fmt_t */ ngx_uint_t combined_used; /* unsigned combined_used:1 */ } ngx_http_log_main_conf_t; typedef struct { ngx_str_t name; struct sockaddr *sockaddr; socklen_t socklen; ngx_msec_t write_timeout; ngx_msec_t read_timeout; ngx_msec_t connect_timeout; ngx_msec_t reconnect_timeout; ngx_msec_t flush_timeout; ngx_msec_t ping_timeout; ngx_bufs_t bufs; size_t recv_buf_size; ngx_str_t password; unsigned authenticate:1; } ngx_redislog_peer_conf_t; typedef struct { ngx_array_t *peers; } ngx_redislog_conf_t; typedef struct ngx_redislog_peer { ngx_redislog_peer_conf_t *conf; ngx_peer_connection_t conn; ngx_event_t reconnect_timer; ngx_event_t flush_timer; ngx_event_t ping_timer; ngx_log_t *log; ngx_pool_t *pool; ngx_chain_t *busy; ngx_chain_t *free; ngx_buf_t *recv_buf; ngx_uint_t discarded; ngx_uint_t reconnect_timeout; ngx_uint_t num_queued; ngx_uint_t state; u_char *password_pos; ngx_redislog_send_handler_pt send_handler; unsigned connecting:1; unsigned authenticated:1; unsigned flush_timer_set:1; } ngx_redislog_peer_t; typedef struct { ngx_str_t peer_name; ngx_uint_t peer_idx; ngx_http_log_fmt_t *format; ngx_http_complex_value_t *key; ngx_str_t command; ngx_str_t arg1; //*** //ngx_str_t arg_num; //*** ngx_http_complex_value_t *_if; ngx_http_complex_value_t *ifnot; unsigned has_arg1; } ngx_http_redislog_t; typedef struct { ngx_array_t *logs; /* array of ngx_http_redislog_t */ unsigned off; } ngx_http_redislog_conf_t; static ngx_array_t ngx_redislog_peers; static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p); static void ngx_http_redislog_append(ngx_redislog_peer_t *p, u_char *buf, size_t len); static void ngx_http_redislog_send(ngx_redislog_peer_t *p); static void ngx_redislog_flush_handler(ngx_event_t*); static u_char *ngx_redislog_size(u_char*, u_char*, size_t); static size_t ngx_redislog_size_len(size_t); static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t*, ngx_buf_t*); static void ngx_redislog_read_handler(ngx_event_t *rev); static void ngx_redislog_idle_read_handler(ngx_event_t *rev); static char *ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static char *ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static void *ngx_http_redislog_create_loc_conf(ngx_conf_t *cf); static char *ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data); static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data); static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data); static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data); static void *ngx_redislog_create_conf(ngx_cycle_t *cycle); static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf); static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf); static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle); static ngx_command_t ngx_http_redislog_commands[] = { { ngx_string("access_redislog"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_SIF_CONF|NGX_HTTP_LIF_CONF |NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1234, ngx_http_redislog_set_log, NGX_HTTP_LOC_CONF_OFFSET, 0, NULL }, ngx_null_command }; static ngx_http_module_t ngx_http_redislog_module_ctx = { ngx_http_redislog_add_variables, /* preconfiguration */ ngx_http_redislog_init, /* postconfiguration */ NULL, /* create main configuration */ NULL, /* init main configuration */ NULL, /* create server configuration */ NULL, /* merge server configuration */ ngx_http_redislog_create_loc_conf, /* create location configration */ ngx_http_redislog_merge_loc_conf /* merge location configration */ }; extern ngx_module_t ngx_http_log_module; ngx_module_t ngx_http_redislog_module = { NGX_MODULE_V1, &ngx_http_redislog_module_ctx, /* module context */ ngx_http_redislog_commands, /* module directives */ NGX_HTTP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static ngx_command_t ngx_redislog_commands[] = { { ngx_string("redislog"), NGX_MAIN_CONF|NGX_CONF_TAKE23, ngx_http_redislog_command, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_redislog_module_ctx = { ngx_string("redislog"), ngx_redislog_create_conf, NULL }; ngx_module_t ngx_core_redislog_module = { NGX_MODULE_V1, &ngx_redislog_module_ctx, /* module context */ ngx_redislog_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ ngx_redislog_init_process, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static ngx_http_variable_t ngx_http_redislog_variables[] = { { ngx_string("redislog_yyyy"), NULL, ngx_http_redislog_yyyy_variable, 0, 0, 0 }, { ngx_string("redislog_yyyymm"), NULL, ngx_http_redislog_yyyymm_variable, 0, 0, 0 }, { ngx_string("redislog_yyyymmdd"), NULL, ngx_http_redislog_yyyymmdd_variable, 0, 0, 0 }, { ngx_string("redislog_yyyymmddhh"), NULL, ngx_http_redislog_yyyymmddhh_variable, 0, 0, 0 }, { ngx_null_string, NULL, NULL, 0, 0, 0 } }; //----------------------------------------------------------------------------- ngx_int_t ngx_http_redislog_handler(ngx_http_request_t *r) { u_char *line, *p; size_t len, command_size_len, arg1_size_len, record_len, key_size_len, record_size_len; ngx_uint_t i, l; ngx_str_t key, _if, ifnot; ngx_http_redislog_t *log; ngx_http_log_op_t *op; ngx_http_redislog_conf_t *slcf; time_t time; ngx_tm_t tm; ngx_redislog_peer_t **peer; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http redislog handler"); slcf = ngx_http_get_module_loc_conf(r, ngx_http_redislog_module); ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "redislog conf=%p, off=%ud, logs=%p", slcf, slcf->off, slcf->logs); if(slcf->off || slcf->logs == NULL) { return NGX_OK; } time = ngx_time(); ngx_gmtime(time, &tm); log = slcf->logs->elts; for (l = 0; l < slcf->logs->nelts; l++) { #if defined nginx_version && nginx_version >= 7018 ngx_http_script_flush_no_cacheable_variables(r, log[l].format->flushes); #endif len = 0; op = log[l].format->ops->elts; for (i = 0; i < log[l].format->ops->nelts; i++) { if (op[i].len == 0) { len += op[i].getlen(r, op[i].data); } else { len += op[i].len; } } if(log[l].ifnot != NULL) { if(ngx_http_complex_value(r, log[l].ifnot, &ifnot) != NGX_OK) { return NGX_ERROR; } if(ifnot.len && (ifnot.len != 1 || ifnot.data[0] != '0')) { continue; } } if(log[l]._if != NULL) { if(ngx_http_complex_value(r, log[l]._if, &_if) != NGX_OK) { return NGX_ERROR; } if(!_if.len || (_if.len == 1 && _if.data[0] == '0')) { continue; } } if(ngx_http_complex_value(r, log[l].key, &key) != NGX_OK) { return NGX_ERROR; } command_size_len = ngx_redislog_size_len(log[l].command.len); key_size_len = ngx_redislog_size_len(key.len); if(log[l].arg1.len) { arg1_size_len = ngx_redislog_size_len(log[l].arg1.len); } else { arg1_size_len = 0; } len += 2 + sizeof(CRLF) - 1 + 1 + command_size_len + 1 + sizeof(CRLF) - 1 + command_size_len + sizeof(CRLF) - 1 + log[l].command.len + sizeof(CRLF) - 1 + key_size_len + sizeof(CRLF) - 1 + key.len + sizeof(CRLF) - 1 + 1 + NGX_OFF_T_LEN + sizeof(CRLF) - 1 + sizeof(CRLF) - 1; if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) { len++; } if(log[l].has_arg1) { len += arg1_size_len + sizeof(CRLF) - 1 + log[l].arg1.len + sizeof(CRLF) - 1; } #if defined nginx_version && nginx_version >= 7003 line = ngx_pnalloc(r->pool, len); #else line = ngx_palloc(r->pool, len); #endif if (line == NULL) { return NGX_ERROR; } p = line; for(i = 0; i < log[l].format->ops->nelts; i++) { p = op[i].run(r, p, &op[i]); } if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) { *p++ = LF; } record_len = p - line; record_size_len = ngx_redislog_size_len(record_len); p = line; /* * Redis append to time series command *3 $6 APPEND $nnn key $nnn log record */ *p++ = '*'; //*p++ = log[l].has_arg1 ? '4' : '3'; /* SETEX */ if(ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) *p++ = '4'; else *p++ = log[l].has_arg1 ? '5' : '3'; p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); *p++ = '$'; //*p++ = '0'; p = ngx_redislog_size(p, p + command_size_len, log[l].command.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); p = ngx_copy(p, log[l].command.data, log[l].command.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "EVALSHA", 7) == 0) { *p++ = '$'; p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); *p++ = '$'; p = ngx_redislog_size(p, p + 1, 1); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); p = ngx_copy(p, "1", 1); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); } /* *p++ = '$'; *p++ = '1'; *p++ = LF; *p++ = '0'; *p++ = LF; */ *p++ = '$'; p = ngx_redislog_size(p, p + key_size_len, key.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); p = ngx_copy(p, key.data, key.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) { *p++ = '$'; p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); } *p++ = '$'; p = ngx_redislog_size(p, p + record_size_len, record_len); p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); for(i = 0; i < log[l].format->ops->nelts; i++) { p = op[i].run(r, p, &op[i]); } if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) { *p++ = LF; } p = ngx_copy(p, CRLF, sizeof(CRLF) - 1); peer = ngx_redislog_peers.elts; peer += log[l].peer_idx; ngx_http_redislog_append(*peer, line, p - line); //ngx_http_redislog_append(*peer, line, p - line + 5); } return NGX_OK; } static u_char *ngx_redislog_size(u_char *p, u_char *q, size_t sz) { u_char *end = q; while(p != q) { *--q = (sz % 10 + '0'); sz /= 10; } return end; } static size_t ngx_redislog_size_len(size_t sz) { size_t len = 0; while(sz != 0) { sz /= 10; len++; } return len; } static u_char* ngx_redislog_buf_append(ngx_buf_t *buf, u_char *p, size_t *len) { size_t remaining = buf->end - buf->last; if(remaining > *len) { remaining = *len; } buf->last = ngx_copy(buf->last, p, remaining); *len -= remaining; return p + remaining; } static void ngx_http_redislog_append(ngx_redislog_peer_t *peer, u_char *buf, size_t len) { u_char *p; ngx_chain_t *last, *q; size_t remaining; ngx_uint_t num_busy = 0; /* * Find last busy buffer */ last = peer->busy; while(last != NULL && last->next != NULL) { last = last->next; } /* * See if message fits into remaining space */ remaining = (last != NULL ? last->buf->end - last->buf->last : 0); q = peer->free; while(remaining <= len && q != NULL) { remaining += (q->buf->end - q->buf->last); q = q->next; } /* * No memory for this message, discard it */ if(remaining < len) { peer->discarded++; return; } /* * Append message to the buffers */ if(last != NULL) { p = ngx_redislog_buf_append(last->buf, buf, &len); } else { p = buf; } while(peer->free != NULL && len != 0) { q = peer->free; p = ngx_redislog_buf_append(q->buf, p, &len); peer->free = peer->free->next; q->next = NULL; if(last == NULL) { peer->busy = q; } else { last->next = q; } last = q; } peer->num_queued++; q = peer->busy; while(q != NULL) { num_busy++; q = q->next; } if(!peer->flush_timer_set) { peer->flush_timer.handler = ngx_redislog_flush_handler; peer->flush_timer.data = peer; peer->flush_timer.log = peer->conn.log; ngx_add_timer(&peer->flush_timer, peer->conf->flush_timeout); peer->flush_timer_set = 1; } if(num_busy >= 2) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, peer->conn.connection->log, 0, "redislog num queued is now %ud, set read handler", peer->num_queued); peer->conn.connection->read->handler = ngx_redislog_read_handler; /* * Send it */ ngx_http_redislog_send(peer); } } static void ngx_http_redislog_send(ngx_redislog_peer_t *p) { ngx_chain_t *written; ngx_connection_t *c; ngx_chain_t *dummy = NULL; c = p->conn.connection; if(c == NULL || c->fd == -1) { return; } if(!c->write->ready) { return; } if(p->flush_timer_set) { ngx_del_timer(&p->flush_timer); p->flush_timer_set = 0; } ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "redislog send handler"); if(p->busy != NULL) { written = c->send_chain(c, p->busy, 0); if(written == NGX_CHAIN_ERROR) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "redislog write error"); ngx_close_connection(c); ngx_redislog_reconnect_peer(p); return; } ngx_chain_update_chains(p->pool, &p->free, &p->busy, &dummy, 0); if(written != NULL) { if(!c->write->ready && !c->write->timer_set) { ngx_add_timer(c->write, p->conf->write_timeout); } if(ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_close_connection(c); ngx_redislog_reconnect_peer(p); } return; } } } static void ngx_redislog_auth_send(ngx_redislog_peer_t *peer) { ngx_connection_t *c; ngx_str_t *password; ssize_t n; c = peer->conn.connection; if(c == NULL || c->fd == -1) { return; } password = &peer->conf->password; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "redislog auth send handler"); n = c->send(c, peer->password_pos, password->len - (peer->password_pos - password->data)); if(n > 0) { peer->password_pos += n; if(peer->password_pos >= (password->data + password->len)) { peer->send_handler = ngx_http_redislog_send; ngx_http_redislog_send(peer); } return; } if(n == NGX_ERROR) { ngx_close_connection(c); ngx_redislog_reconnect_peer(peer); return; } if(!c->write->timer_set) { ngx_add_timer(c->write, peer->conf->write_timeout); } if(ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_close_connection(c); ngx_redislog_reconnect_peer(peer); return; } } static void ngx_redislog_flush_handler(ngx_event_t *ev) { ngx_redislog_peer_t *peer = ev->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, peer->log, 0, "redislog flush handler, set read handler"); peer->flush_timer_set = 0; peer->conn.connection->read->handler = ngx_redislog_read_handler; ngx_http_redislog_send(peer); } static void ngx_redislog_connected_handler(ngx_redislog_peer_t *peer) { ngx_connection_t *c; c = peer->conn.connection; ngx_del_timer(c->read); /* * Once the connection has been established, we need to * reset the reconnect timeout to it's initial value */ peer->reconnect_timeout = peer->conf->reconnect_timeout; if(peer->discarded != 0) { ngx_log_error(NGX_LOG_ERR, peer->log, 0, "redislog peer \"%V\" discarded %ui messages", &peer->conf->name, peer->discarded); peer->discarded = 0; } } static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t *peer, ngx_buf_t *buf) { u_char *p, *q; p = buf->pos; q = buf->last; while(p != q) { if(!peer->state) { if(*p == '+' || *p == '-' || *p == ':') { if(peer->conf->authenticate && !peer->authenticated) { if(*p == '-') { ngx_log_error(NGX_LOG_ERR, peer->log, 0, "redis authentication failure"); return NGX_ERROR; } peer->authenticated = 1; } else { if(peer->num_queued) { peer->num_queued--; } else { ngx_log_error(NGX_LOG_ERR, peer->log, 0, "too many responses from redis"); return NGX_ERROR; } } } if(*p == '-') { ngx_log_error(NGX_LOG_ERR, peer->log, 0, "redislog error"); } peer->state++; } else { if(*p == LF) { peer->state = 0; } else if(peer->state == 1) { if(*p == CR) { peer->state++; } } } p++; } buf->pos = p; return NGX_OK; } static void ngx_redislog_read_handler(ngx_event_t *rev) { ngx_connection_t *c; ngx_redislog_peer_t *peer; ngx_buf_t *buf; ssize_t n, size; ngx_int_t rc; c = rev->data; peer = c->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "redislog read handler"); if(c->read->timer_set) { ngx_del_timer(c->read); } if(rev->timedout || c->error || c->close) { if(rev->timedout) { ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT, "redislog peer timed out"); } if(rev->error) { ngx_log_error(NGX_LOG_ERR, rev->log, 0, "redislog peer connection error"); } ngx_close_connection(c); if(!c->close) { ngx_redislog_reconnect_peer(peer); } return; } buf = peer->recv_buf; for( ;; ) { for( ;; ) { if(buf->last == buf->end) { break; } size = buf->end - buf->last; n = c->recv(c, buf->last, size); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "redislog peer recv %z", n); if(n == NGX_AGAIN) { break; } if(n == 0) { if(peer->num_queued != 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "redis closed the connection prematurely"); } } if(n == 0 || n == NGX_ERROR) { c->error = 1; goto reconnect; } buf->last += n; } rc = ngx_redislog_process_buf(peer, buf); if(rc != NGX_OK) { goto reconnect; } buf->pos = buf->last = buf->start; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "redislog num queued is now %ud", peer->num_queued); if(peer->num_queued == 0) { break; } if (!c->read->ready) { if(ngx_handle_read_event(c->read, 0) != NGX_OK) { goto reconnect; } if(!c->read->timer_set) { ngx_add_timer(c->read, peer->conf->read_timeout); } return; } } ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "redislog set idle read handler"); c->read->handler = ngx_redislog_idle_read_handler; return; reconnect: ngx_close_connection(c); ngx_redislog_reconnect_peer(peer); } static void ngx_redislog_idle_read_handler(ngx_event_t *rev) { ngx_connection_t *c; ngx_redislog_peer_t *peer; int n; char buf[1]; ngx_err_t err; c = rev->data; peer = c->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0, "redislog idle read handler"); if(rev->timedout || c->error || c->close) { if(rev->timedout) { ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT, "redislog peer timed out"); } if(rev->error) { ngx_log_error(NGX_LOG_ERR, rev->log, 0, "redislog peer connection error"); } ngx_close_connection(c); if(!c->close) { ngx_redislog_reconnect_peer(peer); } return; } #if (NGX_HAVE_KQUEUE) if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { if(!rev->pending_eof) { goto no_error; } rev->eof = 1; c->error = 1; if(rev->kq_errno) { rev->error = 1; } goto reconnect; } #endif n = recv(c->fd, buf, 1, MSG_PEEK); err = ngx_socket_errno; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, rev->log, err, "redislog recv(): %d", n); if(n > 0) { goto no_error; } if(n == -1) { if(err == NGX_EAGAIN) { goto no_error; } rev->error = 1; } else { err = 0; } rev->eof = 1; c->error = 1; ngx_log_error(NGX_LOG_ERR, rev->log, err, "redislog connection error"); #if (NGX_HAVE_KQUEUE) reconnect: #endif ngx_close_connection(c); ngx_redislog_reconnect_peer(peer); return; no_error: if(peer->connecting) { ngx_redislog_connected_handler(peer); peer->connecting = 0; } } static void ngx_redislog_write_handler(ngx_event_t *wev) { ngx_connection_t *c; ngx_redislog_peer_t *peer; c = wev->data; peer = c->data; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0, "redislog write handler"); if(wev->timedout || c->error || c->close) { if(wev->timedout) { ngx_log_error(NGX_LOG_ERR, wev->log, NGX_ETIMEDOUT, "redislog peer timed out"); } if(wev->error) { ngx_log_error(NGX_LOG_ERR, wev->log, 0, "redislog peer connection error"); } ngx_close_connection(c); if(!c->close) { ngx_redislog_reconnect_peer(peer); } return; } if(peer->connecting) { ngx_redislog_connected_handler(peer); peer->connecting = 0; } if(c->write->timer_set) { ngx_del_timer(c->write); } peer->send_handler(peer); } static ngx_int_t ngx_redislog_connect_peer(ngx_redislog_peer_t *peer) { ngx_int_t rc; ngx_log_error(NGX_LOG_INFO, peer->log, 0, "redislog connect peer \"%V\"", &peer->conf->name); peer->conn.sockaddr = peer->conf->sockaddr; peer->conn.socklen = peer->conf->socklen; peer->conn.name = &peer->conf->name; peer->conn.get = ngx_event_get_peer; peer->conn.log = peer->log; peer->conn.log_error = NGX_ERROR_ERR; rc = ngx_event_connect_peer(&peer->conn); if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) { if(peer->conn.connection) { ngx_close_connection(peer->conn.connection); } return NGX_ERROR; } peer->conn.connection->data = peer; peer->conn.connection->pool = peer->pool; peer->password_pos = peer->conf->password.data; peer->authenticated = 0; peer->conn.connection->read->handler = ngx_redislog_read_handler; peer->conn.connection->write->handler = ngx_redislog_write_handler; peer->send_handler = peer->conf->authenticate ? ngx_redislog_auth_send : ngx_http_redislog_send; ngx_add_timer(peer->conn.connection->read, peer->conf->connect_timeout); peer->connecting = 1; return NGX_OK; } static void ngx_redislog_connect_handler(ngx_event_t *ev) { ngx_int_t rc; ngx_redislog_peer_t *peer = ev->data; rc = ngx_redislog_connect_peer(peer); if(rc != NGX_OK) { ngx_redislog_reconnect_peer(peer); } } static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p) { p->conn.connection = NULL; p->reconnect_timer.handler = ngx_redislog_connect_handler; p->reconnect_timer.data = p; p->reconnect_timer.log = p->conn.log; ngx_add_timer(&p->reconnect_timer, p->reconnect_timeout); p->reconnect_timeout *= 2; if(p->discarded != 0) { ngx_log_error(NGX_LOG_ERR, p->log, 0, "redislog peer \"%V\" discarded %ui messages", &p->conf->name, p->discarded); p->discarded = 0; } } static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data) { u_char *line; line = ngx_palloc(r->pool, sizeof("yyyy")-1); if(line == NULL) { return NGX_ERROR; } (void) ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1); v->valid = 1; v->no_cacheable = 1; v->not_found = 0; v->data = line; v->len = sizeof("yyyy")-1; return NGX_OK; } static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data) { u_char *line, *p; line = ngx_palloc(r->pool, sizeof("yyyymm")-1); if(line == NULL) { return NGX_ERROR; } p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1); (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1); v->valid = 1; v->no_cacheable = 1; v->not_found = 0; v->data = line; v->len = sizeof("yyyymm")-1; return NGX_OK; } static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data) { u_char *line, *p; line = ngx_palloc(r->pool, sizeof("yyyymmdd")-1); if(line == NULL) { return NGX_ERROR; } p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1); p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1); (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1); v->valid = 1; v->no_cacheable = 1; v->not_found = 0; v->data = line; v->len = sizeof("yyyymmdd")-1; return NGX_OK; } static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r, ngx_http_variable_value_t *v, uintptr_t data) { u_char *line, *p; line = ngx_palloc(r->pool, sizeof("yyyymmddhh")-1); if(line == NULL) { return NGX_ERROR; } p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1); p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1); p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1); (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 11, sizeof("hh")-1); v->valid = 1; v->no_cacheable = 1; v->not_found = 0; v->data = line; v->len = sizeof("yyyymmddhh")-1; return NGX_OK; } static void * ngx_http_redislog_create_loc_conf(ngx_conf_t *cf) { ngx_http_redislog_conf_t *conf; conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_redislog_conf_t)); if (conf == NULL) { return NGX_CONF_ERROR; } return conf; } static char * ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_http_redislog_conf_t *prev = parent; ngx_http_redislog_conf_t *conf = child; if(conf->logs || conf->off) { return NGX_CONF_OK; } conf->logs = prev->logs; conf->off = prev->off; return NGX_CONF_OK; } static void * ngx_redislog_create_conf(ngx_cycle_t *cycle) { ngx_redislog_conf_t *slcf; slcf = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_conf_t)); if(slcf == NULL) { return NULL; } return slcf; } static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf) { ngx_http_variable_t *var, *v; for (v = ngx_http_redislog_variables; v->name.len; v++) { var = ngx_http_add_variable(cf, &v->name, v->flags); if (var == NULL) { return NGX_ERROR; } var->get_handler = v->get_handler; var->data = v->data; } return NGX_OK; } static ngx_int_t ngx_http_redislog_find_peer_by_name(ngx_conf_t *cf, ngx_str_t *name) { ngx_redislog_conf_t *slcf; ngx_redislog_peer_conf_t *pc; ngx_uint_t i; slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module); pc = slcf->peers->elts; for(i = 0; i < slcf->peers->nelts; i++) { if(pc[i].name.len == name->len && ngx_strncmp(pc[i].name.data, name->data, name->len) == 0) { return i; } } return NGX_DECLINED; } static char * ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_http_redislog_conf_t *slcf = conf; ngx_uint_t i; ngx_str_t *value, name, command, arg1, _if; //ngx_str_t arg_num; ngx_http_redislog_t *log; ngx_http_log_fmt_t *fmt; ngx_http_log_main_conf_t *lmcf; ngx_int_t rc; ngx_http_compile_complex_value_t ccv; unsigned format_set; format_set = 0; value = cf->args->elts; if (ngx_strcmp(value[1].data, "off") == 0) { slcf->off = 1; return NGX_CONF_OK; } slcf->off = 0; if (slcf->logs == NULL) { slcf->logs = ngx_array_create(cf->pool, 2, sizeof(ngx_http_redislog_t)); if (slcf->logs == NULL) { return NGX_CONF_ERROR; } } lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module); if(lmcf == NULL) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "redislog module requires log module to be compiled in"); return NGX_CONF_ERROR; } log = ngx_array_push(slcf->logs); if (log == NULL) { return NGX_CONF_ERROR; } ngx_memzero(log, sizeof(ngx_http_redislog_t)); log->peer_name = value[1]; rc = ngx_http_redislog_find_peer_by_name(cf, &log->peer_name); if(rc == NGX_DECLINED) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "redislog peer %V is not defined", &log->peer_name); return NGX_CONF_ERROR; } log->peer_idx = rc; /* * Create and compile key */ log->key = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t)); if(log->key == NULL) { return NGX_CONF_ERROR; } ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t)); ccv.cf = cf; ccv.value = &value[2]; ccv.complex_value = log->key; if(ngx_http_compile_complex_value(&ccv) != NGX_OK) { return NGX_CONF_ERROR; } ngx_str_set(&command, "APPEND"); //ngx_str_set(&arg1, ""); ngx_str_set(&arg1, "test1"); //ngx_str_set(&arg_num, "0"); ngx_str_set(&name, "main"); if (cf->args->nelts >= 4) { for (i = 3; i < cf->args->nelts; i++) { if (ngx_strncmp(value[i].data, "format=", 7) == 0) { format_set = 1; name = value[i]; name.len -= 7; name.data += 7; if (ngx_strcmp(name.data, "combined") == 0) { lmcf->combined_used = 1; } continue; } if (ngx_strncmp(value[i].data, "command=", 8) == 0) { command = value[i]; command.len -= 8; command.data += 8; continue; } if (ngx_strncmp(value[i].data, "arg1=", 5) == 0) { arg1 = value[i]; arg1.len -= 5; arg1.data += 5; log->has_arg1 = 1; continue; } if (ngx_strncmp(value[i].data, "if=", 3) == 0) { if(log->_if != NULL) { continue; } _if = value[i]; _if.len -= 3; _if.data += 3; /* * Create and compile if script */ log->_if = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t)); if(log->_if == NULL) { return NGX_CONF_ERROR; } ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t)); ccv.cf = cf; ccv.value = &_if; ccv.complex_value = log->_if; if(ngx_http_compile_complex_value(&ccv) != NGX_OK) { return NGX_CONF_ERROR; } continue; } if (ngx_strncmp(value[i].data, "ifnot=", 6) == 0) { if(log->ifnot != NULL) { continue; } _if = value[i]; _if.len -= 6; _if.data += 6; /* * Create and compile if script */ log->ifnot = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t)); if(log->ifnot == NULL) { return NGX_CONF_ERROR; } ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t)); ccv.cf = cf; ccv.value = &_if; ccv.complex_value = log->ifnot; if(ngx_http_compile_complex_value(&ccv) != NGX_OK) { return NGX_CONF_ERROR; } continue; } ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid parameter \"%V\"", &value[i]); return NGX_CONF_ERROR; } } if(!format_set) { name.len = sizeof(NGX_DEF_FORMAT) - 1; name.data = (u_char *) NGX_DEF_FORMAT; lmcf->combined_used = 1; } log->command = command; if(log->has_arg1) { log->arg1 = arg1; } fmt = lmcf->formats.elts; for (i = 0; i < lmcf->formats.nelts; i++) { if (fmt[i].name.len == name.len && ngx_strcasecmp(fmt[i].name.data, name.data) == 0) { log->format = &fmt[i]; goto done; } } ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "unknown log format \"%V\"", &name); return NGX_CONF_ERROR; done: return NGX_CONF_OK; } static char * ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_str_t *value; ngx_redislog_conf_t *slcf; ngx_url_t u; ngx_redislog_peer_conf_t *peer; u_char *p; size_t pass_size_len; slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module); value = cf->args->elts; ngx_memzero(&u, sizeof(ngx_url_t)); u.url = value[2]; u.default_port = 6379; u.no_resolve = 0; if(ngx_parse_url(cf->pool, &u) != NGX_OK) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%V: %s", &u.host, u.err); return NGX_CONF_ERROR; } if(slcf->peers == NULL) { slcf->peers = ngx_array_create(cf->pool, 2, sizeof(ngx_redislog_peer_conf_t)); if (slcf->peers == NULL) { return NGX_CONF_ERROR; } } peer = ngx_array_push(slcf->peers); if(peer == NULL) { return NGX_CONF_ERROR; } peer->name = value[1]; peer->sockaddr = u.addrs[0].sockaddr; peer->socklen = u.addrs[0].socklen; if(cf->args->nelts >= 4) { /* * Alloc space for authentication packet and create it */ pass_size_len = ngx_redislog_size_len(value[3].len); peer->password.len = sizeof(NGX_REDIS_AUTH)-1 + 1 + pass_size_len + sizeof(CRLF)-1 + value[3].len + sizeof(CRLF)-1; peer->password.data = ngx_palloc(cf->pool, peer->password.len); if(peer->password.data == NULL) { return NGX_CONF_ERROR; } p = ngx_copy(peer->password.data, NGX_REDIS_AUTH, sizeof(NGX_REDIS_AUTH)-1); *p++ = '$'; p = ngx_redislog_size(p, p + pass_size_len, value[3].len); p = ngx_copy(p, CRLF, sizeof(CRLF)-1); p = ngx_copy(p, value[3].data, value[3].len); p = ngx_copy(p, CRLF, sizeof(CRLF)-1); peer->authenticate = 1; } peer->write_timeout = 30000; peer->read_timeout = 30000; peer->connect_timeout = 30000; peer->reconnect_timeout = 5000; peer->flush_timeout = 2000; peer->ping_timeout = 30000; peer->bufs.num = 200; peer->bufs.size = 2048; peer->recv_buf_size = 1024; return NGX_CONF_OK; } static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf) { ngx_http_core_main_conf_t *cmcf; ngx_http_handler_pt *h; cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module); h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers); if (h == NULL) { return NGX_ERROR; } *h = ngx_http_redislog_handler; return NGX_OK; } static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle) { ngx_int_t rc; ngx_redislog_conf_t *slcf; ngx_uint_t i; ngx_redislog_peer_conf_t *pc; ngx_redislog_peer_t *peer, **ppeer; slcf = (ngx_redislog_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_redislog_module); if(slcf->peers == NULL || slcf->peers->nelts == 0) { return NGX_OK; } rc = ngx_array_init(&ngx_redislog_peers, cycle->pool, slcf->peers->nelts, sizeof(ngx_redislog_peer_t*)); if(rc != NGX_OK) { return rc; } pc = slcf->peers->elts; for(i = 0; i < slcf->peers->nelts; i++) { ppeer = ngx_array_push(&ngx_redislog_peers); if(ppeer == NULL) { return NGX_ERROR; } peer = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_peer_t)); if(peer == NULL) { return NGX_ERROR; } peer->free = ngx_create_chain_of_bufs(cycle->pool, &pc[i].bufs); if(peer->free == NULL) { return NGX_ERROR; } peer->recv_buf = ngx_create_temp_buf(cycle->pool, pc[i].recv_buf_size); if(peer->recv_buf == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } *ppeer = peer; peer->pool = cycle->pool; peer->conf = &pc[i]; peer->log = cycle->log; peer->reconnect_timeout = pc[i].reconnect_timeout; ngx_redislog_connect_peer(peer); } return NGX_OK; }