Python and Pulsar
examples/python-pulsar-cli/consumer.py
import pulsar import time from mytools import get_logger, topic def receive(): logger = get_logger('pulsar') logger.info('Consumer starting') time.sleep(20) logger.info('Consumer really starting') try: client = pulsar.Client('pulsar://my-pulsar:6650') consumer = client.subscribe(topic, 'my-subscription') except Exception: logger.exception("Consumer could not connect to pulsar") logger.info("Consumer connected") while True: msg = consumer.receive() try: logger.info("Received: {}: {}".format(msg.data(), msg.message_id())) consumer.acknowledge(msg) except Exception as err: logger.error(f"Exception {err}") receive()
examples/python-pulsar-cli/docker-compose.yml
version: '3.7' services: producer: build: . volumes: - .:/opt links: - pulsar #command: python producer.py command: tail -f /dev/null consumer: build: . volumes: - .:/opt links: - pulsar command: tail -f /dev/null #command: python consumer.py pulsar: image: apachepulsar/pulsar:2.5.2 container_name: my-pulsar expose: - 8080 - 6650 command: > /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
examples/python-pulsar-cli/Dockerfile
FROM python:3.8 COPY requirements.txt /opt/ RUN pip3 install -r /opt/requirements.txt WORKDIR /opt
examples/python-pulsar-cli/input.txt
First Second Third Fourth
examples/python-pulsar-cli/mytools.py
import logging import os def get_logger(name): log_file = name + '.log' log_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) sh = logging.StreamHandler() sh.setLevel(logging.INFO) sh.setFormatter( log_format ) logger.addHandler(sh) #if os.path.exists(log_file): # os.unlink(log_file) fh = logging.FileHandler(log_file) fh.setLevel(logging.INFO) fh.setFormatter( log_format ) logger.addHandler(fh) return logger topic = 'text'
examples/python-pulsar-cli/producer.py
import pulsar import time from mytools import get_logger, topic def send(): logger = get_logger('pulsar') logger.info("Producer starting") time.sleep(20) logger.info("Producer really starting") filename = 'input.txt' try: client = pulsar.Client('pulsar://my-pulsar:6650') producer = client.create_producer(topic) except Exception: logger.exception("Producer could not connect to pulsar") logger.info("Producer connected") with open(filename) as fh: for row in fh: logger.info(f"Sending {row}") producer.send(row.encode('utf-8')) time.sleep(1) send()
examples/python-pulsar-cli/requirements.txt
pulsar-client
Run:
docker-compose up
and then check the pulsar.log file