Python and Pulsar
import pulsar
import time
from mytools import get_logger, topic
def receive():
logger = get_logger('pulsar')
logger.info('Consumer starting')
time.sleep(20)
logger.info('Consumer really starting')
try:
client = pulsar.Client('pulsar://my-pulsar:6650')
consumer = client.subscribe(topic, 'my-subscription')
except Exception:
logger.exception("Consumer could not connect to pulsar")
logger.info("Consumer connected")
while True:
msg = consumer.receive()
try:
logger.info("Received: {}: {}".format(msg.data(), msg.message_id()))
consumer.acknowledge(msg)
except Exception as err:
logger.error(f"Exception {err}")
receive()
version: '3.7'
services:
producer:
build: .
volumes:
- .:/opt
links:
- pulsar
#command: python producer.py
command: tail -f /dev/null
consumer:
build: .
volumes:
- .:/opt
links:
- pulsar
command: tail -f /dev/null
#command: python consumer.py
pulsar:
image: apachepulsar/pulsar:2.5.2
container_name: my-pulsar
expose:
- 8080
- 6650
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone"
FROM python:3.8
COPY requirements.txt /opt/
RUN pip3 install -r /opt/requirements.txt
WORKDIR /opt
First
Second
Third
Fourth
import logging
import os
def get_logger(name):
log_file = name + '.log'
log_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
sh.setFormatter( log_format )
logger.addHandler(sh)
#if os.path.exists(log_file):
# os.unlink(log_file)
fh = logging.FileHandler(log_file)
fh.setLevel(logging.INFO)
fh.setFormatter( log_format )
logger.addHandler(fh)
return logger
topic = 'text'
import pulsar
import time
from mytools import get_logger, topic
def send():
logger = get_logger('pulsar')
logger.info("Producer starting")
time.sleep(20)
logger.info("Producer really starting")
filename = 'input.txt'
try:
client = pulsar.Client('pulsar://my-pulsar:6650')
producer = client.create_producer(topic)
except Exception:
logger.exception("Producer could not connect to pulsar")
logger.info("Producer connected")
with open(filename) as fh:
for row in fh:
logger.info(f"Sending {row}")
producer.send(row.encode('utf-8'))
time.sleep(1)
send()
pulsar-client
Run:
docker-compose up
and then check the pulsar.log file