forked from OpenNeo/impress
Create new DTIRequests.load_many
helper, to make parallel requests
Just a wrapper for the barrier/semaphore thing we're doing constantly! I only applied it in the importer rake tasks for now. There's other call sites to move over to it, too!
This commit is contained in:
parent
7eb209e206
commit
1d4771ecc5
5 changed files with 31 additions and 22 deletions
|
@ -31,7 +31,7 @@ module Neopets::NCMall
|
||||||
]) do |response|
|
]) do |response|
|
||||||
if response.status != 200
|
if response.status != 200
|
||||||
raise ResponseNotOK.new(response.status),
|
raise ResponseNotOK.new(response.status),
|
||||||
"expected status 200 but got #{response.status} (#{url})"
|
"expected status 200 but got #{response.status} (#{ROOT_DOCUMENT_URL})"
|
||||||
end
|
end
|
||||||
|
|
||||||
response.read
|
response.read
|
||||||
|
|
|
@ -16,13 +16,15 @@
|
||||||
# end
|
# end
|
||||||
|
|
||||||
ActiveSupport::Inflector.inflections(:en) do |inflect|
|
ActiveSupport::Inflector.inflections(:en) do |inflect|
|
||||||
# Teach Zeitwerk that `RocketAMF` is what to expect in `lib/rocketamf`.
|
# `lib/rocketamf` => `RocketAMF`
|
||||||
inflect.acronym "RocketAMF"
|
inflect.acronym "RocketAMF"
|
||||||
|
|
||||||
# Teach Zeitwerk that `NeoPass` is what to expect in `neopass.rb`.
|
# `neopass.rb` => `NeoPass`
|
||||||
inflect.acronym "NeoPass"
|
inflect.acronym "NeoPass"
|
||||||
|
|
||||||
# Teach Zeitwerk that "NCMall" is what to expect in `nc_mall.rb`.
|
# `nc_mall.rb` => `NCMall`
|
||||||
# (We do this by teaching it the word "NC".)
|
|
||||||
inflect.acronym "NC"
|
inflect.acronym "NC"
|
||||||
|
|
||||||
|
# `dti_requests.rb` => `DTIRequests`
|
||||||
|
inflect.acronym "DTI"
|
||||||
end
|
end
|
||||||
|
|
19
lib/dti_requests.rb
Normal file
19
lib/dti_requests.rb
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
require "async"
|
||||||
|
require "async/barrier"
|
||||||
|
|
||||||
|
module DTIRequests
|
||||||
|
class << self
|
||||||
|
def load_many(max_at_once: 10)
|
||||||
|
barrier = Async::Barrier.new
|
||||||
|
semaphore = Async::Semaphore.new(max_at_once, parent: barrier)
|
||||||
|
|
||||||
|
Sync do
|
||||||
|
block_return_value = yield semaphore
|
||||||
|
barrier.wait # Load all the subtasks.
|
||||||
|
block_return_value
|
||||||
|
ensure
|
||||||
|
barrier.stop # If any subtasks failed, cancel the rest.
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -85,15 +85,10 @@ def load_all_nc_mall_pages
|
||||||
links = Neopets::NCMall.load_page_links
|
links = Neopets::NCMall.load_page_links
|
||||||
|
|
||||||
# Next, load the linked pages, 10 at a time.
|
# Next, load the linked pages, 10 at a time.
|
||||||
barrier = Async::Barrier.new
|
linked_page_tasks = DTIRequests.load_many(max_at_once: 10) do |task|
|
||||||
semaphore = Async::Semaphore.new(10, parent: barrier)
|
links.map do |link|
|
||||||
begin
|
task.async { Neopets::NCMall.load_page link[:type], link[:cat] }
|
||||||
linked_page_tasks = links.map do |link|
|
|
||||||
semaphore.async { Neopets::NCMall.load_page link[:type], link[:cat] }
|
|
||||||
end
|
end
|
||||||
barrier.wait # Load all the pages.
|
|
||||||
ensure
|
|
||||||
barrier.stop # If any pages failed, cancel the rest.
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Finally, return all the pages: the homepage, and the linked pages.
|
# Finally, return all the pages: the homepage, and the linked pages.
|
||||||
|
|
|
@ -6,16 +6,14 @@ namespace "neopets:import" do
|
||||||
all_species = Species.order(:name).to_a
|
all_species = Species.order(:name).to_a
|
||||||
|
|
||||||
# Load 10 species pages from the NC Mall at a time.
|
# Load 10 species pages from the NC Mall at a time.
|
||||||
barrier = Async::Barrier.new
|
|
||||||
semaphore = Async::Semaphore.new(10, parent: barrier)
|
|
||||||
styles_by_species_id = {}
|
styles_by_species_id = {}
|
||||||
Sync do
|
DTIRequests.load_many(max_at_once: 10) do |task|
|
||||||
num_loaded = 0
|
num_loaded = 0
|
||||||
num_total = all_species.size
|
num_total = all_species.size
|
||||||
print "0/#{num_total} species loaded"
|
print "0/#{num_total} species loaded"
|
||||||
|
|
||||||
all_species.each do |species|
|
all_species.each do |species|
|
||||||
semaphore.async {
|
task.async {
|
||||||
begin
|
begin
|
||||||
styles_by_species_id[species.id] = Neopets::NCMall.load_styles(
|
styles_by_species_id[species.id] = Neopets::NCMall.load_styles(
|
||||||
species_id: species.id,
|
species_id: species.id,
|
||||||
|
@ -28,11 +26,6 @@ namespace "neopets:import" do
|
||||||
print "\r#{num_loaded}/#{num_total} species loaded"
|
print "\r#{num_loaded}/#{num_total} species loaded"
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
# Wait until all tasks are done.
|
|
||||||
barrier.wait
|
|
||||||
ensure
|
|
||||||
barrier.stop # If something goes wrong, clean up all tasks.
|
|
||||||
end
|
end
|
||||||
print "\n"
|
print "\n"
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue