source: src/client.c @ 687f7d

Revision 687f7d, 45.6 KB checked in by Tomash Brechko <tomash.brechko@…>, 2 years ago (diff)

Revert "Fix SIGPIPE ignoring."

Turned out some orthodox systems do not have sigtimedwait(). Since
advanced systems that have sigtimedwait() also have MSG_NOSIGNAL and
thus do not use SIGPIPE suppression code in question, and other
systems have neither MSG_NOSIGNAL nor sigtimedwait(), the only option
is to revert to the previous suppression code. It works correctly
unless your program is multi-threaded.

This reverts commit 957de1c85a26ef090d3ac1dfa0d244fd1032556d.

  • Property mode set to 100644
Line 
1/*
2  Copyright (C) 2007-2010 Tomash Brechko.  All rights reserved.
3
4  When used to build Perl module:
5
6  This library is free software; you can redistribute it and/or modify
7  it under the same terms as Perl itself, either Perl version 5.8.8
8  or, at your option, any later version of Perl 5 you may have
9  available.
10
11  When used as a standalone library:
12
13  This library is free software; you can redistribute it and/or modify
14  it under the terms of the GNU Lesser General Public License as
15  published by the Free Software Foundation; either version 2.1 of the
16  License, or (at your option) any later version.
17
18  This library is distributed in the hope that it will be useful, but
19  WITHOUT ANY WARRANTY; without even the implied warranty of
20  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21  Lesser General Public License for more details.
22*/
23
24#include "client.h"
25#include "array.h"
26#include "connect.h"
27#include "parse_keyword.h"
28#include "dispatch_key.h"
29#include <stdlib.h>
30#include <string.h>
31#include <stdio.h>
32#ifndef WIN32
33#include "socket_posix.h"
34#include <sys/uio.h>
35#include <signal.h>
36#include <time.h>
37#include <netinet/in.h>
38#include <netinet/tcp.h>
39#else  /* WIN32 */
40#include "socket_win32.h"
41#endif  /* WIN32 */
42
43
44/* REPLY_BUF_SIZE should be large enough to contain first reply line.  */
45#define REPLY_BUF_SIZE  1536
46
47
48#define FLAGS_STUB  "4294967295"
49#define EXPTIME_STUB  "2147483647"
50#define DELAY_STUB  "4294967295"
51#define VALUE_SIZE_STUB  "18446744073709551615"
52#define CAS_STUB  "18446744073709551615"
53#define ARITH_STUB  "18446744073709551615"
54#define NOREPLY  "noreply"
55
56
57static const char eol[2] = "\r\n";
58
59
60typedef unsigned long long generation_type;
61
62
63struct value_state
64{
65  void *opaque;
66  void *ptr;
67  value_size_type size;
68  struct meta_object meta;
69};
70
71
72struct embedded_state
73{
74  void *opaque;
75  void *ptr;
76};
77
78
79struct command_state;
80typedef int (*parse_reply_func)(struct command_state *state);
81
82
83enum command_phase
84{
85  PHASE_RECEIVE,
86  PHASE_PARSE,
87  PHASE_VALUE,
88  PHASE_DONE
89};
90
91
92enum socket_mode_e { NOT_TCP = -1, TCP_LATENCY, TCP_THROUGHPUT };
93
94
95struct client;
96
97
98struct command_state
99{
100  struct client *client;
101  int fd;
102  struct pollfd *pollfd;
103  enum socket_mode_e socket_mode;
104  int noreply;
105  int last_cmd_noreply;
106
107  struct array iov_buf;
108  int str_step;
109
110  generation_type generation;
111
112  int phase;
113  int nowait_count;
114  int reply_count;
115
116  char *buf;
117  char *pos;
118  char *end;
119  char *eol;
120  int match;
121
122  struct iovec *iov;
123  int iov_count;
124  int write_offset;
125  struct iovec *key;
126  int key_count;
127  int index;
128  int index_head;
129  int index_tail;
130
131  parse_reply_func parse_reply;
132  struct result_object *object;
133
134  union
135  {
136    struct value_state value;
137    struct embedded_state embedded;
138  } u;
139};
140
141
142static inline
143int
144command_state_init(struct command_state *state,
145                   struct client *c, int noreply)
146{
147  state->client = c;
148  state->fd = -1;
149  state->noreply = noreply;
150  state->last_cmd_noreply = 0;
151
152  array_init(&state->iov_buf);
153
154  state->generation = 0;
155  state->nowait_count = 0;
156  state->buf = (char *) malloc(REPLY_BUF_SIZE);
157  if (! state->buf)
158    return -1;
159
160  state->pos = state->end = state->eol = state->buf;
161
162  return 0;
163}
164
165
166static inline
167void
168command_state_destroy(struct command_state *state)
169{
170  free(state->buf);
171
172  array_destroy(&state->iov_buf);
173
174  if (state->fd != -1)
175    close(state->fd);
176}
177
178
179static inline
180void
181command_state_reinit(struct command_state *state)
182{
183  if (state->fd != -1)
184    close(state->fd);
185
186  state->fd = -1;
187  state->last_cmd_noreply = 0;
188
189  array_clear(state->iov_buf);
190
191  state->generation = 0;
192  state->nowait_count = 0;
193
194  state->pos = state->end = state->eol = state->buf;
195}
196
197
198struct server
199{
200  char *host;
201  size_t host_len;
202  char *port;
203  int failure_count;
204  time_t failure_expires;
205  struct command_state cmd_state;
206};
207
208
209static inline
210int
211server_init(struct server *s, struct client *c,
212            const char *host, size_t host_len,
213            const char *port, size_t port_len, int noreply)
214{
215  if (port)
216    s->host = (char *) malloc(host_len + 1 + port_len + 1);
217  else
218    s->host = (char *) malloc(host_len + 1);
219
220  if (! s->host)
221    return MEMCACHED_FAILURE;
222
223  memcpy(s->host, host, host_len);
224  s->host[host_len] = '\0';
225  s->host_len = host_len;
226
227  if (port)
228    {
229      s->port = s->host + host_len + 1;
230      memcpy(s->port, port, port_len);
231      s->port[port_len] = '\0';
232    }
233  else
234    {
235      s->port = NULL;
236    }
237
238  s->failure_count = 0;
239  s->failure_expires = 0;
240
241  if (command_state_init(&s->cmd_state, c, noreply) != 0)
242    return MEMCACHED_FAILURE;
243
244  return MEMCACHED_SUCCESS;
245}
246
247
248static inline
249void
250server_destroy(struct server *s)
251{
252  free(s->host); /* This also frees port string.  */
253  command_state_destroy(&s->cmd_state);
254}
255
256
257static inline
258void
259server_reinit(struct server *s)
260{
261  s->failure_count = 0;
262  s->failure_expires = 0;
263
264  command_state_reinit(&s->cmd_state);
265}
266
267
268struct index_node
269{
270  int index;
271  int next;
272};
273
274
275struct client
276{
277  struct array pollfds;
278  struct array servers;
279
280  struct dispatch_state dispatch;
281
282  char *prefix;
283  size_t prefix_len;
284
285  int connect_timeout;          /* 1/1000 sec.  */
286  int io_timeout;               /* 1/1000 sec.  */
287  int max_failures;
288  int failure_timeout;          /* 1 sec.  */
289  int close_on_error;
290  int nowait;
291  int hash_namespace;
292
293  struct array index_list;
294  struct array str_buf;
295  int iov_max;
296
297  generation_type generation;
298
299  struct result_object *object;
300  int noreply;
301};
302
303
304static inline
305void
306command_state_reset(struct command_state *state, int str_step,
307                    parse_reply_func parse_reply)
308{
309  state->reply_count = 0;
310  state->str_step = str_step;
311  state->key_count = 0;
312  state->parse_reply = parse_reply;
313
314  state->phase = PHASE_RECEIVE;
315
316  array_clear(state->iov_buf);
317
318  state->write_offset = 0;
319  state->index_head = state->index_tail = -1;
320  state->generation = state->client->generation;
321
322#if 0 /* No need to initialize the following.  */
323  state->key = NULL;
324  state->index = 0;
325  state->match = NO_MATCH;
326  state->iov_count = 0;
327  state->iov = NULL;
328#endif
329}
330
331
332static inline
333int
334is_active(struct command_state *state)
335{
336  return (state->generation == state->client->generation);
337}
338
339
340static inline
341void
342deactivate(struct command_state *state)
343{
344  state->generation = state->client->generation - 1;
345}
346
347
348static inline
349int
350get_index(struct command_state *state)
351{
352  struct index_node *node = array_elem(state->client->index_list,
353                                       struct index_node, state->index_head);
354  return node->index;
355}
356
357
358static inline
359void
360next_index(struct command_state *state)
361{
362  struct index_node *node = array_elem(state->client->index_list,
363                                       struct index_node, state->index_head);
364  state->index_head = node->next;
365}
366
367
368struct client *
369client_init()
370{
371  struct client *c;
372
373#ifdef WIN32
374  if (win32_socket_library_acquire() != 0)
375    return NULL;
376#endif  /* WIN32 */
377
378  c = malloc(sizeof(struct client));
379  if (! c)
380    return NULL;
381
382  array_init(&c->pollfds);
383  array_init(&c->servers);
384  array_init(&c->index_list);
385  array_init(&c->str_buf);
386
387  dispatch_init(&c->dispatch);
388
389  c->connect_timeout = 250;
390  c->io_timeout = 1000;
391  c->prefix = " ";
392  c->prefix_len = 1;
393  c->max_failures = 0;
394  c->failure_timeout = 10;
395  c->close_on_error = 1;
396  c->nowait = 0;
397  c->hash_namespace = 0;
398
399  c->iov_max = get_iov_max();
400
401  c->generation = 1;            /* Different from initial command state.  */
402
403  c->object = NULL;
404  c->noreply = 0;
405
406  return c;
407}
408
409
410static
411int
412client_noreply_push(struct client *c);
413
414
415void
416client_destroy(struct client *c)
417{
418  struct server *s;
419
420  client_nowait_push(c);
421  client_noreply_push(c);
422
423  for (array_each(c->servers, struct server, s))
424    server_destroy(s);
425
426  dispatch_destroy(&c->dispatch);
427
428  array_destroy(&c->servers);
429  array_destroy(&c->pollfds);
430  array_destroy(&c->index_list);
431  array_destroy(&c->str_buf);
432
433  if (c->prefix_len > 1)
434    free(c->prefix);
435  free(c);
436
437#ifdef WIN32
438  win32_socket_library_release();
439#endif  /* WIN32 */
440}
441
442
443void
444client_reinit(struct client *c)
445{
446  struct server *s;
447
448  for (array_each(c->servers, struct server, s))
449    server_reinit(s);
450
451  array_clear(c->str_buf);
452  array_clear(c->index_list);
453
454  c->generation = 1;            /* Different from initial command state.  */
455  c->object = NULL;
456}
457
458
459int
460client_set_ketama_points(struct client *c, int ketama_points)
461{
462  /* Should be called before we added any server.  */
463  if (! array_empty(c->servers) || ketama_points < 0)
464    return MEMCACHED_FAILURE;
465
466  dispatch_set_ketama_points(&c->dispatch, ketama_points);
467
468  return MEMCACHED_SUCCESS;
469}
470
471
472void
473client_set_connect_timeout(struct client *c, int to)
474{
475  c->connect_timeout = (to > 0 ? to : -1);
476}
477
478
479void
480client_set_io_timeout(struct client *c, int to)
481{
482  c->io_timeout = (to > 0 ? to : -1);
483}
484
485
486void
487client_set_max_failures(struct client *c, int f)
488{
489  c->max_failures = f;
490}
491
492
493void
494client_set_failure_timeout(struct client *c, int to)
495{
496  c->failure_timeout = to;
497}
498
499
500void
501client_set_close_on_error(struct client *c, int enable)
502{
503  c->close_on_error = enable;
504}
505
506
507void
508client_set_nowait(struct client *c, int enable)
509{
510  c->nowait = enable;
511}
512
513
514void
515client_set_hash_namespace(struct client *c, int enable)
516{
517  c->hash_namespace = enable;
518}
519
520
521int
522client_add_server(struct client *c, const char *host, size_t host_len,
523                  const char *port, size_t port_len, double weight,
524                  int noreply)
525{
526  int res;
527
528  if (weight <= 0.0)
529    return MEMCACHED_FAILURE;
530
531  if (array_extend(c->pollfds, struct pollfd, 1, ARRAY_EXTEND_EXACT) == -1)
532    return MEMCACHED_FAILURE;
533
534  if (array_extend(c->servers, struct server, 1, ARRAY_EXTEND_EXACT) == -1)
535    return MEMCACHED_FAILURE;
536
537  res = server_init(array_end(c->servers, struct server), c,
538                    host, host_len, port, port_len, noreply);
539  if (res != MEMCACHED_SUCCESS)
540    return res;
541
542  res = dispatch_add_server(&c->dispatch, host, host_len, port, port_len,
543                            weight, array_size(c->servers));
544  if (res == -1)
545    return MEMCACHED_FAILURE;
546
547  array_push(c->pollfds);
548  array_push(c->servers);
549
550  return MEMCACHED_SUCCESS;
551}
552
553
554int
555client_set_prefix(struct client *c, const char *ns, size_t ns_len)
556{
557  char *s;
558
559  if (ns_len == 0)
560    {
561      if (c->prefix_len > 1)
562        {
563          free(c->prefix);
564          c->prefix = " ";
565          c->prefix_len = 1;
566        }
567
568      if (c->hash_namespace)
569        dispatch_set_prefix(&c->dispatch, "", 0);
570
571      return MEMCACHED_SUCCESS;
572    }
573
574  if (c->prefix_len == 1)
575    c->prefix = NULL;
576  s = (char *) realloc(c->prefix, 1 + ns_len + 1);
577  if (! s)
578    return MEMCACHED_FAILURE;
579
580  s[0] = ' ';
581  memcpy(s + 1, ns, ns_len);
582  s[ns_len + 1] = '\0';
583
584  c->prefix = s;
585  c->prefix_len = 1 + ns_len;
586
587  if (c->hash_namespace)
588    dispatch_set_prefix(&c->dispatch, ns, ns_len);
589
590  return MEMCACHED_SUCCESS;
591}
592
593
594const char *
595client_get_prefix(struct client *c, size_t *ns_len)
596{
597  *ns_len = c->prefix_len - 1;
598
599  return (c->prefix + 1);
600}
601
602
603static inline
604ssize_t
605read_restart(int fd, void *buf, size_t size)
606{
607  ssize_t res;
608
609  do
610    res = read(fd, buf, size);
611  while (res == -1 && errno == EINTR);
612
613  return res;
614}
615
616
617static inline
618ssize_t
619readv_restart(int fd, const struct iovec *iov, int count)
620{
621  ssize_t res;
622
623  do
624    res = readv(fd, iov, count);
625  while (res == -1 && errno == EINTR);
626
627  return res;
628}
629
630
631#ifndef MSG_NOSIGNAL
632
633static inline
634ssize_t
635writev_restart(int fd, const struct iovec *iov, int count)
636{
637  ssize_t res;
638
639  do
640    res = writev(fd, iov, count);
641  while (res == -1 && errno == EINTR);
642
643  return res;
644}
645
646#else  /* MSG_NOSIGNAL */
647
648static inline
649ssize_t
650writev_restart(int fd, const struct iovec *iov, int count)
651{
652  struct msghdr msg;
653  ssize_t res;
654
655  memset(&msg, 0, sizeof(msg));
656  msg.msg_iov = (struct iovec *) iov;
657  msg.msg_iovlen = count;
658
659  do
660    res = sendmsg(fd, &msg, MSG_NOSIGNAL);
661  while (res == -1 && errno == EINTR);
662
663  return res;
664}
665
666#endif /* MSG_NOSIGNAL */
667
668
669/*
670  parse_key() assumes that one key definitely matches.
671*/
672static
673int
674parse_key(struct command_state *state)
675{
676  char *key_pos;
677
678  /* Skip over the prefix.  */
679  state->pos += state->client->prefix_len - 1;
680
681  key_pos = (char *) state->key->iov_base;
682  while (state->key_count > 1)
683    {
684      char *key_end, *prefix_key;
685      size_t prefix_len;
686
687      key_end = (char *) state->key->iov_base + state->key->iov_len;
688      while (key_pos != key_end && *state->pos == *key_pos)
689        {
690          ++key_pos;
691          ++state->pos;
692        }
693
694      if (key_pos == key_end && *state->pos == ' ')
695        break;
696
697      prefix_key = (char *) state->key->iov_base;
698      prefix_len = key_pos - prefix_key;
699      /*
700        TODO: Below it might be faster to compare the tail of the key
701        before comparing the head.
702      */
703      do
704        {
705          next_index(state);
706          state->key += 2;
707        }
708      while (--state->key_count > 1
709             && (state->key->iov_len < prefix_len
710                 || memcmp(state->key->iov_base,
711                           prefix_key, prefix_len) != 0));
712
713      key_pos = (char *) state->key->iov_base + prefix_len;
714    }
715
716  if (state->key_count == 1)
717    {
718      while (*state->pos != ' ')
719        ++state->pos;
720    }
721
722  --state->key_count;
723  state->key += 2;
724  state->index = get_index(state);
725  next_index(state);
726
727  return MEMCACHED_SUCCESS;
728}
729
730
731static
732int
733read_value(struct command_state *state)
734{
735  value_size_type size;
736  size_t remains;
737
738  size = state->end - state->pos;
739  if (size > state->u.value.size)
740    size = state->u.value.size;
741  if (size > 0)
742    {
743      memcpy(state->u.value.ptr, state->pos, size);
744      state->u.value.size -= size;
745      state->u.value.ptr = (char *) state->u.value.ptr + size;
746      state->pos += size;
747    }
748
749  remains = state->end - state->pos;
750  if (remains < sizeof(eol))
751    {
752      struct iovec iov[2], *piov;
753
754      state->pos = memmove(state->buf, state->pos, remains);
755      state->end = state->buf + remains;
756
757      iov[0].iov_base = state->u.value.ptr;
758      iov[0].iov_len = state->u.value.size;
759      iov[1].iov_base = state->end;
760      iov[1].iov_len = REPLY_BUF_SIZE - remains;
761      piov = &iov[state->u.value.size > 0 ? 0 : 1];
762
763      do
764        {
765          ssize_t res;
766
767          res = readv_restart(state->fd, piov, iov + 2 - piov);
768          if (res <= 0)
769            {
770              state->u.value.ptr = iov[0].iov_base;
771              state->u.value.size = iov[0].iov_len;
772              state->end = iov[1].iov_base;
773
774              if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
775                return MEMCACHED_EAGAIN;
776
777              state->object->free(state->u.value.opaque);
778              return MEMCACHED_CLOSED;
779            }
780
781          if ((size_t) res >= piov->iov_len)
782            {
783              piov->iov_base = (char *) piov->iov_base + piov->iov_len;
784              res -= piov->iov_len;
785              piov->iov_len = 0;
786              ++piov;
787            }
788
789          piov->iov_len -= res;
790          piov->iov_base = (char *) piov->iov_base + res;
791        }
792      while ((size_t) ((char *) iov[1].iov_base - state->pos) < sizeof(eol));
793
794      state->end = iov[1].iov_base;
795    }
796
797  if (memcmp(state->pos, eol, sizeof(eol)) != 0)
798    {
799      state->object->free(state->u.value.opaque);
800      return MEMCACHED_UNKNOWN;
801    }
802  state->pos += sizeof(eol);
803  state->eol = state->pos;
804
805  state->object->store(state->object->arg, state->u.value.opaque,
806                       state->index, &state->u.value.meta);
807
808  return MEMCACHED_SUCCESS;
809}
810
811
812static inline
813int
814swallow_eol(struct command_state *state, int skip, int done)
815{
816  if (! skip && state->eol - state->pos != sizeof(eol))
817    return MEMCACHED_UNKNOWN;
818
819  state->pos = state->eol;
820
821  if (done)
822    state->phase = PHASE_DONE;
823
824  return MEMCACHED_SUCCESS;
825}
826
827
828static
829int
830parse_ull(struct command_state *state, unsigned long long *result)
831{
832  unsigned long long res = 0;
833  const char *beg;
834
835  while (*state->pos == ' ')
836    ++state->pos;
837
838  beg = state->pos;
839
840  while (1)
841    {
842      switch (*state->pos)
843        {
844        case '0': case '1': case '2': case '3': case '4':
845        case '5': case '6': case '7': case '8': case '9':
846          res = res * 10 + (*state->pos - '0');
847          ++state->pos;
848          break;
849
850        default:
851          *result = res;
852          return (beg != state->pos ? MEMCACHED_SUCCESS : MEMCACHED_UNKNOWN);
853        }
854    }
855}
856
857
858static
859int
860parse_get_reply(struct command_state *state)
861{
862  unsigned long long num;
863  int res;
864
865  switch (state->match)
866    {
867    case MATCH_END:
868      return swallow_eol(state, 0, 1);
869
870    default:
871      return MEMCACHED_UNKNOWN;
872
873    case MATCH_VALUE:
874      break;
875    }
876
877  while (*state->pos == ' ')
878    ++state->pos;
879
880  res = parse_key(state);
881  if (res != MEMCACHED_SUCCESS)
882    return res;
883
884  res = parse_ull(state, &num);
885  if (res != MEMCACHED_SUCCESS)
886    return res;
887  state->u.value.meta.flags = num;
888
889  res = parse_ull(state, &num);
890  if (res != MEMCACHED_SUCCESS)
891    return res;
892  state->u.value.size = num;
893
894  if (state->u.value.meta.use_cas)
895    {
896      res = parse_ull(state, &num);
897      if (res != MEMCACHED_SUCCESS)
898        return res;
899      state->u.value.meta.cas = num;
900    }
901
902  res = swallow_eol(state, 0, 0);
903  if (res != MEMCACHED_SUCCESS)
904    return res;
905
906  state->u.value.ptr = state->object->alloc(state->u.value.size,
907                                            &state->u.value.opaque);
908  if (! state->u.value.ptr)
909    return MEMCACHED_FAILURE;
910
911  state->phase = PHASE_VALUE;
912
913  return MEMCACHED_SUCCESS;
914}
915
916
917static inline
918void
919store_result(struct command_state *state, int res)
920{
921  int index = get_index(state);
922  next_index(state);
923  state->object->store(state->object->arg, (void *) (long) res, index, NULL);
924}
925
926
927static
928int
929parse_set_reply(struct command_state *state)
930{
931  switch (state->match)
932    {
933    case MATCH_STORED:
934      store_result(state, 1);
935      break;
936
937    case MATCH_NOT_STORED:
938    case MATCH_NOT_FOUND:
939    case MATCH_EXISTS:
940      store_result(state, 0);
941      break;
942
943    default:
944      return MEMCACHED_UNKNOWN;
945    }
946
947  return swallow_eol(state, 0, 1);
948}
949
950
951static
952int
953parse_delete_reply(struct command_state *state)
954{
955  switch (state->match)
956    {
957    case MATCH_DELETED:
958      store_result(state, 1);
959      break;
960
961    case MATCH_NOT_FOUND:
962      store_result(state, 0);
963      break;
964
965    default:
966      return MEMCACHED_UNKNOWN;
967    }
968
969  return swallow_eol(state, 0, 1);
970}
971
972
973static
974int
975parse_arith_reply(struct command_state *state)
976{
977  char *beg;
978  size_t len;
979  int zero;
980
981  state->index = get_index(state);
982  next_index(state);
983
984  switch (state->match)
985    {
986    case MATCH_NOT_FOUND:
987      /* On NOT_FOUND we store the defined empty string.  */
988      state->u.embedded.ptr =
989        state->object->alloc(0, &state->u.embedded.opaque);
990      if (! state->u.embedded.ptr)
991        return MEMCACHED_FAILURE;
992
993      state->object->store(state->object->arg, state->u.embedded.opaque,
994                           state->index, NULL);
995
996      return swallow_eol(state, 0, 1);
997
998    default:
999      return MEMCACHED_UNKNOWN;
1000
1001    case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1002    case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1003      break;
1004    }
1005
1006  beg = state->pos - 1;
1007  len = 0;
1008  while (len == 0)
1009    {
1010      switch (*state->pos)
1011        {
1012        case '0': case '1': case '2': case '3': case '4':
1013        case '5': case '6': case '7': case '8': case '9':
1014          ++state->pos;
1015          break;
1016
1017        default:
1018          len = state->pos - beg;
1019          break;
1020        }
1021    }
1022
1023  zero = (*beg == '0' && len == 1);
1024  if (zero)
1025    len = 3;
1026
1027  state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1028  if (! state->u.embedded.ptr)
1029    return MEMCACHED_FAILURE;
1030
1031  if (! zero)
1032    memcpy(state->u.embedded.ptr, beg, len);
1033  else
1034    memcpy(state->u.embedded.ptr, "0E0", 3);
1035
1036  state->object->store(state->object->arg, state->u.embedded.opaque,
1037                       state->index, NULL);
1038
1039  /* Value may be space padded.  */
1040  return swallow_eol(state, 1, 1);
1041}
1042
1043
1044static
1045int
1046parse_ok_reply(struct command_state *state)
1047{
1048  switch (state->match)
1049    {
1050    case MATCH_OK:
1051      store_result(state, 1);
1052      return swallow_eol(state, 0, 1);
1053
1054    default:
1055      return MEMCACHED_UNKNOWN;
1056    }
1057}
1058
1059
1060static
1061int
1062parse_version_reply(struct command_state *state)
1063{
1064  const char *beg;
1065  size_t len;
1066  int res;
1067
1068  state->index = get_index(state);
1069  next_index(state);
1070
1071  switch (state->match)
1072    {
1073    default:
1074      return MEMCACHED_UNKNOWN;
1075
1076    case MATCH_VERSION:
1077      break;
1078    }
1079
1080  while (*state->pos == ' ')
1081    ++state->pos;
1082
1083  beg = state->pos;
1084
1085  res = swallow_eol(state, 1, 1);
1086  if (res != MEMCACHED_SUCCESS)
1087    return res;
1088
1089  len = state->pos - sizeof(eol) - beg;
1090
1091  state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1092  if (! state->u.embedded.ptr)
1093    return MEMCACHED_FAILURE;
1094
1095  memcpy(state->u.embedded.ptr, beg, len);
1096
1097  state->object->store(state->object->arg, state->u.embedded.opaque,
1098                       state->index, NULL);
1099
1100  return MEMCACHED_SUCCESS;
1101}
1102
1103
1104static
1105int
1106parse_nowait_reply(struct command_state *state)
1107{
1108  int res;
1109
1110  /*
1111    Cast to enum parse_keyword_e to get compiler warning when some
1112    match result is not handled.
1113  */
1114  switch ((enum parse_keyword_e) state->match)
1115    {
1116    case MATCH_DELETED:
1117    case MATCH_OK:
1118    case MATCH_STORED:
1119    case MATCH_EXISTS:
1120    case MATCH_NOT_FOUND:
1121    case MATCH_NOT_STORED:
1122      return swallow_eol(state, 0, 1);
1123
1124    case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1125    case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1126    case MATCH_VERSION: /* see client_noreply_push().  */
1127      return swallow_eol(state, 1, 1);
1128
1129    case MATCH_ERROR:
1130      res = swallow_eol(state, 0, 1);
1131      return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1132
1133    case MATCH_CLIENT_ERROR:
1134    case MATCH_SERVER_ERROR:
1135      res = swallow_eol(state, 1, 1);
1136      return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1137
1138    case NO_MATCH:
1139    case MATCH_VALUE:
1140    case MATCH_END:
1141    case MATCH_STAT:
1142      return MEMCACHED_UNKNOWN;
1143    }
1144
1145  /* Never reach here.  */
1146  return MEMCACHED_UNKNOWN;
1147}
1148
1149
1150static
1151void
1152client_mark_failed(struct client *c, struct server *s)
1153{
1154  if (s->cmd_state.fd != -1)
1155    {
1156      close(s->cmd_state.fd);
1157      s->cmd_state.fd = -1;
1158      s->cmd_state.nowait_count = 0;
1159      s->cmd_state.pos = s->cmd_state.end = s->cmd_state.eol =
1160        s->cmd_state.buf;
1161    }
1162
1163  if (c->max_failures > 0)
1164    {
1165      time_t now = time(NULL);
1166      if (s->failure_expires < now)
1167        s->failure_count = 0;
1168      ++s->failure_count;
1169      /*
1170        Set timeout on first failure, and on max_failures.  The idea
1171        is that if max_failures had happened during failure_timeout,
1172        we do not retry in another failure_timeout seconds.  This is
1173        not entirely true: we remember the time of the first failure,
1174        but for exact accounting we would have to keep time of each
1175        failure.  However such exact measurement is not necessary.
1176      */
1177      if (s->failure_count == 1 || s->failure_count == c->max_failures)
1178        s->failure_expires = now + c->failure_timeout;
1179    }
1180}
1181
1182
1183static
1184int
1185send_request(struct command_state *state, struct server *s)
1186{
1187  while (state->iov_count > 0)
1188    {
1189      int count;
1190      ssize_t res;
1191      size_t len;
1192
1193      count = (state->iov_count < state->client->iov_max
1194               ? state->iov_count : state->client->iov_max);
1195
1196      state->iov->iov_base =
1197        (char *) state->iov->iov_base + state->write_offset;
1198      state->iov->iov_len -= state->write_offset;
1199      len = state->iov->iov_len;
1200
1201      res = writev_restart(state->fd, state->iov, count);
1202
1203      state->iov->iov_base =
1204        (char *) state->iov->iov_base - state->write_offset;
1205      state->iov->iov_len += state->write_offset;
1206
1207      if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1208        return MEMCACHED_EAGAIN;
1209      if (res <= 0)
1210        {
1211          deactivate(state);
1212          client_mark_failed(state->client, s);
1213
1214          return MEMCACHED_CLOSED;
1215        }
1216
1217      while ((size_t) res >= len)
1218        {
1219          res -= len;
1220          ++state->iov;
1221          if (--state->iov_count == 0)
1222            break;
1223          len = state->iov->iov_len;
1224          state->write_offset = 0;
1225        }
1226      state->write_offset += res;
1227    }
1228
1229  if (state->reply_count == 0)
1230    deactivate(state);
1231
1232  return MEMCACHED_SUCCESS;
1233}
1234
1235
1236static
1237int
1238receive_reply(struct command_state *state)
1239{
1240  while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1241    ++state->eol;
1242
1243  /*
1244    When buffer is empty, move to the beginning of it for better CPU
1245    cache utilization.
1246  */
1247  if (state->pos == state->end)
1248    state->pos = state->end = state->eol = state->buf;
1249
1250  while (state->eol == state->end)
1251    {
1252      size_t size;
1253      ssize_t res;
1254
1255      size = REPLY_BUF_SIZE - (state->end - state->buf);
1256      if (size == 0)
1257        {
1258          if (state->pos != state->buf)
1259            {
1260              size_t len = state->end - state->pos;
1261              state->pos = memmove(state->buf, state->pos, len);
1262              state->end -= REPLY_BUF_SIZE - len;
1263              state->eol -= REPLY_BUF_SIZE - len;
1264              size = REPLY_BUF_SIZE - len;
1265            }
1266          else
1267            {
1268              return MEMCACHED_UNKNOWN;
1269            }
1270        }
1271
1272      res = read_restart(state->fd, state->end, size);
1273      if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1274        return MEMCACHED_EAGAIN;
1275      if (res <= 0)
1276        return MEMCACHED_CLOSED;
1277
1278      state->end += res;
1279
1280      while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1281        ++state->eol;
1282    }
1283
1284  if ((size_t) (state->eol - state->buf) < sizeof(eol) - 1
1285      || memcmp(state->eol - (sizeof(eol) - 1), eol, sizeof(eol) - 1) != 0)
1286    return MEMCACHED_UNKNOWN;
1287
1288  ++state->eol;
1289
1290  return MEMCACHED_SUCCESS;
1291}
1292
1293
1294static
1295int
1296parse_reply(struct command_state *state)
1297{
1298  int res, skip;
1299
1300  switch (state->match)
1301    {
1302    case MATCH_ERROR:
1303    case MATCH_CLIENT_ERROR:
1304    case MATCH_SERVER_ERROR:
1305      skip = (state->match != MATCH_ERROR);
1306      res = swallow_eol(state, skip, 1);
1307
1308      return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1309
1310    default:
1311      if (state->nowait_count)
1312        return parse_nowait_reply(state);
1313      else
1314        return state->parse_reply(state);
1315
1316    case NO_MATCH:
1317      return MEMCACHED_UNKNOWN;
1318    }
1319}
1320
1321
1322static
1323int
1324process_reply(struct command_state *state, struct server *s)
1325{
1326  int res = 0;
1327
1328  while (1)
1329    {
1330      switch (state->phase)
1331        {
1332        case PHASE_RECEIVE:
1333          res = receive_reply(state);
1334          if (res != MEMCACHED_SUCCESS)
1335            break;
1336
1337          state->match = parse_keyword(&state->pos);
1338
1339          state->phase = PHASE_PARSE;
1340
1341          /* Fall into below.  */
1342
1343        case PHASE_PARSE:
1344          res = parse_reply(state);
1345          if (res != MEMCACHED_SUCCESS)
1346            break;
1347
1348          if (state->phase != PHASE_DONE)
1349            continue;
1350
1351          /* Fall into below.  */
1352
1353        case PHASE_DONE:
1354          res = MEMCACHED_SUCCESS;
1355
1356          break;
1357
1358        case PHASE_VALUE:
1359          res = read_value(state);
1360          if (res != MEMCACHED_SUCCESS)
1361            break;
1362
1363          state->phase = PHASE_RECEIVE;
1364          continue;
1365        }
1366
1367      switch (res)
1368        {
1369        case MEMCACHED_ERROR:
1370          if (! (state->client->close_on_error || state->noreply))
1371            break;
1372
1373          /* else fall into below.  */
1374
1375        case MEMCACHED_UNKNOWN:
1376        case MEMCACHED_CLOSED:
1377          deactivate(state);
1378          client_mark_failed(state->client, s);
1379
1380          /* Fall into below.  */
1381
1382        case MEMCACHED_EAGAIN:
1383          return res;
1384        }
1385
1386      if (state->nowait_count > 0)
1387        {
1388          --state->nowait_count;
1389        }
1390      else if (--state->reply_count == 0)
1391        {
1392          if (state->iov_count == 0)
1393            deactivate(state);
1394
1395          return res;
1396        }
1397
1398      state->phase = PHASE_RECEIVE;
1399    }
1400}
1401
1402
1403static inline
1404void
1405state_prepare(struct command_state *state)
1406{
1407  state->key = array_elem(state->iov_buf, struct iovec, 2);
1408  state->iov = array_beg(state->iov_buf, struct iovec);
1409  state->iov_count = array_size(state->iov_buf);
1410
1411  if (state->str_step > 0)
1412    {
1413      struct iovec *iov = state->iov;
1414      char *buf = array_beg(state->client->str_buf, char);
1415      int count = state->iov_count, step = state->str_step;
1416
1417      if (state->key_count > 0)
1418        {
1419          iov += 3;
1420          count -= 3;
1421        }
1422
1423      while (count > 0)
1424        {
1425          iov->iov_base = (void *) (buf + (long) (iov->iov_base));
1426          iov += step;
1427          count -= step;
1428        }
1429    }
1430}
1431
1432
1433int
1434client_execute(struct client *c)
1435{
1436  int first_iter = 1;
1437
1438#if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1439  struct sigaction orig, ignore;
1440  int res;
1441
1442  ignore.sa_handler = SIG_IGN;
1443  sigemptyset(&ignore.sa_mask);
1444  ignore.sa_flags = 0;
1445  res = sigaction(SIGPIPE, &ignore, &orig);
1446  if (res == -1)
1447    return MEMCACHED_FAILURE;
1448#endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1449
1450  while (1)
1451    {
1452      struct server *s;
1453      struct pollfd *pollfd_beg, *pollfd;
1454      int res;
1455
1456      pollfd_beg = array_beg(c->pollfds, struct pollfd);
1457      pollfd = pollfd_beg;
1458
1459      for (array_each(c->servers, struct server, s))
1460        {
1461          int may_write, may_read;
1462          struct command_state *state = &s->cmd_state;
1463
1464          if (! is_active(state))
1465            continue;
1466
1467          if (first_iter)
1468            {
1469              state_prepare(state);
1470
1471              may_write = 1;
1472              may_read = (state->reply_count > 0
1473                          || state->nowait_count > 0);
1474            }
1475          else
1476            {
1477              const short revents = state->pollfd->revents;
1478
1479              may_write = revents & (POLLOUT | POLLERR | POLLHUP);
1480              may_read = revents & (POLLIN | POLLERR | POLLHUP);
1481            }
1482
1483          if (may_read || may_write)
1484            {
1485              if (may_write)
1486                {
1487                  int res;
1488
1489                  res = send_request(state, s);
1490                  if (res == MEMCACHED_CLOSED)
1491                    may_read = 0;
1492                }
1493
1494              if (may_read)
1495                process_reply(state, s);
1496
1497              if (! is_active(state))
1498                continue;
1499            }
1500
1501          pollfd->events = 0;
1502
1503          if (state->iov_count > 0)
1504            pollfd->events |= POLLOUT;
1505          if (state->reply_count > 0 || state->nowait_count > 0)
1506            pollfd->events |= POLLIN;
1507
1508          if (pollfd->events != 0)
1509            {
1510              pollfd->fd = state->fd;
1511              state->pollfd = pollfd;
1512              ++pollfd;
1513            }
1514        }
1515
1516      if (pollfd == pollfd_beg)
1517        break;
1518
1519      do
1520        res = poll(pollfd_beg, pollfd - pollfd_beg, c->io_timeout);
1521      while (res == -1 && errno == EINTR);
1522
1523      /*
1524        On error or timeout close all active connections.  Otherwise
1525        we might receive garbage on them later.
1526      */
1527      if (res <= 0)
1528        {
1529          for (array_each(c->servers, struct server, s))
1530            {
1531              struct command_state *state = &s->cmd_state;
1532
1533              if (is_active(state))
1534                {
1535                  /*
1536                    Ugly fix for possible memory leak.  FIXME:
1537                    requires redesign.
1538                  */
1539                  if (state->phase == PHASE_VALUE)
1540                    state->object->free(state->u.value.opaque);
1541
1542                  client_mark_failed(c, s);
1543                }
1544            }
1545
1546          break;
1547        }
1548
1549      first_iter = 0;
1550    }
1551
1552#if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1553  /*
1554    Ignore return value of sigaction(), there's nothing we can do in
1555    the case of error.
1556  */
1557  sigaction(SIGPIPE, &orig, NULL);
1558#endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1559
1560  return MEMCACHED_SUCCESS;
1561}
1562
1563
1564/* Is the following required for any platform?  */
1565#if (! defined(IPPROTO_TCP) && defined(SOL_TCP))
1566#define IPPROTO_TCP  SOL_TCP
1567#endif
1568
1569
1570static inline
1571void
1572tcp_optimize_latency(struct command_state *state)
1573{
1574#ifdef TCP_NODELAY
1575  if (state->socket_mode == TCP_THROUGHPUT)
1576    {
1577      static const int enable = 1;
1578      setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1579                 (void *) &enable, sizeof(enable));
1580      state->socket_mode = TCP_LATENCY;
1581    }
1582#endif /* TCP_NODELAY */
1583}
1584
1585
1586static inline
1587void
1588tcp_optimize_throughput(struct command_state *state)
1589{
1590#ifdef TCP_NODELAY
1591  if (state->socket_mode == TCP_LATENCY)
1592    {
1593      static const int disable = 0;
1594      setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1595                 (void *) &disable, sizeof(disable));
1596      state->socket_mode = TCP_THROUGHPUT;
1597    }
1598#endif /* TCP_NODELAY */
1599}
1600
1601
1602static
1603int
1604get_server_fd(struct client *c, struct server *s)
1605{
1606  struct command_state *state;
1607
1608  /*
1609    Do not try to try reconnect if had max_failures and
1610    failure_expires time is not reached yet.
1611  */
1612  if (c->max_failures > 0 && s->failure_count >= c->max_failures)
1613    {
1614      if (time(NULL) <= s->failure_expires)
1615        return -1;
1616      else
1617        s->failure_count = 0;
1618    }
1619
1620  state = &s->cmd_state;
1621  if (state->fd == -1)
1622    {
1623      if (s->port)
1624        {
1625          state->fd = client_connect_inet(s->host, s->port,
1626                                          c->connect_timeout);
1627          /* This is to trigger actual reset.  */
1628          state->socket_mode = TCP_THROUGHPUT;
1629          if (state->fd != -1)
1630            tcp_optimize_latency(state);
1631        }
1632      else
1633        {
1634          state->fd = client_connect_unix(s->host, s->host_len);
1635          state->socket_mode = NOT_TCP;
1636        }
1637    }
1638
1639  if (state->fd == -1)
1640    client_mark_failed(c, s);
1641
1642  return state->fd;
1643}
1644
1645
1646static inline
1647void
1648iov_push(struct command_state *state, const void *buf, size_t buf_size)
1649{
1650  struct iovec *iov = array_end(state->iov_buf, struct iovec);
1651  iov->iov_base = (void *) buf;
1652  iov->iov_len = buf_size;
1653  array_push(state->iov_buf);
1654}
1655
1656
1657static
1658int
1659push_index(struct command_state *state, int index)
1660{
1661  struct index_node *node;
1662  struct client *c;
1663
1664  c = state->client;
1665  if (array_extend(c->index_list, struct index_node,
1666                   1, ARRAY_EXTEND_TWICE) == -1)
1667    return MEMCACHED_FAILURE;
1668
1669  if (state->index_tail != -1)
1670    array_elem(c->index_list, struct index_node, state->index_tail)->next =
1671      array_size(c->index_list);
1672  else
1673    state->index_head = array_size(c->index_list);
1674
1675  state->index_tail = array_size(c->index_list);
1676
1677  node = array_elem(c->index_list, struct index_node, state->index_tail);
1678  node->index = index;
1679  node->next = -1;
1680
1681  array_push(c->index_list);
1682
1683  return MEMCACHED_SUCCESS;
1684}
1685
1686
1687static
1688struct command_state *
1689init_state(struct command_state *state, int index, size_t request_size,
1690           size_t str_size, parse_reply_func parse_reply)
1691{
1692  if (! is_active(state))
1693    {
1694      if (state->client->noreply)
1695        {
1696          if (state->client->nowait || state->noreply)
1697            {
1698              parse_reply = NULL;
1699              tcp_optimize_throughput(state);
1700            }
1701
1702          state->last_cmd_noreply = state->noreply;
1703        }
1704      else
1705        {
1706          state->last_cmd_noreply = 0;
1707          tcp_optimize_latency(state);
1708        }
1709
1710      state->object = state->client->object;
1711      command_state_reset(state, (str_size > 0 ? request_size : 0),
1712                          parse_reply);
1713    }
1714
1715  if (array_extend(state->iov_buf, struct iovec,
1716                   request_size, ARRAY_EXTEND_EXACT) == -1)
1717    {
1718      deactivate(state);
1719      return NULL;
1720    }
1721
1722  if (str_size > 0
1723      && array_extend(state->client->str_buf, char,
1724                      str_size, ARRAY_EXTEND_TWICE) == -1)
1725    {
1726      deactivate(state);
1727      return NULL;
1728    }
1729
1730  if (push_index(state, index) != MEMCACHED_SUCCESS)
1731    {
1732      deactivate(state);
1733      return NULL;
1734    }
1735
1736  if (state->parse_reply)
1737    ++state->reply_count;
1738  else if (! state->last_cmd_noreply)
1739    ++state->nowait_count;
1740
1741  return state;
1742}
1743
1744
1745static
1746struct command_state *
1747get_state(struct client *c, int index, const char *key, size_t key_len,
1748          size_t request_size, size_t str_size,
1749          parse_reply_func parse_reply)
1750{
1751  struct server *s;
1752  int server_index, fd;
1753
1754  server_index = dispatch_key(&c->dispatch, key, key_len);
1755  if (server_index == -1)
1756    return NULL;
1757
1758  s = array_elem(c->servers, struct server, server_index);
1759
1760  fd = get_server_fd(c, s);
1761  if (fd == -1)
1762    return NULL;
1763
1764  return init_state(&s->cmd_state, index, request_size, str_size,
1765                    parse_reply);
1766}
1767
1768
1769static inline
1770const char *
1771get_noreply(struct command_state *state)
1772{
1773  if (state->noreply && state->client->noreply)
1774    return " " NOREPLY;
1775  else
1776    return "";
1777}
1778
1779
1780inline
1781void
1782client_reset(struct client *c, struct result_object *o, int noreply)
1783{
1784  array_clear(c->index_list);
1785  array_clear(c->str_buf);
1786
1787  ++c->generation;
1788  c->object = o;
1789  c->noreply = noreply;
1790}
1791
1792
1793#define STR_WITH_LEN(str) (str), (sizeof(str) - 1)
1794
1795
1796int
1797client_prepare_set(struct client *c, enum set_cmd_e cmd, int key_index,
1798                   const char *key, size_t key_len,
1799                   flags_type flags, exptime_type exptime,
1800                   const void *value, value_size_type value_size)
1801{
1802  static const size_t request_size = 6;
1803  static const size_t str_size =
1804    sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1805           " " NOREPLY "\r\n");
1806
1807  struct command_state *state;
1808
1809  state = get_state(c, key_index, key, key_len, request_size, str_size,
1810                    parse_set_reply);
1811  if (! state)
1812    return MEMCACHED_FAILURE;
1813
1814  ++state->key_count;
1815
1816  switch (cmd)
1817    {
1818    case CMD_SET:
1819      iov_push(state, STR_WITH_LEN("set"));
1820      break;
1821
1822    case CMD_ADD:
1823      iov_push(state, STR_WITH_LEN("add"));
1824      break;
1825
1826    case CMD_REPLACE:
1827      iov_push(state, STR_WITH_LEN("replace"));
1828      break;
1829
1830    case CMD_APPEND:
1831      iov_push(state, STR_WITH_LEN("append"));
1832      break;
1833
1834    case CMD_PREPEND:
1835      iov_push(state, STR_WITH_LEN("prepend"));
1836      break;
1837
1838    case CMD_CAS:
1839      /* This can't happen.  */
1840      return MEMCACHED_FAILURE;
1841    }
1842  iov_push(state, c->prefix, c->prefix_len);
1843  iov_push(state, key, key_len);
1844
1845  {
1846    char *buf = array_end(c->str_buf, char);
1847    size_t str_size =
1848      sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE "%s\r\n",
1849              flags, exptime, value_size, get_noreply(state));
1850    iov_push(state, (void *) (long) array_size(c->str_buf), str_size);
1851    array_append(c->str_buf, str_size);
1852  }
1853
1854  iov_push(state, value, value_size);
1855  iov_push(state, STR_WITH_LEN("\r\n"));
1856
1857  return MEMCACHED_SUCCESS;
1858}
1859
1860
1861int
1862client_prepare_cas(struct client *c, int key_index,
1863                   const char *key, size_t key_len,
1864                   cas_type cas, flags_type flags, exptime_type exptime,
1865                   const void *value, value_size_type value_size)
1866{
1867  static const size_t request_size = 6;
1868  static const size_t str_size =
1869    sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1870           " " CAS_STUB " " NOREPLY "\r\n");
1871
1872  struct command_state *state;
1873
1874  state = get_state(c, key_index, key, key_len, request_size, str_size,
1875                    parse_set_reply);
1876  if (! state)
1877    return MEMCACHED_FAILURE;
1878
1879  ++state->key_count;
1880
1881  iov_push(state, STR_WITH_LEN("cas"));
1882  iov_push(state, c->prefix, c->prefix_len);
1883  iov_push(state, key, key_len);
1884
1885  {
1886    char *buf = array_end(c->str_buf, char);
1887    size_t str_size =
1888      sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE
1889              " " FMT_CAS "%s\r\n", flags, exptime, value_size, cas,
1890              get_noreply(state));
1891    iov_push(state, (void *) (long) array_size(c->str_buf), str_size);
1892    array_append(c->str_buf, str_size);
1893  }
1894
1895  iov_push(state, value, value_size);
1896  iov_push(state, STR_WITH_LEN("\r\n"));
1897
1898  return MEMCACHED_SUCCESS;
1899}
1900
1901
1902int
1903client_prepare_get(struct client *c, enum get_cmd_e cmd, int key_index,
1904                   const char *key, size_t key_len)
1905{
1906  static const size_t request_size = 4;
1907
1908  struct command_state *state;
1909
1910  state = get_state(c, key_index, key, key_len, request_size, 0,
1911                    parse_get_reply);
1912  if (! state)
1913    return MEMCACHED_FAILURE;
1914
1915  ++state->key_count;
1916
1917  if (! array_empty(state->iov_buf))
1918    {
1919      /* Pop off trailing \r\n because we are about to add another key.  */
1920      array_pop(state->iov_buf);
1921
1922      /* get can't be in noreply mode, so reply_count is positive.  */
1923      --state->reply_count;
1924    }
1925  else
1926    {
1927      switch (cmd)
1928        {
1929        case CMD_GET:
1930          state->u.value.meta.use_cas = 0;
1931          iov_push(state, STR_WITH_LEN("get"));
1932          break;
1933
1934        case CMD_GETS:
1935          state->u.value.meta.use_cas = 1;
1936          iov_push(state, STR_WITH_LEN("gets"));
1937          break;
1938        }
1939    }
1940
1941  iov_push(state, c->prefix, c->prefix_len);
1942  iov_push(state, key, key_len);
1943  iov_push(state, STR_WITH_LEN("\r\n"));
1944
1945  return MEMCACHED_SUCCESS;
1946}
1947
1948
1949int
1950client_prepare_incr(struct client *c, enum arith_cmd_e cmd, int key_index,
1951                    const char *key, size_t key_len, arith_type arg)
1952{
1953  static const size_t request_size = 4;
1954  static const size_t str_size = sizeof(" " ARITH_STUB " " NOREPLY "\r\n");
1955
1956  struct command_state *state;
1957
1958  state = get_state(c, key_index, key, key_len, request_size, str_size,
1959                    parse_arith_reply);
1960  if (! state)
1961    return MEMCACHED_FAILURE;
1962
1963  ++state->key_count;
1964
1965  switch (cmd)
1966    {
1967    case CMD_INCR:
1968      iov_push(state, STR_WITH_LEN("incr"));
1969      break;
1970
1971    case CMD_DECR:
1972      iov_push(state, STR_WITH_LEN("decr"));
1973      break;
1974    }
1975  iov_push(state, c->prefix, c->prefix_len);
1976  iov_push(state, key, key_len);
1977
1978  {
1979    char *buf = array_end(c->str_buf, char);
1980    size_t str_size =
1981      sprintf(buf, " " FMT_ARITH "%s\r\n", arg, get_noreply(state));
1982    iov_push(state, (void *) (long) array_size(c->str_buf), str_size);
1983    array_append(c->str_buf, str_size);
1984  }
1985
1986  return MEMCACHED_SUCCESS;
1987}
1988
1989
1990int
1991client_prepare_delete(struct client *c, int key_index,
1992                      const char *key, size_t key_len)
1993{
1994  static const size_t request_size = 4;
1995  static const size_t str_size = sizeof(" " NOREPLY "\r\n");
1996
1997  struct command_state *state;
1998
1999  state = get_state(c, key_index, key, key_len, request_size, str_size,
2000                    parse_delete_reply);
2001  if (! state)
2002    return MEMCACHED_FAILURE;
2003
2004  ++state->key_count;
2005
2006  iov_push(state, STR_WITH_LEN("delete"));
2007  iov_push(state, c->prefix, c->prefix_len);
2008  iov_push(state, key, key_len);
2009
2010  {
2011    char *buf = array_end(c->str_buf, char);
2012    size_t str_size = sprintf(buf, "%s\r\n", get_noreply(state));
2013    iov_push(state, (void *) (long) array_size(c->str_buf), str_size);
2014    array_append(c->str_buf, str_size);
2015  }
2016
2017  return MEMCACHED_SUCCESS;
2018}
2019
2020
2021int
2022client_flush_all(struct client *c, delay_type delay,
2023                 struct result_object *o, int noreply)
2024{
2025  static const size_t request_size = 1;
2026  static const size_t str_size =
2027    sizeof("flush_all " DELAY_STUB " " NOREPLY "\r\n");
2028
2029  struct server *s;
2030  double ddelay = delay, delay_step = 0.0;
2031  int i;
2032
2033  client_reset(c, o, noreply);
2034
2035  if (array_size(c->servers) > 1)
2036    delay_step = ddelay / (array_size(c->servers) - 1);
2037  ddelay += delay_step;
2038
2039  for (i = 0, array_each(c->servers, struct server, s), ++i)
2040    {
2041      struct command_state *state;
2042      int fd;
2043
2044      ddelay -= delay_step;
2045
2046      fd = get_server_fd(c, s);
2047      if (fd == -1)
2048        continue;
2049
2050      state = init_state(&s->cmd_state, i, request_size, str_size,
2051                         parse_ok_reply);
2052      if (! state)
2053        continue;
2054
2055      {
2056        char *buf = array_end(c->str_buf, char);
2057        size_t str_size =
2058          sprintf(buf, "flush_all " FMT_DELAY "%s\r\n",
2059                  (delay_type) (ddelay + 0.5), get_noreply(state));
2060        iov_push(state, (void *) (long) array_size(c->str_buf), str_size);
2061        array_append(c->str_buf, str_size);
2062      }
2063    }
2064
2065  return client_execute(c);
2066}
2067
2068
2069int
2070client_nowait_push(struct client *c)
2071{
2072  struct server *s;
2073
2074  if (! c->nowait)
2075    return MEMCACHED_SUCCESS;
2076
2077  client_reset(c, NULL, 0);
2078
2079  for (array_each(c->servers, struct server, s))
2080    {
2081      struct command_state *state;
2082      int fd;
2083
2084      state = &s->cmd_state;
2085      if (state->nowait_count == 0)
2086        continue;
2087
2088      fd = get_server_fd(c, s);
2089      if (fd == -1)
2090        continue;
2091
2092      /*
2093        In order to wait the final pending reply we pretend that one
2094        command was never a nowait command, and set parse function to
2095        parse_nowait_reply.
2096      */
2097      --state->nowait_count;
2098      command_state_reset(state, 0, parse_nowait_reply);
2099      tcp_optimize_latency(state);
2100      ++state->reply_count;
2101    }
2102
2103  return client_execute(c);
2104}
2105
2106
2107int
2108client_server_versions(struct client *c, struct result_object *o)
2109{
2110  static const size_t request_size = 1;
2111
2112  struct server *s;
2113  int i;
2114
2115  client_reset(c, o, 0);
2116
2117  for (i = 0, array_each(c->servers, struct server, s), ++i)
2118    {
2119      struct command_state *state;
2120      int fd;
2121
2122      fd = get_server_fd(c, s);
2123      if (fd == -1)
2124        continue;
2125
2126      state = init_state(&s->cmd_state, i, request_size, 0,
2127                         parse_version_reply);
2128      if (! state)
2129        continue;
2130
2131      iov_push(state, STR_WITH_LEN("version\r\n"));
2132    }
2133
2134  return client_execute(c);
2135}
2136
2137
2138/*
2139  When noreply mode is enabled the client may send the last noreply
2140  request and close the connection.  The server will see that the
2141  connection is closed, and will discard all previously read data
2142  without processing it.  To avoid this, we send "version" command and
2143  wait for the reply (discarding it).
2144*/
2145static
2146int
2147client_noreply_push(struct client *c)
2148{
2149  static const size_t request_size = 1;
2150
2151  struct server *s;
2152  int i;
2153
2154  client_reset(c, NULL, 0);
2155
2156  for (i = 0, array_each(c->servers, struct server, s), ++i)
2157    {
2158      struct command_state *state = &s->cmd_state;
2159      int fd;
2160
2161      if (! state->last_cmd_noreply)
2162        continue;
2163
2164      fd = get_server_fd(c, s);
2165      if (fd == -1)
2166        continue;
2167
2168      state = init_state(state, i, request_size, 0, parse_nowait_reply);
2169      if (! state)
2170        continue;
2171
2172      iov_push(state, STR_WITH_LEN("version\r\n"));
2173    }
2174
2175  return client_execute(c);
2176}
Note: See TracBrowser for help on using the repository browser.