Files
once-campfire/lib/web_push/pool.rb
Kevin McConnell df76a227dc Hello world
First open source release of Campfire 🎉
2025-08-21 09:31:59 +01:00

57 lines
1.9 KiB
Ruby

# This is in lib so we can use it in a thread pool without the Rails executor
class WebPush::Pool
attr_reader :delivery_pool, :invalidation_pool, :connection, :invalid_subscription_handler
def initialize(invalid_subscription_handler:)
@delivery_pool = Concurrent::ThreadPoolExecutor.new(max_threads: 50, queue_size: 10000)
@invalidation_pool = Concurrent::FixedThreadPool.new(1)
@connection = Net::HTTP::Persistent.new(name: "web_push", pool_size: 150)
@invalid_subscription_handler = invalid_subscription_handler
end
def queue(payload, subscriptions)
subscriptions.find_each do |subscription|
deliver_later(payload, subscription)
end
end
def shutdown
connection.shutdown
shutdown_pool(delivery_pool)
shutdown_pool(invalidation_pool)
end
private
def deliver_later(payload, subscription)
# Ensure any AR operations happen before we post to the thread pool
notification = subscription.notification(**payload)
subscription_id = subscription.id
delivery_pool.post do
deliver(notification, subscription_id)
rescue Exception => e
Rails.logger.error "Error in WebPush::Pool.deliver: #{e.class} #{e.message}"
end
rescue Concurrent::RejectedExecutionError
end
def deliver(notification, id)
notification.deliver(connection: connection)
rescue WebPush::ExpiredSubscription, OpenSSL::OpenSSLError => ex
invalidate_subscription_later(id) if invalid_subscription_handler
end
def invalidate_subscription_later(id)
invalidation_pool.post do
invalid_subscription_handler.call(id)
rescue Exception => e
Rails.logger.error "Error in WebPush::Pool.invalid_subscription_handler: #{e.class} #{e.message}"
end
end
def shutdown_pool(pool)
pool.shutdown
pool.kill unless pool.wait_for_termination(1)
end
end