class VirtualPaymentsBase(object): def __init__(self, autotest_cfg): self.autotest_cfg = autotest_cfg self.table_name = "virtual_payments" self.db_records = [] self.rabbit_srv = None self.snmp_agent = None self.con = None self.cart_consumer = None self.pub = None self.test_env = None self.sent_cart_records = [] self.sent_hrs_records = [] self.sent_brt_records = [] self.sent_bfam_records = [] self.cart_consumer = None self.hrs_consumer = None self.brt_consumer = None self.bfam_consumer = None self.test_clnt_id = random.randint(1, 100000000) def test_name(self): raise NotImplementedError def publish_records(self): raise NotImplementedError def check_db_records(self): raise NotImplementedError def check_sent_cart_records(self): utility.check_number_of_records(self.sent_cart_records, 0) def expect_cart_records(self): return 0 def check_sent_hrs_records(self): utility.check_number_of_records(self.sent_hrs_records, 0) def expect_hrs_records(self): return 0 def check_sent_brt_records(self): raise NotImplementedError def expect_brt_records(self): raise NotImplementedError def check_sent_bfam_records(self): raise NotImplementedError def expect_bfam_records(self): raise NotImplementedError def db_records_has_been_fetched(self, db_records): return True if len(db_records) > 0 else False def prepare_db(self): raise NotImplementedError def on_finish(self): pass @utility.log_run def run_in_test_env(self, test_env): self.snmp_agent = test_env.snmp_agent self.con = test_env.con self.test_env = test_env self.pub = test_env.pub self.cart_consumer = test_env.cart_consumer self.hrs_consumer = test_env.hrs_consumer self.brt_consumer = test_env.brt_consumer self.bfam_consumer = test_env.bfam_consumer self.prepare_db() self.publish_records() self.db_records = fetch_table_records(partial(db_functions.fetch_virtual_payments, clnt_id=self.test_clnt_id), self.con, self.db_records_has_been_fetched) logging.info("checking db records") self.check_db_records() logging.info("checking cart records") self.sent_cart_records = self.cart_consumer.get_records(10, self.expect_cart_records()) self.check_sent_cart_records() logging.info("checking brt records") self.sent_brt_records = self.brt_consumer.get_records(10, self.expect_brt_records()) self.check_sent_brt_records() logging.info("checking hrs records") self.sent_hrs_records = self.hrs_consumer.get_records(10, self.expect_hrs_records()) self.check_sent_hrs_records() logging.info("checking bfam records") self.sent_bfam_records = self.bfam_consumer.get_records(10, self.expect_bfam_records()) self.check_sent_bfam_records() self.on_finish() logging.info("done") class VirtualPaymentsWithShard(VirtualPaymentsBase): def __init__(self, autotest_cfg): VirtualPaymentsBase.__init__(self, autotest_cfg) self.routing_key = "ps.ocsdb_tevt.virtual_payments.100" self.brt_routing_key = "ps.ocsdb.virtual_payments" self.bfam_routing_key = "ps.ocsdb_bfam.confirm_virt" def test_name(self): return "BisrtAddon.VirtualPaymentsWithShard" def prepare_db(self): cur = self.con.cursor() cur.execute("delete from virtual_payments t " "where t.clnt_clnt_id = {clnt_id}".format(clnt_id=self.test_clnt_id)) self.con.commit() def publish_records(self): record = { 'last_record' : 1, 'virt_id' : self.test_clnt_id, 'vrtp_vrtp_id' : 1, 'clnt_clnt_id' : self.test_clnt_id, 'amount_r' : 123.4, 'exp_date' : '20900102', 'virtual_date' : '20690203', 'amount_' : 12.3, 'vrnt_vrnt_id' : 2, 'vrct_vrct_id' : 3, 'start_date' : '20160203', 'end_date' : '20890405', 'navi_date' : '20170405', } message_str = json.dumps([record], indent=4) logging.info(message_str) self.pub.publish(self.routing_key, message_str) def check_db_records(self): utility.check_number_of_records(self.db_records, 1) expected_recs = [(self.test_clnt_id, 1, self.test_clnt_id, 123.4, datetime(2090, 1, 2), datetime(2069, 2, 3), 12.3, None, 2, None, None, None, None, None, 3, datetime(2016, 2, 3), datetime(2089, 4, 5), datetime(2017, 4, 5), None, None, None, None, None, None, )] compare_db_records(self.db_records, expected_recs) def expect_brt_records(self): return 1 def check_sent_brt_records(self): utility.check_number_of_records(self.sent_brt_records, 1) a_message = self.sent_brt_records[0] check_message_routing_key(a_message, self.brt_routing_key) check_message_header_type(a_message, self.brt_routing_key) a_record = a_message['record'] check_amqp_field(a_record, 'clnt_id', self.test_clnt_id) check_amqp_field(a_record, 'virt_id', self.test_clnt_id) check_amqp_field(a_record, 'vrtp_id', 1) check_amqp_field(a_record, 'vrct_id', 3) check_amqp_field_not_present(a_record, 'bltp_id') check_amqp_field_not_present(a_record, 'chrg_id') check_amqp_field(a_record, 'amount', 12.3) check_amqp_field(a_record, 'amount_r', 123.4) check_amqp_field(a_record, "start_date", '2016-02-03') check_amqp_field(a_record, "end_date", '2089-04-05') check_amqp_field(a_record, "deleted", False) def expect_bfam_records(self): return 1 def check_sent_bfam_records(self): utility.check_number_of_records(self.sent_bfam_records, 1) a_message = self.sent_bfam_records[0] check_message_routing_key(a_message, self.bfam_routing_key) check_message_header_type(a_message, self.bfam_routing_key) a_record = a_message['record'] utility.check_amqp_field(a_record, 'virt_id', self.test_clnt_id)