FFmpeg  4.3
slicethread.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "slicethread.h"
21 #include "mem.h"
22 #include "thread.h"
23 #include "avassert.h"
24 
25 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
26 
27 typedef struct WorkerContext {
31  pthread_t thread;
32  int done;
33 } WorkerContext;
34 
35 struct AVSliceThread {
36  WorkerContext *workers;
37  int nb_threads;
38  int nb_active_threads;
39  int nb_jobs;
40 
41  atomic_uint first_job;
42  atomic_uint current_job;
43  pthread_mutex_t done_mutex;
44  pthread_cond_t done_cond;
45  int done;
46  int finished;
47 
48  void *priv;
49  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
50  void (*main_func)(void *priv);
51 };
52 
53 static int run_jobs(AVSliceThread *ctx)
54 {
55  unsigned nb_jobs = ctx->nb_jobs;
56  unsigned nb_active_threads = ctx->nb_active_threads;
57  unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
58  unsigned current_job = first_job;
59 
60  do {
61  ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
62  } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
63 
64  return current_job == nb_jobs + nb_active_threads - 1;
65 }
66 
67 static void *attribute_align_arg thread_worker(void *v)
68 {
69  WorkerContext *w = v;
70  AVSliceThread *ctx = w->ctx;
71 
72  pthread_mutex_lock(&w->mutex);
73  pthread_cond_signal(&w->cond);
74 
75  while (1) {
76  w->done = 1;
77  while (w->done)
78  pthread_cond_wait(&w->cond, &w->mutex);
79 
80  if (ctx->finished) {
81  pthread_mutex_unlock(&w->mutex);
82  return NULL;
83  }
84 
85  if (run_jobs(ctx)) {
86  pthread_mutex_lock(&ctx->done_mutex);
87  ctx->done = 1;
88  pthread_cond_signal(&ctx->done_cond);
89  pthread_mutex_unlock(&ctx->done_mutex);
90  }
91  }
92 }
93 
94 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
95  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
96  void (*main_func)(void *priv),
97  int nb_threads)
98 {
100  int nb_workers, i;
101 
102  av_assert0(nb_threads >= 0);
103  if (!nb_threads) {
104  int nb_cpus = av_cpu_count();
105  if (nb_cpus > 1)
106  nb_threads = nb_cpus + 1;
107  else
108  nb_threads = 1;
109  }
110 
111  nb_workers = nb_threads;
112  if (!main_func)
113  nb_workers--;
114 
115  *pctx = ctx = av_mallocz(sizeof(*ctx));
116  if (!ctx)
117  return AVERROR(ENOMEM);
118 
119  if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
120  av_freep(pctx);
121  return AVERROR(ENOMEM);
122  }
123 
124  ctx->priv = priv;
125  ctx->worker_func = worker_func;
126  ctx->main_func = main_func;
127  ctx->nb_threads = nb_threads;
128  ctx->nb_active_threads = 0;
129  ctx->nb_jobs = 0;
130  ctx->finished = 0;
131 
132  atomic_init(&ctx->first_job, 0);
133  atomic_init(&ctx->current_job, 0);
134  pthread_mutex_init(&ctx->done_mutex, NULL);
135  pthread_cond_init(&ctx->done_cond, NULL);
136  ctx->done = 0;
137 
138  for (i = 0; i < nb_workers; i++) {
139  WorkerContext *w = &ctx->workers[i];
140  int ret;
141  w->ctx = ctx;
142  pthread_mutex_init(&w->mutex, NULL);
143  pthread_cond_init(&w->cond, NULL);
144  pthread_mutex_lock(&w->mutex);
145  w->done = 0;
146 
147  if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
148  ctx->nb_threads = main_func ? i : i + 1;
149  pthread_mutex_unlock(&w->mutex);
150  pthread_cond_destroy(&w->cond);
151  pthread_mutex_destroy(&w->mutex);
153  return AVERROR(ret);
154  }
155 
156  while (!w->done)
157  pthread_cond_wait(&w->cond, &w->mutex);
158  pthread_mutex_unlock(&w->mutex);
159  }
160 
161  return nb_threads;
162 }
163 
164 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
165 {
166  int nb_workers, i, is_last = 0;
167 
168  av_assert0(nb_jobs > 0);
169  ctx->nb_jobs = nb_jobs;
170  ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
171  atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
172  atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
173  nb_workers = ctx->nb_active_threads;
174  if (!ctx->main_func || !execute_main)
175  nb_workers--;
176 
177  for (i = 0; i < nb_workers; i++) {
178  WorkerContext *w = &ctx->workers[i];
179  pthread_mutex_lock(&w->mutex);
180  w->done = 0;
181  pthread_cond_signal(&w->cond);
182  pthread_mutex_unlock(&w->mutex);
183  }
184 
185  if (ctx->main_func && execute_main)
186  ctx->main_func(ctx->priv);
187  else
188  is_last = run_jobs(ctx);
189 
190  if (!is_last) {
191  pthread_mutex_lock(&ctx->done_mutex);
192  while (!ctx->done)
193  pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
194  ctx->done = 0;
195  pthread_mutex_unlock(&ctx->done_mutex);
196  }
197 }
198 
200 {
202  int nb_workers, i;
203 
204  if (!pctx || !*pctx)
205  return;
206 
207  ctx = *pctx;
208  nb_workers = ctx->nb_threads;
209  if (!ctx->main_func)
210  nb_workers--;
211 
212  ctx->finished = 1;
213  for (i = 0; i < nb_workers; i++) {
214  WorkerContext *w = &ctx->workers[i];
215  pthread_mutex_lock(&w->mutex);
216  w->done = 0;
217  pthread_cond_signal(&w->cond);
218  pthread_mutex_unlock(&w->mutex);
219  }
220 
221  for (i = 0; i < nb_workers; i++) {
222  WorkerContext *w = &ctx->workers[i];
223  pthread_join(w->thread, NULL);
224  pthread_cond_destroy(&w->cond);
225  pthread_mutex_destroy(&w->mutex);
226  }
227 
228  pthread_cond_destroy(&ctx->done_cond);
229  pthread_mutex_destroy(&ctx->done_mutex);
230  av_freep(&ctx->workers);
231  av_freep(pctx);
232 }
233 
234 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
235 
237  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
238  void (*main_func)(void *priv),
239  int nb_threads)
240 {
241  *pctx = NULL;
242  return AVERROR(EINVAL);
243 }
244 
245 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
246 {
247  av_assert0(0);
248 }
249 
251 {
252  av_assert0(!pctx || !*pctx);
253 }
254 
255 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
avpriv_slicethread_execute
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
Execute slice threading.
Definition: slicethread.c:245
AVSliceThread
struct AVSliceThread AVSliceThread
Definition: slicethread.h:22
avpriv_slicethread_create
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void(*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void(*main_func)(void *priv), int nb_threads)
Create slice threading context.
Definition: slicethread.c:236
avassert.h
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
ctx
AVFormatContext * ctx
Definition: movenc.c:48
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:237
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
NULL
#define NULL
Definition: coverity.c:32
worker_func
static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
Definition: pthread_slice.c:65
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:66
av_cpu_count
int av_cpu_count(void)
Definition: cpu.c:267
attribute_align_arg
#define attribute_align_arg
Definition: internal.h:62
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
FFMIN
#define FFMIN(a, b)
Definition: common.h:96
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
slicethread.h
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
main_func
int() main_func(AVCodecContext *c)
Definition: pthread_slice.c:41
pthread_cond_t
Definition: os2threads.h:58
ret
ret
Definition: filter_design.txt:187
w
FFmpeg Automated Testing Environment ************************************Introduction Using FATE from your FFmpeg source directory Submitting the results to the FFmpeg result aggregation server Uploading new samples to the fate suite FATE makefile targets and variables Makefile targets Makefile variables Examples Introduction **************FATE is an extended regression suite on the client side and a means for results aggregation and presentation on the server side The first part of this document explains how you can use FATE from your FFmpeg source directory to test your ffmpeg binary The second part describes how you can run FATE to submit the results to FFmpeg’s FATE server In any way you can have a look at the publicly viewable FATE results by visiting this as it can be seen if some test on some platform broke with their recent contribution This usually happens on the platforms the developers could not test on The second part of this document describes how you can run FATE to submit your results to FFmpeg’s FATE server If you want to submit your results be sure to check that your combination of OS and compiler is not already listed on the above mentioned website In the third part you can find a comprehensive listing of FATE makefile targets and variables Using FATE from your FFmpeg source directory **********************************************If you want to run FATE on your machine you need to have the samples in place You can get the samples via the build target fate rsync Use this command from the top level source this will cause FATE to fail NOTE To use a custom wrapper to run the pass ‘ target exec’ to ‘configure’ or set the TARGET_EXEC Make variable Submitting the results to the FFmpeg result aggregation server ****************************************************************To submit your results to the server you should run fate through the shell script ‘tests fate sh’ from the FFmpeg sources This script needs to be invoked with a configuration file as its first argument tests fate sh path to fate_config A configuration file template with comments describing the individual configuration variables can be found at ‘doc fate_config sh template’ Create a configuration that suits your based on the configuration template The ‘slot’ configuration variable can be any string that is not yet but it is suggested that you name it adhering to the following pattern ‘ARCH OS COMPILER COMPILER VERSION’ The configuration file itself will be sourced in a shell therefore all shell features may be used This enables you to setup the environment as you need it for your build For your first test runs the ‘fate_recv’ variable should be empty or commented out This will run everything as normal except that it will omit the submission of the results to the server The following files should be present in $workdir as specified in the configuration it may help to try out the ‘ssh’ command with one or more ‘ v’ options You should get detailed output concerning your SSH configuration and the authentication process The only thing left is to automate the execution of the fate sh script and the synchronisation of the samples directory Uploading new samples to the fate suite *****************************************If you need a sample uploaded send a mail to samples request This is for developers who have an account on the fate suite server If you upload new please make sure they are as small as space on each network bandwidth and so on benefit from smaller test cases Also keep in mind older checkouts use existing sample that means in practice generally do not remove or overwrite files as it likely would break older checkouts or releases Also all needed samples for a commit should be ideally before the push If you need an account for frequently uploading samples or you wish to help others by doing that send a mail to ffmpeg devel rsync vauL Duo ug o o w
Definition: fate.txt:150
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
void
typedef void(RENAME(mix_any_func_type))
Definition: rematrix_template.c:52
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Non-inlined equivalent of av_mallocz_array().
Definition: mem.c:245
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
avpriv_slicethread_free
void avpriv_slicethread_free(AVSliceThread **pctx)
Destroy slice threading context.
Definition: slicethread.c:250
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
cond
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28
mutex
static AVMutex mutex
Definition: log.c:44
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:62