2017-08-30

BigObject Python Demo of RabbitMQ Topics Pub/Sub mode

BigObject Python Demo of RabbitMQ Topics Pub/Sub mode



RabbitMQ Topics mode:


Publishers can publish messages with different topics to MQ. A subscriber can subscribe multiple topics of messages from MQ.

The bo_emit_stooq.py program sends all instruments from BigObject, and the bo_receive_stooq.py program only receives messages with 'ibm.us' topic.


Environment
Python

Install and configure components

  1. Install the package:
  2. # apt-get install rabbitmq-server
  1. Add the remoteguest user:
  2. # sudo rabbitmqctl add_user remoteguest remoteguest

  1. Permit configuration, write, and read access for the remoteguest user:
  2. # sudo rabbitmqctl set_permissions remoteguest ".*" ".*" ".*"

# local connection can use the user guest/guest
# but a remote connection needs to add a new user remoteguest/remoteguest

#**************************
# bo_receive_stooq.py
#**************************
#!/usr/bin/env python
import pika
import sys

#localhost
#connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost

#remote server
parameters = pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key='ibm.us')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()




#***********************
# bo_emit_stooq.py
#***********************

#!/usr/bin/env python
import datetime
import time
import mysql.connector
import random
import pickle
import os
from time import sleep

import pika
import sys

#localhost
#connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#localhost

#remote server
parameters = pika.URLParameters("amqp://remoteguest:remoteguest@192.168.1.163/%2f")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

cnx = mysql.connector.connect(user='scott', password='tiger',host='192.168.1.163')
cursor = cnx.cursor()

sql="select rowid() as rowid,Date,Inst,Open,High,Low,Close from stooqseq"
cursor.execute(sql)
# Fetch a single row using fetchone() method.
#data = cursor.fetchone()
cnt=0
for (rowid,Date,Inst,Open,High,Low,Close) in cursor:
if cnt%10==0:
print rowid,Date,Inst,Open,High,Low,Close
routing_key = Inst.encode('ascii')
message = str(rowid)+','+ Date+','+Inst+','+str(Open)+','+str(High)+','+str(Low)+','+str(Close)
message=message.encode('ascii')
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
#print(" [x] Sent %r:%r" % (routing_key, message))
cnt=cnt+1
#sleep(0.1)

# Make sure data is committed to the database
cnx.commit()

cursor.close()
cnx.close()




connection.close()

沒有留言:

張貼留言