FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
pthread_slice.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 /**
20  * @file
21  * Slice multithreading support functions
22  * @see doc/multithreading.txt
23  */
24 
25 #include "config.h"
26 
27 #if HAVE_PTHREADS
28 #include <pthread.h>
29 #elif HAVE_W32THREADS
30 #include "compat/w32pthreads.h"
31 #elif HAVE_OS2THREADS
32 #include "compat/os2threads.h"
33 #endif
34 
35 #include "avcodec.h"
36 #include "internal.h"
37 #include "pthread_internal.h"
38 #include "thread.h"
39 
40 #include "libavutil/common.h"
41 #include "libavutil/cpu.h"
42 #include "libavutil/mem.h"
43 
44 typedef int (action_func)(AVCodecContext *c, void *arg);
45 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
46 
47 typedef struct SliceThreadContext {
51  void *args;
52  int *rets;
54  int job_count;
55  int job_size;
56 
60  unsigned current_execute;
62  int done;
63 
64  int *entries;
70 
71 static void* attribute_align_arg worker(void *v)
72 {
73  AVCodecContext *avctx = v;
75  unsigned last_execute = 0;
76  int our_job = c->job_count;
77  int thread_count = avctx->thread_count;
78  int self_id;
79 
81  self_id = c->current_job++;
82  for (;;){
83  while (our_job >= c->job_count) {
84  if (c->current_job == thread_count + c->job_count)
86 
87  while (last_execute == c->current_execute && !c->done)
89  last_execute = c->current_execute;
90  our_job = self_id;
91 
92  if (c->done) {
94  return NULL;
95  }
96  }
98 
99  c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
100  c->func2(avctx, c->args, our_job, self_id);
101 
103  our_job = c->current_job++;
104  }
105 }
106 
108 {
110  int i;
111 
113  c->done = 1;
116 
117  for (i=0; i<avctx->thread_count; i++)
118  pthread_join(c->workers[i], NULL);
119 
123  av_freep(&c->workers);
124  av_freep(&avctx->internal->thread_ctx);
125 }
126 
128 {
129  while (c->current_job != thread_count + c->job_count)
132 }
133 
134 static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
135 {
137  int dummy_ret;
138 
139  if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
140  return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
141 
142  if (job_count <= 0)
143  return 0;
144 
146 
147  c->current_job = avctx->thread_count;
148  c->job_count = job_count;
149  c->job_size = job_size;
150  c->args = arg;
151  c->func = func;
152  if (ret) {
153  c->rets = ret;
154  c->rets_count = job_count;
155  } else {
156  c->rets = &dummy_ret;
157  c->rets_count = 1;
158  }
159  c->current_execute++;
161 
163 
164  return 0;
165 }
166 
167 static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
168 {
170  c->func2 = func2;
171  return thread_execute(avctx, NULL, arg, ret, job_count, 0);
172 }
173 
175 {
176  int i;
178  int thread_count = avctx->thread_count;
179 
180 #if HAVE_W32THREADS
181  w32thread_init();
182 #endif
183 
184  if (!thread_count) {
185  int nb_cpus = av_cpu_count();
186  if (avctx->height)
187  nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16);
188  // use number of cores + 1 as thread count if there is more than one
189  if (nb_cpus > 1)
190  thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
191  else
192  thread_count = avctx->thread_count = 1;
193  }
194 
195  if (thread_count <= 1) {
196  avctx->active_thread_type = 0;
197  return 0;
198  }
199 
200  c = av_mallocz(sizeof(SliceThreadContext));
201  if (!c)
202  return -1;
203 
204  c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
205  if (!c->workers) {
206  av_free(c);
207  return -1;
208  }
209 
210  avctx->internal->thread_ctx = c;
211  c->current_job = 0;
212  c->job_count = 0;
213  c->job_size = 0;
214  c->done = 0;
216  pthread_cond_init(&c->last_job_cond, NULL);
219  for (i=0; i<thread_count; i++) {
220  if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
221  avctx->thread_count = i;
223  ff_thread_free(avctx);
224  return -1;
225  }
226  }
227 
228  thread_park_workers(c, thread_count);
229 
230  avctx->execute = thread_execute;
231  avctx->execute2 = thread_execute2;
232  return 0;
233 }
234 
235 void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
236 {
238  int *entries = p->entries;
239 
240  pthread_mutex_lock(&p->progress_mutex[thread]);
241  entries[field] +=n;
242  pthread_cond_signal(&p->progress_cond[thread]);
244 }
245 
246 void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
247 {
249  int *entries = p->entries;
250 
251  if (!entries || !field) return;
252 
253  thread = thread ? thread - 1 : p->thread_count - 1;
254 
255  pthread_mutex_lock(&p->progress_mutex[thread]);
256  while ((entries[field - 1] - entries[field]) < shift){
257  pthread_cond_wait(&p->progress_cond[thread], &p->progress_mutex[thread]);
258  }
260 }
261 
263 {
264  int i;
265 
266  if (avctx->active_thread_type & FF_THREAD_SLICE) {
268  p->thread_count = avctx->thread_count;
269  p->entries = av_mallocz_array(count, sizeof(int));
270 
273 
274  if (!p->entries || !p->progress_mutex || !p->progress_cond) {
275  av_freep(&p->entries);
277  av_freep(&p->progress_cond);
278  return AVERROR(ENOMEM);
279  }
280  p->entries_count = count;
281 
282  for (i = 0; i < p->thread_count; i++) {
283  pthread_mutex_init(&p->progress_mutex[i], NULL);
284  pthread_cond_init(&p->progress_cond[i], NULL);
285  }
286  }
287 
288  return 0;
289 }
290 
292 {
294  memset(p->entries, 0, p->entries_count * sizeof(int));
295 }