2017-08-29

BigObject Python Demo of RabbitMQ RPC client-server mode



BigObject-Performance with 100x Speedup


RabbitMQ rpc mode:

Use the RabbitMQ between client AP and BigObject

BOrpc_client -->Request--> rpc_queue-->BOrpc_server-->odbc-->BO
BOrpc_client <--Response<-- callback queue<-- BOrpc_server<--odbc<--BO


Users can write a python program to send SQL commands to our BigObject thru a RabbitMQ and get a response back. The BOrpc_client.py demo program sends a “select top 10 * from t1” request to the rpc_queue first and then sends “fetchall” and “fetchone” requests to read the result rows back.

The BOrpc_server.py program receives a request from the rpc_queue and then sends it to BO. It also processes the “fetchall” and “fetchone” requests from rpc_queue to send back the BO query rows to the callback queue.


Environment
Python

Install and configure components

  1. Install the package:
  2. # apt-get install rabbitmq-server
  1. Add the remoteguest user:
  2. # sudo rabbitmqctl add_user remoteguest remoteguest

  1. Permit configuration, write, and read access for the remoteguest user:
  2. # sudo rabbitmqctl set_permissions remoteguest ".*" ".*" ".*"

# local connection can use guest/guest
# but a remote connection needs to add a new user remoteguest/remoteguest

#*********************
#borpc_server.py
#*********************

import datetime
import time
import mysql.connector
import random

import pika

cnx = mysql.connector.connect(user='scott', password='tiger',host='192.168.1.163')
cursor = cnx.cursor(buffered=True)

#localhost
#connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost

#remote server
parameters = pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

#parameters = pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
#connection = pika.BlockingConnection(parameters)
#channel = connection.channel()

channel.queue_declare(queue='rpc_queue')



querycount=0
def fetchone(linedata):
r=''
if linedata is not None:
for field in linedata:
r=r+field+'\t'
return r

def fetchall(data):
r=''
if data is not None:
for row in data:
r=r+fetchone(row)+'\n'
return r
def on_request(ch, method, props, body):
global cursor,querycount,cnx

print(" [cmd] %s" % body)

try:
if body=='fetchone':
data=cursor.fetchone()
response=fetchone(data)
elif body=='fetchall':
data=cursor.fetchall()
response=fetchall(data)
else:
cursor.execute(body)
response="OK"
except mysql.connector.Error as err:
response="Error: %s" % err

querycount=querycount+1

ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=response)
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()


#*********************
#borpc_client.py
#*********************

import datetime
import time
import mysql.connector
import random

import pika
import uuid

class BORpcClient(object):
def __init__(self):
#localhost
#self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost
#remote server
parameters = pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()


result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, cmd):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=cmd)
while self.response is None:
self.connection.process_data_events()
return self.response

BO_rpc = BORpcClient()
cmd="select top 10 * from t1"

print(" [x] Requesting BO: %s" % cmd)
response = BO_rpc.call(cmd)
print(" [.] Response: %s" % response)
response = BO_rpc.call('fetchall')
print(" [.] Response: %s" % response)

for i in range(1):
response = BO_rpc.call('fetchone')
#response = BO_rpc.call(cmd)
print(" [.] Response: %s" % response)



No comments:

Post a Comment