mirror of
https://github.com/basecamp/once-campfire.git
synced 2026-02-21 12:10:34 +09:00
57 lines
1.9 KiB
Ruby
57 lines
1.9 KiB
Ruby
# This is in lib so we can use it in a thread pool without the Rails executor
|
|
class WebPush::Pool
|
|
attr_reader :delivery_pool, :invalidation_pool, :connection, :invalid_subscription_handler
|
|
|
|
def initialize(invalid_subscription_handler:)
|
|
@delivery_pool = Concurrent::ThreadPoolExecutor.new(max_threads: 50, queue_size: 10000)
|
|
@invalidation_pool = Concurrent::FixedThreadPool.new(1)
|
|
@connection = Net::HTTP::Persistent.new(name: "web_push", pool_size: 150)
|
|
@invalid_subscription_handler = invalid_subscription_handler
|
|
end
|
|
|
|
def queue(payload, subscriptions)
|
|
subscriptions.find_each do |subscription|
|
|
deliver_later(payload, subscription)
|
|
end
|
|
end
|
|
|
|
def shutdown
|
|
connection.shutdown
|
|
shutdown_pool(delivery_pool)
|
|
shutdown_pool(invalidation_pool)
|
|
end
|
|
|
|
private
|
|
def deliver_later(payload, subscription)
|
|
# Ensure any AR operations happen before we post to the thread pool
|
|
notification = subscription.notification(**payload)
|
|
subscription_id = subscription.id
|
|
|
|
delivery_pool.post do
|
|
deliver(notification, subscription_id)
|
|
rescue Exception => e
|
|
Rails.logger.error "Error in WebPush::Pool.deliver: #{e.class} #{e.message}"
|
|
end
|
|
rescue Concurrent::RejectedExecutionError
|
|
end
|
|
|
|
def deliver(notification, id)
|
|
notification.deliver(connection: connection)
|
|
rescue WebPush::ExpiredSubscription, OpenSSL::OpenSSLError => ex
|
|
invalidate_subscription_later(id) if invalid_subscription_handler
|
|
end
|
|
|
|
def invalidate_subscription_later(id)
|
|
invalidation_pool.post do
|
|
invalid_subscription_handler.call(id)
|
|
rescue Exception => e
|
|
Rails.logger.error "Error in WebPush::Pool.invalid_subscription_handler: #{e.class} #{e.message}"
|
|
end
|
|
end
|
|
|
|
def shutdown_pool(pool)
|
|
pool.shutdown
|
|
pool.kill unless pool.wait_for_termination(1)
|
|
end
|
|
end
|