BigQueryService.java
001 /*
002  * SPDX-License-Identifier: Apache-2.0
003  *
004  * Copyright 2020-2022 Agorapulse.
005  *
006  * Licensed under the Apache License, Version 2.0 (the "License");
007  * you may not use this file except in compliance with the License.
008  * You may obtain a copy of the License at
009  *
010  *     https://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing, software
013  * distributed under the License is distributed on an "AS IS" BASIS,
014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015  * See the License for the specific language governing permissions and
016  * limitations under the License.
017  */
018 package com.agorapulse.micronaut.bigquery;
019 
020 import com.google.common.base.CaseFormat;
021 import io.micronaut.core.beans.BeanIntrospection;
022 import io.micronaut.core.beans.BeanIntrospector;
023 import io.micronaut.core.beans.BeanProperty;
024 import io.reactivex.Flowable;
025 
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.Collections;
029 import java.util.LinkedHashMap;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.Optional;
033 import java.util.function.Function;
034 import java.util.stream.Collectors;
035 
036 /**
037  * BigQuery service is used to execute SQL statements against BigQuery data warehouse.
038  */
039 public interface BigQueryService {
040 
041     /**
042      * Runs a SQL query against the BigQuery warehouse and map the results into an object.
043      @param namedParameters the named parameters for the SQL query
044      @param sql the SQL query, must contain <code>@</code> as named parameter prefix
045      @param builder the function mapping the result into an object
046      @param <T> type of the result objects
047      @return the flowable of objects mapped using the builder
048      */
049     <T> Flowable<T> query(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder);
050 
051     /**
052      * Runs a SQL statement against the BigQuery warehouse.
053      @param namedParameters the named parameters for the SQL statement
054      @param sql the SQL statement, must contain <code>@</code> as named parameter prefix
055      */
056     void execute(Map<String, ?> namedParameters, String sql);
057 
058     /**
059      * Run a SQL query against the BigQuery warehouse and map the results into an single object if present.
060      @param sql the SQL query, must contain <code>@</code> as named parameter prefix
061      @param builder the function mapping the result into an object
062      @param <T> type of the result objects
063      @return the optional holding the first returned result or an empty optinal
064      */
065     default <T> Optional<T> querySingle(String sql, Function<RowResult, T> builder) {
066         return querySingle(Collections.emptyMap(), sql, builder);
067     }
068 
069     /**
070      * Run a SQL query against the BigQuery warehouse and map the results into an single object if present.
071      @param namedParameters the named parameters for the SQL query
072      @param sql the SQL query, must contain <code>@</code> as named parameter prefix
073      @param builder the function mapping the result into an object
074      @param <T> type of the result objects
075      @return the optional holding the first returned result or an empty optinal
076      */
077     default <T> Optional<T> querySingle(Map<String, ?> namedParameters, String sql, Function<RowResult, T> builder) {
078         return Optional.ofNullable(query(namedParameters, sql, builder).blockingFirst(null));
079     }
080 
081     /**
082      * Run a SQL query against the BigQuery warehouse and map the results into an object.
083      @param sql the SQL query, must contain <code>@</code> as named parameter prefix
084      @param builder the function mapping the result into an object
085      @param <T> type of the result objects
086      @return the flowable of objects mapped using the builder
087      */
088     default <T> Flowable<T> query(String sql, Function<RowResult, T> builder) {
089         return query(Collections.emptyMap(), sql, builder);
090     }
091 
092     /**
093      * Runs a SQL statement against the BigQuery warehouse
094      @param sql the SQL statement, must contain <code>@</code> as named parameter prefix
095      */
096     default void execute(String sql) {
097         execute(Collections.emptyMap(), sql);
098     }
099 
100     /**
101      * Inserts the object into the database.
102      *
103      @param object the object to be inserted
104      @param dataset the name of the dataset
105      @param table the name of the table
106      @param <T> the type of the inserted object
107      @return the very same object as has been passed into this method
108      */
109     default <T> T insert(T object, String dataset, String table) {
110         ParameterizedSql insert = generateInsert(object, dataset, table);
111         execute(insert.getNamedParameters(), insert.getSql());
112         return object;
113     }
114 
115     /**
116      * Generates the insert statement with the named parameters prepared.
117      *
118      @param object the object to be inserted
119      @param dataset the name of the dataset
120      @param table the name of the table
121      @param <T> the type of the inserted object
122      @return the sql and named parameters for the insertion of the given object
123      */
124     @SuppressWarnings("unchecked")
125     default <T> ParameterizedSql generateInsert(T object, final String dataset, String table) {
126         BeanIntrospection<T> introspection = BeanIntrospector.SHARED.getIntrospection((Class<T>object.getClass());
127 
128         Collection<BeanProperty<T, Object>> fields = introspection.getBeanProperties();
129 
130         final List<String> keys = new ArrayList<>();
131         Map<String, Object> values = new LinkedHashMap<>();
132         for (BeanProperty<T, Object> field : fields) {
133             Object value = field.get(object);
134             String formattedName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
135 
136             if (value != null) {
137                 keys.add(formattedName);
138                 values.put(formattedName, convertIfNecessary(value));
139             }
140         }
141 
142         String builder = String.format(
143                 "insert into %s.%s (%s) values (%s)",
144                 dataset,
145                 table,
146                 String.join(", ", keys),
147                 keys.stream().map(k -> "@" + k).collect(Collectors.joining(", "))
148         );
149 
150         return ParameterizedSql.from(values, builder);
151     }
152 
153     /**
154      * Converts the object value to the value suitable for the underlying database if necessary.
155      @param object the object which might need conversion
156      @return either the original object or a representation of the object which is suitable for the database
157      */
158     default Object convertIfNecessary(Object object) {
159         if (object instanceof Enum) {
160             return object.toString();
161         }
162 
163         return object;
164     }
165 
166 }