source: src/dispatch_key.c @ dbde51

Revision dbde51, 8.3 KB checked in by Tomash Brechko <tomash.brechko@…>, 3 years ago (diff)

Better distribution of ketama points.

Feed previous point instead of sequential index.

  • Property mode set to 100644
Line 
1/*
2  Copyright (C) 2007-2009 Tomash Brechko.  All rights reserved.
3
4  When used to build Perl module:
5
6  This library is free software; you can redistribute it and/or modify
7  it under the same terms as Perl itself, either Perl version 5.8.8
8  or, at your option, any later version of Perl 5 you may have
9  available.
10
11  When used as a standalone library:
12
13  This library is free software; you can redistribute it and/or modify
14  it under the terms of the GNU Lesser General Public License as
15  published by the Free Software Foundation; either version 2.1 of the
16  License, or (at your option) any later version.
17
18  This library is distributed in the hope that it will be useful, but
19  WITHOUT ANY WARRANTY; without even the implied warranty of
20  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21  Lesser General Public License for more details.
22*/
23
24#include "dispatch_key.h"
25#include "compute_crc32.h"
26#include <string.h>
27
28
29/*
30  Note on rounding: C89 (which we are trying to be compatible with)
31  doesn't have round-to-nearest function, only ceil() and floor(), so
32  we add 0.5 to doubles before casting them to integers (and the cast
33  always rounds toward zero).
34*/
35
36
37#define DISPATCH_MAX_POINT  0xffffffffU
38
39
40struct continuum_point
41{
42  unsigned int point;
43  int index;
44};
45
46
47static
48struct continuum_point *
49dispatch_find_bucket(struct dispatch_state *state, unsigned int point)
50{
51  struct continuum_point *beg, *end, *left, *right;
52
53  beg = left = array_beg(state->buckets, struct continuum_point);
54  end = right = array_end(state->buckets, struct continuum_point);
55
56  while (left < right)
57    {
58      struct continuum_point *middle = left + (right - left) / 2;
59      if (middle->point < point)
60        {
61          left = middle + 1;
62        }
63      else if (middle->point > point)
64        {
65          right = middle;
66        }
67      else
68        {
69          /* Find the first point for this value.  */
70          while (middle != beg && (middle - 1)->point == point)
71            --middle;
72
73          return middle;
74        }
75    }
76
77  /* Wrap around.  */
78  if (left == end)
79    left = beg;
80
81  return left;
82}
83
84
85static inline
86int
87compatible_add_server(struct dispatch_state *state, double weight, int index)
88{
89  /*
90    For compatibility with Cache::Memcached we put each server in a
91    continuum so that it occupies the space proportional to its
92    weight.  See the comment in compatible_get_server().
93  */
94  double scale;
95  struct continuum_point *p;
96
97  if (array_extend(state->buckets, struct continuum_point,
98                   1, ARRAY_EXTEND_EXACT) == -1)
99    return -1;
100
101  state->total_weight += weight;
102  scale = weight / state->total_weight;
103  /*
104    Note that during iterative scaling below the rounding error
105    accumulates.  However the offset to the smaller values is alright
106    as long as it is smaller than the interval length, which is big
107    enough for sane number of servers (thousands) and relative weight
108    ratios.
109  */
110  for (array_each(state->buckets, struct continuum_point, p))
111    p->point -= (double) p->point * scale;
112
113  /* Here p points to array_end().  */
114  p->point = DISPATCH_MAX_POINT;
115  p->index = index;
116  array_push(state->buckets);
117
118  ++state->server_count;
119
120  return 0;
121}
122
123
124static inline
125int
126compatible_get_server(struct dispatch_state *state,
127                      const char *key, size_t key_len)
128{
129  /*
130    For compatibility with Cache::Memcached we do the following: first
131    we compute 'hash' the same way the original module does.  Since
132    that module puts 'weight' copies of each server into buckets
133    array, our '(unsigned int) (state->total_weight + 0.5)' is equal
134    to the number of such buckets (0.5 is there for proper rounding).
135    Then we scale 'point' to the continuum, and since each server
136    occupies the space proportional to its weight, we get the same
137    server index.
138  */
139  struct continuum_point *p;
140  unsigned int crc32 = compute_crc32_add(state->prefix_hash, key, key_len);
141  unsigned int hash = (crc32 >> 16) & 0x00007fffU;
142  unsigned int point = hash % (unsigned int) (state->total_weight + 0.5);
143
144  point = (double) point / state->total_weight * DISPATCH_MAX_POINT + 0.5;
145  /*
146    Shift point one step forward to possibly get from the border point
147    which belongs to the previous bucket.
148  */
149  point += 1;
150
151  p = dispatch_find_bucket(state, point);
152  return p->index;
153}
154
155
156static inline
157int
158ketama_crc32_add_server(struct dispatch_state *state,
159                        const char *host, size_t host_len,
160                        const char *port, size_t port_len,
161                        double weight, int index)
162{
163  static const char delim = '\0';
164  unsigned int crc32, point;
165  int count, i;
166
167  count = state->ketama_points * weight + 0.5;
168
169  if (array_extend(state->buckets, struct continuum_point,
170                   count, ARRAY_EXTEND_EXACT) == -1)
171    return -1;
172
173  crc32 = compute_crc32(host, host_len);
174  crc32 = compute_crc32_add(crc32, &delim, 1);
175  crc32 = compute_crc32_add(crc32, port, port_len);
176  point = 0;
177
178  for (i = 0; i < count; ++i)
179    {
180      char buf[4];
181      struct continuum_point *p;
182
183      /*
184        We want the same result on all platforms, so we hardcode size
185        of int as 4 8-bit bytes.
186      */
187      buf[0] = point & 0xff;
188      buf[1] = (point >> 8) & 0xff;
189      buf[2] = (point >> 16) & 0xff;
190      buf[3] = (point >> 24) & 0xff;
191
192      point = compute_crc32_add(crc32, buf, 4);
193
194      if (! array_empty(state->buckets))
195        {
196          struct continuum_point *end =
197            array_end(state->buckets, struct continuum_point);
198
199          p = dispatch_find_bucket(state, point);
200
201          /* Check if we wrapped around but actually have new max point.  */
202          if (p == array_beg(state->buckets, struct continuum_point)
203              && point > p->point)
204            {
205              p = end;
206            }
207          else
208            {
209              /*
210                Even if there's a server for the same point already,
211                we have to add ours, because the first one may be
212                removed later.  But we add ours after the old servers
213                for not to change key distribution.
214              */
215              while (p != end && p->point == point)
216                ++p;
217
218              /* Move the tail one position forward.  */
219              if (p != end)
220                memmove(p + 1, p, (end - p) * sizeof(*p));
221            }
222        }
223      else
224        {
225          p = array_beg(state->buckets, struct continuum_point);
226        }
227
228      p->point = point;
229      p->index = index;
230      array_push(state->buckets);
231    }
232
233  ++state->server_count;
234
235  return 0;
236}
237
238
239static inline
240int
241ketama_crc32_get_server(struct dispatch_state *state,
242                        const char *key, size_t key_len)
243{
244  unsigned int point = compute_crc32_add(state->prefix_hash, key, key_len);
245  struct continuum_point *p = dispatch_find_bucket(state, point);
246  return p->index;
247}
248
249
250void
251dispatch_init(struct dispatch_state *state)
252{
253  array_init(&state->buckets);
254  state->total_weight = 0.0;
255  state->ketama_points = 0;
256  state->prefix_hash = 0x0U;
257  state->server_count = 0;
258}
259
260
261void
262dispatch_destroy(struct dispatch_state *state)
263{
264  array_destroy(&state->buckets);
265}
266
267
268void
269dispatch_set_ketama_points(struct dispatch_state *state, int ketama_points)
270{
271  state->ketama_points = ketama_points;
272}
273
274
275void
276dispatch_set_prefix(struct dispatch_state *state,
277                    const char *prefix, size_t prefix_len)
278{
279  state->prefix_hash = compute_crc32(prefix, prefix_len);
280}
281
282
283int
284dispatch_add_server(struct dispatch_state *state,
285                    const char *host, size_t host_len,
286                    const char *port, size_t port_len,
287                    double weight, int index)
288{
289  if (state->ketama_points > 0)
290    return ketama_crc32_add_server(state, host, host_len, port, port_len,
291                                   weight, index);
292  else
293    return compatible_add_server(state, weight, index);
294}
295
296
297int
298dispatch_key(struct dispatch_state *state, const char *key, size_t key_len)
299{
300  if (state->server_count == 0)
301    return -1;
302
303  if (state->server_count == 1)
304    {
305      struct continuum_point *p =
306        array_beg(state->buckets, struct continuum_point);
307      return p->index;
308    }
309  else
310    {
311      if (state->ketama_points > 0)
312        return ketama_crc32_get_server(state, key, key_len);
313      else
314        return compatible_get_server(state, key, key_len);
315    }
316}
Note: See TracBrowser for help on using the repository browser.