From e05c15951d1951d40e50c066d7af9db190e592eb Mon Sep 17 00:00:00 2001 From: luochunsheng Date: Wed, 17 Aug 2022 16:22:19 +0800 Subject: [PATCH] optimize the execution time of check_hd by using multi-thread After tasks are split, we use the thread pool to process tasks concurrently, reducing the execution time. fix:#118 --- src/ids/Makefile | 7 +- src/ids/check_hd.c | 269 ++++++++++++++++++++++++++++++++++++------ src/ids/thread_pool.c | 202 +++++++++++++++++++++++++++++++ src/ids/thread_pool.h | 36 ++++++ 4 files changed, 473 insertions(+), 41 deletions(-) create mode 100644 src/ids/thread_pool.c create mode 100644 src/ids/thread_pool.h diff --git a/src/ids/Makefile b/src/ids/Makefile index 473ec3f5..479e0210 100644 --- a/src/ids/Makefile +++ b/src/ids/Makefile @@ -50,8 +50,11 @@ IDFILES += src/pci src/storage src/sound src/mouse src/braille $(LIBHD_D): hd_ids.o ar r $(LIBHD) $? -check_hd: check_hd.c - $(CC) $(CFLAGS) $< -o $@ +CHECK_HD_SRC = check_hd.c \ + thread_pool.c + +check_hd: $(CHECK_HD_SRC) + $(CC) $(CFLAGS) -lpthread $(CHECK_HD_SRC) -o $@ hd_ids.c: hd_ids.h hd_ids_tiny.h diff --git a/src/ids/check_hd.c b/src/ids/check_hd.c index 3b8d6f29..be9e0908 100644 --- a/src/ids/check_hd.c +++ b/src/ids/check_hd.c @@ -7,8 +7,11 @@ #include #include #include +#include +#include #include "../hd/hddb_int.h" +#include "thread_pool.h" typedef enum hw_item { hw_none = 0, hw_sys, hw_cpu, hw_keyboard, hw_braille, hw_mouse, @@ -189,12 +192,12 @@ unsigned driver_entry_types(hid_t *hid); void remove_items(list_t *hd); void remove_nops(list_t *hd); -void check_items(list_t *hd); +void check_items(thread_pool_t *th_pool, list_t *hd); void split_items(list_t *hd); void combine_driver(list_t *hd); void combine_requires(list_t *hd); -void join_items_by_value(list_t *hd); -void join_items_by_key(list_t *hd); +void join_items_by_value(thread_pool_t *th_pool, list_t *hd); +void join_items_by_key(thread_pool_t *th_pool, list_t *hd); void remove_unimportant_items(list_t *hd); void write_cfile(FILE *f, list_t *hd); @@ -251,7 +254,8 @@ struct { unsigned items_in, items_out; unsigned diffs, errors, errors_res; } stats; - +/* protect global value stats in multi-thread scene */ +pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ int main(int argc, char **argv) @@ -259,6 +263,8 @@ int main(int argc, char **argv) int i, close_log = 0, close_cfile = 0; item_t *item; FILE *cfile; + unsigned thread_num = sysconf(_SC_NPROCESSORS_ONLN); + thread_pool_t *th_pool = NULL; for(opterr = 0; (i = getopt_long(argc, argv, "", options, NULL)) != -1; ) { switch(i) { @@ -386,6 +392,9 @@ int main(int argc, char **argv) } if(opt.check) { + th_pool = threadpool_create(thread_num); + if (!th_pool) return 4; + fprintf(logfh, "- combining driver info\n"); fflush(logfh); combine_driver(&hd); @@ -396,20 +405,21 @@ int main(int argc, char **argv) fprintf(logfh, "- checking for consistency\n"); fflush(logfh); - check_items(&hd); + check_items(th_pool, &hd); fprintf(logfh, "- join items\n"); fflush(logfh); if(opt.join_keys_first) { - join_items_by_key(&hd); - join_items_by_value(&hd); + join_items_by_key(th_pool, &hd); + join_items_by_value(th_pool, &hd); } else { - join_items_by_value(&hd); - join_items_by_key(&hd); + join_items_by_value(th_pool, &hd); + join_items_by_key(th_pool, &hd); } if(opt.split) split_items(&hd); + threadpool_destroy(th_pool); } if(opt.sort) { @@ -2113,17 +2123,157 @@ void split_items(list_t *hd) *hd = hd_new; } +struct thread_input { + item_t *item_si0; + item_t *item_ei0; + unsigned *item_group0; + item_t *item_si1; + item_t *item_ei1; + unsigned *item_group1; + void (*routine)(item_t *, item_t *, item_t *, item_t *); + sem_t *done_ind; +}; + +struct items_point { + item_t *item_s; + item_t *item_e; +}; + +#define ITEMS_SPILT_SIZE 60 + +void *threadpool_map_thread(void *args) +{ + struct thread_input *in = (struct thread_input *)args; + item_t *item_si0 = in->item_si0; + item_t *item_ei0 = in->item_ei0; + item_t *item_si1 = in->item_si1; + item_t *item_ei1 = in->item_ei1; + unsigned *item_group0 = in->item_group0; + unsigned *item_group1 = in->item_group1; + sem_t *done_ind = in->done_ind; + + in->routine(item_si0, item_ei0, item_si1, item_ei1); + sem_post(done_ind); + + *item_group0 = 0; + *item_group1 = 0; + + return NULL; +} + +static void threadpool_map(thread_pool_t *th_pool, list_t *hd, void (*routine)(item_t *, item_t *, item_t *, item_t *)) +{ + unsigned total_items = 0; + unsigned average = 0; + unsigned count_total = 0; + unsigned spilt_count = 0; + item_t *item_tmp; + unsigned mk_calculated_Combine[ITEMS_SPILT_SIZE][ITEMS_SPILT_SIZE]; + unsigned mk_calculating_group[ITEMS_SPILT_SIZE]; + struct items_point items_group[ITEMS_SPILT_SIZE]; + int total_calculated = ITEMS_SPILT_SIZE * (ITEMS_SPILT_SIZE + 1)/ 2; + int cur_calculated = 0; + struct thread_input *input; + struct thread_input *cur_input; + sem_t done_indicator; + int i, j, found; + + memset(mk_calculated_Combine, 0, sizeof(mk_calculated_Combine)); + memset(mk_calculating_group, 0, sizeof(mk_calculating_group)); + memset(items_group, 0, sizeof(items_group)); + + input = (struct thread_input *)malloc(sizeof(struct thread_input) * total_calculated); + + for(item_tmp = hd->first; item_tmp; item_tmp = item_tmp->next) total_items++; + + average = total_items / (ITEMS_SPILT_SIZE - 1); + items_group[0].item_s = hd->first; + for(item_tmp = hd->first; item_tmp; item_tmp = item_tmp->next) { + count_total++; + if (count_total % average == 0) { + items_group[spilt_count].item_e = item_tmp; + spilt_count++; + items_group[spilt_count].item_s = item_tmp; + } + } + + sem_init(&done_indicator, 0, 0); + for (i = 0; i < ITEMS_SPILT_SIZE; i++) { + cur_input = &(input[cur_calculated]); + cur_input->item_si0 = items_group[i].item_s; + cur_input->item_ei0 = items_group[i].item_e; + cur_input->item_si1 = items_group[i].item_s; + cur_input->item_ei1 = items_group[i].item_e; + cur_input->item_group0 = &mk_calculating_group[i]; + cur_input->item_group1 = &mk_calculating_group[i]; + cur_input->routine = routine; + cur_input->done_ind = &done_indicator; + + mk_calculating_group[i] = 1; + mk_calculated_Combine[i][i]++; + threadpool_add_task(th_pool, threadpool_map_thread, cur_input); + cur_calculated++; + } + + for(;;) { + found = 0; + for (i = 0; i < ITEMS_SPILT_SIZE; i++) { + if (mk_calculating_group[i]) continue; + + for (j = i + 1; j < ITEMS_SPILT_SIZE; j++) { + if (mk_calculating_group[j]) continue; + if (mk_calculated_Combine[i][j]) continue; + + cur_input = &(input[cur_calculated]); + cur_input->item_si0 = items_group[i].item_s; + cur_input->item_ei0 = items_group[i].item_e; + cur_input->item_si1 = items_group[j].item_s; + cur_input->item_ei1 = items_group[j].item_e; + cur_input->item_group0 = &mk_calculating_group[i]; + cur_input->item_group1 = &mk_calculating_group[j]; + cur_input->routine = routine; + cur_input->done_ind = &done_indicator; + + mk_calculated_Combine[i][j]++; + mk_calculating_group[i] = 1; + mk_calculating_group[j] = 1; + threadpool_add_task(th_pool, threadpool_map_thread, cur_input); + cur_calculated++; + found = 1; + break; + } + if (found) break; + } + + if (cur_calculated >= total_calculated) break; + } + + /* wait thread_pool_t run over */ + for (i = 0; i < cur_calculated; i++) + sem_wait(&done_indicator); + + free(input); +} + -void check_items(list_t *hd) +void check_items_internal(item_t *item_si0, item_t *item_ei0, item_t *item_si1, item_t *item_ei1) { int i, j, k, m, mr, m_all, mr_all, c_ident, c_diff, c_crit; char *s; item_t *item0, *item1, *item_a, *item_b; unsigned *stat_cnt; + unsigned local = 0; - for(item0 = hd->first; item0; item0 = item0->next) { + if (!item_si0 || !item_si1) + return; + + if (item_si0 == item_si1 && item_ei0 == item_ei1) + local = 1; + + for(item0 = item_si0; item0 != item_ei0; item0 = item0->next) { if(item0->remove) continue; - for(item1 = item0->next; item1 && !item0->remove; item1 = item1->next) { + if (local) item_si1 = item0->next; + for(item1 = item_si1; item1 != item_ei1; item1 = item1->next) { if(item1->remove) continue; item_a = item0; item_b = item1; @@ -2230,14 +2380,18 @@ void check_items(list_t *hd) * else make it an error */ if(c_diff && !c_ident) { + pthread_mutex_lock(&stats_lock); (*stat_cnt)++; + pthread_mutex_unlock(&stats_lock); fprintf(logfh, "%s: info %s %s, info removed\n", item_a->pos, s, item_b->pos ); } else if(c_diff && c_ident) { + pthread_mutex_lock(&stats_lock); (*stat_cnt)++; + pthread_mutex_unlock(&stats_lock); fprintf(logfh, "%s: info %s/is identical to %s, info removed\n", item_a->pos, s, item_b->pos @@ -2271,7 +2425,9 @@ void check_items(list_t *hd) c_diff = (j >> 8) & 0xff; if(c_diff) { /* different keys, conflicting values --> error */ + pthread_mutex_lock(&stats_lock); stats.errors++; + pthread_mutex_unlock(&stats_lock); fprintf(logfh, "%s: info conflicts with %s\n", item_b->pos, item_a->pos @@ -2285,9 +2441,13 @@ void check_items(list_t *hd) } } - remove_items(hd); } +void check_items(thread_pool_t *th_pool, list_t *hd) +{ + threadpool_map(th_pool, hd, check_items_internal); + remove_items(hd); +} void combine_driver(list_t *hd) { @@ -2490,32 +2650,12 @@ void combine_requires(list_t *hd) remove_items(hd); } - -void join_items_by_value(list_t *hd) +static void sort_and_combine_key(list_t *hd) { - item_t *item0, *item1; + item_t *item0; skey_t *skey, *next; int i; - for(item0 = hd->first; item0; item0 = item0->next) { - if(item0->remove) continue; - for(item1 = item0->next; item1; item1 = item1->next) { - if(item1->remove) continue; - - if(!cmp_skey(item0->value, item1->value)) { - for(skey = item1->key.first; skey; skey = next) { - next = skey->next; - add_list(&item0->key, skey); - } - memset(&item1->key, 0, sizeof item1->key); - item1->remove = 1; - fprintf(logfh, "%s: info added to %s, item removed\n", item1->pos, item0->pos); - } - } - } - - remove_items(hd); - for(item0 = hd->first; item0; item0 = item0->next) { /* sort key entries */ @@ -2537,17 +2677,65 @@ void join_items_by_value(list_t *hd) } } +void join_items_by_value_internal(item_t *item_si0, item_t *item_ei0, item_t *item_si1, item_t *item_ei1) +{ + item_t *item0, *item1; + skey_t *skey, *next; + int local = 0; -void join_items_by_key(list_t *hd) + if (!item_si0 || !item_si1) + return; + + if (item_si0 == item_si1 && item_ei0 == item_ei1) + local = 1; + + for(item0 = item_si0; item0 != item_ei0; item0 = item0->next) { + + if(item0->remove) continue; + if (local) item_si1 = item0->next; + for(item1 = item_si1; item1 != item_ei1; item1 = item1->next) { + if(item1->remove) continue; + + if(!cmp_skey(item0->value, item1->value)) { + for(skey = item1->key.first; skey; skey = next) { + next = skey->next; + add_list(&item0->key, skey); + } + memset(&item1->key, 0, sizeof item1->key); + item1->remove = 1; + fprintf(logfh, "%s: info added to %s, item removed\n", item1->pos, item0->pos); + } + } + } +} + + +void join_items_by_value(thread_pool_t *th_pool, list_t *hd) +{ + threadpool_map(th_pool, hd, join_items_by_value_internal); + remove_items(hd); + sort_and_combine_key(hd); +} + +void join_items_by_key_internal(item_t *item_si0, item_t *item_ei0, item_t *item_si1, item_t *item_ei1) { item_t *item0, *item1; skey_t *val0, *val1; int i; + int local = 0; + + if (!item_si0 || !item_si1) + return; + + if (item_si0 == item_si1 && item_ei0 == item_ei1) + local = 1; + + for(item0 = item_si0; item0 != item_ei0; item0 = item0->next) { - for(item0 = hd->first; item0; item0 = item0->next) { if(item0->remove) continue; val0 = item0->value; - for(item1 = item0->next; item1; item1 = item1->next) { + if (local) item_si1 = item0->next; + for(item1 = item_si1; item1 != item_ei1; item1 = item1->next) { if(item1->remove) continue; i = cmp_item(item0, item1); @@ -2570,11 +2758,14 @@ void join_items_by_key(list_t *hd) } } } +} +void join_items_by_key(thread_pool_t *th_pool, list_t *hd) +{ + threadpool_map(th_pool, hd, join_items_by_key_internal); remove_items(hd); } - void remove_unimportant_items(list_t *hd) { item_t *item; diff --git a/src/ids/thread_pool.c b/src/ids/thread_pool.c new file mode 100644 index 00000000..6a1b3113 --- /dev/null +++ b/src/ids/thread_pool.c @@ -0,0 +1,202 @@ +#include +#include +#include + +#include "thread_pool.h" + +#define list_entry(ptr, type, member) \ + ((type *) ((char *) (ptr) - (unsigned long) (&((type *) 0)->member))) + +#define list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + +#define INIT_LIST_HEAD(ptr) (ptr)->next = (ptr)->prev = (ptr) + +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +static inline void list_add_tail(dlist_t *elem, dlist_t *head) +{ + head->prev->next = elem; + elem->next = head; + elem->prev = head->prev; + head->prev = elem; +} + +static inline void list_del(dlist_t *elem) +{ + elem->next->prev = elem->prev; + elem->prev->next = elem->next; +} + + +static void free_unsched_tasks(thread_pool_t *pool) +{ + thread_task_t *task = NULL; + dlist_t *pos; + + if (pool == NULL) + return; + + list_for_each(pos, &pool->task_list) { + task = list_entry(pos, struct thread_task, list); + free(task); + } + + return; +} + +static void threadpool_cancel_unlock(void *arg) +{ + thread_pool_t *pool = (thread_pool_t *)arg; + if (pool == NULL) return; + + pthread_mutex_trylock(&pool->lock); + pthread_mutex_unlock(&pool->lock); +} + +static void *threadpool_routine(void *arg) +{ + thread_pool_t *pool = (thread_pool_t *)arg; + task_functor functor = NULL; + void *func_arg = NULL; + + pthread_cleanup_push(threadpool_cancel_unlock, pool); + while (true) { + pthread_mutex_lock(&pool->lock); + while (__atomic_load_n(&pool->task_num, __ATOMIC_SEQ_CST) == 0) { + pthread_cond_wait(&(pool->cond), &(pool->lock)); + } + + struct list_head *task_head = &pool->task_list; + if (!list_empty(task_head)) { + thread_task_t *task = list_entry(task_head->next, struct thread_task, list); + list_del(task_head->next); + functor = task->functor; + func_arg = task->arg; + free(task); + task = NULL; + pthread_mutex_unlock(&(pool->lock)); + (*functor)(func_arg); + continue; + } + pthread_mutex_unlock(&(pool->lock)); + sleep(1); + } + pthread_cleanup_pop(0); + + return NULL; +} + +static int pool_pthread_init(thread_pool_t *pool) +{ + if (pthread_mutex_init(&(pool->lock), NULL) != 0) + return -1; + + if (pthread_cond_init(&(pool->cond), NULL) != 0) { + pthread_mutex_destroy(&(pool->lock)); + return -1; + } + + return 0; +} + +thread_pool_t *threadpool_create(uint64_t thread_num) +{ + uint64_t i; + + thread_pool_t *pool = (thread_pool_t *)malloc(sizeof(thread_pool_t)); + if (pool == NULL) { + printf("malloc for thread_pool_t fail\n"); + return NULL; + } + + pool->tid = NULL; + __atomic_store_n(&pool->task_num, 0, __ATOMIC_SEQ_CST); + + INIT_LIST_HEAD(&pool->task_list); + + pool->tid = (pthread_t *)malloc(thread_num * sizeof(pthread_t)); + if (pool->tid == NULL) + goto exit_tid; + + if (pool_pthread_init(pool) != 0) + goto exit_init; + + for (i = 0; i < thread_num; i++) { + if (pthread_create(&(pool->tid[i]), NULL, threadpool_routine, pool) != 0) { + printf("start threadpool fail\n"); + pool->thread_count = i; + threadpool_destroy(pool); + return NULL; + } + } + + pool->thread_count = i; + return pool; + +exit_init: + free(pool->tid); +exit_tid: + free(pool); + + return NULL; +} + +int threadpool_add_task(thread_pool_t *pool, void *(*executor)(void *arg), void *arg) +{ + if (executor == NULL || arg == NULL) { + printf("threadpool instance startup parameter exception.\n"); + return -1; + } + + thread_task_t *task = (thread_task_t *)malloc(sizeof(thread_task_t)); + if (task == NULL) + return -1; + + task->functor = executor; + task->arg = arg; + pthread_mutex_lock(&(pool->lock)); + list_add_tail(&task->list, &pool->task_list); + __atomic_add_fetch(&pool->task_num, 1, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(&(pool->cond)); + pthread_mutex_unlock(&(pool->lock)); + + return 0; +} + +static void threadpool_cancel_thread(const thread_pool_t *pool) +{ + int i; + + for (i = 0; i < pool->thread_count; i++) + pthread_cancel(pool->tid[i]); + + for (i = 0; i < pool->thread_count; i++) + pthread_join(pool->tid[i], NULL); +} + +static void threadpool_free(thread_pool_t *pool) +{ + if (pool == NULL) + return; + + free(pool->tid); + free_unsched_tasks(pool); + pthread_mutex_destroy(&(pool->lock)); + pthread_cond_destroy(&(pool->cond)); + + free(pool); +} + +void threadpool_destroy(thread_pool_t *pool) +{ + if (pool == NULL) + return; + + threadpool_cancel_thread(pool); + threadpool_free(pool); +} + diff --git a/src/ids/thread_pool.h b/src/ids/thread_pool.h new file mode 100644 index 00000000..f1157690 --- /dev/null +++ b/src/ids/thread_pool.h @@ -0,0 +1,36 @@ + +#ifndef _THREADPOOL_H +#define _THREADPOOL_H + +#include +#include +#include + +typedef struct list_head +{ + struct list_head *next; + struct list_head *prev; +} dlist_t; + +typedef void *(*task_functor)(void *); +typedef struct thread_task { + task_functor functor; + void *arg; + struct list_head list; +} thread_task_t; + +typedef struct thread_pool { + int thread_count; + int task_num; + struct list_head task_list; + pthread_t *tid; + pthread_mutex_t lock; + pthread_cond_t cond; + int stat; +} thread_pool_t; + +thread_pool_t *threadpool_create(uint64_t thread_num); +int threadpool_add_task(thread_pool_t *pool, void *(*executor)(void *arg), void *arg); +void threadpool_destroy(thread_pool_t *pool); + +#endif //_THREADPOOL_H \ No newline at end of file