| Name | Total Lines | Lines of Code | Total Coverage | Code Coverage |
|---|---|---|---|---|
| app/models/submission_collector.rb | 257 | 171 | 47.47%
|
42.11%
|
Code reported as executed by Ruby looks like this...and this: this line is also marked as covered.Lines considered as run by rcov, but not reported by Ruby, look like this,and this: these lines were inferred by rcov (using simple heuristics).Finally, here's a line marked as not executed.
1 # A signleton class responsible for collecting submissions from groupings. |
2 # |
3 # The SubmissionCollector keeps track of groupings it needs to collect |
4 # submissions from via two queues, one regular queue to keep track of groupings |
5 # whose submissions need to be collected. The second queue is the priority |
6 # queue, whose members' submissions get collected ahead of the ones in the |
7 # regular queue. |
8 # |
9 # The actual creation of the submissions for the grouping is done inside a |
10 # forked process due to the length of time taken to collect submissions with |
11 # pdf files in them. |
12 # |
13 # The collector updates the groupings to let them know if they are in queue for |
14 # collection, in order to figure out if their current submission is the latest |
15 # one, or theres a newer one waiting to be collected. |
16 # |
17 # Both queues are stored in the database to allow for easy parent-child |
18 # process communication bypassing the need for pipes or signals. |
19 class SubmissionCollector < ActiveRecord::Base |
20 |
21 has_many :grouping_queues, :dependent => :destroy |
22 |
23 validates_numericality_of :child_pid, :only_integer => true, |
24 :allow_nil => true |
25 |
26 validates_inclusion_of :stop_child, :in => [true, false] |
27 |
28 #Always use the instance method to get an object of this class, never call |
29 #new or create directly |
30 def self.instance |
31 if SubmissionCollector.first.nil? |
32 temp = SubmissionCollector.create |
33 temp.init_queues |
34 end |
35 return SubmissionCollector.first |
36 end |
37 |
38 #Get two fresh grouping_queues |
39 def init_queues |
40 self.grouping_queues.clear |
41 self.grouping_queues.create(:priority_queue => false) |
42 self.grouping_queues.create(:priority_queue => true) |
43 end |
44 |
45 #Add all the groupings belonging to assignment to the grouping queue |
46 def push_groupings_to_queue(groupings) |
47 priority_queue = grouping_queues.find_by_priority_queue(true).groupings |
48 regular_queue = grouping_queues.find_by_priority_queue(false).groupings |
49 |
50 groupings.each do |grouping| |
51 unless regular_queue.include?(grouping) || |
52 priority_queue.include?(grouping) |
53 grouping.is_collected = false |
54 regular_queue.push(grouping) |
55 end |
56 end |
57 start_collection_process |
58 end |
59 |
60 def push_grouping_to_priority_queue(grouping) |
61 priority_queue = grouping_queues.find_by_priority_queue(true).groupings |
62 regular_queue = grouping_queues.find_by_priority_queue(false).groupings |
63 |
64 regular_queue.delete(grouping) if regular_queue.include?(grouping) |
65 unless priority_queue.include?(grouping) |
66 grouping.is_collected = false |
67 priority_queue.push(grouping) |
68 end |
69 start_collection_process |
70 end |
71 |
72 #Remove grouping from the grouping queue if it exists there. |
73 def remove_grouping_from_queue(grouping) |
74 return if grouping.nil? || grouping.grouping_queue.nil? |
75 grouping.grouping_queue.groupings.delete(grouping) |
76 grouping.grouping_queue = nil |
77 grouping.save |
78 return grouping |
79 end |
80 |
81 #Get the next grouping for which to collect the submission, or return nil |
82 #if there are no more groupings. |
83 def get_next_grouping_for_collection |
84 priority_queue = grouping_queues.find_by_priority_queue(true).groupings |
85 regular_queue = grouping_queues.find_by_priority_queue(false).groupings |
86 |
87 current_grouping = priority_queue.first || regular_queue.first |
88 return current_grouping |
89 end |
90 |
91 #Fork-off a new process resposible for collecting all submissions |
92 def start_collection_process |
93 |
94 #Since windows doesn't support fork, the main process will have to collect |
95 #the submissions. |
96 if ( RUBY_PLATFORM =~ /(:?mswin|mingw)/ ) # should match for Windows only |
97 while !collect_next_submission.nil? |
98 end |
99 return |
100 end |
101 |
102 m_logger = MarkusLogger.instance |
103 |
104 #Check to see if there is still a process running |
105 m_logger.log("Checking to see if there is already a submission collection" + |
106 " process running") |
107 begin |
108 unless self.child_pid.nil? |
109 Process.waitpid(self.child_pid, Process::WNOHANG) |
110 #If child is still running do nothing, otherwise reset the child_pid |
111 if $?.nil? |
112 m_logger.log("Submission collection process still running, doing nothing") |
113 return |
114 else |
115 self.child_pid = nil |
116 self.save |
117 end |
118 end |
119 |
120 #If for some reason there is no process with id self.child_pid, simply |
121 #proceed by forking a new process as usual. |
122 rescue Errno::ESRCH, Errno::ECHILD => e |
123 end |
124 |
125 #We have to re-establish a separate database connection for each process |
126 db_connection = ActiveRecord::Base.remove_connection |
127 |
128 pid = fork do |
129 begin |
130 ActiveRecord::Base.establish_connection(db_connection) |
131 m_logger.log("Submission collection process established database" + |
132 " connection successfully") |
133 #Any custom tasks to be performed by the child can be given as a block |
134 if block_given? |
135 m_logger.log("Submission collection process now evaluating provided code block") |
136 yield |
137 m_logger.log("Submission collection process done evaluating provided code block") |
138 end |
139 |
140 while !collect_next_submission.nil? |
141 if SubmissionCollector.first.stop_child |
142 m_logger.log("Submission collection process now exiting because it was " + |
143 "asked to stop by its parent") |
144 exit!(0) |
145 end |
146 end |
147 m_logger.log("Submission collection process done") |
148 exit!(0) |
149 ensure |
150 ActiveRecord::Base.remove_connection |
151 end |
152 end |
153 #parent |
154 if pid |
155 ActiveRecord::Base.establish_connection(db_connection) |
156 self.child_pid = pid |
157 self.save |
158 end |
159 end |
160 |
161 #Collect the next submission or return nil if there are none to be collected |
162 def collect_next_submission |
163 grouping = get_next_grouping_for_collection |
164 return if grouping.nil? |
165 assignment = grouping.assignment |
166 m_logger = MarkusLogger.instance |
167 m_logger.log("Now collecting: #{assignment.short_identifier} for grouping: " + |
168 "'#{grouping.id}'") |
169 time = assignment.submission_rule.calculate_collection_time.localtime |
170 # Create a new Submission by timestamp. |
171 # A Result is automatically attached to this Submission, thanks to some |
172 # callback logic inside the Submission model |
173 new_submission = Submission.create_by_timestamp(grouping, time) |
174 # Apply the SubmissionRule |
175 new_submission = assignment.submission_rule.apply_submission_rule( |
176 new_submission) |
177 #convert any pdf submission files to jpgs |
178 new_submission.submission_files.each do |subm_file| |
179 subm_file.convert_pdf_to_jpg if subm_file.is_pdf? |
180 end |
181 grouping.is_collected = true |
182 remove_grouping_from_queue(grouping) |
183 grouping.save |
184 end |
185 |
186 #Use the database to communicate to the child to stop, and restart itself |
187 #and manually collect the submission |
188 def manually_collect_submission(grouping, rev_num) |
189 |
190 #Since windows doesn't support fork, the main process will have to collect |
191 #the submissions. |
192 if ( RUBY_PLATFORM =~ /(:?mswin|mingw)/ ) # should match for Windows only |
193 grouping.is_collected = false |
194 remove_grouping_from_queue(grouping) |
195 grouping.save |
196 new_submission = Submission.create_by_revision_number(grouping, rev_num) |
197 new_submission.submission_files.each do |subm_file| |
198 subm_file.convert_pdf_to_jpg if subm_file.is_pdf? |
199 end |
200 grouping.is_collected = true |
201 grouping.save |
202 return |
203 end |
204 |
205 #Make the child process exit safely, to avoid both parent and child process |
206 #from calling the Magick::Image.from_blob function, this breaks future calls |
207 #of the method by the child. |
208 safely_stop_child |
209 |
210 #remove the grouping from the grouping_queue so it isnt collected again |
211 grouping.is_collected = false |
212 remove_grouping_from_queue(grouping) |
213 grouping.save |
214 |
215 new_submission = Submission.create_by_revision_number(grouping, rev_num) |
216 |
217 #This is to help determine the progress of the method. |
218 self.safely_stop_child_exited = true |
219 self.save |
220 |
221 #Let the child process handle conversion, as things go wrong when both |
222 #parent and child do this. |
223 start_collection_process do |
224 if MarkusConfigurator.markus_config_pdf_support |
225 new_submission.submission_files.each do |subm_file| |
226 subm_file.convert_pdf_to_jpg if subm_file.is_pdf? |
227 end |
228 grouping.is_collected = true |
229 grouping.save |
230 end |
231 end |
232 #setting is_collected here will prevent an sqlite lockout error when pdfs |
233 #aren't supported |
234 if !MarkusConfigurator.markus_config_pdf_support |
235 grouping.is_collected = true |
236 grouping.save |
237 end |
238 |
239 end |
240 |
241 def safely_stop_child |
242 unless self.child_pid.nil? |
243 begin |
244 self.stop_child = true |
245 self.save |
246 Process.waitpid(self.child_pid) |
247 #Ignore case where no process with child_pid exists |
248 rescue Errno::ESRCH, Errno::ECHILD => e |
249 ensure |
250 self.stop_child = false |
251 self.child_pid = nil |
252 self.save |
253 end |
254 end |
255 end |
256 |
257 end |
Generated on Tue Feb 07 00:07:36 -0500 2012 with rcov 0.9.10