197 lines
6.9 KiB
Python
197 lines
6.9 KiB
Python
|
|
import json
|
||
|
|
import time
|
||
|
|
import unittest
|
||
|
|
|
||
|
|
import mock
|
||
|
|
|
||
|
|
try:
|
||
|
|
from queue import Queue
|
||
|
|
except ImportError:
|
||
|
|
from Queue import Queue
|
||
|
|
|
||
|
|
from posthog.consumer import MAX_MSG_SIZE, Consumer
|
||
|
|
from posthog.request import APIError
|
||
|
|
from posthog.test.test_utils import TEST_API_KEY
|
||
|
|
|
||
|
|
|
||
|
|
class TestConsumer(unittest.TestCase):
|
||
|
|
def test_next(self):
|
||
|
|
q = Queue()
|
||
|
|
consumer = Consumer(q, "")
|
||
|
|
q.put(1)
|
||
|
|
next = consumer.next()
|
||
|
|
self.assertEqual(next, [1])
|
||
|
|
|
||
|
|
def test_next_limit(self):
|
||
|
|
q = Queue()
|
||
|
|
flush_at = 50
|
||
|
|
consumer = Consumer(q, "", flush_at)
|
||
|
|
for i in range(10000):
|
||
|
|
q.put(i)
|
||
|
|
next = consumer.next()
|
||
|
|
self.assertEqual(next, list(range(flush_at)))
|
||
|
|
|
||
|
|
def test_dropping_oversize_msg(self):
|
||
|
|
q = Queue()
|
||
|
|
consumer = Consumer(q, "")
|
||
|
|
oversize_msg = {"m": "x" * MAX_MSG_SIZE}
|
||
|
|
q.put(oversize_msg)
|
||
|
|
next = consumer.next()
|
||
|
|
self.assertEqual(next, [])
|
||
|
|
self.assertTrue(q.empty())
|
||
|
|
|
||
|
|
def test_upload(self):
|
||
|
|
q = Queue()
|
||
|
|
consumer = Consumer(q, TEST_API_KEY)
|
||
|
|
track = {"type": "track", "event": "python event", "distinct_id": "distinct_id"}
|
||
|
|
q.put(track)
|
||
|
|
success = consumer.upload()
|
||
|
|
self.assertTrue(success)
|
||
|
|
|
||
|
|
def test_flush_interval(self):
|
||
|
|
# Put _n_ items in the queue, pausing a little bit more than
|
||
|
|
# _flush_interval_ after each one.
|
||
|
|
# The consumer should upload _n_ times.
|
||
|
|
q = Queue()
|
||
|
|
flush_interval = 0.3
|
||
|
|
consumer = Consumer(q, TEST_API_KEY, flush_at=10, flush_interval=flush_interval)
|
||
|
|
with mock.patch("posthog.consumer.batch_post") as mock_post:
|
||
|
|
consumer.start()
|
||
|
|
for i in range(0, 3):
|
||
|
|
track = {
|
||
|
|
"type": "track",
|
||
|
|
"event": "python event %d" % i,
|
||
|
|
"distinct_id": "distinct_id",
|
||
|
|
}
|
||
|
|
q.put(track)
|
||
|
|
time.sleep(flush_interval * 1.1)
|
||
|
|
self.assertEqual(mock_post.call_count, 3)
|
||
|
|
|
||
|
|
def test_multiple_uploads_per_interval(self):
|
||
|
|
# Put _flush_at*2_ items in the queue at once, then pause for
|
||
|
|
# _flush_interval_. The consumer should upload 2 times.
|
||
|
|
q = Queue()
|
||
|
|
flush_interval = 0.5
|
||
|
|
flush_at = 10
|
||
|
|
consumer = Consumer(
|
||
|
|
q, TEST_API_KEY, flush_at=flush_at, flush_interval=flush_interval
|
||
|
|
)
|
||
|
|
with mock.patch("posthog.consumer.batch_post") as mock_post:
|
||
|
|
consumer.start()
|
||
|
|
for i in range(0, flush_at * 2):
|
||
|
|
track = {
|
||
|
|
"type": "track",
|
||
|
|
"event": "python event %d" % i,
|
||
|
|
"distinct_id": "distinct_id",
|
||
|
|
}
|
||
|
|
q.put(track)
|
||
|
|
time.sleep(flush_interval * 1.1)
|
||
|
|
self.assertEqual(mock_post.call_count, 2)
|
||
|
|
|
||
|
|
def test_request(self):
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
track = {"type": "track", "event": "python event", "distinct_id": "distinct_id"}
|
||
|
|
consumer.request([track])
|
||
|
|
|
||
|
|
def _test_request_retry(self, consumer, expected_exception, exception_count):
|
||
|
|
def mock_post(*args, **kwargs):
|
||
|
|
mock_post.call_count += 1
|
||
|
|
if mock_post.call_count <= exception_count:
|
||
|
|
raise expected_exception
|
||
|
|
|
||
|
|
mock_post.call_count = 0
|
||
|
|
|
||
|
|
with mock.patch(
|
||
|
|
"posthog.consumer.batch_post", mock.Mock(side_effect=mock_post)
|
||
|
|
):
|
||
|
|
track = {
|
||
|
|
"type": "track",
|
||
|
|
"event": "python event",
|
||
|
|
"distinct_id": "distinct_id",
|
||
|
|
}
|
||
|
|
# request() should succeed if the number of exceptions raised is
|
||
|
|
# less than the retries paramater.
|
||
|
|
if exception_count <= consumer.retries:
|
||
|
|
consumer.request([track])
|
||
|
|
else:
|
||
|
|
# if exceptions are raised more times than the retries
|
||
|
|
# parameter, we expect the exception to be returned to
|
||
|
|
# the caller.
|
||
|
|
try:
|
||
|
|
consumer.request([track])
|
||
|
|
except type(expected_exception) as exc:
|
||
|
|
self.assertEqual(exc, expected_exception)
|
||
|
|
else:
|
||
|
|
self.fail(
|
||
|
|
"request() should raise an exception if still failing after %d retries"
|
||
|
|
% consumer.retries
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_request_retry(self):
|
||
|
|
# we should retry on general errors
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
self._test_request_retry(consumer, Exception("generic exception"), 2)
|
||
|
|
|
||
|
|
# we should retry on server errors
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
self._test_request_retry(consumer, APIError(500, "Internal Server Error"), 2)
|
||
|
|
|
||
|
|
# we should retry on HTTP 429 errors
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
self._test_request_retry(consumer, APIError(429, "Too Many Requests"), 2)
|
||
|
|
|
||
|
|
# we should NOT retry on other client errors
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
api_error = APIError(400, "Client Errors")
|
||
|
|
try:
|
||
|
|
self._test_request_retry(consumer, api_error, 1)
|
||
|
|
except APIError:
|
||
|
|
pass
|
||
|
|
else:
|
||
|
|
self.fail("request() should not retry on client errors")
|
||
|
|
|
||
|
|
# test for number of exceptions raise > retries value
|
||
|
|
consumer = Consumer(None, TEST_API_KEY, retries=3)
|
||
|
|
self._test_request_retry(consumer, APIError(500, "Internal Server Error"), 3)
|
||
|
|
|
||
|
|
def test_pause(self):
|
||
|
|
consumer = Consumer(None, TEST_API_KEY)
|
||
|
|
consumer.pause()
|
||
|
|
self.assertFalse(consumer.running)
|
||
|
|
|
||
|
|
def test_max_batch_size(self):
|
||
|
|
q = Queue()
|
||
|
|
consumer = Consumer(q, TEST_API_KEY, flush_at=100000, flush_interval=3)
|
||
|
|
properties = {}
|
||
|
|
for n in range(0, 500):
|
||
|
|
properties[str(n)] = "one_long_property_value_to_build_a_big_event"
|
||
|
|
track = {
|
||
|
|
"type": "track",
|
||
|
|
"event": "python event",
|
||
|
|
"distinct_id": "distinct_id",
|
||
|
|
"properties": properties,
|
||
|
|
}
|
||
|
|
msg_size = len(json.dumps(track).encode())
|
||
|
|
# Let's capture 8MB of data to trigger two batches
|
||
|
|
n_msgs = int(8_000_000 / msg_size)
|
||
|
|
|
||
|
|
def mock_post_fn(_, data, **kwargs):
|
||
|
|
res = mock.Mock()
|
||
|
|
res.status_code = 200
|
||
|
|
request_size = len(data.encode())
|
||
|
|
# Batches close after the first message bringing it bigger than BATCH_SIZE_LIMIT, let's add 10% of margin
|
||
|
|
self.assertTrue(
|
||
|
|
request_size < (5 * 1024 * 1024) * 1.1,
|
||
|
|
"batch size (%d) higher than limit" % request_size,
|
||
|
|
)
|
||
|
|
return res
|
||
|
|
|
||
|
|
with mock.patch(
|
||
|
|
"posthog.request._session.post", side_effect=mock_post_fn
|
||
|
|
) as mock_post:
|
||
|
|
consumer.start()
|
||
|
|
for _ in range(0, n_msgs + 2):
|
||
|
|
q.put(track)
|
||
|
|
q.join()
|
||
|
|
self.assertEqual(mock_post.call_count, 2)
|